卡夫卡连接|无法反序列化主题数据 |检索 id 的 Avro 键/值架构版本时出错 |未找到主题错误代码:40401

2024-02-23

首先感谢@OneCricketeer 迄今为止的支持。到目前为止我已经尝试了很多配置,我不知道还能尝试什么。

使用汇合connect-standalone worker.properties sink.properties访问外部流。

连接正在工作,我可以看到偏移量已加载:

INFO [my_mysql_sink|task-0] [Consumer clientId=connector-consumer-my_mysql_sink-0, groupId=connect-my_mysql_sink] 将分区 gamerboot.gamer.master.workouts.clubs.spieleranalysis-1 的偏移量设置为提交的偏移量 FetchPosition{偏移量=2225, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[kafka8.pro.someurl.net:9093 (id: 8rack: null)], epoch=0}} (org.apache.kafka.clients.consumer.内部结构。消费者协调员:844)

但后来当新消息进来时我收到一个错误:

错误 [my_mysql_sink|task-0] WorkerSinkTask{id=my_mysql_sink-0}转换主题中的消息键时出错'gamerboot.gamer.master.workouts.clubs.spieleranalysis'分区 1 位于偏移量 2225 和时间戳 1641459346507 处:无法反序列化主题数据gamerboot.gamer.master.workouts.clubs.spieleranalysisto Avro:
造成原因:org.apache.kafka.common.errors.SerializationException:检索 id 的 Avro 密钥架构版本时出错 422

造成原因:io.confluence.kafka.schemaregistry.client.rest.exceptions.RestClientException:未找到主题。;错误代码:40401

我不明白这一点。

工人属性:

key.converter=io.confluent.connect.avro.AvroConverter
value.converter=io.confluent.connect.avro.AvroConverter

接收器属性

#key.converter.enhanced.avro.schema.support=true
#key.converter=org.apache.kafka.connect.storage.StringConverter

key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=https://schema-reg.pro.someurl.net

#value.converter=org.apache.kafka.connect.storage.StringConverter

value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=https://schema-reg.pro.someurl.net

#key.converter.key.subject.name.strategy=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
#value.converter.value.subject.name.strategy=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
#key.converter.key.subject.name.strategy=io.confluent.kafka.serializers.subject.RecordNameStrategy
#value.converter.value.subject.name.strategy=io.confluent.kafka.serializers.subject.RecordNameStrategy

#pk.mode=record_key
#pk.fields=

由于 mysql 中没有设置 pk,我想记录流中的所有内容。

正如它所说“检索 id 422 的 Avro 密钥架构版本时出错“我可以看到以下内容:

屏幕截图_主题_id https://i.stack.imgur.com/LmBC7.png

不要奇怪,因为它说的是 JSON,这只是我的 ChromePlugin 它将其解释为 json。 发现同样的价值。我还尝试了sink.properties中的每个组合,该组合已被注释掉。 我还能够卷曲键和值的最新模式(例如):

curl -s https://schema-reg.pro.someurl.net/subjects/gamerboot.gamer.master.club-com.ad.gamerboot.kafka.models.workouts.WorkoutKey/versions/latest|jq https://schema-reg.pro.someurl.net/subjects/gamerboot.gamer.master.club-com.ad.gamerboot.kafka.models.workouts.WorkoutKey/versions/latest%7Cjq

{
  "type": "record",
  "name": "ClubWorkoutKey",
  "namespace": "com.ad.gamerboot.kafka.models.workouts",
  "fields": [
    {
      "name": "playerId",
      "type": "string"
    },
    {
      "name": "tagId",
      "type": [
        "null",
        "string"
      ],
      "default": null
    }
  ]
}

当我在 sink.properties 中为 key.converter 和 value.converter 输入 String Converter 时,情况就更进一步了。但我认为一定有什么问题,因为 Avro 是在这里传递的。对于 String,还有其他问题,我必须设置 pk 并打开删除等。

感谢你的支持。

*EDIT:

所以,给了我:

topic = gamerboot.gamer.master.workouts.clubs.spieleranalyse

schema.url = https://schema-reg.pro.someurl.net

