您正在遵循的教程非常旧,它使用已弃用的旧 Scala Kafka 客户端,请参阅http://kafka.apache.org/documentation/#legacyapis http://kafka.apache.org/documentation/#legacyapis
已弃用的类有:
kafka.consumer.*
and kafka.javaapi.consumer
而是使用较新的 Java Consumerorg.apache.kafka.clients.consumer.*
kafka.producer.*
and kafka.javaapi.producer
而是使用下面较新的 Java Producerorg.apache.kafka.clients.producer.*
除了使用已弃用的类之外,您的代码大部分都是正确的,我只需要修复一些导入。请参阅下面的固定版本。使用它,我能够使用我为一个名为的主题生成的消息twittertopic
.
package example;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
public class MyConsumer {
private final ConsumerConnector consumer;
private final String topic;
public MyConsumer(String zookeeper, String groupId, String topic) {
Properties props = new Properties();
props.put("zookeeper.connect", zookeeper);
props.put("group.id", groupId);
props.put("zookeeper.session.timeout.ms", "500");
props.put("zookeeper.sync.time.ms", "250");
props.put("auto.commit.interval.ms", "1000");
consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(props));
this.topic = topic;
}
public void testConsumer() {
Map<String, Integer> topicCount = new HashMap<>();
topicCount.put(topic, 1);
Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams = consumer.createMessageStreams(topicCount);
List<KafkaStream<byte[], byte[]>> streams = consumerStreams.get(topic);
for (final KafkaStream stream : streams) {
ConsumerIterator<byte[], byte[]> it = stream.iterator();
while (it.hasNext()) {
System.out.println("Message from Single Topic: " + new String(it.next().message()));
}
}
if (consumer != null) {
consumer.shutdown();
}
}
public static void main(String[] args) {
System.out.println("Started");
String topic = "twittertopic";
MyConsumer simpleTWConsumer = new MyConsumer("localhost:2181", "testgroup", topic);
simpleTWConsumer.testConsumer();
System.out.println("End");
}
}
虽然可以使用上面的代码,但 Kafka 的下一个主要版本可能会删除当前已弃用的类,因此您不应该使用这些代码编写新逻辑。
相反,您应该开始使用 Java 客户端,您可以使用 Github 上提供的示例:https://github.com/apache/kafka/tree/trunk/examples/src/main/java/kafka/examples https://github.com/apache/kafka/tree/trunk/examples/src/main/java/kafka/examples
使用新的 Java Consumer,您的逻辑将如下所示:
import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
public class MyConsumer {
static final String TOPIC = "twittertopic";
static final String GROUP = "testgroup";
public static void main(String[] args) {
System.out.println("Started");
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", GROUP);
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);) {
consumer.subscribe(Arrays.asList(TOPIC));
for (int i = 0; i < 1000; i++) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1L));
System.out.println("Size: " + records.count());
for (ConsumerRecord<String, String> record : records) {
System.out.println("Received a message: " + record.key() + " " + record.value());
}
}
}
System.out.println("End");
}
}