java操作kafka读写操作

2023-11-11

1,前提,kafka 的 server.properties里面开通了

listeners=PLAINTEXT://192.168.137.141:9092
advertised.listeners=PLAINTEXT://192.168.137.141:9092

2,防火墙

systemctl status firewalld
systemctl stop firewalld

3,java代码

生产者

import java.text.SimpleDateFormat;
import java.util.*;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;

public class Producer123 {

	public static void main(String[] args) {
		// TODO Auto-generated method stub
		Producer123 aa1=new Producer123();
		for(int i=0;i<50;i++) {
			aa1.send1();
			System.out.println(i);
			try {
				Thread.sleep(300);
			} catch (InterruptedException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
		}
	}
	public void send1() {
		String out = "";
		String topic, msg;
		Properties properties = new Properties();
		KafkaProducer<String, String> kafkaProducer;
		topic = "CHINA";
		properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.137.141:9092");
		properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
		properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
		kafkaProducer = new KafkaProducer<String, String>(properties);
		Date dNow = new Date( );
		SimpleDateFormat ft = new SimpleDateFormat ("E yyyy.MM.dd 'at' hh:mm:ss a zzz");
		msg = "现在是" + ft.format(dNow);
		System.out.println(msg);
		ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topic, msg);
		kafkaProducer.send(producerRecord);
		kafkaProducer.close();

//	return out;
	}

}

4,消费者

import java.util.*;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

public class Main1 {

	public static void main(String[] args) {
		// TODO Auto-generated method stub
		System.out.println("hello");
		Main1 m1=new Main1();
		m1.consumer1();

	}
	public String consumer1() {
		System.out.println("starting......");
		String out = "";
		String topic;
		Properties properties = new Properties();
		KafkaConsumer<String, String> kafkaConsumer;
		topic = "CHINA";
		properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.137.141:9092");
		properties.put(ConsumerConfig.GROUP_ID_CONFIG, "lowLevel");
		properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
		properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
		kafkaConsumer = new KafkaConsumer<String, String>(properties);
		kafkaConsumer.subscribe(Collections.singletonList(topic));
		int i=0;
		while(i<10){
			i++;
			System.out.println("aaa:"+String.valueOf(i));
		    ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(3000);//3000代表每3秒读一次
		    System.out.println(consumerRecords.count());
		    if(consumerRecords.count() > 0){
		        for (ConsumerRecord<String, String> consumerRecord : consumerRecords){
		            out = consumerRecord.value();
		            System.out.println("receive data: "+out);
		        }
//		        break;
		    }
		}
		return out;	
	}

}

5,依赖项,版本最好和kafka服务端保持一致

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>test-kafka</groupId>
  <artifactId>test-kafka</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <name>test-kafka</name>
  <description>test-kafka</description>
  <dependencies>
	  	<dependency>
		    <groupId>org.apache.kafka</groupId>
    		<artifactId>kafka-clients</artifactId>
    		<version>2.8.0</version>
		</dependency>
  </dependencies>
  <build>
       <plugins>
           <plugin>
               <groupId>org.apache.maven.plugins</groupId>
               <artifactId>maven-compiler-plugin</artifactId>
               <configuration>
					       <source>1.8</source>
                           <target>1.8</target>
               </configuration>
           </plugin>
       </plugins>
   </build>
</project>

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

java操作kafka读写操作 的相关文章

