PySpark:使用 newAPIHadoopFile 从多行记录文本文件中读取、映射和减少

2024-02-27

我正在尝试解决一个类似于这个帖子 https://stackoverflow.com/questions/31227363/creating-spark-data-structure-from-multiline-record。我的原始数据是一个文本文件,其中包含多个传感器的值(观察值)。每个观测值都带有时间戳,但传感器名称仅给出一次,并且不是在每一行中给出。但一个文件中有多个传感器。

Time    MHist::852-YF-007   
2016-05-10 00:00:00 0
2016-05-09 23:59:00 0
2016-05-09 23:58:00 0
2016-05-09 23:57:00 0
2016-05-09 23:56:00 0
2016-05-09 23:55:00 0
2016-05-09 23:54:00 0
2016-05-09 23:53:00 0
2016-05-09 23:52:00 0
2016-05-09 23:51:00 0
2016-05-09 23:50:00 0
2016-05-09 23:49:00 0
2016-05-09 23:48:00 0
2016-05-09 23:47:00 0
2016-05-09 23:46:00 0
2016-05-09 23:45:00 0
2016-05-09 23:44:00 0
2016-05-09 23:43:00 0
2016-05-09 23:42:00 0
Time    MHist::852-YF-008   
2016-05-10 00:00:00 0
2016-05-09 23:59:00 0
2016-05-09 23:58:00 0
2016-05-09 23:57:00 0
2016-05-09 23:56:00 0
2016-05-09 23:55:00 0
2016-05-09 23:54:00 0
2016-05-09 23:53:00 0
2016-05-09 23:52:00 0
2016-05-09 23:51:00 0
2016-05-09 23:50:00 0
2016-05-09 23:49:00 0
2016-05-09 23:48:00 0
2016-05-09 23:47:00 0
2016-05-09 23:46:00 0
2016-05-09 23:45:00 0
2016-05-09 23:44:00 0
2016-05-09 23:43:00 0
2016-05-09 23:42:00 0

因此,我想将 Hadoop 配置为在给出传感器信息的行处拆分文件。然后从这些行中读取传感器名称(例如 852-YF-007 和 852-YF-008),并使用 MapReduce 相应地读取每个传感器的值。

我在 Python(Jupyter Notebook)中做到了这一点:

sheet = sc.newAPIHadoopFile(
    '/user/me/sample.txt',
    'org.apache.hadoop.mapreduce.lib.input.TextInputFormat',
    'org.apache.hadoop.io.LongWritable',
    'org.apache.hadoop.io.Text',
    conf={'textinputformat.record.delimiter': 'Time\tMHist'}
)

sf = sheet.filter(lambda (k, v): v)
sf.map(lambda (k, v): v).splitlines())

sf.take(50)

输出是这样的:

[[u'::852-YF-007\t',
  u'2016-05-10 00:00:00\t0',
  u'2016-05-09 23:59:00\t0',
  u'2016-05-09 23:58:00\t0',
  u'2016-05-09 23:57:00\t0',
  u'2016-05-09 23:56:00\t0',
  u'2016-05-09 23:55:00\t0',
  u'2016-05-09 23:54:00\t0',
  u'2016-05-09 23:53:00\t0',
  u'2016-05-09 23:52:00\t0',
  u'2016-05-09 23:51:00\t0',
  u'2016-05-09 23:50:00\t0',
  u'2016-05-09 23:49:00\t0',
  u'2016-05-09 23:48:00\t0',
  u'2016-05-09 23:47:00\t0',
  u'2016-05-09 23:46:00\t0',
  u'2016-05-09 23:45:00\t0',
  u'2016-05-09 23:44:00\t0',
  u'2016-05-09 23:43:00\t0',
  u'2016-05-09 23:42:00\t0'],
 [u'::852-YF-008\t',
  u'2016-05-10 00:00:00\t0',
  u'2016-05-09 23:59:00\t0',
  u'2016-05-09 23:58:00\t0',
  u'2016-05-09 23:57:00\t0',
  u'2016-05-09 23:56:00\t0',
  u'2016-05-09 23:55:00\t0',
  u'2016-05-09 23:54:00\t0',
  u'2016-05-09 23:53:00\t0',
  u'2016-05-09 23:52:00\t0',
  u'2016-05-09 23:51:00\t0',
  u'2016-05-09 23:50:00\t0',
  u'2016-05-09 23:49:00\t0',
  u'2016-05-09 23:48:00\t0',
  u'2016-05-09 23:47:00\t0',
  u'2016-05-09 23:46:00\t0',
  u'2016-05-09 23:45:00\t0',
  u'2016-05-09 23:44:00\t0',
  u'2016-05-09 23:43:00\t0',
  u'2016-05-09 23:42:00\t0']]

