kafka启动失败(版本0.8.0 beta1)

2024-04-23

我正在尝试在独立模式(在ec2上)上使用zookeeper版本(3.3.6)启动kafka服务。 所以我运行 1) sbt update 2) sbt package 3)sbt assembly-package-dependency 然后启动zookeeper服务,然后启动kafka服务器。但是,我收到以下错误消息: 对于卡夫卡服务器日志:

ERROR Error while electing or becoming leader on broker 0 (kafka.server.ZookeeperLeaderElector)
java.net.ConnectException: Connection timed out
    at sun.nio.ch.Net.connect0(Native Method)
    at sun.nio.ch.Net.connect(Net.java:465)
    at sun.nio.ch.Net.connect(Net.java:457)
    at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:670)
    at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
    at kafka.controller.ControllerChannelManager.kafka$controller$ControllerChannelManager$$addNewBroker(ControllerChannelManager.scala:84)
    at kafka.controller.ControllerChannelManager$$anonfun$1.apply(ControllerChannelManager.scala:35)
    at kafka.controller.ControllerChannelManager$$anonfun$1.apply(ControllerChannelManager.scala:35)
    at scala.collection.immutable.Set$Set1.foreach(Set.scala:81)
    at kafka.controller.ControllerChannelManager.<init>(ControllerChannelManager.scala:35)
    at kafka.controller.KafkaController.startChannelManager(KafkaController.scala:503)
    at kafka.controller.KafkaController.initializeControllerContext(KafkaController.scala:467)
    at kafka.controller.KafkaController.onControllerFailover(KafkaController.scala:215)
    at kafka.controller.KafkaController$$anonfun$1.apply$mcV$sp(KafkaController.scala:89)
    at kafka.server.ZookeeperLeaderElector.elect(ZookeeperLeaderElector.scala:53)
    at kafka.server.ZookeeperLeaderElector$LeaderChangeListener.handleDataDeleted(ZookeeperLeaderElector.scala:106)
    at org.I0Itec.zkclient.ZkClient$6.run(ZkClient.java:549)
    at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)

对于动物园管理员日志:

    2014-07-15 15:49:22,996 - INFO  [ProcessThread:-1:PrepRequestProcessor@419] - Got user-level KeeperException when processing sessionid:0x1473b82f52e0004 type:create cxid:0x57 zxid:0xfffffffffffffffe txntype:unknown reqpath:n/a Error Path:/brokers/topics/edwintest Error:KeeperErrorCode = NodeExists for /brokers/topics/edwintest
2014-07-15 15:49:23,102 - INFO  [ProcessThread:-1:PrepRequestProcessor@419] - Got user-level KeeperException when processing sessionid:0x1473b82f52e0004 type:create cxid:0x59 zxid:0xfffffffffffffffe txntype:unknown reqpath:n/a Error Path:/brokers/topics/edwintest Error:KeeperErrorCode = NodeExists for /brokers/topics/edwintest
2014-07-15 15:49:23,109 - INFO  [ProcessThread:-1:PrepRequestProcessor@419] - Got user-level KeeperException when processing sessionid:0x1473b82f52e0004 type:create cxid:0x5b zxid:0xfffffffffffffffe txntype:unknown reqpath:n/a Error Path:/brokers/topics/edwintest Error:KeeperErrorCode = NodeExists for /brokers/topics/edwintest
2014-07-15 15:49:23,215 - INFO  [ProcessThread:-1:PrepRequestProcessor@419] - Got user-level KeeperException when processing sessionid:0x1473b82f52e0004 type:create cxid:0x5d zxid:0xfffffffffffffffe txntype:unknown reqpath:n/a Error Path:/brokers/topics/edwintest Error:KeeperErrorCode = NodeExists for /brokers/topics/edwintest

对于 kafka 生产者日志:

