Kafka与Flink结合使用
本地连接kafka
首先可以先以本地模式来对kafka进行操作。
当我们在系统(可以是windows,尽量linux)上部署好了Zookeeper和kafka集群,值得注意的是单机也可以部署一个假集群来对实际生产环境做一个模拟(参考:https://blog.csdn.net/weixin_40366684/article/details/106258816),接下来我们可以启动zookeeper集群。
# bin/zkServer.sh start conf/zoo-1.cfg
# bin/zkServer.sh start conf/zoo-2.cfg
# bin/zkServer.sh start conf/zoo-3.cfg
可以使用以下命令来监测zookeeper集群是否启动成功。
# bin/zkCli.sh
接下来启动kafka集群,kafka集群有两种启动方式,通常使用前台运行的方式,在这种方式如果是用的Xshell来远程控制,那么启动的标签页就不能关闭,只能保持挂着。
开启kafka:
前台运行:
bin/kafka-server-start.sh config/server.properties
后台运行:
nohup bin/kafka-server-start.sh config/server.properties > kafka-run.log 2>&1 &
![在这里插入图片描述](https://imgconvert.csdnimg.cn/aHR0cHM6Ly9pLmxvbGkubmV0LzIwMjAvMDUvMjkvMWFNZXJuQ09oN3BnSlV5LnBuZw?x-oss-process=image/format,png#pic_center)
当kafka集群启动完毕,可以通过以下命令来对对应主题进行操作。
创建kafka主题:
bin/kafka-topics.sh --create --zookeeper 10.45.xx.xx:2191 --replication-factor 1 --partitions 1 --topic test
显示kafka所有主题:
bin/kafka-topics.sh -list -zookeeper 10.45.xx.xx:2191
创建kafka生产者:
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
创建kafka消费者:
bin/kafka-console-consumer.sh --zookeeper 10.45.xx.xx:2191 --topic test --from-beginning
上面是老版本的使用方式 在最新版本的kafka中取消了这个命令使用以下命令:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
解释:
--zookeeper:后面接的是你配置的zookeeper地址
--broker-list:默认端口为9092.可自行更改
![在这里插入图片描述](https://imgconvert.csdnimg.cn/aHR0cHM6Ly9pLmxvbGkubmV0LzIwMjAvMDUvMjkvdDNld3pVbDV2bnlLbXNHLnBuZw?x-oss-process=image/format,png#pic_center)
远程连接kafka
我们想要远程连接,最简单的功能就是生产者和消费者。
首先是在我们的maven项目中添加依赖,其实就是下面这个2.11为kafka的版本,1.10.0为Flink的版本 ,
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>1.10.0</version>
</dependency>
以Apache 的Flink训练项目的完整依赖为例:
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<slf4j.version>1.7.19</slf4j.version>
<flink.version>1.10.0</flink.version>
<scala.binary.version>2.12</scala.binary.version>
<junit.version>4.12</junit.version>
<assertj.version>3.11.1</assertj.version>
</properties>
<dependencies>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-nop</artifactId>
<version>1.7.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cep_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cep-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-state-processor-api_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-uber_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils-junit</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
<version>2.7</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-math3</artifactId>
<version>3.5</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>${junit.version}</version>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<version>${assertj.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>1.10.0</version>
</dependency>
</dependencies>
实际上依赖只需要核心的几个,最重要的是确定需要的版本和版本之间的搭配,下面这个也行:
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.3.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.11</artifactId>
<version>2.3.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.3.1</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.47</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.0.0</version>
</dependency>
</dependencies>
生产者:
package com.ververica.flinktraining.exercises.datastream_scala.connect
import java.util.Properties
import com.ververica.flinktraining.exercises.datastream_java.datatypes.TaxiRide
import com.ververica.flinktraining.exercises.datastream_java.sources.TaxiRideSource
import com.ververica.flinktraining.exercises.datastream_java.utils.ExerciseBase
import com.ververica.flinktraining.exercises.datastream_java.utils.ExerciseBase.rideSourceOrTest
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, FlinkKafkaConsumerBase, FlinkKafkaProducer}
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
object KafkaProducer {
def main(args: Array[String]): Unit = {
val props = new Properties()
props.setProperty("bootstrap.servers", "47.107.X.X:9092")
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
val kp = new KafkaProducer[String, String](props)
kp.send(new ProducerRecord[String, String]("TaxiRdie", "XXXXXX"))
kp.close()
println("++++")
}
}
接下来在Xshell中运行对应主题TaxiRdie的消费者命令(创建主题时候拼写错了 懒得改了,注意主题要和kafka里面的主题名称一致),
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic TaxiRdie --from-beginning
![image-20200529141818753](https://imgconvert.csdnimg.cn/aHR0cHM6Ly9pLmxvbGkubmV0LzIwMjAvMDUvMjkvSm0zVHVLUEJOeTdackEyLnBuZw?x-oss-process=image/format,png)
接下来我们运行我们的生产者程序,十有八九是不会成功的。。 也就是当我们执行程序添加一条内容为“XXXXXX“的信息给TaxiRdie主题,但是我们的消费者是获取不到的。也就是说并未添加成功。
首先要说明的问题有,kafka和flink的版本问题。Flink内置了Kafka连接器,可用于生产和消费Kafka数据。重要的是,Flink Kafka Consumer
集成了Flink的检查点机制,可提供Exactly Once
。Flink并不会完全依赖Kafka的offset,而是在内部跟踪和检查这些offset。参考:https://blog.csdn.net/duxu24/article/details/105569855
下表为不同版本的Kafka与Flink Kafka Consumer
的对应关系:
![在这里插入图片描述](https://imgconvert.csdnimg.cn/aHR0cHM6Ly9pLmxvbGkubmV0LzIwMjAvMDUvMjkvYmVybjQ3dkpTT1lScXNsLnBuZw?x-oss-process=image/format,png#pic_center)
Kafka Consumer
根据版本分别叫做FlinkKafkaConsumer08
、FlinkKafkaConsumer09
等等,而Kafka >= 1.0.0 的版本就叫FlinkKafkaConsumer
。另外从Flink1.9.0
开始使用Kafka2.2.0
客户端。
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>1.10.0</version>
</dependency>
完整的代码示例,消费者:
package org.ourhome.streamapi
import java.util.Properties
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.runtime.state.filesystem.FsStateBackend
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, FlinkKafkaConsumerBase}
object KafkaSource {
private val KAFKA_TOPIC: String = "kafka_producer_test"
def main(args: Array[String]) {
val params: ParameterTool = ParameterTool.fromArgs(args)
val runType:String = params.get("runtype")
println("runType: " + runType)
val properties: Properties = new Properties()
properties.setProperty("bootstrap.servers", "ip:host")
properties.setProperty("group.id", "kafka_consumer")
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
env.enableCheckpointing(5000)
env.setStateBackend(new FsStateBackend("file:///D:/Temp/checkpoint/flink/KafkaSource"))
val dataSource: FlinkKafkaConsumerBase[String] = new FlinkKafkaConsumer(
KAFKA_TOPIC,
new SimpleStringSchema(),
properties)
.setStartFromLatest()
env.addSource(dataSource)
.flatMap(_.toLowerCase.split(" "))
.map((_, 1))
.keyBy(0)
.timeWindow(Time.seconds(5))
.sum(1)
.filter(_._2 > 5)
.print()
.setParallelism(1)
env.execute("Flink Streaming—————KafkaSource")
}
}
生产者:
package org.ourhome.streamapi
import java.util.Properties
import org.apache.flink.api.common.serialization.{SimpleStringSchema}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.runtime.state.filesystem.FsStateBackend
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, FlinkKafkaConsumerBase, FlinkKafkaProducer}
object WriteIntoKafka {
private val KAFKA_TOPIC: String = "kafka_producer_test"
def main(args: Array[String]): Unit = {
val params: ParameterTool = ParameterTool.fromArgs(args)
val runType:String = params.get("runtype")
println("runType: " + runType)
val properties: Properties = new Properties()
properties.setProperty("bootstrap.servers", "ip:host")
properties.setProperty("group.id", "kafka_consumer")
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
env.enableCheckpointing(5000)
env.setStateBackend(new FsStateBackend("file:///D:/Temp/checkpoint/flink/KafkaSource"))
val dataSource: FlinkKafkaConsumerBase[String] = new FlinkKafkaConsumer(
KAFKA_TOPIC,
new SimpleStringSchema(),
properties)
.setStartFromLatest()
val dataStream: DataStream[String] = env.addSource(dataSource)
val kafkaSink: FlinkKafkaProducer[String] = new FlinkKafkaProducer[String](
"brokerList",
"topic",
new SimpleStringSchema()
)
dataStream.addSink(kafkaSink)
env.execute("Flink Streaming—————KafkaSource and KafkaSink")
}
}
当我们了解了对应的API操作,接下来还有很重要的一点就是访问得到云主机对应得端口吗,可以使用Telnet来排查是否有防火墙问题,注意端口号前是空格不是冒号:
telnet 47.107.X。X 9092
如果能够不能连通,那么就不是端口通不过防火墙,否则就需要去对应的控制台更改防火墙设置。
![[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-lXdDwofI-1590735306343)(https://i.loli.net/2020/05/29/kjqQMmZUdhyPibC.png)]](https://imgconvert.csdnimg.cn/aHR0cHM6Ly9pLmxvbGkubmV0LzIwMjAvMDUvMjkva2pxUU1tWlVkaHlQaWJDLnBuZw?x-oss-process=image/format,png#pic_center)
当能够连通那么就排除了这个问题,接下来我们可以检查kafka的配置文件,打开config/server.properties配置文件,更改如下:
![在这里插入图片描述](https://imgconvert.csdnimg.cn/aHR0cHM6Ly9pLmxvbGkubmV0LzIwMjAvMDUvMjkvSWtQWkNLYTdYM0dORExwLnBuZw?x-oss-process=image/format,png#pic_center)
此时重启kafka集群,然后我们运行上面的生产者代码,可以看到生产者程序运行成功。:
![在这里插入图片描述](https://imgconvert.csdnimg.cn/aHR0cHM6Ly9pLmxvbGkubmV0LzIwMjAvMDUvMjkvamRTMmV5Y0g0dUlrZ3poLnBuZw?x-oss-process=image/format,png#pic_center#pic_center)
)]
同理改写上面的示例消费者代码来进行一个Flink的实时流消费:
import java.util.Properties
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.runtime.state.filesystem.FsStateBackend
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, FlinkKafkaConsumerBase}
object KafkaSource {
private val KAFKA_TOPIC: String = "TaxiRdie"
def main(args: Array[String]) {
val params: ParameterTool = ParameterTool.fromArgs(args)
val properties: Properties = new Properties()
properties.setProperty("bootstrap.servers", "47.107.X.X:9092")
properties.setProperty("group.id", "TaxiRdie")
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val dataSource: FlinkKafkaConsumerBase[String] = new FlinkKafkaConsumer(
KAFKA_TOPIC,
new SimpleStringSchema(),
properties)
.setStartFromLatest()
env.addSource(dataSource)
.print()
.setParallelism(1)
env.execute("Flink Streaming—————KafkaSource")
}
}
![外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传](https://imgconvert.csdnimg.cn/aHR0cHM6Ly9pLmxvbGkubmV0LzIwMjAvMDUvMjkvR0RodDZXS296NWtmclBVLnBuZw?x-oss-process=image/format,png)
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)