无法在 Kafka 中使用来自远程计算机的消息

2023-12-21

我在我的一台机器上创建了一个kafka主题,其IP为192.168.25.50,主题名称为test-poc。然后通过使用 kafka-console- Producer 我生成了如下消息

kafka-console-producer --broker-list localhost:9092 --topic test-poc

>test message1

>test message2

之后我在另一台机器上下载了kafka并尝试使用以下命令使用

kafka-console-consumer --bootstrap-server 192.168.25.50:9092 --topic test-poc --from-beginning 

其中 192.168.25.50 是运行 Kafka 生产者的服务器的 IP。

因此,执行上述命令后,我收到以下错误。

[2018-06-28 20:45:12,822] WARN [Consumer clientId=consumer-1, groupId=console-consumer-33012] Connection to node 2147483647 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2018-06-28 20:45:12,934] WARN [Consumer clientId=consumer-1, groupId=console-consumer-33012] Connection to node 0 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2018-06-28 20:45:13,038] WARN [Consumer clientId=consumer-1, groupId=console-consumer-33012] Connection to node 0 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2018-06-28 20:45:13,191] WARN [Consumer clientId=consumer-1, groupId=console-consumer-33012] Connection to node 0 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2018-06-28 20:45:13,395] WARN [Consumer clientId=consumer-1, groupId=console-consumer-33012] Connection to node 0 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)

我的kafka的server.properties

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# see kafka.server.KafkaConfig for additional details and defaults

############################# Server Basics #############################

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=0

############################# Socket Server Settings #############################

# The address the socket server listens on. It will get the value returned from
# java.net.InetAddress.getCanonicalHostName() if not configured.
#   FORMAT:
#     listeners = listener_name://host_name:port
#   EXAMPLE:
#     listeners = PLAINTEXT://your.host.name:9092
#listeners=PLAINTEXT://:9092

# Hostname and port the broker will advertise to producers and consumers. If not set,
# it uses the value for "listeners" if configured.  Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
#advertised.listeners=PLAINTEXT://your.host.name:9092

# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL

# The number of threads that the server uses for receiving requests from the network and sending responses to the network
num.network.threads=3

# The number of threads that the server uses for processing requests, which may include disk I/O
num.io.threads=8

# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400

# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400

# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600


############################# Log Basics #############################

# A comma seperated list of directories under which to store log files
log.dirs=/tmp/kafka-logs

# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=1

# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1

############################# Internal Topic Settings  #############################
# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
# For anything other than development testing, a value greater than 1 is recommended for to ensure availability such as 3.
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1

############################# Log Flush Policy #############################

# Messages are immediately written to the filesystem but by default we only fsync() to sync
# the OS cache lazily. The following configurations control the flush of data to disk.
# There are a few important trade-offs here:
#    1. Durability: Unflushed data may be lost if you are not using replication.
#    2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
#    3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.

# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000

# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000

############################# Log Retention Policy #############################

# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.

# The minimum age of a log file to be eligible for deletion due to age
log.retention.hours=168

# A size-based retention policy for logs. Segments are pruned from the log unless the remaining
# segments drop below log.retention.bytes. Functions independently of log.retention.hours.
#log.retention.bytes=1073741824

# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824

# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000

############################# Zookeeper #############################

# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=localhost:2181

# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000


############################# Group Coordinator Settings #############################

# The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
# The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
# The default value for this is 3 seconds.
# We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
# However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.

任何人都可以帮助解决这个问题吗?

注意:当我在同一台机器上运行生产者和消费者时,它工作正常。


默认情况下,代理将绑定到本地主机。如果你的机器ip是a.b.c.d它是一个虚拟机实例,那么您需要取消注释该行server.properties含有listeners=PLAINTEXT://:9092并把listeners=PLAINTEXT://a.b.c.d:9092

如果是 docker 容器,您可以尝试添加以下两行:

listeners=PLAINTEXT://0.0.0.0:9092
advertised.listeners=PLAINTEXT://a.b.c.d:9092
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