[2014-07-15 15:49:23,107] INFO Fetching metadata from broker id:0,host:localhost,port:9092 with correlation id 23 for 1 topic(s) Set(edwintest) (kafka.client.ClientUtils$)
[2014-07-15 15:49:23,107] INFO Connected to localhost:9092 for producing (kafka.producer.SyncProducer)
[2014-07-15 15:49:23,111] INFO Disconnecting from localhost:9092 (kafka.producer.SyncProducer)
[2014-07-15 15:49:23,111] WARN Error while fetching metadata [{TopicMetadata for topic edwintest ->
No partition metadata for topic edwintest due to kafka.common.LeaderNotAvailableException}] for topic [edwintest]: class kafka.common.LeaderNotAvailableException  (kafka.producer.BrokerPartitionInfo)
[2014-07-15 15:49:23,112] ERROR Failed to collate messages by topic, partition due to: Failed to fetch topic metadata for topic: edwintest (kafka.producer.async.DefaultEventHandler)
[2014-07-15 15:49:23,112] INFO Back off for 100 ms before retrying send. Remaining retries = 0 (kafka.producer.async.DefaultEventHandler)
[2014-07-15 15:49:23,213] INFO Fetching metadata from broker id:0,host:localhost,port:9092 with correlation id 24 for 1 topic(s) Set(edwintest) (kafka.client.ClientUtils$)
[2014-07-15 15:49:23,213] INFO Connected to localhost:9092 for producing (kafka.producer.SyncProducer)
[2014-07-15 15:49:23,217] INFO Disconnecting from localhost:9092 (kafka.producer.SyncProducer)
[2014-07-15 15:49:23,218] WARN Error while fetching metadata [{TopicMetadata for topic edwintest ->
No partition metadata for topic edwintest due to kafka.common.LeaderNotAvailableException}] for topic [edwintest]: class kafka.common.LeaderNotAvailableException  (kafka.producer.BrokerPartitionInfo)
[2014-07-15 15:49:23,219] ERROR Failed to send requests for topics edwintest with correlation ids in [17,24] (kafka.producer.async.DefaultEventHandler)
[2014-07-15 15:49:23,219] ERROR Error in handling batch of 1 events (kafka.producer.async.ProducerSendThread)
kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
    at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
    at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
    at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)
    at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67)
    at scala.collection.immutable.Stream.foreach(Stream.scala:254)
    at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66)
    at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)

my /etc/hosts config

127.0.0.1       ip-172-32-1-95 localhost.localdomain localhost
::1             localhost6.localdomain6 localhost6

我的 server.properties 文件

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# see kafka.server.KafkaConfig for additional details and defaults

############################# Server Basics #############################

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=0

############################# Socket Server Settings #############################

# The port the socket server listens on
port=9092

# Hostname the broker will bind to and advertise to producers and consumers.
# If not set, the server will bind to all interfaces and advertise the value returned from
# from java.net.InetAddress.getCanonicalHostName().
#host.name=localhost

# The number of threads handling network requests
num.network.threads=2

# The number of threads doing disk I/O
num.io.threads=2

# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=1048576

# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=1048576

# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600


############################# Log Basics #############################

# The directory under which to store log files
log.dir=/tmp/kafka-logs

# The number of logical partitions per topic per server. More partitions allow greater parallelism
# for consumption, but also mean more files.
num.partitions=1

############################# Log Flush Policy #############################

# The following configurations control the flush of data to disk. This is the most
# important performance knob in kafka.
# There are a few important trade-offs here:
#    1. Durability: Unflushed data is at greater risk of loss in the event of a crash.
#    2. Latency: Data is not made available to consumers until it is flushed (which adds latency).
#    3. Throughput: The flush is generally the most expensive operation.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.

# The number of messages to accept before forcing a flush of data to disk
log.flush.interval.messages=10000

# The maximum amount of time a message can sit in a log before we force a flush
log.flush.interval.ms=1000

# Per-topic overrides for log.flush.interval.ms
#log.flush.intervals.ms.per.topic=topic1:1000, topic2:3000

############################# Log Retention Policy #############################

# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.

# The minimum age of a log file to be eligible for deletion
log.retention.hours=168

# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
# segments don't drop below log.retention.bytes.
#log.retention.bytes=1073741824

# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=536870912

# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.cleanup.interval.mins=1