以及:架构 ID 网址:

 https://schema-reg.pro.someurl.net/subjects/gamerboot.gamer.master.workouts-com.ad.gamerboot.kafka.models.workouts.WorkoutKickValue/versions/latest/schema

and:

https://schema-reg.pro.someurl.net/subjects/gamerboot.gamer.master.club-com.ad.gamerboot.kafka.models.workouts.WorkoutKickValue/versions/latest

对我来说这就像一个谜题,我 20 天前开始使用 kafka。从那里我尝试了周围的网址,找到了我为主题发布的网址:

对于密钥:https://schema-reg.pro.someurl.net/subjects/gamerboot.gamer.master.club-com.ad.gamerboot.kafka.models.workouts.WorkoutKey/versions/latest/

Schema: {"subject":"gamerboot.gamer.master.club-com.ad.gamerboot.kafka.models.workouts.WorkoutKey","version":1,"id":422,"schema":"{\"type\":\"record\",\"name\":\"ClubWorkoutKey\",\"namespace\":\"com.ad.gamerboot.kafka.models.workouts\",\"fields\":[{\"name\":\"playerId\",\"type\":\"string\"},{\"name\":\"tagId\",\"type\":[\"null\",\"string\"],\"default\":null}]}"}

对于价值观:https://schema-reg.pro.someurl.net/subjects/gamerboot.gamer.master.club-com.ad.gamerboot.kafka.models.workouts.WorkoutKickValue/versions/latest/

and https://schema-reg.pro.someurl.net/subjects/gamerboot.gamer.master.club-com.ad.gamerboot.kafka.models.workouts.WorkoutPlayerMotionValue/versions/latest/

架构:{"subject":"gamerboot.gamer.master.club-com.ad.gamerboot.kafka.models.workouts.WorkoutKickValue","version":1,"id":423,"schema":"{\"type\":\"record\",\"name\":\"ClubWorkoutKickValue\",\"namespace\":\"com.ad.gamerboot.kafka.models.workouts\",\"fields\":[{\"name\":\"playerId\",\"type\":\"string\"},{\"name\":\"timestamp\",\"type\":{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"}},{\"name\":\"tagId\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"ballSpeed\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"ballSpeedFloat\",\"type\":[\"null\",\"float\"],\"default\":null},{\"name\":\"ballSpeedZone\",\"type\":{\"type\":\"enum\",\"name\":\"BallSpeedZone\",\"symbols\":[\"COLD\",\"MEDIUM\",\"HOT\",\"FIRE\",\"INVALID\"]}},{\"name\":\"confidence\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"ingestionTime\",\"type\":[\"null\",{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"}],\"default\":null}]}"}

and: {"subject":"gamerboot.gamer.master.club-com.ad.gamerboot.kafka.models.workouts.WorkoutPlayerMotionValue","version":1,"id":424,"schema":"{\"type\":\"record\",\"name\":\"ClubWorkoutPlayerMotionValue\",\"namespace\":\"com.ad.gamerboot.kafka.models.workouts\",\"fields\":[{\"name\":\"playerId\",\"type\":\"string\"},{\"name\":\"timestamp\",\"type\":{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"}},{\"name\":\"absoluteDistance\",\"type\":\"float\"},{\"name\":\"averageSpeed\",\"type\":\"float\"},{\"name\":\"peakSpeed\",\"type\":\"float\"},{\"name\":\"tagId\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"installationId\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"averageSpeedZone\",\"type\":[\"null\",{\"type\":\"enum\",\"name\":\"AverageSpeedZone\",\"symbols\":[\"SPRINT\",\"HIGH_SPEED_RUN\",\"RUN\",\"JOG\",\"WALK\",\"STAND\",\"INVALID\"]}],\"default\":null,\"aliases\":[\"speedZone\"]},{\"name\":\"peakSpeedZone\",\"type\":[\"null\",{\"type\":\"enum\",\"name\":\"PeakSpeedZone\",\"symbols\":[\"SPRINT\",\"HIGH_SPEED_RUN\",\"RUN\",\"JOG\",\"WALK\",\"STAND\",\"INVALID\"]}],\"default\":null},{\"name\":\"ingestionTime\",\"type\":[\"null\",{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"}],\"default\":null}]}"}