无法在 Kafka 中使用来自远程计算机的消息 的相关文章

  • KafkaConsumer.commitAsync() 行为的偏移量比以前更低

    kafka 将如何处理调用 KafkaConsumer commitAsync Map
  • 如何在 Python 中以编程方式检查 Kafka Broker 是否已启动并运行

    我正在尝试使用来自 Kafka 主题的消息 我正在使用包装器confluent kafka消费者 我需要在开始使用消息之前检查连接是否已建立 我读到消费者很懒 所以我需要执行一些操作才能建立连接 但我想检查连接建立而不执行consume o
  • Kafka Streams - 如何扩展 Kafka 存储生成的变更日志主题

    我有多个冗余应用程序实例 它们想要使用主题的所有事件并独立存储它们以进行磁盘查找 通过rocksdb 为了便于论证 我们假设这些冗余消费者正在服务无状态 http 请求 因此 负载不是使用 kafka 共享的 而是使用 kafka 将数据从
  • 使用 offsets_for_times 从时间戳消费

    尝试使用 confluence kafka AvroConsumer 来消费给定时间戳的消息 if flag creating a list topic partitons to search list map lambda p Topic
  • Kafka:如何获取主题的最后修改时间,即添加到主题的任何分区的最后一条消息

    我们的用例是从 kafka 中删除陈旧 未使用的主题 即如果某个主题 在所有分区上 在过去 7 天内没有任何新消息 那么我们会将其视为陈旧 未使用并删除它 许多谷歌结果建议向消息添加时间戳 然后解析它 对于新主题和消息 灵魂可以工作 但我们
  • 使用 Spring Embedded Kafka 测试 @KafkaListener

    我正在尝试为我正在使用 Spring Boot 2 x 开发的 Kafka 侦听器编写单元测试 作为一个单元测试 我不想启动一个完整的 Kafka 服务器作为 Zookeeper 的实例 所以 我决定使用 Spring Embedded K
  • 卡夫卡流:RocksDB TTL

    据我了解 默认 TTL 设置为无穷大 非正数 但是 如果我们需要在存储中保留数据最多 2 天 我们可以使用 RocksDBConfigSetter 接口实现 即 options setWalTtlSeconds 172800 进行覆盖吗 或
  • 有没有办法使用 .NET 中的 Kafka Ksql Push 查询

    我目前正在 NET 中使用 Kafka 消费者处理大量 Kafka 消息 我的处理过程的第一步是解析 JSON 并根据 JSON 中特定字段的值丢弃许多消息 我不想首先处理 特别是不下载 那些不需要的消息 看起来 kSql 查询 写为推送查
  • Kafka不启动空白输出

    我正在努力安装 Kafka 和 Zookeeper 我已经运行了 Zookeeper 并且它当前正在运行 我将所有内容设置为 https dzone com articles running apache kafka on windows
  • Spring Boot 和 Kafka,Producer 抛出 key='null' 异常

    我正在尝试使用Spring Boot with Kafka and ZooKeeper with Docker docker compose yml version 2 services zookeeper image wurstmeist
  • 为什么我无法从外部连接到 Kafka?

    我在 ec2 实例上运行 kafka 所以amazon ec2实例有两个ip 一个是内部ip 第二个是外部使用的 我从本地计算机创建了生产者 但它重定向到内部 IP 并给我连接不成功的错误 任何人都可以帮助我在 ec2 实例上配置 kafk
  • Kafka Java Consumer 已关闭

    我刚刚开始使用卡夫卡 我面临着消费者的一个小问题 我用Java写了一个消费者 我收到此异常 IllegalStateException 此消费者已关闭 我在以下行中遇到异常 ConsumerRecords
  • Confluence 平台与 apache kafka [关闭]

    Closed 这个问题是基于意见的 help closed questions 目前不接受答案 我是 kafka 的新手 对 Confluence 平台很好奇 看来Confluence平台上的用户故事并不多 Confluence平台和Apa
  • 从kafka获取特定时间段的结果

    这是我的代码 它使用kafka python now datetime now month ago now relativedelta month 1 topic some topic name consumer KafkaConsumer
  • Kafka JDBC Sink Connector,批量插入值

    我每秒收到很多消息 通过 http 协议 50000 100000 并希望将它们保存到 PostgreSql 我决定使用 Kafka JDBC Sink 来实现此目的 消息以一条记录保存到数据库 而不是批量保存 我想在 PostgreSQL
  • 如何使用PySpark结构流+Kafka

    我尝试将 Spark 结构流与 kafka 一起使用 并且在使用 Spark 提交时遇到问题 消费者仍然从生产中接收数据 但 Spark 结构出错 请帮我找到我的代码的问题 这是我在 test py 中的代码 from kafka impo
  • 如何在kafka中定义多个序列化器?

    比如说 我发布和使用不同类型的 java 对象 对于每个对象 我必须定义自己的序列化器实现 我们如何在 serializer class 属性下提供kafka消费者 生产者属性文件中的所有实现 我们有一个类似的设置 不同主题中的不同对象 但
  • Kafka Consumer 如何(应该)应对有毒消息

    当 Kafka Consumer 无法反序列化消息时 客户端应用程序是否有责任处理有毒消息 Or Kafka是否会 增加 消息偏移并继续消费有效消息 是否有处理 Kafka 主题上的有毒消息的 最佳实践 当 Kafka 无法反序列化记录时
  • 如何获取 Kafka 偏移量以进行结构化查询以进行手动且可靠的偏移量管理?

    Spark 2 2引入了Kafka的结构化流源 据我了解 它依赖 HDFS 检查点目录来存储偏移量并保证 恰好一次 消息传递 但是旧码头 比如https blog cloudera com blog 2017 06 offset manag
  • kafka中的Bootstrap服务器与zookeeper?

    为什么在 kafka consumer 中不推荐使用 Zookeeper 以及为什么建议使用 bootstrap 服务器 bootstrap server 有什么优点 Kafka消费者需要将偏移量提交给kafka并从kafka获取偏移量 由