我的问题是,如何进一步处理它以提取传感器名称并获取该传感器的值线。有点喜欢这个

852-YF-007 --> array of sensor_lines
852-YF-008 --> array of sensor_lines

然后,这些行本身将被分为时间戳和值。但我更感兴趣的是从行中分离传感器名称。


我个人会:

  • 扩展分隔符::

    sheet = sc.newAPIHadoopFile(
        path,
        'org.apache.hadoop.mapreduce.lib.input.TextInputFormat',
        'org.apache.hadoop.io.LongWritable',
        'org.apache.hadoop.io.Text',
        conf={'textinputformat.record.delimiter': 'Time\tMHist::'}
    )
    
  • 删除键:

    values = sheet.values()
    
  • 过滤掉空条目

    non_empty = values.filter(lambda x:  x)
    
  • split:

    grouped_lines = non_empty.map(str.splitlines)
    
  • 单独的键和值:

    from operator import itemgetter
    
    pairs = grouped_lines.map(itemgetter(0, slice(1, None)))
    
  • 最后分割值:

    pairs.flatMapValues(lambda xs: [x.split("\t") for x in xs])
    

当然,所有这些都可以通过一个函数来完成:

import dateutil.parser

def process(pair):
    _, content = pair
    clean = [x.strip() for x in content.strip().splitlines()]
    if not clean:
        return []
    k, vs = clean[0], clean[1:]
    for v in vs:
        try:
            ds, x = v.split("\t")
            yield k, (dateutil.parser.parse(ds), float(x))  # or int(x)
        except ValueError:
            pass

sheet.flatMap(process)
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

