pyspark 将 twitter json 流式传输到 DF

2024-05-18

我正在从事集成工作spark-streaming with twitter using pythonAPI。我看到的大多数示例或代码片段和博客是他们从Twitter JSON文件进行最终处理。但根据我的用例,我需要所有字段twitter JSON并将其转换为数据框。这就是我面临问题的地方sqlContext.read.json()正在倾倒整个JSON DStream into _corrupt_record

+--------------------+
|     _corrupt_record|
+--------------------+
|{u'quote_count': ...|
|{u'quote_count': ...|
|{u'quote_count': ...|
|{u'quote_count': ...|
|{u'quote_count': ...|
|{u'quote_count': ...|
|{u'quote_count': ...|
|{u'quote_count': ...|
|{u'quote_count': ...|
|{u'quote_count': ...|
|{u'quote_count': ...|
|{u'quote_count': ...|

另外,这个问题似乎可以通过使用来解决structured streaming使用spark 2+版本。但我必须坚持spark 1.6。以下是我的代码片段。

def process(time, rdd):
    print("========= %s =========" % str(time))
    try:

        sqlContext = getSqlContextInstance(rdd.context)

        jsonRDD = sqlContext.read.json(rdd)
        jsonRDD.registerTempTable("tweets")
        jsonRDD.printSchema()
    except:
        pass


rawKafkaStream = KafkaUtils.createStream(ssc, zkQuorum, "kafka-consumer", {kafkaTopic: 4})
parsed_stream = rawKafkaStream.map(lambda rawTweet: json.loads(rawTweet[1]))

parsed_stream.foreachRDD(process)

Python json.dumps()在 Spark 中创建 RDD[Dict] 类型的字典 RDD。要使其成为 DF,以下行将起作用

SQLContext.jsonRDD(RDD[dict].map(lambda x: json.dumps(x)))

为了使它在我的情况下工作,我必须执行以下操作

def process(time, rdd):
    print("========= %s =========" % str(time))
    try:

        sqlContext = getSqlContextInstance(rdd.context)

        jsonRDD=sqlContext.jsonRDD(rdd.map(lambda x: json.dumps(x)))
        jsonRDD.registerTempTable("tweets")
        jsonRDD.printSchema()
    except:
        pass


rawKafkaStream = KafkaUtils.createStream(ssc, zkQuorum, "kafka-consumer", {kafkaTopic: 4})
parsed_stream = rawKafkaStream.map(lambda rawTweet: json.loads(rawTweet[1]))

parsed_stream.foreachRDD(process)

有关此方法的更多详细信息。请参阅link https://issues.apache.org/jira/browse/SPARK-2870

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

pyspark 将 twitter json 流式传输到 DF 的相关文章

随机推荐

  • Nginx docker容器代理传递到另一个端口

    我想在 docker 容器中运行 Nginx 它监听端口 80 并且当 url 以 word 开头时 我希望它 proxy pass 到端口 8080api 我有一些网络应用程序侦听端口 8080 这在没有 docker 的情况下对我来说一
  • C#:如何防止主窗体过早显示

    在我的 main 方法中 我像往常一样启动主窗体 Application EnableVisualStyles Application SetCompatibleTextRenderingDefault false Application
  • Bigquery - 选择时间戳作为人类可读的日期时间

    如何在 Google Bigquery 中选择时间戳 存储为秒 作为人类可读的日期时间 schema id STRING signup date TIMESTAMP 我使用编写了一个查询DATE功能 但出现错误 SELECT DATE cr
  • 使用多个值过滤 JFX TableView

    我目前正在尝试过滤我的数据TableView using FilteredList with predicate 我有2个ComboBoxes来过滤值 我的表包含Result Each Result has a Student that S
  • 如何在Java媒体框架中学习.wav持续时间?

    我正在尝试使用 java 媒体框架将 mov 文件与 wav 文件合并 因此我需要知道它们的持续时间 我怎样才能做到这一点 任何想法 将不胜感激 您可以使用以下方式了解声音文件的持续时间 即 VitalyVal 的第二种方式 import
  • 将目录压缩为单个文件的方法有哪些

    不知道怎么问 所以我会解释一下情况 我需要存储一些压缩文件 最初的想法是创建一个文件夹并存储所需数量的压缩文件 并创建一个文件来保存有关每个压缩文件的数据 但是 我不被允许创建许多文件 只能有一个 我决定创建一个压缩文件 其中包含有关进一步
  • Excel:#CALC!使用 MAP 函数计算间隔重叠时出现错误(嵌套数组)

    我正在努力解决以下公式 它适用于某些情况 但不适用于所有情况 名字input有失败的数据集 得到一个 CALC 描述 嵌套数组 错误 LET input N1 0 0 N1 0 10 N1 10 20 names INDEX input 1
  • 在.rdlc报告的底部设置一个文本框

    我在 rdlc 报告中使用 tablix 有一个文本框 其中包含文本 签名 我想将此文本框放置在报告最后一页的底部 就在页脚之前 我已经用谷歌搜索了这个解决方案 但没有找到满意的结果 我的环境是VS2010 framework 4 0 有什
  • 内嵌显示定义术语和描述

    我正在为页面上的某些元素使用定义列表 并需要它们内联显示 例如 它们normally看起来像 我需要它们看起来像 注意多个 DD 我可以让它们在 moz 中使用 float 来正常工作 但无论我尝试什么 它们都无法在 IE 中工作 我通常会
  • C 预处理器库

    我的任务是开发源分析工具C程序 并且我需要在分析本身之前预处理代码 我想知道什么是最好的图书馆 我需要一些重量轻 便于携带的东西 与其推出自己的 为什么不使用cpp这是的一部分gcc suite http gcc gnu org onlin
  • 索引后文件被锁定

    我的 网络 应用程序中有以下工作流程 从存档下载 pdf 文件 索引文件 删除文件 我的问题是 对文件进行索引后 它仍然处于锁定状态 并且删除部分会引发异常 这是我用于索引文件的代码片段 try ContentStreamUpdateReq
  • 如何使用 pybrain 黑盒优化训练神经网络来处理监督数据集?

    我玩了一下 pybrain 了解如何生成具有自定义架构的神经网络 并使用反向传播算法将它们训练为监督数据集 然而 我对优化算法以及任务 学习代理和环境的概念感到困惑 例如 我将如何实现一个神经网络 例如 1 以使用 pybrain 遗传算法
  • Json.NET - 反序列化接口属性引发错误“类型是接口或抽象类,无法实例化”

    我有一个类 其属性是接口 public class Foo public int Number get set public ISomething Thing get set 尝试反序列化Foo使用 Json NET 的类给我一条错误消息
  • 当按钮处于加载状态时,如何向按钮添加微调器图标?

    Twitter 引导按钮 http getbootstrap com javascript buttons有一个很好的Loading 状态可用 问题是它只显示一条消息 例如Loading 通过了data loading text像这样的属性
  • webpack中动态加载外部模块失败

    我正在尝试建立以下架构 一个核心 React 应用程序 它具有一些基本功能 并且能够在运行时加载其他 React 组件 这些额外的 React 组件可以按需加载 并且它们在构建核心应用程序时不可用 因此它们不能包含在核心应用程序的捆绑包中
  • 如何通过 jQuery 中的类获取特定 html 元素的innerHTML?

    我有这样的 HTML 代码 div class a html value 1 div div class a html value 2 div 我怎样才能访问html value 1 and html value 2使用jquery 分别地
  • 如果使用 SingleOrDefault() 并在数字列表中搜索不在列表中的数字,如何返回 null?

    使用查询正数列表时SingleOrDefault 当在列表中找不到数字时 如何返回 null 或像 1 这样的自定义值 而不是类型的默认值 在本例中为 0 你可以使用 var first theIntegers Cast
  • 接口中的构造方法

    接口中的构造方法不好吗 为什么人们认为有人想要实例化接口 我们想要做的是强制实现者实现构造函数 就像其他接口方法一样 接口就像一个合同 假设我有一个接口 Queue 并且我想确保实现者创建一个带有一个参数的构造函数 该构造函数创建一个单例队
  • SKNode 上的 runAction 未完成

    我使用 NSOperation 子类来获取串行执行SKAction正如这个问题中所描述的 如何在 Swift 中子类化 NSOperation 以将 SKAction 对象排队以进行串行执行 https stackoverflow com
  • pyspark 将 twitter json 流式传输到 DF

    我正在从事集成工作spark streaming with twitter using pythonAPI 我看到的大多数示例或代码片段和博客是他们从Twitter JSON文件进行最终处理 但根据我的用例 我需要所有字段twitter J