Twitter 流媒体的 Kafka Consumer 弃用错误

2024-01-03

我一直在研究 Kafka Twitter 流式提要数据。

我正在关注以下链接中的示例:http://www.hahaskills.com/tutorials/kafka/Twitter_doc.html http://www.hahaskills.com/tutorials/kafka/Twitter_doc.html

我可以使用生产者代码并且工作正常。能够获取 Twitter feed 并将其发送给 Kafka Producer。

我无法使用消费者代码,因为它已被许多 API 抛出为已弃用的错误。

这是消费者代码:

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import kafka.consumer.Consumer;
//import kafka.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
//import kafka.consumer.KafkaStream;
//import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;


//import org.apache.kafka.clients.producer.KafkaProducer;

public class KafkaConsumer {
    private final ConsumerConnector consumer;
    private final String topic;

    public KafkaConsumer(String zookeeper, String groupId, String topic) {
        Properties props = new Properties();
        props.put("zookeeper.connect", zookeeper);
        props.put("group.id", groupId);
        props.put("zookeeper.session.timeout.ms", "500");
        props.put("zookeeper.sync.time.ms", "250");
        props.put("auto.commit.interval.ms", "1000");

        consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(props));

        this.topic = topic;
    }

    public void testConsumer() {

     System.out.println("Test Con called");

        Map<String, Integer> topicCount = new HashMap<>();

        topicCount.put(topic, 1);

        Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams = consumer.createMessageStreams(topicCount);

        List<KafkaStream<byte[], byte[]>> streams = consumerStreams.get(topic);

        System.out.println("For");

        for (final KafkaStream stream : streams) {

            ConsumerIterator<byte[], byte[]> it = stream.iterator();

            System.out.println("Size"+it.length());

            while (it.hasNext()) {
                System.out.println("Stream");
                System.out.println("Message from Single Topic: " + new String(it.next().message()));
            }
        }

        if (consumer != null) {
            consumer.shutdown();
        }
    }

    public static void main(String[] args) {

     System.out.println("Started");
     String topic="twittertopic";
     KafkaConsumer simpleTWConsumer = new KafkaConsumer("localhost:XXXX", "testgroup", topic);
     simpleTWConsumer.testConsumer();
     System.out.println("End");
    }    
}

它抛出错误:ConsumerConnector、ConsumerIterator、KafkaStream 已弃用。

ConsumerConfig 不可见。

此示例代码是否有固定版本(twitter 的 Kafka 消费者)?


您正在遵循的教程非常旧,它使用已弃用的旧 Scala Kafka 客户端,请参阅http://kafka.apache.org/documentation/#legacyapis http://kafka.apache.org/documentation/#legacyapis

已弃用的类有:

  • kafka.consumer.* and kafka.javaapi.consumer而是使用较新的 Java Consumerorg.apache.kafka.clients.consumer.*

  • kafka.producer.* and kafka.javaapi.producer而是使用下面较新的 Java Producerorg.apache.kafka.clients.producer.*

除了使用已弃用的类之外​​,您的代码大部分都是正确的,我只需要修复一些导入。请参阅下面的固定版本。使用它,我能够使用我为一个名为的主题生成的消息twittertopic.

package example;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

public class MyConsumer {
    private final ConsumerConnector consumer;
    private final String topic;

    public MyConsumer(String zookeeper, String groupId, String topic) {
        Properties props = new Properties();
        props.put("zookeeper.connect", zookeeper);
        props.put("group.id", groupId);
        props.put("zookeeper.session.timeout.ms", "500");
        props.put("zookeeper.sync.time.ms", "250");
        props.put("auto.commit.interval.ms", "1000");
        consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(props));
        this.topic = topic;
    }

    public void testConsumer() {

        Map<String, Integer> topicCount = new HashMap<>();
        topicCount.put(topic, 1);

        Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams = consumer.createMessageStreams(topicCount);
        List<KafkaStream<byte[], byte[]>> streams = consumerStreams.get(topic);

        for (final KafkaStream stream : streams) {
            ConsumerIterator<byte[], byte[]> it = stream.iterator();
            while (it.hasNext()) {
                System.out.println("Message from Single Topic: " + new String(it.next().message()));
            }
        }

        if (consumer != null) {
            consumer.shutdown();
        }
    }

    public static void main(String[] args) {
        System.out.println("Started");
        String topic = "twittertopic";
        MyConsumer simpleTWConsumer = new MyConsumer("localhost:2181", "testgroup", topic);
        simpleTWConsumer.testConsumer();
        System.out.println("End");
    }
}

虽然可以使用上面的代码,但 Kafka 的下一个主要版本可能会删除当前已弃用的类,因此您不应该使用这些代码编写新逻辑。

相反,您应该开始使用 Java 客户端,您可以使用 Github 上提供的示例:https://github.com/apache/kafka/tree/trunk/examples/src/main/java/kafka/examples https://github.com/apache/kafka/tree/trunk/examples/src/main/java/kafka/examples

使用新的 Java Consumer,您的逻辑将如下所示:

import java.util.Arrays;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

public class MyConsumer {

    static final String TOPIC = "twittertopic";
    static final String GROUP = "testgroup";

    public static void main(String[] args) {
        System.out.println("Started");

        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", GROUP);
        props.put("auto.commit.interval.ms", "1000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);) {
            consumer.subscribe(Arrays.asList(TOPIC));

            for (int i = 0; i < 1000; i++) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1L));
                System.out.println("Size: " + records.count());
                for (ConsumerRecord<String, String> record : records) {
                    System.out.println("Received a message: " + record.key() + " " + record.value());
                }
            }
        }
        System.out.println("End");
    }

}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Twitter 流媒体的 Kafka Consumer 弃用错误 的相关文章