随机推荐

  • javax.net.ssl.SSLHandshakeException: sun.security.validator.ValidatorExcepti

    问题现象 Java Spring应用发送数据报如下问题 AxisFault faultCode http schemas xmlsoap org soap envelope Server userException faultSubcode
  • 网络编程先导知识

    目录 1 什么是网络协议 2 什么是Socket Socket主要类型 3 C S和B S架构 4 网络字节序和主机字节序 5 局域网和广域网 6 IP地址和端口的概念 1 什么是网络协议 为了在计算机网络中做到有条不紊地交换数据 就必须遵
  • android.content包-----ClipboardManager

    ClipboardManager类介绍 Clipboardmanager类通过getSystemService String 方法进行实例化操作 ClipboardManger类的相关方法很简单 包含set和get剪切板的数据 剪切板的数据
  • Tesseract-OCR 中文识别(附上源码)

    简介 光学字符识别 OCR Optical Character Recognition 是指对文本资料进行扫描 然后对图像文件进行分析处理 获取文字及版面信息的过程 OCR技术非常专业 一般多是印刷 打印行业的从业人员使用 可以快速的将纸质
  • Arcmap卫星影像去黑边(彻底去除黑边)

    在处理栅格数据时 我们常常会遇到一个问题 下载下来的卫星影像数据在Arcmap等软件上会出现黑边问题 如图 出现黑边的原因是因为我们下载影像图层是按外接矩形下载的 所以下载时矩形内没图的地方会填充透明色 透明后下下来后就会用黑色代替 那么我
  • 计算机组成原理-复习题2

    二 简答题 43 请写出8位定点原码整数中能表示的最大正数 最小正数 最大负数和最小负数的机器数形式 并用十进制表示其数值范围 答 最大正数 01111111 最小正数 00000001 最大负数 10000001 最小负数 1111111
  • 【动手学】36 图片增广_代码

    matplotlib inline import torch import torchvision from torch import nn from d2l import torch as d2l d2l set figsize img
  • qt学习笔记7:控件、自定义控件封装

    系统提供的控件们 按钮组 QPushButton 常用按钮 QToolButton 工具按钮 用于显示图片 如果想显示文字 可以修改风格 RadioButton 单选框 如果想设置默认 ui gt 空间名 gt setChecked Che
  • js最简单的数组去重方法set数组去重

    js最简单的数组去重方法set数组去重 数组去重 function newArr arr return Array from new Set arr 对象数组去重 arr是数组参数 type是要根据去重的字段参数 function newA
  • C--游游的水果大礼包--牛客周赛 Round 3

    示例1 输入 3 4 1 2 输出 4 说明 组成两个二号水果大礼包 使用了2个苹果和4个桃子 总价值为4 示例2 输入 1 1 5 6 输出 0 说明 显然无法组合成任意一个大礼包 解析 数据量1e6 应该O n 遍历苹果数量 计算最大值
  • 【python基础】猜数字游戏

    前言 相信很多人都玩过猜数字游戏 游戏规则也十分简单 还记得小时候我经常和朋友一起玩 我们在桌上摆放一些1 100以内的数字卡片 一个人随机抽取一张卡片 这里不许偷看卡片内容 首先这个人随机猜一个数字 然后其他人会告诉你猜大了还是猜小了 先
  • Oracle数据库同时执行多条DML语句

    在sqlplus中 SQL gt begin 语句1 语句2 语句N end
  • 【Detectron2】使用 Detectron2 训练基于 coco 数据集的目标检测网络

    文章目录 一 安装 Detectron2 二 软连接 coco 数据集 三 训练 四 数据集相关参数 五 输出结果路径 六 COCO 数据集简介 七 模型相关参数 八 可视化结果 一 安装 Detectron2 初次接触 Detectron
  • TypeScript基础之元组(Tuple)

    元组 我们知道数组中元素的数据类型都一般是相同的 any 类型的数组可以不同 如果存储的元素数据类型不同 则需要使用元组 元组是一种数据类型 可以像任何其他变量一样使用 它表示值的异构集合 也可以作为函数调用中的参数传递 在抽象数学中 术语
  • [Android] 给图像添加相框、圆形圆角显示图片、图像合成知识

    前一篇文章讲述了Android触屏setOnTouchListener实现突破缩放 移动 绘制和添加水印 继续我的 随手拍 项目完成给图片添加相框 圆形圆角显示图片和图像合成的功能介绍 希望文章对大家有所帮助 一 打开图片和显示assets
  • 博途V16软件官方下载和安装

    获取博途软件的途径很多 且有各种版本和各种形式的安装包 不可否认这些都是同行们共同努力为我们获取的资源 但这些安装包大多以百度网盘的途径下载 由于安装包很大且百度网盘限速 就很不方便我们获取博途的安装包文件 为此 向大家介绍如何通过西门子官
  • vue scoped 解决样式不生效问题

    对于添加样式不能影响子组件样式的情况使用 gt gt gt
  • 精卫填海——大数据安全与隐私保护

    第一章 绪论 一 课程内容 1 大数据安全 如何在满足可用性的前提下实现大数据机密性 安全与效率之间的平衡一直信息安全领域关注的重要问题 在大数据场景下 数据的高速流动特性以及操作多样性使得数据的安全与效率之间的矛盾更加突出 如何实现大数据
  • LeetCode刷题-21. 合并两个有序链表(python)

    将两个有序链表合并为一个新的有序链表并返回 新链表是通过拼接给定的两个链表的所有节点组成的 示例 输入 1 gt 2 gt 4 1 gt 3 gt 4 输出 1 gt 1 gt 2 gt 3 gt 4 gt 4 方法一 递归 Definit
  • java操作kafka读写操作

    1 前提 kafka 的 server properties里面开通了 listeners PLAINTEXT 192 168 137 141 9092 advertised listeners PLAINTEXT 192 168 137