python将csv数据发送到spark Streaming

2024-02-03

我想尝试在 python 中加载 csv 数据并通过 SPark Streaming 流式传输每一行 Spark。

我对网络东西还很陌生。我不完全是如果我应该创建一个服务器 python 脚本,一旦建立连接(使用 Spark 流),它将开始发送每一行。在 Spark Streaming Documentation 中,如果我正确的话,他们会执行 nc -l 9999 ,这是一个监听端口 9999 的 netcat 服务器。所以我尝试创建一个类似的 python 脚本来解析 csv 并在端口 60000 上发送

import socket                   # Import socket module
import csv

 port = 60000                    # Reserve a port for your service.
 s = socket.socket()             # Create a socket object
 host = socket.gethostname()     # Get local machine name
 s.bind((host, port))            # Bind to the port
 s.listen(5)                     # Now wait for client connection.

 print('Server listening....')

 while True:
     conn, addr = s.accept()     # Establish connection with client.
     print('Got connection from', addr)



     csvfile = open('Titantic.csv', 'rb')

     reader = csv.reader(csvfile, delimiter = ',')
     for row in reader:
         line = ','.join(row)

         conn.send(line)
         print(line)

     csvfile.close()

     print('Done sending')
     conn.send('Thank you for connecting')
     conn.close()

Spark 流脚本 -

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
ssc = StreamingContext(sc, 1)

# Create a DStream that will connect to hostname:port, like localhost:9999
lines_RDD = ssc.socketTextStream("localhost", 60000)

# Split each line into words
data_RDD = lines_RDD.flatMap(lambda line: line.split(","))

data_RDD.pprint()

ssc.start()             # Start the computation
ssc.awaitTermination()  # Wait for the computation to terminate

运行 Spark 脚本时(顺便说一句,这是在 Jupyter Notebooks 中)我收到此错误 - IllegalArgumentException:“要求失败:未注册输出操作,因此无需执行任何内容”

我不认为我正确地执行了我的套接字脚本,但我不太确定该怎么做我基本上试图复制 nc -lk 9999 所做的事情,这样我就可以通过端口发送文本数据,然后 Spark Streaming 正在侦听它并接收数据并对其进行处理。

任何帮助将不胜感激


我正在尝试做类似的事情,但我想每 10 秒传输一行。我用这个脚本解决了:

import socket
from time import sleep

host = 'localhost'
port = 12345

s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind((host, port))
s.listen(1)
while True:
    print('\nListening for a client at',host , port)
    conn, addr = s.accept()
    print('\nConnected by', addr)
    try:
        print('\nReading file...\n')
        with open('iris_test.csv') as f:
            for line in f:
                out = line.encode('utf-8')
                print('Sending line',line)
                conn.send(out)
                sleep(10)
            print('End Of Stream.')
    except socket.error:
        print ('Error Occured.\n\nClient disconnected.\n')
conn.close()

希望这可以帮助。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

python将csv数据发送到spark Streaming 的相关文章

