从 kafka-Spark-Streaming 读取数据时获取空集

2024-03-03

大家好,我是 Spark Streaming 的新手。我正在尝试读取 xml 文件并将其发送到 kafka 主题。这是我的 Kafka 代码,它将数据发送到 Kafka-console-consumer。

Code:

package org.apache.kafka.Kafka_Producer;

import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.util.Properties;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutionException;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

@SuppressWarnings("unused")
public class KafkaProducer { 
   private static String sCurrentLine;
   public static void main(String args[]) throws InterruptedException, ExecutionException{ 
       try (BufferedReader br = new BufferedReader(new FileReader("/Users/sreeharsha/Downloads/123.txt")))
       {
           while ((sCurrentLine = br.readLine()) != null) {
               System.out.println(sCurrentLine);
               kafka(sCurrentLine);
           }
       } catch (FileNotFoundException e) {
           // TODO Auto-generated catch block
           e.printStackTrace();
       } catch (IOException e) {
           // TODO Auto-generated catch block
           e.printStackTrace();}
   }
   public static void kafka(String sCurrentLine)  {
       Properties props = new Properties();
       props.put("metadata.broker.list", "localhost:9092");
       props.put("serializer.class", "kafka.serializer.StringEncoder");
       props.put("partitioner.class","kafka.producer.DefaultPartitioner");
       props.put("request.required.acks", "1");
       ProducerConfig config = new ProducerConfig(props);
       Producer<String, String> producer = new Producer<String, String>(config);
       producer.send(new KeyedMessage<String, String>("sample",sCurrentLine));
       producer.close();
   }
}

我可以在 Kafka-Console-Consumer 中接收数据。在下面的屏幕截图中,您可以看到我发送到该主题的数据。

现在我需要使用 Spark-Streaming 将发送到 kafka-console-consumer 的数据进行流式传输。这是代码。

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaStreamingContext;

public class SparkStringConsumer {

   public static void main(String[] args) {

       SparkConf conf = new SparkConf()
               .setAppName("kafka-sandbox")
               .setMaster("local[*]");
       JavaSparkContext sc = new JavaSparkContext(conf);
       JavaStreamingContext ssc = new JavaStreamingContext(sc, new Duration(2000));

       Map<String, String> kafkaParams = new HashMap<>();
       kafkaParams.put("metadata.broker.list", "localhost:9092");
       Set<String> topics = Collections.singleton("sample");

       JavaPairInputDStream<String, String> directKafkaStream = KafkaUtils.createDirectStream(ssc,
       String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topics);
       directKafkaStream.foreachRDD(rdd -> {
       System.out.println("--- New RDD with " + rdd.partitions().size()
           + " partitions and " + rdd.count() + " records");
       rdd.foreach(record -> System.out.println(record._2));
       });
       ssc.start();
       ssc.awaitTermination();
   }
}

在提交我的作业时得到空集,如下所示:

./spark-submit --class org.apache.spark_streaming.Spark_Kafka_Streaming.SparkStringConsumer --master local[4] Spark_Kafka_Streaming-0.0.1-SNAPSHOT.jar

下面您可以看到数据接收情况的屏幕截图:

使用以下版本:

火花 - 2.0.0

动物园管理员-3.4.6

卡夫卡 - 0.8.2.1

有什么建议请,


最后在网上冲浪后我找到了这些解决方案。

不要同时使用“Spark-Submit”和“SetMaster”。

  • 如果您从 IDE 运行代码,请在代码中使用 SetMaster
  • 如果您通过“Spark-Submit”运行 jar,请勿将 setMaster 放入代码中

还有一件事首先运行/提交你的 Spark jar,然后将数据发送到 Kafka-Console-Consumer

工作正常。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