############################# Zookeeper #############################

# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=localhost:2181

# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=1000000

# metrics reporter properties
kafka.metrics.polling.interval.secs=5
kafka.metrics.reporters=kafka.metrics.KafkaCSVMetricsReporter
kafka.csv.metrics.dir=/tmp/kafka_metrics
# Disable csv reporting by default.
kafka.csv.metrics.reporter.enabled=false </code>

我的动物园管理员配置zoo.cfg

# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
dataDir=/tmp/zookeeper
# the port at which the clients will connect
clientPort=2181

我尝试删除/tmp/zookeeper或/tmp/kafka-logs下的kafka和zookeeper的所有信息,然后重新启动所有内容,但仍然收到相同的错误。


凉爽的!我猜您正在运行 kafka-console- Producer 来向主题“edwintest”发布消息。在运行生产者之前,使用此命令创建主题


./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 5 --topic edwintest  

然后启动你的控制台制作器。希望这能解决您的问题。

[EDIT]显然,您必须确保正确更新您的 ec2 安全组,以便为​​生产者打开 zk 和 kafka 代理端口。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

kafka启动失败(版本0.8.0 beta1) 的相关文章

  • Kafka Streams - 跳跃窗口 - 去重键

    我正在 4 小时窗口上进行跳跃窗口聚合 每 5 分钟前进一次 由于跳跃窗口重叠 我得到了具有不同聚合值的重复键 TimeWindows of 240 60 1000L advanceBy 5 60 1000L 如何消除具有重复数据的重复键或
  • 通过SOCKS代理连接Kafka

    我有一个在 AWS 上运行的 Kafka 集群 我想用标准连接到集群卡夫卡控制台消费者从我的应用程序服务器 应用程序服务器可以通过 SOCKS 代理访问互联网 无需身份验证 如何告诉 Kafka 客户端通过代理进行连接 我尝试了很多事情 包
  • kafka 连接 s3 源无法与 Minio 一起使用

    我已经验证了与 minio 的连接 确保凭据工作正常并且可以访问 minio 另外 如果我尝试任何其他值store url http minio 9000我无法保存配置 所以我猜想在可见性方面不存在问题卡夫卡连接容器和minio容器 我不确
  • 在spark-kafka中使用schema将ConsumerRecord值转换为Dataframe

    我正在使用 Spark 2 0 2 和 Kafka 0 11 0 并且 我正在尝试在火花流中使用来自卡夫卡的消息 以下是代码 val topics notes val kafkaParams Map String Object bootst
  • 连接到 Apache Kafka 多节点集群中的 Zookeeper

    我按照以下说明设置了多节点 kafka 集群 现在 如何连接到zookeeper 是否可以从 JAVA 中的生产者 消费者端仅连接到一个 ZooKeeper 或者是否有一种方法可以连接所有 ZooKeeper 节点 设置多节点 Apache
  • 使用 kafka java api 的 Avro 序列化器和反序列化器

    Kafka Avro 序列化器和反序列化器无法工作 我尝试使用 kafka 控制台消费者消费消息 我可以看到发布的消息 public class AvroProducer
  • Kafka Producer配置重试策略

    需要更改 Kafka Producer 配置的哪些参数 以便生产者应该 1 重试n次 2 n个间隔后 如果代理关闭 也会收到相同的消息 我需要处理与此相关的情况 https github com rsyslog rsyslog issues
  • Apache Kafka 是否提供异步订阅回调 API?

    我的项目正在将 Apache Kafka 视为老化的基于 JMS 的消息传递方法的潜在替代品 为了让这个过渡尽可能的顺利 如果替代的排队系统 Kafka 有一个异步订阅机制那就更理想了 类似于我们当前项目使用的JMS机制MessageLis
  • kafka ProducerRecord 和 KeyedMessage 有什么区别

    我正在衡量卡夫卡生产者生产者的表现 目前我遇到了两个配置和用法略有不同的客户 Common def buildKafkaConfig hosts String port Int Properties val props new Proper
  • 使用 Spring Boot 进行 Kafka 流

    我想在我的 Spring Boot 项目中使用 Kafka Streams 实时处理 所以我需要 Kafka Streams 配置或者我想使用 KStreams 或 KTable 但我在互联网上找不到示例 我做了生产者和消费者 现在我想实时
  • 如何使用 Kafka 发送大消息(超过 15MB)?

    我发送字符串消息到Kafka V 0 8使用 Java Producer API 如果消息大小约为 15 MB 我会得到MessageSizeTooLargeException 我尝试过设置message max bytes到 40 MB
  • Spring Kafka - 为任何主题的分区消耗最后 N 条消息

    我正在尝试读取请求的卡夫卡消息数 对于非事务性消息 我们将从 endoffset N 对于 M 个分区 开始轮询并收集当前偏移量小于每个分区的结束偏移量的消息 对于幂等 事务消息 我们必须考虑事务标记 重复消息 这意味着偏移量将不连续 在这
  • Spring Boot 和 Kafka,Producer 抛出 key='null' 异常

    我正在尝试使用Spring Boot with Kafka and ZooKeeper with Docker docker compose yml version 2 services zookeeper image wurstmeist
  • Confluence 平台与 apache kafka [关闭]

    Closed 这个问题是基于意见的 help closed questions 目前不接受答案 我是 kafka 的新手 对 Confluence 平台很好奇 看来Confluence平台上的用户故事并不多 Confluence平台和Apa
  • Kafka 中的内部和外部通信

    流动 本地 gt 代理 gt Kafka advertised listeners PLAINTEXT proxyhostname 8080 for external communication listeners PLAINTEXT 90
  • 删除主题级别配置

    为了删除主题中的所有数据 我将其retention ms配置设置为1000 bin kafka topics sh zookeeper KAFKAZKHOSTS alter topic
  • 从kafka获取特定时间段的结果

    这是我的代码 它使用kafka python now datetime now month ago now relativedelta month 1 topic some topic name consumer KafkaConsumer
  • 即使在 Kafka 中进行轮询后,当前也不会发生分区分配

    我有 Java 8 应用程序与 Apache Kafka 2 11 0 10 1 0 一起使用 我需要使用seek特征为poll来自分区的旧消息 然而我遇到了一个例外No current assignment for partition每次
  • 如何删除 Apache Kafka 中的多个主题

    假设我有许多具有相同前缀的主题 例如 giorgos topic1 giorgos topic2 giorgos topic3 用于删除单个主题的命令 例如giorgos topic1 如下 bin kafka topics sh zook
  • kafka中的Bootstrap服务器与zookeeper?

    为什么在 kafka consumer 中不推荐使用 Zookeeper 以及为什么建议使用 bootstrap 服务器 bootstrap server 有什么优点 Kafka消费者需要将偏移量提交给kafka并从kafka获取偏移量 由

