无法向 kafka 主题发送消息

2024-05-05

我正在使用 Kafka、Play 以及 Scala。 这是我的代码,我想在其中发送消息到kafka服务器,主题名称是“测试主题”。 尽管我没有在主题中看到我发送的消息,但我没有收到任何错误 这里有什么问题吗

 import kafka.producer.ProducerConfig
    import java.util.Properties
    import kafka.producer.Producer
    import scala.util.Random
    import kafka.producer.Producer
    import kafka.producer.Producer
    import kafka.producer.Producer
    import kafka.producer.KeyedMessage
    import java.util.Date

    object KafkaProducerLocal extends App {

      sendMessage

      def sendMessage {

        val topicName = "test-topic"
        try {
          val rnd = new Random()
          val props = new Properties()
          props.put("metadata.broker.list", "localhost:9092") //kafka 
          props.put("zk.connect", "localhost:2181");  //zookeeper
          props.put("serializer.class", "kafka.serializer.StringEncoder")
          props.put("producer.type", "async")


          val config = new ProducerConfig(props)
          val producer = new Producer[String, String](config)
          val t = System.currentTimeMillis()
          for (nEvents <- Range(0, 10)) {
            val ip = "192.168.2." + rnd.nextInt(255);
            val data = new KeyedMessage[String, String](topicName, ip, "Swapnil Test Data" + nEvents);
            producer.send(data);
          }

          producer.close();
        } catch {
          case t: Throwable => t.printStackTrace()
        }
      }

    }

你的代码没有任何问题。

  • 检查您的 log4j 属性以查看日志
  • 您运行的kafka版本与您的客户端版本相同。
  • 首先创建一个主题link http://kafka.apache.org/documentation.html#quickstart_createtopic
  • 检查服务器是否正常工作,主题是否已创建,是否可以通过控制台生产者和消费者发送和接收消息example http://kafka.apache.org/documentation.html#quickstart_send

应用日志

2016-04-19 01:12:34 INFO  kafka.utils.Logging$class:68 - Verifying properties
2016-04-19 01:12:34 INFO  kafka.utils.Logging$class:68 - Property metadata.broker.list is overridden to localhost:9092
2016-04-19 01:12:34 INFO  kafka.utils.Logging$class:68 - Property producer.type is overridden to async
2016-04-19 01:12:34 INFO  kafka.utils.Logging$class:68 - Property serializer.class is overridden to kafka.serializer.StringEncoder
2016-04-19 01:12:34 WARN  kafka.utils.Logging$class:83 - Property zk.connect is not valid
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/home/vishnu/.m2/repository/org/slf4j/slf4j-log4j12/1.7.12/slf4j-log4j12-1.7.12.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/home/vishnu/.m2/repository/ch/qos/logback/logback-classic/1.1.2/logback-classic-1.1.2.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
2016-04-19 01:12:34 INFO  kafka.utils.Logging$class:68 - Shutting down producer
2016-04-19 01:12:34 INFO  kafka.utils.Logging$class:68 - Begin shutting down ProducerSendThread
2016-04-19 01:12:34 INFO  kafka.utils.Logging$class:68 - Fetching metadata from broker id:0,host:localhost,port:9092 with correlation id 0 for 1 topic(s) Set(topic-test)
2016-04-19 01:12:34 INFO  kafka.utils.Logging$class:68 - Connected to localhost:9092 for producing
2016-04-19 01:12:34 INFO  kafka.utils.Logging$class:68 - Disconnecting from localhost:9092
2016-04-19 01:12:34 INFO  kafka.utils.Logging$class:68 - Connected to HMECL001076:9092 for producing
2016-04-19 01:12:34 INFO  kafka.utils.Logging$class:68 - Shutdown ProducerSendThread complete
2016-04-19 01:12:34 INFO  kafka.utils.Logging$class:68 - Closing all sync producers
2016-04-19 01:12:34 INFO  kafka.utils.Logging$class:68 - Disconnecting from HMECL001076:9092
2016-04-19 01:12:34 INFO  kafka.utils.Logging$class:68 - Producer shutdown completed in 298 ms

控制台消费者输出

 /opt/kafka/kafka_2.10-0.8.2.1/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic topic-test --property group.id cs1 --from-beginning
Swapnil Test Data3
Swapnil Test Data9
Swapnil Test Data2
Swapnil Test Data5
Swapnil Test Data6
Swapnil Test Data8
Swapnil Test Data0
Swapnil Test Data1
Swapnil Test Data4
Swapnil Test Data7
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