从 kafka-Spark-Streaming 读取数据时获取空集 的相关文章

  • Spark SQL 广播提示中间表

    我在使用广播提示时遇到问题 可能是缺乏 SQL 知识 我有一个查询 例如 SELECT broadcast a FROM a INNER JOIN b ON INNER JOIN c on 我想要做 SELECT broadcast a F
  • Spark RDD默认分区数

    版本 Spark 1 6 2 Scala 2 10 我正在执行以下命令spark shell 我试图查看 Spark 默认创建的分区数量 val rdd1 sc parallelize 1 to 10 println rdd1 getNum
  • Sparklyr - 在 Apache Spark Join 中包含空值

    问题在 Apache Spark Join 中包含空值 https stackoverflow com questions 41728762 including null values in an apache spark join有 Sc
  • JDBC Kafka Connector 可以从多个数据库中提取数据吗?

    我想设置一个 JDBC Kafka 连接器集群 并将它们配置为从同一主机上运行的多个数据库中提取数据 我一直在查看 Kafka Connect 文档 似乎在配置 JDBC 连接器后 它只能从单个数据库中提取数据 谁能证实这一点吗 根据您启动
  • Spark 和 Ipython 中将非数字特征编码为数字的问题

    我正在做一些我必须做出预测的事情numeric数据 每月员工支出 使用non numeric特征 我在用Spark MLlibs Random Forests algorthim 我有我的features数据在一个dataframe看起来像
  • Apache Kafka 中消费者消费消息的延迟

    我正在使用 Kafka 0 8 0 并尝试实现下面提到的场景 JCA API 充当生产者并将数据发送到 gt 消费者 gt HBase 一旦我使用 JCA 客户端获取数据 我就会将每条消息发送给消费者 例如 一旦生产者发送消息 no 1 我
  • 是否有用于事件驱动的 Kafka 消费者的 Python API?

    我一直在尝试构建一个以 Kafka 作为唯一界面的 Flask 应用程序 因此 我希望有一个 Kafka 消费者 当相关主题的流中存在新消息时 该消费者会被触发 并通过将消息推回到 Kafka 流来进行响应 我一直在寻找类似 Spring
  • 对多列应用窗口函数

    我想执行窗口函数 具体为移动平均值 但针对数据帧的所有列 我可以这样做 from pyspark sql import SparkSession functions as func df df select func avg df col
  • 是否有任何模拟器/工具可以生成流式传输消息?

    出于测试目的 我需要模拟客户端每秒生成 100 000 条消息并将它们发送到 kafka 主题 有没有任何工具或方法可以帮助我生成这些随机消息 有一个用于生成虚拟负载的内置工具 位于bin kafka producer perf test
  • pyspark 中的 Pandas UDF

    我正在尝试在 Spark 数据帧上填充一系列观察结果 基本上我有一个日期列表 我应该为每个组创建缺失的日期 在熊猫中有reindex函数 这是 pyspark 中不可用的 我尝试实现 pandas UDF pandas udf schema
  • Spark的distinct()函数是否仅对每个分区中的不同元组进行洗牌

    据我了解 distinct 哈希分区 RDD 来识别唯一键 但它是否针对仅移动每个分区的不同元组进行了优化 想象一个具有以下分区的 RDD 1 2 2 1 4 2 2 1 3 3 5 4 5 5 5 在此 RDD 上的不同键上 所有重复键
  • Java 中的“Lambdifying”scala 函数

    使用Java和Apache Spark 已用Scala重写 面对旧的API方法 org apache spark rdd JdbcRDD构造函数 其参数为 AbstractFunction1 abstract class AbstractF
  • 如何使用 SparkR 1.6.0 写入 JDBC 源?

    使用 SparkR 1 6 0 我可以使用以下代码从 JDBC 源读取数据 jdbc url lt jdbc mysql localhost 3306 dashboard user
  • 行类型 Spark 数据集的编码器

    我想写一个编码器Row https spark apache org docs 2 0 0 api java index html org apache spark sql Row html输入 DataSet 用于我正在执行的地图操作 本
  • 如何在 Spark Dataframe 中显示完整的列内容?

    我正在使用 Spark csv 将数据加载到 DataFrame 中 我想做一个简单的查询并显示内容 val df sqlContext read format com databricks spark csv option header
  • 如何使用 log4j 自定义附加程序在 HDFS 上创建日志?

    Overview 我们希望使用 log4j 记录 Spark 作业活动 并将日志文件写入 HDFS Java 8 Spark 2 4 6 Scala 2 1 2 Hadoop 3 2 1 我们无法找到本地 apache log4j 附加程序
  • Spark中如何获取map任务的ID?

    Spark中有没有办法获取map任务的ID 例如 如果每个映射任务都调用用户定义的函数 我可以从该用户定义的函数中获取该映射任务的 ID 吗 我不确定您所说的地图任务 ID 是什么意思 但您可以使用以下方式访问任务信息TaskContext
  • 如何在SparkR中进行map和reduce

    如何使用 SparkR 进行映射和归约操作 我能找到的只是有关 SQL 查询的内容 有没有办法使用 SQL 进行映射和减少 See 写入从 SparkR map 返回的 R 数据帧 https stackoverflow com quest
  • Spark 数据帧:根据另一列的值提取一列

    我有一个包含带有连接价目表的交易的数据框 paid currency EUR USD GBP 49 5 EUR 99 79 69 客户已支付 49 5 欧元 如 货币 列中所示 我现在想将支付的价格与价目表中的价格进行比较 因此 我需要根据
  • kafka中的Bootstrap服务器与zookeeper?

    为什么在 kafka consumer 中不推荐使用 Zookeeper 以及为什么建议使用 bootstrap 服务器 bootstrap server 有什么优点 Kafka消费者需要将偏移量提交给kafka并从kafka获取偏移量 由