MySQL 表:

+------------------+----------------------------------------------------------------------+------+-----+---------+-------+
| Field            | Type                                                                 | Null | Key | Default | Extra |
+------------------+----------------------------------------------------------------------+------+-----+---------+-------+
| playerid         | varchar(100)                                                         | YES  |     | NULL    |       |
| timestamp        | mediumtext                                                           | YES  |     | NULL    |       |
| absoluteDistance | float                                                                | YES  |     | NULL    |       |
| avarageSpeed     | float                                                                | YES  |     | NULL    |       |
| peakSpeed        | float                                                                | YES  |     | NULL    |       |
| tagId            | varchar(50)                                                          | YES  |     | NULL    |       |
| installationId   | varchar(100)                                                         | YES  |     | NULL    |       |
| averageSpeedZone | enum('SPRINT','HIGH_SPEED_RUN','RUN','JOG','WALK','STAND','INVALID') | YES  |     | NULL    |       |
| peakSpeedZone    | enum('SPRINT','HIGH_SPEED_RUN','RUN','JOG','WALK','STAND','INVALID') | YES  |     | NULL    |       |
| ballSpeed        | int(11)                                                              | YES  |     | NULL    |       |
| ballSpeedFloat   | float                                                                | YES  |     | NULL    |       |
| ballSpeedZone    | enum('COLD','MEDIUM','HOT','FIRE','INVALID')                         | YES  |     | NULL    |       |
| confidence       | int(11)                                                              | YES  |     | NULL    |       |
| ingestionTime    | mediumtext                                                           | YES  |     | NULL    |       |
+------------------+----------------------------------------------------------------------+------+-----+---------+-------+

MySQL 中的预期数据:

+--------------------------------------+---------------+------------------+--------------+-----------+----------------+----------------+------------------+---------------+-----------+----------------+---------------+------------+---------------+
| playerid                             | timestamp     | absoluteDistance | avarageSpeed | peakSpeed | tagId          | installationId | averageSpeedZone | peakSpeedZone | ballSpeed | ballSpeedFloat | ballSpeedZone | confidence | ingestionTime |
+--------------------------------------+---------------+------------------+--------------+-----------+----------------+----------------+------------------+---------------+-----------+----------------+---------------+------------+---------------+
| 59a70d45-5c00-4bb6-966d-b961b78ef5c1 | 1641495873505 |          5.76953 |       1.1543 |   1.22363 | 0104FLHBN009XD | null           | WALK             | WALK          |      NULL |           NULL | NULL          |       NULL | 1641496586458 |
| 59a70d45-5c00-4bb6-966d-b961b78ef5c1 | 1641484677624 |             NULL |         NULL |      NULL | 0104FLHBN009XD | NULL           | NULL             | NULL          |        37 |        37.0897 | COLD          |         77 | 1641484896747 |
+--------------------------------------+---------------+------------------+--------------+-----------+----------------+----------------+------------------+---------------+-----------+----------------+---------------+------------+---------------+

来自 avro-console 的数据看起来像数据库条目:

{"playerId":"59a70d45-5c00-4bb6-966d-b961b78ef5c1","timestamp":1641484677624,"tagId":{"string":"0104FLHBN009XD"},"ballSpeed":{"int":37},"ballSpeedFloat":{"float":37.08966},"ballSpeedZone":"COLD","confidence":{"int":77},"ingestionTime":{"long":1641484896747}}

{"playerId":"59a70d45-5c00-4bb6-966d-b961b78ef5c1","timestamp":1641495873505,"absoluteDistance":5.7695312,"averageSpeed":1.1542969,"peakSpeed":1.2236328,"tagId":{"string":"0104FLHBN009XD"},"installationId":null,"averageSpeedZone":{"com.ad.gamerboot.kafka.models.workouts.AverageSpeedZone":"WALK"},"peakSpeedZone":{"com.ad.gamerboot.kafka.models.workouts.PeakSpeedZone":"WALK"},"ingestionTime":{"long":1641496586458}}

