我有两个笔记本。第一个笔记本正在使用 tweepy 从 twitter 读取推文并将其写入套接字。其他笔记本正在使用 Spark 结构化流(Python)从该套接字读取推文并将其结果写入控制台。不幸的是我没有在 jupyter 控制台上得到输出。代码在 pycharm 上运行良好。
spark = SparkSession \
.builder \
.appName("StructuredStreaming") \
.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
# This is Spark Structured Streaming Code which is reading streams from twitter and showing them on console.
tweets = spark \
.readStream \
.format("socket") \
.option("host", "127.0.0.1") \
.option("port", 7000) \
.load()
query = tweets \
.writeStream \
.option("truncate", "false") \
.outputMode("append") \
.format("console") \
.start()
query.awaitTermination()
我不确定 Jupyter Notebook 是否可以实现这一点。但是,您可以使用内存输出来实现类似的结果。这很简单在complete
模式,但可能需要一些更改append
.
For the complete
mode
In a complete
输出模式,您的查询应该大致如下所示:
query = tweets \
.writeStream \
.outputMode("complete") \
.format("memory") \
.queryName("your_query_name") \
.start()
请注意,没有query.awaitTermination()
在最后。
现在,查询your_query_name
另一个单元格中的临时表,并根据需要观察不断更新的结果:
from IPython.display import display, clear_output
while True:
clear_output(wait=True)
display(query.status)
display(spark.sql('SELECT * FROM your_query_name').show())
sleep(1)
For the append
mode
如果您想使用append
输出模式,你必须使用水印。您也将无法使用聚合,因此您的代码可能需要进行一些进一步的更改。
query = tweets \
.withWatermark("timestampColumn", "3 minutes")
.writeStream \
.outputMode("append") \
.format("memory") \
.queryName("your_query_name") \
.start()
显示代码保持不变。
您还可以展示query.lastProgress
以类似的方式获取更详细的信息。
灵感和参考
- 如何从 Zeppelin 中的控制台流接收器获取输出? https://stackoverflow.com/questions/47357418/how-to-get-the-output-from-console-streaming-sink-in-zeppelin
- 覆盖 jupyter 笔记本中以前的输出 https://stackoverflow.com/questions/38540395/overwrite-previous-output-in-jupyter-notebook
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)