随机推荐

  • 使用 mbedtls 的 AES-CMAC:未定义的参考错误

    我尝试使用 mbedTLS 实现 AES CMAC 我收到一些错误 未定义的引用mbedtls cipher cmac starts 未定义的引用mbedtls cipher cmac update 未定义的引用mbedtls cipher
  • 在 VB.NET 中连接数组[重复]

    这个问题在这里已经有答案了 在 Visual Basic 中连接一个或多个数组 或 ArrayList 的最简单方法是什么 我正在使用 NET 3 5 如果这很重要的话 这是用 C 编写的 但你肯定能弄清楚 int a new int 1
  • 如何通过从文件读取坐标自动画线?

    我正在尝试在一端画一条带有箭头的线 另外 我需要对同一图中的多个箭头自动执行此操作 d3 csv data coordinates csv then function data d x1 d x1 d y1 d y1 d x2 d x2 d
  • iPhone MonoTouch - 获取捆绑包版本

    在 MonoTouch 中 我们如何获取当前运行的包的版本 我最接近的猜测是在某个地方 NSBundle MainBundle ObjectForInfoDictionary 使用以下代码获取捆绑包的当前版本 NSBundle MainBu
  • Google Data Studio Connector 检索数据的不同参数

    我正在尝试连接到需要不同参数来检索数据的数据存储 我尝试过数据连接器的不同多个实例 但数据工作室似乎很难存储不同的配置值 数据连接器控件当前不可用于社区连接器 请问最好的方法是什么 社区连接器当前不支持参数化 我们计划将来添加此功能 目前
  • 无法连接到 SQL Server 来调试 SQLCLR 存储过程

    我想在 SQL Server 中调试 SQLCLR 存储过程 我一直在尝试在 VS2015 Community 和最近安装的 VS2017 Community 版本中调试 SP 但没有成功 我非常确定问题出在连接到 SQL Server 上
  • 如何使用 RVM 并创建全局可用的 gem?

    我正在运行 Mac OSX 10 6 4 并安装了 RVM 到目前为止 它非常棒 我真的很喜欢它让我在同一台机器上管理多个版本的 Rails 和 ruby 的方式 而不会让人头疼 但是 我不想为每个设置安装某些宝石 例如乘客 有没有办法在宝
  • 当达到字符限制时,AngularJS 阻止在文本区域上输入

    当达到最大字符数时 如何阻止用户在文本区域中输入更多字符 我现在使用 ng keypress 但我不知道如何在达到限制时阻止输入 用户不应能够在该文本区域中输入或粘贴总共超过 1000 个字符 问题是如何停止输入 而不是如何计算输入长度 这
  • ArticlesController#show 中的 ActiveRecord::RecordNotFound 无法找到没有 ID 的文章

    我正在尝试向数据库提交一些数据 但当我尝试检索这些数据时 显示无法找到没有 ID ils 4 0 1 的文章 我正在使用 ruby 2 0 0 和 ra def show article Article find params id end
  • 阻止 iPhone 缩放表格? [复制]

    这个问题在这里已经有答案了 代码
  • Scala 转换为泛型类型

    我对泛型类型感到困惑 我预计2 asInstanceOf A 被强制转换为类型A 同时 它被投射到Int 除此之外 输入是java lang Long而输出是一个列表Int 根据定义输入和输出应该是相同的类型 这是为什么 def whate
  • 如何在C#中使用Webclient填写表单并提交

    我是在 C 中使用 WebClient HttpResponse 和 HttpRequest 库的新手 所以如果我的问题读起来令人困惑 请耐心等待 我需要构建一个基于 C 的 WinForm 它可以打开一个 URL 该 URL 受到基本授权
  • 0/1 重量不合理的背包

    考虑0 1背包问题 http en wikipedia org wiki Knapsack problem 标准动态规划算法仅适用于背包的容量和重量均为整数 有理数时 当容量 重量不合理时怎么办 问题是我们不能像处理整数权重那样记忆 因为我
  • plsql 远程调试断点不起作用

    我在调试 pl sql 代码时遇到了奇怪的情况 我将 sql Developer 设置为侦听调试连接 在java代码中 我使用以下代码附加调试器 CallableStatement cstmt null try cstmt getConne
  • Android – 如何加载共享库?

    我创建了最简单的 EXECUTABLE 和 SHARED LIBRARY 如果不更改 LD LIBRARY PATH 则不会加载 SHARED LIBRARY hello hello link image 1995 failed to li
  • cv::Mat 到 QImage 转换

    我发现了非常相似的主题 如何将 opencv cv Mat 转换为 qimage https stackoverflow com questions 5026965 how to convert an opencv cvmat to qim
  • bash脚本循环多个变量

    我正在尝试写类似以下内容 for i in a z j in 1 26 do echo dev sd i 1 disk j ext4 noatime 1 1 gt gt test done 当然 这不是正确的语法 有人可以帮助我使用正确的语
  • 在运行时从 iOS 上的本机方法创建委托

    这是一个MonoTouch 专用问题 我目前正在开发一个 OpenGL 的包装器 它与 OpenTK 等包装器有很大不同 该包装器用于实现更快的 OpenGL 开发 方法不是这样声明的 void glGenTextures Int32 n
  • 创建一个php函数来返回mysql结果

    我试图创建一个函数 它将返回一个 mysql 查询 然后我可以循环遍历并处理结果 但它似乎不起作用 我什至可能没有以正确的方式这样做 function GetAccounts username require dbconn php resu
  • 无法在 Kafka 中使用来自远程计算机的消息

    我在我的一台机器上创建了一个kafka主题 其IP为192 168 25 50 主题名称为test poc 然后通过使用 kafka console Producer 我生成了如下消息 kafka console producer brok