随机推荐

  • 为什么 pgAdmin 4 这么慢?

    postgreSQL 的 pgAdmin 4 GUI 非常慢 即使扩展服务器树或数据库树也需要花费太多时间 它们各自花费了近 30 秒的时间来展开 创建新数据库或表时它也会挂起 即使加载后 创建和保存新数据库也需要一分多钟的时间 几乎每次我
  • 回形针未保存,没有错误

    我被绊倒了 浏览了文档 教程等 但不确定我做错了什么 项目中的另一个模型是为 Paperclip 设置的 并且在测试时可以正常工作 它将附件文件信息保存和检索到数据库中 并将文件放入 public system 内的子文件夹中 我基本上将相
  • jQuery 剪贴板复制

    我需要剪贴板复制功能 即使我正在寻求帮助http plugins jquery com project copy http plugins jquery com project copy链接 但无法正常工作 li li
  • Spark:相当于数据帧中的 zipwithindex

    假设我有以下数据框 dummy data a 1 b 25 c 3 d 8 e 1 df sc parallelize dummy data toDF letter number 我想创建以下数据框 a 0 b 2 c 1 d 3 e 0
  • 从坐标中提取地址分量

    我正在尝试使用 R 进行反向地理编码 我首先使用了 ggmap 但无法让它与我的 API 密钥一起使用 现在我正在用 googleway 尝试 newframe c Front lat Front long Front lat Front
  • 共享苹果付费开发者帐户的选项?

    问题 这里正确的程序是什么 Do both the developer and account holder need paid Apple Developer accounts 正如在下面的上下文中所说 我的客户已经有一个 我还需要一个付
  • Kotlin 中 ArrayList() 和 mutableListOf() 之间的区别

    private val repositories mutableListOf
  • 在 ResourceDictionary 文件中使用 viewbox

    我有 ResourceFile1 xaml 文件 其内容
  • 在控制器中使用 Angular 的 $watch 是反模式吗?

    在我永无休止地追求以 正确的 角度方式做事的过程中 我阅读了很多有关如何让控制器观察角度服务中保存的模型变化的文章 一些网站 http www benlesh com 2013 08 angularjs watch digest and a
  • 如何在单页应用程序中实现 gmail 撰写窗口概念?

    我正在开发一个项目 用户可以更轻松地快速添加交易 我非常有兴趣做一些类似于 gmail 撰写弹出窗口在单页上所做的事情 我不知道如何实现这样的事情 请给我指导如何做这些事情 我有兴趣使用 AngularJS 构建它 P S 抱歉问了一个宽泛
  • 包 oracle.jdbc.driver 不存在

    以下代码出错 发生错误 1 import java sql public class DBConnect public static void main String a throws SQLException package oracle
  • 如何使用脚本。在 JADE 模板中

    我使用 JADE 模板使用 Express 框架创建了一个简单的节点应用程序 学习过程中一切都很顺利 直到我开始尝试运行一些客户端 js 但我不知道该怎么做 我需要在 app index js 中做一些事情来告诉节点它们吗 任何帮助将非常感
  • 从相关系数计算中删除异常值

    假设我们有两个数值向量x and y 之间的皮尔逊相关系数x and y是 谁 给的 坐标 x y 我怎样才能自动考虑仅一个子集x and y在计算中 比如90 最大化相关系数 If you really想要做到这一点 删除最大 绝对 残差
  • Boost 互斥范围锁

    我正在阅读 drdobbs com 上的 Boost Mutex 教程 并发现了这段代码 include
  • GNU gdb 如何显示源文件名和符号行

    当使用 GNU gdb 调试 c 进程时 list 命令将打印行但不告诉我文件名 设置断点可以显示我想要的所有行和文件信息 但我不想设置断点并且必须禁用或删除它 gdb b oyss funtion Breakpoint 13 at 0x8
  • 如何在 Google Chrome 扩展程序中创建侧边栏?

    我正在考虑在 Google Chrome 中创建一个侧边栏扩展 并读到有一个 API 调用 Google 禁用了它 那么也许有人知道如何创建并有例子吗 不幸的是 侧边栏 API 工作最近已停止 https bugs chromium org
  • Tkinter 按钮在禁用和更新后仍然响应点击

    我希望按钮启动命令 然后在执行时禁用并在执行完成后再次启用 当我单击该按钮时 它似乎被禁用并且命令被执行 但是 当我在禁用按钮时单击该按钮时 该命令会在第一次执行完成后第二次执行 似乎在第二次单击后 该按钮确实被禁用了 因为我可以在禁用它时
  • 在 NASM 中使用 istruc 时:“警告:尝试初始化 BSS 部分‘.bss’中的内存:忽略 [-w+other]”

    在搜索这个错误时我发现this https stackoverflow com questions 65731514 nasm attempt to initialize memory in bss section 77001709问题 但
  • 我需要删除分割块之间的一点空间

    我的两个分割块之间有一点空间 https i stack imgur com ysU0R png https i stack imgur com ysU0R png在这里你可以看到我的问题 我不明白为什么这些块会这样 body main w
  • kafka启动失败(版本0.8.0 beta1)

    我正在尝试在独立模式 在ec2上 上使用zookeeper版本 3 3 6 启动kafka服务 所以我运行 1 sbt update 2 sbt package 3 sbt assembly package dependency 然后启动z