PySpark:使用 newAPIHadoopFile 从多行记录文本文件中读取、映射和减少 的相关文章

  • 缺少依赖项 hive-builtins 会导致 Oozie 构建失败,错误代码为 410

    我尝试从源代码构建 oozie 但安装失败 我想安装 oozie 并热切地等待使用它 我在这个阶段失败了 当我从 oozie 3 3 3 目录给出 cmd 时 bin mkdistro sh DskipTests 我收到这个错误 INFO
  • 让 VoiceChannel.members 和 Guild.members 返回完整列表的问题

    每当我尝试使用 VoiceChannel members 或 Guild members 时 它都不会提供适用成员的完整列表 我从文本命令的上下文中获取 VoiceChannel 和 Guild 如下所示 bot command name
  • pickle.PicklingError:无法腌制未打开读取的文件

    我在 Dataproc 上运行 PySpark 作业时收到此错误 可能是什么原因 这是错误的堆栈跟踪 File usr lib python2 7 pickle py line 331 in save self save reduce ob
  • 多输出堆叠回归器

    一次性问题 我正在尝试构建一个多输入堆叠回归器 添加到 sklearn 0 22 据我了解 我必须结合StackingRegressor and MultiOutputRegressor 经过多次尝试 这似乎是正确的顺序 import nu
  • NLTK 2.0分类器批量分类器方法

    当我运行此代码时 它会抛出一个错误 我认为这是由于 NLTK 3 0 中不存在batch classify 方法 我很好奇如何解决旧版本中的某些内容在新版本中消失的此类问题 def accuracy classifier gold resu
  • 如何从Python中的函数返回多个值? [复制]

    这个问题在这里已经有答案了 如何从Python中的函数返回多个变量 您可以用逗号分隔要返回的值 def get name you code return first name last name 逗号表示它是一个元组 因此您可以用括号将值括
  • python multiprocessing 设置生成进程等待

    是否可以生成一些进程并将生成进程设置为等待生成的进程完成 下面是我用过的一个例子 import multiprocessing import time import sys def daemon p multiprocessing curr
  • Python 3d 绘图设置固定色阶

    我正在尝试绘制两个 3d 数组 第一个数组的 z 值在范围内 0 15 0 15 第二个来自 0 001 0 001 当我绘图时 色标自动遵循数据范围 如何设置自定义比例 我不想看到 0 001 的浅色 而应该看到 0 15 的浅色 如何修
  • Python 内置的 super() 是否违反了 DRY?

    显然这是有原因的 但我没有足够的经验来认识到这一点 这是Python中给出的例子docs http docs python org 2 library functions html super class C B def method se
  • 从 Powershell 脚本安装 Python

    当以管理员身份从 PowerShell 命令行运行以下命令时 可以在 Windows 11 上成功安装 Python c temp python 3 11 4 amd64 exe quiet InstallAllUsers 0 Instal
  • Java 和 Python 可以在同一个应用程序中共存吗?

    我需要一个 Java 实例直接从 Python 实例数据存储中获取数据 我不知道这是否可能 数据存储是否透明 唯一 或者每个实例 如果它们确实可以共存 都有其单独的数据存储 总结一下 Java 应用程序如何从 Python 应用程序的数据存
  • 尽管我已在 python ctypes 中设置了信号处理程序,但并未调用它

    我尝试过使用 sigaction 和 ctypes 设置信号处理程序 我知道它可以与python中的信号模块一起使用 但我想尝试学习 当我向该进程发送 SIGTERM 时 但它没有调用我设置的处理程序 只打印 终止 为什么它不调用处理程序
  • pandas - 包含时间序列数据的堆积条形图

    我正在尝试使用时间序列数据在 pandas 中创建堆积条形图 DATE TYPE VOL 0 2010 01 01 Heavy 932 612903 1 2010 01 01 Light 370 612903 2 2010 01 01 Me
  • Django REST Framework - CurrentUserDefault 使用

    我正在尝试使用CurrentUserDefault一个序列化器的类 user serializers HiddenField default serializers CurrentUserDefault 文档说 为了使用它 请求 必须作为
  • 如何使用 Python 3 检查目录是否包含文件

    我到处寻找这个答案但找不到 我正在尝试编写一个脚本来搜索特定的子文件夹 然后检查它是否包含任何文件 如果包含 则写出该文件夹的路径 我已经弄清楚了子文件夹搜索部分 但检查文件却难倒了我 我发现了有关如何检查文件夹是否为空的多个建议 并且我尝
  • 找到一个数字所属的一组范围

    我有一个 200k 行的数字范围列表 例如开始位置 停止位置 该列表包括除了非重叠的重叠之外的所有类型的重叠 列表看起来像这样 3 5 10 30 15 25 5 15 25 35 我需要找到给定数字所属的范围 并对 100k 个数字重复该
  • 重新分配唯一值 - pandas DataFrame

    我在尝试着assign unique值在pandas df给特定的个人 For the df below Area and Place 会一起弥补unique不同的价值观jobs 这些值将分配给个人 总体目标是使用尽可能少的个人 诀窍在于这
  • 如何使用 Boto3 启动具有 IAM 角色的 EC2 实例?

    我无法弄清楚如何使用指定的 IAM 角色在 Boto3 中启动 EC2 实例 以下是迄今为止我如何成功创建实例的一些示例代码 import boto3 ec2 boto3 resource ec2 region name us west 2
  • 等待子进程使用 os.system

    我用了很多os system在 for 循环内调用创建后台进程 如何等待所有后台进程结束 os wait告诉我没有子进程 ps 我使用的是Solaris 这是我的代码 usr bin python import subprocess imp
  • 如何使用 PrimaryKeyRelatedField 更新多对多关系上的类别

    Django Rest 框架有一个主键相关字段 http www django rest framework org api guide relations primarykeyrelatedfield其中列出了我的 IDmany to m

随机推荐