随机推荐

  • 使用数据表运行 100,000 次 Fisher 精确检验比应用慢

    早上好 我正在尝试使用 R 非常快速地对模拟遗传数据运行 100 000 次 Fisher 精确测试 最好在 30 秒内完成 因为我需要排列病例对照标签并迭代该过程 1 000 次 因此它会运行一夜 我尝试使用融化 整齐的数据上的数据表 其
  • 一个项目可以有多个起源吗?

    一个项目在 Git 中可以有两个 或多个 起源 吗 我想将一个项目推向两者github https github com and a Heroku https www heroku com server 具体来说 添加github仓库时出现
  • beginReceivingRemoteControlEvents 不触发 Apple Music 事件

    我正在从我的应用程序播放 Apple Music Apple Music 播放器代码如下 void submitAppleMusicTrackWithProductID NSString productID productID in US
  • 这个语法在 Javascript 中的含义是什么

    我正在 javascript 中寻找 TWILIO 的 API 我发现了类似的东西 const connect createLocalTracks Twilio Video navigator mediaDevices enumerateD
  • 没有异常处理的力量?

    在 Eclipse 中进行 Java 编程 我习惯于处理异常 在使用 VisualStudio 的 C 中 似乎我不能在方法上说 抛出异常 经过大量编码后 我发现了很多异常 并且必须在测试过程中发现它们时捕获它们 我想被迫处理它们 以便 V
  • 如何对 foreach 循环中迭代的元素进行分段

    我需要循环遍历整个用户列表 但需要一次获取 20 个 foreach var student in Class Students Take 20 Console WriteLine You belong to Group groupNumb
  • 无法自动装配方法

    我收到这个错误 org springframework beans factory BeanCreationException Could not autowire method 这是我的spring的xml配置
  • UIPickerView Swift 上奇怪的自定义背景颜色

    将自定义 UIColor 分配给 UIPickerViews 的背景时 我得到了奇怪的颜色 我为 textViews 和 pickerViews 创建了颜色 如下所示 let myTextViewBackgroundColor UIColo
  • 当node.js宕机时,如何让它自动恢复?

    由于节点基本上是一个进程 因此当出现严重错误时 整个应用程序就会崩溃 我现在有几个基于 Express 构建的应用程序 并且我正在使用一些手动方法来防止延长停机时间 process on uncaughtException 和自定义心跳监视
  • Mysql find_in_set 斜杠( / ) 分隔符

    我的值为 1 2 3 4 5 2 3 6 我想找到值 2 所以结果必须是 1 2 2 3 6 我不想使用 LIKE 运算符 有没有办法在FIND IN SET函数中设置分隔符 您可以使用like or find in set 这是一种方法
  • 如何编写 javascript 来重新排序 pdf 文档的页面?

    我有一个双面文档作为两个单独的 pdf 文件 一个文档的正面页面和第二个文档的背面页面 front pdf rear pdf 我还将它们合并为一个包含所有页面的文档 但所有正面页面都在背面页面之前 页面排序的形式为 1 3 5 7 n 2
  • Spring 启动和 SQLite

    我正在尝试将 SQLite 与 Spring Boot 应用程序一起使用 我知道 Spring Boot 对 MongoDB 等提供了出色的支持 但我找不到将 Spring Boot 与 SQLite 结合使用的方法 有什么建议从哪里或如何
  • android中的sqlite示例程序[关闭]

    Closed 这个问题不符合堆栈溢出指南 help closed questions 目前不接受答案 我是数据库概念的新手 特别是我需要相关的数据库概念 我想要一个在 android 上使用 sqlite 数据库的示例 我浏览了 andro
  • 替换php数组中的所有键

    这是我的数组 apple some code beta other code cat other code 2 如何将所有 e 字母替换为 在键名称中并保留值 这样我就会得到类似的东西 appl some code b ta other c
  • 当我尝试添加文本剪辑时,出现有关 ImageMagick With Python/MoviePy 的错误

    我正在使用 python 3 8 5 以及最新版本的 imagemagick 和 moviepy 错误 与代码 Traceback most recent call last File C Users edgib102 AppData Lo
  • JVM 内存使用失控

    我有一个 Tomcat Web 应用程序 它代表客户端执行一些内存和 CPU 密集型任务 这是正常现象 也是所需的功能 然而 当我运行 Tomcat 时 内存使用量会随着时间的推移飙升至 4 0GB 以上 此时我通常会终止该进程 因为它会扰
  • 为什么 Clojure 变量 arity args 根据使用而获得不同的类型?

    在回答中另一个问题 https stackoverflow com questions 26039461 does clojure have the c sharp equivalent of yield 26040454 comment4
  • 在 Windows 10 中使用 PS 将程序固定到开始菜单

    我正在尝试将程序固定到 Windows 10 中的开始菜单 shell New Object ComObject Shell Application Folder shell NameSpace C Test exe Folder Pars
  • Firestore 查询子集合

    我以为我读到您可以使用新的 Firebase Firestore 查询子集合 但我没有看到任何示例 例如 我通过以下方式设置 Firestore Dances collection 舞蹈名称 Songs collection songNam
  • Twitter 流媒体的 Kafka Consumer 弃用错误

    我一直在研究 Kafka Twitter 流式提要数据 我正在关注以下链接中的示例 http www hahaskills com tutorials kafka Twitter doc html http www hahaskills c