无法向 kafka 主题发送消息 的相关文章

  • 使用 Spray-json 解析简单数组

    我正在尝试 但失败了 了解 Spray json 如何将 json feed 转换为对象 如果我有一个简单的 key gt value json feed 那么它似乎可以正常工作 但是我想要读取的数据出现在如下列表中 name John a
  • 通用特征的隐式转换

    我正在实现一个数据结构 并希望用户能够使用任何类型作为密钥 只要他提供一个合适的密钥类型来包装它 我有这个关键类型的特质 这个想法是进行从基类型到键类型的隐式转换 反之亦然 实际上 只使用基类型 该特征看起来像这样 trait Key T
  • Scala:类似 Option (Some, None) 但具有三种状态:Some、None、Unknown

    我需要返回值 当有人询问值时 告诉他们以下三件事之一 这是值 没有价值 我们没有关于该值的信息 未知 情况 2 与情况 3 略有不同 示例 val radio car radioType 我们知道该值 返回无线电类型 例如 pioneer
  • 如何在 Scala 中打印任何内容的列表?

    目前我有一个打印整数的方法 def printList args List Int Unit args foreach println 我如何修改它 使其足够灵活 可以打印任何内容的列表 您不需要专用的方法 所需的功能已经在集合类中 pri
  • 使用 Spark DataFrame 获取组后所有组的 TopN

    我有一个 Spark SQL DataFrame user1 item1 rating1 user1 item2 rating2 user1 item3 rating3 user2 item1 rating4 如何按用户分组然后返回TopN
  • 是否有任何模拟器/工具可以生成流式传输消息?

    出于测试目的 我需要模拟客户端每秒生成 100 000 条消息并将它们发送到 kafka 主题 有没有任何工具或方法可以帮助我生成这些随机消息 有一个用于生成虚拟负载的内置工具 位于bin kafka producer perf test
  • 为什么《Scala 中的函数式编程》一书的“无异常处理错误”一章中没有提到“scala.util.Try”?

    在 Scala 中的函数式编程 一书中的 无异常处理错误 一章中 作者给出 从函数体抛出异常的问题 Use Option如果我们不关心实际的异常 Use Either如果我们关心实际的异常 But scala util Try没有提到 从我
  • 如何在kafka中定义多个序列化器?

    比如说 我发布和使用不同类型的 java 对象 对于每个对象 我必须定义自己的序列化器实现 我们如何在 serializer class 属性下提供kafka消费者 生产者属性文件中的所有实现 我们有一个类似的设置 不同主题中的不同对象 但
  • 运行具有外部依赖项的 Scala 脚本

    我在 Users joe scala lib 下有以下 jar commons codec 1 4 jar httpclient 4 1 1 jar httpcore 4 1 jar commons logging 1 1 1 jar ht
  • 使用 scala 集合 - CanBuildFrom 麻烦

    我正在尝试编写一个接受任何类型集合的方法CC 并将其映射到一个新的集合 相同的集合类型但不同的元素类型 我正在挣扎 基本上我正在尝试实施map but 不在集合本身上 问题 我正在尝试实现一个带有签名的方法 它看起来有点像 def map
  • Java 中的“Lambdifying”scala 函数

    使用Java和Apache Spark 已用Scala重写 面对旧的API方法 org apache spark rdd JdbcRDD构造函数 其参数为 AbstractFunction1 abstract class AbstractF
  • Scala 特性:val/def 和 require

    下面的代码抛出IllegalArgumentException trait T val x Long require x gt 0 object T extends App val y new T val x 42L 而以下情况则不然 tr
  • 如果找不到元素,为什么 Scala 的索引方法返回 -1 而不是 None?

    我一直想知道为什么在 Scala 中使用各种索引方法来确定集合中元素的位置 例如List indexOf List indexWhere 返回 1指示集合中不存在给定元素 而不是更惯用的Option Int 回国有什么特别的好处吗 1代替N
  • 在 IntelliJ 中运行 Spark 字数统计

    我花了几个小时浏览 You Tube 视频和教程 试图了解如何在 Scala 中运行 Spark 字数统计程序 并将其转换为 jar 文件 我现在完全糊涂了 我运行了 Hello World 并且了解了如何在 Apache spark sp
  • 在 Scala 中调用 WebSocket 中的方法

    我是 scala Play 框架和 Akka 的新手 我的函数定义为 def socket WebSocket accept String String request gt ActorFlow actorRef out gt MyWebS
  • 如何访问 Scala XML 中的父元素

    The scala xml包表示带有标记树节点的 XML 但是这棵树在 Scala 2 7 中是单向的吗 因为似乎没有办法访问Elem给定的父级Elem 这似乎同样适用于父母Document 例如 在 XOM 中你有getParent an
  • 按字符分割字符串

    scala 有一个标准的分割字符串的方法StringOps split 但它的行为有点让我惊讶 演示一下 使用快捷便利功能 def sp str String str split toList 以下表达式全部计算结果为 true sp Li
  • 到底什么是单例类型?

    什么是单例类型 有什么应用和影响 我们非常欢迎示例 更欢迎外行术语 如果将类型视为一组值 则值的单例类型x是仅包含该值的类型 x 用法示例 模式匹配 case Foo type检查匹配的对象是否与Foo using eq where cas
  • Scala sbt 项目给出 NullPointerException?

    当我运行命令时sbt clean compile run在我的 sbt 项目中 它给出了空指针异常 这是控制台输出 info Loading project definition from home dnilesh workspace wi
  • java.lang.OutOfMemoryError:Scala 上超出了 GC 开销限制

    我是 Scala 开发人员 我在Routes它包含的文件1008行如果我添加另一行 则会抛出下面的错误 Uncaught error from thread sbt web scheduler 1 shutting down JVM sin

随机推荐