随机推荐

  • OnClick单选按钮显示隐藏div角度js

    我的代码是
  • 为什么对象不应该是可克隆的? [关闭]

    就目前情况而言 这个问题不太适合我们的问答形式 我们希望答案得到事实 参考资料或专业知识的支持 但这个问题可能会引发辩论 争论 民意调查或扩展讨论 如果您觉得这个问题可以改进并可能重新开放 访问帮助中心 help reopen questi
  • 如何在黑莓10级联qml中的listview上实现按钮单击事件?

    我有一个带有按钮的列表视图 但无法触发 qml blackberry 10 中按钮的单击事件 任何人都可以帮我解决这个问题吗 ListView verticalAlignment VerticalAlignment Center horiz
  • 用于从 VS 项目中提取类及其所有依赖项的工具或插件

    我有一个非常大的项目 有很多文件 我只需要将几个类提取到一个单独的 DLL 中 但问题是这些类对其他文件有很多依赖项 尝试手动制作它 我已经花了几个小时 但仍然存在与丢失文件相关的无限错误 有什么解决方案可以自动化这个过程吗 我不知道有什么
  • 用冒号对数字进行排序

    我有一个圣经经文 时间或其他一些带有数字和冒号的字符串的列表 我希望将这些分类为 1 5 2 1 2 8 2 14 11 36 我将如何对这些数字进行排序 我假设我必须解析字符串 用冒号分隔 然后排序 我尝试过的给了我这样的东西 1 5 1
  • 使用 'with_items' 时,Ansible 显示错误:“一个或多个未定义的变量:'item' 未定义”

    我正在尝试计算 elb 内的实例数 这是我的 Ansible 剧本 name Get elb facts local action module ec2 elb facts name elb region ansible ec2 place
  • Xcode 服务器 CI Bot 测试会话已退出 (-1)

    尝试在 XcodeServer 上针对模拟器设备运行测试时出现错误 有时 一台设备的测试通过 另一台设备的测试失败 但失败的设备并不总是同一台设备 这可以在单个会话中发生 日志文件错误如下 2015 03 23 10 44 11 029 I
  • jQuery.ajax() - 如何最好地处理超时?

    我想知道 处理超时的最佳方法是什么jQuery ajax 这是我目前的解决方案 如果发生超时 页面将被重新加载 并且脚本将有另一个机会在给定的时间范围内加载数据 Problem 如果 get json php 下面的示例 确实不可用 它将成
  • 从自定义 mojo 访问 Maven 插件运行时配置的最佳方法?

    我正在编写一个自定义的 maven2 MOJO 我需要从此 MOJO 访问另一个插件的运行时配置 做这个的最好方式是什么 您可以使用以下步骤获取当前在构建中使用的插件列表 首先 您需要让 Maven 将当前项目注入到您的 mojo 中 您可
  • 如何摆脱算法的复杂性?

    锻炼 编写一个 multiple a b 函数 将数字 a 乘以数字 b 而不使用 运算符或 Math imul 方法 multiple 1 1 1 multiple 1 2 2 multiple 0 0 0 Code export def
  • 如何使用两条相交线的概念在 Netlogo 中实现避障(海龟标题与由补丁组成的墙)

    我们如何将 Netlogo 海龟的方向转换为直线方程 y mx c 以便可以将其与另一个直线方程 例如代表墙的补丁 进行比较 我需要将乌龟的航向转换为直线方程 然后将标题线方程与墙的线方程进行比较 墙的线方程有固定的 x 或固定的 y 取决
  • python中读取资源文件

    我是一名 Java 开发人员 后来转为 Python 开发人员 如何在python中读取类路径资源文件 这是我的目录结构 resources test schema xml create confd serialized objects s
  • 以编程方式发送短信,无需打开消息应用程序

    到目前为止 我正在使用以下代码通过我的应用程序向另一部手机发送短信 Intent intent new Intent Intent ACTION VIEW Uri parse sms srcNumber intent putExtra sm
  • 在 Swift 中录制音频

    有谁知道我在哪里可以找到有关如何在 Swift 应用程序中录制音频的信息 我一直在查看一些音频播放示例 但似乎无法找到有关实现音频录制的任何内容 谢谢 在 Swift 3 中 添加框架AVFoundation 在info plist中添加键
  • 使用复选框 onClick 覆盖父级 onClick 事件?

    首先 抱歉我的英语不好 我正在创建一个优惠券网站 但在选择和取消选择优惠券时遇到问题 每张优惠券都位于一个 DIV 框 中 其中有一个复选框 我在 DIV 框中创建了一个 onClick 函数 这样用户可以通过单击 DIV 框中的任何内容来
  • 设置不带货币符号的货币格式

    我在用NumberFormat getCurrencyInstance myLocale 获取我给定的区域设置的自定义货币格式 但是 这始终包含我不想要的货币符号 我只想为给定的区域设置提供正确的货币数字格式 而无需货币符号 Doing a
  • Objective C 中的惰性数据类型

    在 SML 中 可以采用以下方式对惰性编程进行建模 Have a datatype to wrap a computation datatype a susp Susp of unit gt a A function to hold the
  • chol.default(K) 中出现错误:5 阶前导小数对于 betareg 不是正定的

    我正在尝试适应一个beta regression模型使用betareg function of the betareg package对这些数据 df lt data frame category c c1 c1 c1 c1 c1 c1 c
  • 使用 C# 以编程方式读取 Openoffice Calc (.ods)?

    我想知道是否可以使用 C 以编程方式读取 OpenOffice Calc 电子表格 我可以对 Excel xls 和 xlsx 执行此操作 但无法找到读取计算电子表格的解决方案 如果有人有解决方案 请帮助我 ODF NET http www
  • python将csv数据发送到spark Streaming

    我想尝试在 python 中加载 csv 数据并通过 SPark Streaming 流式传输每一行 Spark 我对网络东西还很陌生 我不完全是如果我应该创建一个服务器 python 脚本 一旦建立连接 使用 Spark 流 它将开始发送