随机推荐

  • 通过 PHP 使用 EPL 打印:存储图像

    背景信息 Mac OS X Lion 10 7 3 11D50b EPL http en wikipedia org wiki Eltron Programming Language http en wikipedia org wiki E
  • 向形状/线条添加渐变

    我正在尝试使用 VBA 将渐变添加到 Excel 中的线条形状 此功能可在Line Color下的部分Format Shape选项 尽管此功能存在于Format Shape选项 我无法在 VBA 中重现该功能 我的代码是 With Acti
  • 我可以通过 Lucene 在 Orchard 中搜索/索引自定义数据源吗?

    我目前正在开发一个网站 允许用户搜索自定义产品目录 我一直在寻找并希望利用 Orchard CMS 来帮助我开发这个网站 我目前已经经历了罗恩 彼得森的 YouTube 系列 http www youtube com watch v Iv7
  • 输入助手 valueBinding 已弃用 - 有什么替代方案?

    我有一些像这样的文本输入助手 input type text valueBinding name focus out focusOutName 我刚刚将 Ember 升级到 1 11 0 现在收到此弃用警告 弃用 您尝试通过将 valueB
  • 如何通过ajax将值传递给php变量

    这是我的 JavaScript 代码 function category row dataparam oper delete row row ajax type POST url multiupload php data dataparam
  • Helm Charts 中的秘密管理

    我正在尝试使用Helm charts在 Kubernetes 集群中安装应用程序 有人可以建议什么是更好的秘密管理解决方案吗 使用helm secrets是个好主意或者Hashicorp Vault Vault 在技术上非常棒 但它可能会成
  • 如何创建类似谷歌纵横的标记?

    在我的 HTML5 应用程序中 我使用 Google Map v3 并在地图上添加多个标记 放置新标记和更改图标很容易 但我希望能够构建像谷歌纬度中使用的标记一样的标记 这些标记设置有图标图像和漂亮的边框 关于如何做到这一点有什么想法吗 您
  • 使用 bootstrap 无法在 angularjs 中打开模式窗口

    这是我的 app js 文件 const app angular module CurseTransport ui router ui bootstrap ngMessages raceModule app config stateProv
  • Azure函数应用程序-在执行和间歇性运行的旧代码之间共享全局变量

    目前 我在 Azure 函数应用程序中面临两个问题 我已提供以下详细信息 1 全局变量内容在执行之间共享 我使用了并发字典 它是一个全局变量 私有的和静态的 该变量在队列触发器中使用 private static readonly Conc
  • iphone开发:验证来自https url的证书信息

    当用户使用网络浏览器 Safari Chrome等 连接到 https url 时 例如 https encrypted google com 则用户可以获得有关证书相关的信息到这样的 https url 也就是说 在连接到url http
  • 使用 HTML 表单和 PHP 更新 MySQL 数据库字段数据

    所以我试图使用 html 表单和一些 PHP 代码更新数据库字段 但我无法让它工作 它不会抛出任何错误 但不会更新该字段 我不确定它是否因为我也回显该字段网页 它似乎所做的就是打印失败消息 HTML
  • MSSQL - 将一个字段拆分为 3 个字段

    我有一个由 1 列组成的结果集 在本例中为 2 行 单列 ProductDescription 是一个 varchar 字段 其中包含 3 条信息 我没有设计它 我需要将这三条信息分成 3 个使用查询的附加字段 before Product
  • 在Bootstrap组件中单独加载

    我正在开发一个涉及许多开发人员的大型 Web 项目 我想精简 Bootstrap3 的包并仅保留我们正在使用的内容 基本上 这个想法是在页面加载到浏览器中时减少任何额外的开销 所以我可以通过两种方法来做到这一点 我也可以 a 从库中删除任何
  • 如何使用 lambda 表达式创建扩展方法

    目前我正在创建一个接受参数的扩展方法 使用下面的示例 如何使用 lambda 表达式对其进行转换 public static decimal ChangePercentage this IEnumerable
  • 根据前缀对目录中的文件进行分组

    我有一个包含图片的文件夹 文件夹 1 Files ABC 138923 ABC 3223 ABC 33489 ABC 3111 CBA 238923 CBA 1313 CBA 1313 DAC 38932 DAC 1111 DAC 1389
  • 使用 Microsoft Graph API 获取 SharePoint Online 团队网站

    我正在尝试访问组织的 SharePoint 团队网站 我使用 Microsoft Graph API 因为它是 Office 365 最完整的 API 我了解如何获取访问令牌以及如何使用它来发出请求 我知道它有效 因为我可以获得组列表 但是
  • 获取ejs模板中的url参数

    我试图根据 URL 参数创建一个 ejs 条件 例如 如果测试参数存在于 localhost 3000 page test 则显示一个 div 否则不显示它 我的 ejs 模板看起来像这样 div class row div div div
  • 如何捕获事件调度线程 (EDT) 异常?

    我正在使用一个名为MyExceptionHandler实现Thread UncaughtExceptionHandler处理我的项目中的正常异常 据我了解 这个类无法捕获 EDT 异常 所以我尝试在main 处理EDT异常的方法 publi
  • jQuery 测试 element1 是否是 element2 的后代

    有谁知道一种好方法来测试存储在 var 中的一个元素是否是另一个也存储在 var 中的元素的后代 我不需要element1 isChildOf selector 这很容易 I need element1 isChildOf element2
  • 从 kafka-Spark-Streaming 读取数据时获取空集

    大家好 我是 Spark Streaming 的新手 我正在尝试读取 xml 文件并将其发送到 kafka 主题 这是我的 Kafka 代码 它将数据发送到 Kafka console consumer Code package org a