我想尝试在 python 中加载 csv 数据并通过 SPark Streaming 流式传输每一行 Spark。
我对网络东西还很陌生。我不完全是如果我应该创建一个服务器 python 脚本,一旦建立连接(使用 Spark 流),它将开始发送每一行。在 Spark Streaming Documentation 中,如果我正确的话,他们会执行 nc -l 9999 ,这是一个监听端口 9999 的 netcat 服务器。所以我尝试创建一个类似的 python 脚本来解析 csv 并在端口 60000 上发送
import socket # Import socket module
import csv
port = 60000 # Reserve a port for your service.
s = socket.socket() # Create a socket object
host = socket.gethostname() # Get local machine name
s.bind((host, port)) # Bind to the port
s.listen(5) # Now wait for client connection.
print('Server listening....')
while True:
conn, addr = s.accept() # Establish connection with client.
print('Got connection from', addr)
csvfile = open('Titantic.csv', 'rb')
reader = csv.reader(csvfile, delimiter = ',')
for row in reader:
line = ','.join(row)
conn.send(line)
print(line)
csvfile.close()
print('Done sending')
conn.send('Thank you for connecting')
conn.close()
Spark 流脚本 -
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
ssc = StreamingContext(sc, 1)
# Create a DStream that will connect to hostname:port, like localhost:9999
lines_RDD = ssc.socketTextStream("localhost", 60000)
# Split each line into words
data_RDD = lines_RDD.flatMap(lambda line: line.split(","))
data_RDD.pprint()
ssc.start() # Start the computation
ssc.awaitTermination() # Wait for the computation to terminate
运行 Spark 脚本时(顺便说一句,这是在 Jupyter Notebooks 中)我收到此错误 -
IllegalArgumentException:“要求失败:未注册输出操作,因此无需执行任何内容”
我不认为我正确地执行了我的套接字脚本,但我不太确定该怎么做我基本上试图复制 nc -lk 9999 所做的事情,这样我就可以通过端口发送文本数据,然后 Spark Streaming 正在侦听它并接收数据并对其进行处理。
任何帮助将不胜感激