package Kafka010.Utils
import java.util.Properties
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
/**
* Created by Shi shuai RollerQing on 2019/12/24 20:19
*/
object ProducerDemo {
def main(args: Array[String]): Unit = {
// 定义kafka的参数
val brokers = "hadoop01:9092,hadoop02:9092,hadoop03:9092"
val topic = "topicB"
val prop = new Properties()
//prop.put("bootstraps", brokers)
//prop.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
//prop.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
// KafkaProducer
val producer: KafkaProducer[String, String] = new KafkaProducer[String, String](prop)
// KafkaRecorder
// 异步
for(i <- 1 to 100000){
val msg = new ProducerRecord[String, String](topic, i.toString, i.toString)
//发送消息
producer.send(msg)
println(s"i = $i")
Thread.sleep(100)
}
}
}
<!-- spark-streaming-kafka-0-10 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>2.2.0</version>
</dependency>