我正在从事集成工作spark-streaming
with twitter
using python
API。我看到的大多数示例或代码片段和博客是他们从Twitter
JSON
文件进行最终处理。但根据我的用例,我需要所有字段twitter
JSON
并将其转换为数据框。这就是我面临问题的地方sqlContext.read.json()
正在倾倒整个JSON
DStream
into _corrupt_record
+--------------------+
| _corrupt_record|
+--------------------+
|{u'quote_count': ...|
|{u'quote_count': ...|
|{u'quote_count': ...|
|{u'quote_count': ...|
|{u'quote_count': ...|
|{u'quote_count': ...|
|{u'quote_count': ...|
|{u'quote_count': ...|
|{u'quote_count': ...|
|{u'quote_count': ...|
|{u'quote_count': ...|
|{u'quote_count': ...|
另外,这个问题似乎可以通过使用来解决structured streaming
使用spark 2+版本。但我必须坚持spark 1.6
。以下是我的代码片段。
def process(time, rdd):
print("========= %s =========" % str(time))
try:
sqlContext = getSqlContextInstance(rdd.context)
jsonRDD = sqlContext.read.json(rdd)
jsonRDD.registerTempTable("tweets")
jsonRDD.printSchema()
except:
pass
rawKafkaStream = KafkaUtils.createStream(ssc, zkQuorum, "kafka-consumer", {kafkaTopic: 4})
parsed_stream = rawKafkaStream.map(lambda rawTweet: json.loads(rawTweet[1]))
parsed_stream.foreachRDD(process)
Python json.dumps()
在 Spark 中创建 RDD[Dict] 类型的字典 RDD。要使其成为 DF,以下行将起作用
SQLContext.jsonRDD(RDD[dict].map(lambda x: json.dumps(x)))
为了使它在我的情况下工作,我必须执行以下操作
def process(time, rdd):
print("========= %s =========" % str(time))
try:
sqlContext = getSqlContextInstance(rdd.context)
jsonRDD=sqlContext.jsonRDD(rdd.map(lambda x: json.dumps(x)))
jsonRDD.registerTempTable("tweets")
jsonRDD.printSchema()
except:
pass
rawKafkaStream = KafkaUtils.createStream(ssc, zkQuorum, "kafka-consumer", {kafkaTopic: 4})
parsed_stream = rawKafkaStream.map(lambda rawTweet: json.loads(rawTweet[1]))
parsed_stream.foreachRDD(process)
有关此方法的更多详细信息。请参阅link https://issues.apache.org/jira/browse/SPARK-2870
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)