这是一个全新的实际融合安装。我几个小时前将 Avro 更新为:kafka-connect-avro-converter:7.0.1


公司更改了有关 RecordNameStrategy 的架构。现在一切正常。

Thanks

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

卡夫卡连接|无法反序列化主题数据 |检索 id 的 Avro 键/值架构版本时出错 |未找到主题错误代码:40401 的相关文章

  • 生产者程序中的 kafka 网络处理器错误(ArrayIndexOutOfBoundsException:18)

    我有下面的 kafka Producer Api 程序 我对 kafka 本身是新手 下面的代码从 API 之一获取数据并将消息发送到 kafka 主题 package kafka Demo import java util Propert
  • KafkaConsumer.commitAsync() 行为的偏移量比以前更低

    kafka 将如何处理调用 KafkaConsumer commitAsync Map
  • 使用表白名单选项更新 Debezium MySQL 连接器

    我正在使用 Debezium 0 7 5 MySQL 连接器 并且我试图了解如果我想使用以下选项更新此配置 最好的方法是什么table whitelist 假设我创建了一个连接器 如下所示 curl i X POST H Accept ap
  • 为每个键使用主题中的最新值

    我有一个 Kafka 生产者 它正在以高速率生成消息 消息键是用户名 值是他在游戏中的当前分数 Kafka消费者处理消费消息的速度相对较慢 在这里 我的要求是显示最新的分数并避免显示陈旧的数据 但代价是某些分数可能永远不会显示 本质上 对于
  • 如何使用 Kafka 发送大消息(超过 15MB)?

    我发送字符串消息到Kafka V 0 8使用 Java Producer API 如果消息大小约为 15 MB 我会得到MessageSizeTooLargeException 我尝试过设置message max bytes到 40 MB
  • kafka消费者群体正在重新平衡

    我正在使用 Kafka 9 和新的 java 消费者 我正在循环内进行轮询 当代码尝试执行 Consumer commitSycn 时 由于组重新平衡 我收到 commitfailedexcption 请注意 我将 session time
  • 了解Kafka流groupBy和window

    我无法理解 kafka 流中的 groupBy groupById 和窗口的概念 我的目标是聚合一段时间内 例如 5 秒 的流数据 我的流数据看起来像 value 0 time 1533875665509 value 10 time 153
  • 当我重新运行 Flink 消费者时,Kafka 再次消费最新消息

    我在用 Scala 编写的 Apache Flink API 中创建了一个 Kafka 消费者 每当我从某个主题传递一些消息时 它就会及时接收它们 但是 当我重新启动使用者时 它不会接收新的或未使用的消息 而是使用发送到该主题的最新消息 这
  • 如何有效地将数据从 Kafka 移动到 Impala 表?

    以下是当前流程的步骤 Flafka http blog cloudera com blog 2014 11 flafka apache flume meets apache kafka for event processing 将日志写入
  • 如何在 PySpark 中使用 foreach 或 foreachBatch 写入数据库?

    我想使用 Python PySpark 从 Kafka 源到 MariaDB 进行 Spark 结构化流处理 Spark 2 4 x 我想使用流式 Spark 数据帧 而不是静态数据帧或 Pandas 数据帧 看来必须要用foreach o
  • Kafka 是否保证具有任何配置参数值的单个分区内的消息排序?

    如果我在 Producer 中将 Kafka 配置参数设置为 1 retries 3 2 max in flight requests per connection 5 那么一个分区内的消息很可能不按 send order 排列 Kafka
  • Mesos DCOS 未安装 Kafka

    我正在尝试在 Mesos 上安装 Kafka 看来安装已经成功了 vagrant DevNode dcos dcos package install kafka This will install Apache Kafka DCOS Ser
  • Flink Kafka - 如何使应用程序并行运行?

    我正在 Flink 中创建一个应用程序 读取某个主题的消息 对其进行一些简单的处理 将结果写入不同的主题 我的代码确实有效 然而它不并行运行我怎么做 看来我的代码只在一个线程 块上运行 在 Flink Web 仪表板上 应用程序进入运行状态
  • JDBC Kafka Connector 可以从多个数据库中提取数据吗?

    我想设置一个 JDBC Kafka 连接器集群 并将它们配置为从同一主机上运行的多个数据库中提取数据 我一直在查看 Kafka Connect 文档 似乎在配置 JDBC 连接器后 它只能从单个数据库中提取数据 谁能证实这一点吗 根据您启动
  • Avro 消息中的架构

    我看到 Avro 消息嵌入了架构 然后是二进制格式的数据 如果发送多条消息并且为每条消息创建新的 avro 文件 模式嵌入不是一种开销吗 那么 这是否意味着生产者批量处理消息然后写入总是很重要 因此写入一个 avro 文件的多条消息只携带一
  • 卡夫卡监听器中的钩子

    kafka 监听消息之前 之后是否有任何类型的钩子可用 使用案例 必须设置MDC关联id才能进行日志溯源 我在寻找什么 之前 之后回调方法 以便可以在进入时设置 MDC 关联 ID 并最终在退出时清除 MDC 编辑后的场景 我将关联 id
  • 是否有任何模拟器/工具可以生成流式传输消息?

    出于测试目的 我需要模拟客户端每秒生成 100 000 条消息并将它们发送到 kafka 主题 有没有任何工具或方法可以帮助我生成这些随机消息 有一个用于生成虚拟负载的内置工具 位于bin kafka producer perf test
  • AVRO - 具有联合记录类型支持的复杂记录

    我正在尝试使用联合数据类型支持的成员记录类型构建 AVRO 的复杂记录 namespace proj avro protocol app messages doc application messages types name record
  • 如何删除 Apache Kafka 中的多个主题

    假设我有许多具有相同前缀的主题 例如 giorgos topic1 giorgos topic2 giorgos topic3 用于删除单个主题的命令 例如giorgos topic1 如下 bin kafka topics sh zook
  • kafka中的Bootstrap服务器与zookeeper?

    为什么在 kafka consumer 中不推荐使用 Zookeeper 以及为什么建议使用 bootstrap 服务器 bootstrap server 有什么优点 Kafka消费者需要将偏移量提交给kafka并从kafka获取偏移量 由

