ReceiverAPI:需要一个专门的Executor去接收数据,然后发送给其他的Executor做计算。存在的问题,接收数据的Executor和计算的Executor速度会有所不同,特别在接收数据的Executor速度大于计算的Executor速度,会导致计算数据的节点内存溢出。
DirectAPI:是由计算的Executor来主动消费Kafka的数据,速度由自身控制。
Kafka 0-8 Receive模式
1) 需求:通过SparkStreaming从Kafka读取数据,并将读取过来的数据做简单计算,最终打印到控制台。
2)导入依赖
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
<version>2.1.1</version>
</dependency>
2) 编写代码
0-8Receive模式,offset维护在zk中,程序停止后,继续生产数据,再次启动程序,仍然可以继续消费。可通过get /consumers/bigdata/offsets/主题名/分区号 查看
object Spark04_ReceiverAPI {
def main(args: Array[String]): Unit = {
//1.创建SparkConf
val sparkConf: SparkConf = new SparkConf().setAppName("Spark04_ReceiverAPI").setMaster("local[*]")
//2.创建StreamingContext
val ssc = new StreamingContext(sparkConf, Seconds(3))
//3.使用ReceiverAPI读取Kafka数据创建DStream
//在定义Kafka参数的时候,应注意添加的是zk集群信息还是kafka集群信息
val kafkaDStream: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream(ssc,
"hadoop202:2181,hadoop203:2181,hadoop204:2181",
"bigdata",
//v表示的主题的分区数
Map("mybak" -> 2))
//4.计算WordCount并打印 new KafkaProducer[String,String]().send(new ProducerRecord[]())
val lineDStream: DStream[String] = kafkaDStream.map(_._2)
val word: DStream[String] = lineDStream.flatMap(_.split(" "))
val wordToOneDStream: DStream[(String, Int)] = word.map((_, 1))
val wordToCountDStream: DStream[(String, Int)] = wordToOneDStream.reduceByKey(_ + _)
wordToCountDStream.print()
//5.开启任务
ssc.start()
ssc.awaitTermination()
}
}
打开kafka集群,输入
[wt@hadoop02 kafka]$ bin/kafka-console-producer.sh --broker-list hadoop02:9092 --topic mybak
Kafka 0-8 Direct模式
1)需求:通过SparkStreaming从Kafka读取数据,并将读取过来的数据做简单计算,最终打印到控制台。
2)导入依赖
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
<version>2.1.1</version>
</dependency>
3) 编写代码(自动维护offset1)
offset维护在checkpoint中,但是获取StreamingContext的方式需要改变,目前这种方式会丢失消息
object Spark05_DirectAPI_Auto01 {
def main(args: Array[String]): Unit = {
//1.创建SparkConf
val sparkConf: SparkConf = new SparkConf().setAppName("Spark05_DirectAPI_Auto01").setMaster("local[*]")
//2.创建StreamingContext
val ssc = new StreamingContext(sparkConf, Seconds(3))
ssc.checkpoint("D:\\dev\\workspace\\my-bak\\spark-bak\\cp")
//3.准备Kafka参数
val kafkaParams: Map[String, String] = Map[String, String](
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "hadoop202:9092,hadoop203:9092,hadoop204:9092",
ConsumerConfig.GROUP_ID_CONFIG -> "bigdata"
)
//4.使用DirectAPI自动维护offset的方式读取Kafka数据创建DStream
val kafkaDStream: InputDStream[(String, String)] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc,
kafkaParams,
Set("mybak"))
//5.计算WordCount并打印
kafkaDStream.map(_._2)
.flatMap(_.split(" "))
.map((_, 1))
.reduceByKey(_ + _)
.print()
//6.开启任务
ssc.start()
ssc.awaitTermination()
}
}
4) 编写代码(自动维护offset2)
offset维护在checkpoint中,获取StreamingContext为getActiveOrCreate
这种方式缺点:
checkpoint小文件过多
checkpoint记录最后一次时间戳,再次启动的时候会把间隔时间的周期再执行一次
object Spark06_DirectAPI_Auto02 {
def main(args: Array[String]): Unit = {
val ssc: StreamingContext = StreamingContext.getActiveOrCreate("D:\\dev\\workspace\\my-bak\\spark-bak\\cp", () => getStreamingContext)
ssc.start()
ssc.awaitTermination()
}
def getStreamingContext: StreamingContext = {
//1.创建SparkConf
val sparkConf: SparkConf = new SparkConf().setAppName("DirectAPI_Auto01").setMaster("local[*]")
//2.创建StreamingContext
val ssc = new StreamingContext(sparkConf, Seconds(3))
ssc.checkpoint("D:\\dev\\workspace\\my-bak\\spark-bak\\cp")
//3.准备Kafka参数
val kafkaParams: Map[String, String] = Map[String, String](
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "hadoop202:9092,hadoop203:9092,hadoop204:9092",
ConsumerConfig.GROUP_ID_CONFIG -> "bigdata"
)
//4.使用DirectAPI自动维护offset的方式读取Kafka数据创建DStream
val kafkaDStream: InputDStream[(String, String)] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc,
kafkaParams,
Set("mybak"))
//5.计算WordCount并打印
kafkaDStream.map(_._2)
.flatMap(_.split(" "))
.map((_, 1))
.reduceByKey(_ + _)
.print()
//6.返回结果
ssc
}
}
5) 编写代码(手动维护offset)
object Spark07_DirectAPI_Handler {
def main(args: Array[String]): Unit = {
//1.创建SparkConf
val conf: SparkConf = new SparkConf().setAppName("DirectAPI_Handler").setMaster("local[*]")
//2.创建StreamingContext
val ssc = new StreamingContext(conf, Seconds(3))
//3.创建Kafka参数
val kafkaParams: Map[String, String] = Map[String, String](
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "hadoop202:9092,hadoop203:9092,hadoop204:9092",
ConsumerConfig.GROUP_ID_CONFIG -> "bigdata"
)
//4.获取上一次消费的位置信息
val fromOffsets: Map[TopicAndPartition, Long] = Map[TopicAndPartition, Long](
TopicAndPartition("mybak", 0) -> 13L,
TopicAndPartition("mybak", 1) -> 10L
)
//5.使用DirectAPI手动维护offset的方式消费数据
val kafakDStream: InputDStream[String] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, String](
ssc,
kafkaParams,
fromOffsets,
(m: MessageAndMetadata[String, String]) => m.message())
//6.定义空集合用于存放数据的offset
var offsetRanges = Array.empty[OffsetRange]
//7.将当前消费到的offset进行保存
kafakDStream.transform { rdd =>
offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd
}.foreachRDD { rdd =>
for (o <- offsetRanges) {
println(s"${o.fromOffset}-${o.untilOffset}")
}
}
//8.开启任务
ssc.start()
ssc.awaitTermination()
}
}
## Kafka 0-10 Direct模式
1)需求:通过SparkStreaming从Kafka读取数据,并将读取过来的数据做简单计算,最终打印到控制台。
2)导入依赖,为了避免和0-8冲突,我们新建一个module演示
org.apache.spark spark-core_2.11 2.1.1 org.apache.spark spark-streaming_2.11 2.1.1 org.apache.spark spark-streaming-kafka-0-10_2.11 2.1.1 ```
3)编写代码
object Spark01_DirectAPI010 {
def main(args: Array[String]): Unit = {
//1.创建SparkConf
val conf: SparkConf = new SparkConf().setAppName("DirectAPI010").setMaster("local[*]")
//2.创建StreamingContext
val ssc = new StreamingContext(conf, Seconds(3))
//3.构建Kafka参数
val kafkaParmas: Map[String, Object] = Map[String, Object](
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "hadoop102:9092,hadoop103:9092,hadoop104:9092",
ConsumerConfig.GROUP_ID_CONFIG -> "bigdata191122",
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer",
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer]
)
//4.消费Kafka数据创建流
val kafkaDStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](Set("test"), kafkaParmas))
//5.计算WordCount并打印
kafkaDStream.map(_.value())
.flatMap(_.split(" "))
.map((_, 1))
.reduceByKey(_ + _)
.print()
//6.启动任务
ssc.start()
ssc.awaitTermination()
}
}
消费Kafka数据模式总结
0-8 ReceiverAPI:
- 专门的Executor读取数据,速度不统一
- 跨机器传输数据,WAL
- Executor读取数据通过多个线程的方式,想要增加并行度,则需要多个流union
- offset存储在Zookeeper中
0-8 DirectAPI:
- Executor读取数据并计算
- 增加Executor个数来增加消费的并行度
- offset存储
a) CheckPoint(getActiveOrCreate方式创建StreamingContext)
b) 手动维护(有事务的存储系统)
c) 获取offset必须在第一个调用的算子中:offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
0-10 DirectAPI:
- Executor读取数据并计算
- 增加Executor个数来增加消费的并行度
- offset存储
i. a.__consumer_offsets系统主题中
ii. b.手动维护(有事务的存储系统)