随机推荐

  • jmeter无法记录浏览器操作

    我正在使用 apache jmeter 2 6 我想使用 HTTP 代理服务器记录浏览器操作 但动作并没有记录 我已经在线程组下定义了 HTTP 请求默认值 我为服务器名称指定了值 如下所示 http www xxxxx com 81 ht
  • 在 Codeigniter 中手动调用/调用挂钩

    我搜索了手动调用 调用钩子以及网上类似的东西 但找不到任何东西 codeigniter中有这样的东西吗 我下面有一个钩子 它会按预期触发 但以防万一没有触发 那么我想在代码中手动调用它 Thanks hook post controller
  • 使用 DotNetOpenAuth 创建 OAuth 2.0 服务提供者

    我正在构建一个 Web 应用程序 它将有一个 api 和一个使用 DotNetOpenAuth 的授权服务 我发现这个例子 http scatteredcode wordpress com 2011 12 01 dotnetopenauth
  • 类型错误:“NoneType”对象无法使用函数结果进行迭代

    我在 companyName monthAverage costPerTon totalCost displayCost companyName monthAverage costPerTon totalCost 行上收到 TypeErro
  • maxlength 属性的角度验证消息

    我在 Angular 中显示 maxlength 属性的错误消息时遇到一些问题 Problem 由于 maxlength 属性不允许的字符数超过指定数量 因此我无法显示错误消息 有什么方法可以关闭默认行为 允许用户输入更多字符 以显示我的错
  • Linux/X11下如何隐藏鼠标指针?

    如何在X11下隐藏鼠标指针 我想使用内置库来执行此操作 而不是使用 SDL SDL ShowCursor 0 或 glut glutSetCursor GLUT CURSOR NONE 之类的库 此外 无论指针位置如何 鼠标指针都应该隐藏
  • 在 Windows 中启动 Confluence Schema Registry

    我有Windows环境和我自己的一套kafka和zookeeper正在运行 为了使用自定义对象 我开始使用 Avro 但我需要启动注册表 下载 Confluence 平台并运行 bin schema registry start etc s
  • 哪种编程语言最能弥合伪代码和代码之间的差距? [关闭]

    Closed 这个问题是基于意见的 help closed questions 目前不接受答案 当我从现在开始编写代码时 我计划首先用漂亮 可读的伪代码来布局所有内容 然后围绕该结构实现程序 如果我将我目前掌握的语言从最容易翻译到最难翻译进
  • Git 告诉我拉,然后提交,然后拉?

    我正在尝试推送新的更改 但我有一个冲突的文件 尝试推送后 出现以下错误 Merge the remote changes e g git pull before pushing again See the Note about fast f
  • 脚本可以编辑受保护的范围或工作表吗?

    我正在使用一个脚本来复制和粘贴一些数据 这些数据由几个用户填充 而没有权限仅编辑一行 其他范围受到这些用户的保护 无法编辑 该脚本将数据从一张纸复制到另一张纸 并且两张纸都受到完全保护 除了可以编辑任何内容的两个编辑者之外 当没有人处理此电
  • 将嵌入式 Glassfish 与 Maven 结合使用

    有人知道嵌入式 Glassfish 吗 我想运行一些 EJB 测试 但我不想每次运行测试时都启动和停止嵌入的 glassfish 根据插件文档 我应该将其放入 POM 中
  • 在移动设备上部署 C++ QML 插件的正确方法是什么?

    我经常使用 Box2D QML 插件 看起来效果非常好 但是 我想在 Android SGS2 上部署我的示例应用程序 但我似乎无法让它工作 无论我尝试在 AVD 上还是在设备上运行它 它都不起作用 androiddeployqt 成功完成
  • 嵌套 div 上的 jQuery click()

    代码可能比我能更好地解释这一点 div class wrapper div class inner1 div div class inner2 div div 当我点击inner1div 它运行do something 与两个inner1d
  • 为什么我的 PDF 生成为空白?

    我正在使用 ItextSharp 和 c asp net MVC 生成 PDF 报告 但是 当我生成报告时 PDF 返回为空白 除了工作正常的标题之外 我会喜欢你的意见 生成报告的代码如下 using var writer PdfWrite
  • 在 Mac OS X 上以编程方式修改家长控制

    Mac OS 具有一组非常基本的家长控制选项 允许您限制用户对网站和应用程序的访问 以及设置的每日时间限制 我希望能够编写脚本来执行如下操作 允许上午 8 点至上午 9 30 访问特定网站 限制对游戏的访问 允许下午 5 点至下午 6 00
  • 如何使错误页面(http 500)在 IceFaces 中工作?

    使用 Icefaces 2 如果在标准 非icefaces h commandButton 上执行操作方法期间发生错误 则该按钮似乎没有任何操作 尽管在 web xml 中将其配置为显示错误页面 但不会显示错误页面 我可以通过将标签包围来使
  • Sprite/Texture Atlas:GDI+ Bitmap.MakeTransparent 用于 OpenTK 的颜色键

    我正在使用 C 和 OpenTK 编写精灵 纹理图集功能的支持类 到目前为止 大多数功能都运行良好 正交视图上的简单 2D 图块 我的问题与调用 GDI Bitmap MakeTransparent 方法设置颜色 洋红色 0xFFFF00F
  • JavaScript 中 Uint8Array 到图像

    我有一个名为的 Uint8ArrayframeBytes 我使用以下代码从该字节数组创建了 RGBA 值 for var i 0 i lt frameBytes length i imgData data 4 i frameBytes i
  • Safari 中的跨源视频

    有谁知道 Safari 是否支持crossoriginHTML5 上的属性
  • 卡夫卡连接|无法反序列化主题数据 |检索 id 的 Avro 键/值架构版本时出错 |未找到主题错误代码:40401

    首先感谢 OneCricketeer 迄今为止的支持 到目前为止我已经尝试了很多配置 我不知道还能尝试什么 使用汇合connect standalone worker properties sink properties访问外部流 连接正在