Spark Streaming入门

2023-10-27

什么是Spark Streaming?

首先,什么是流(streaming)?数据流是连续到达的无穷序列。流处理将不断流动的输入数据分成独立的单元进行处理。流处理是对流数据的低延迟处理和分析。Spark Streaming是Spark API核心的扩展,可实现实时数据的快速扩展,高吞吐量,高容错处理。Spark Streaming适用于大量数据的快速处理。实时处理用例包括:

  • 网站监控,网络监控
  • 欺诈识别
  • 网页点击
  • 广告
  • 物联网传感器

Spark Streaming支持如HDFS目录,TCP套接字,Kafka,Flume,Twitter等数据源。数据流可以用Spark 的核心API,DataFrames SQL,或机器学习的API进行处理,并且可以被保存到HDFS,databases或Hadoop OutputFormat提供的任何文件系统中去。

Spark Straming如何工作

Spark Streaming将数据流每X秒分作一个集合,称为Dstreams,它在内部是一系列RDD。您的Spark应用程序使用Spark API处理RDD,并且批量返回RDD操作的结果。

示例应用程序的体系结构

Spark Streaming示例代码执行以下操作:

  • 读取流式数据。
  • 处理流数据。
  • 将处理后的数据写入HBase表。

其他Spark示例代码执行以下操作:

  • 读取流媒体代码编写的HBase Table数据
  • 计算每日汇总的统计信息
  • 将汇总统计信息写入HBase表

示例数据集

油泵传感器数据文件放入目录中(文件是以逗号为分隔符的CSV)。Spark Streaming将监视目录并处理在该目录中创建的所有文件。(如前所述,Spark Streaming支持不同的流式数据源;为简单起见,此示例将使用CSV。)

以下是带有一些示例数据的csv文件示例:

我们使用Scala案例类来定义与传感器数据csv文件相对应的传感器模式,并使用parseSensor函数将逗号分隔值解析到传感器案例类中。

HBase表格模式

流数据的HBase表格模式如下:

  • 泵名称日期和时间戳的复合行键
  • 可以设置报警列簇,来监控数据。请注意,数据和警报列簇可能会设为在一段时间后失效。

日常统计汇总的模式如下所示:

  • 泵名称和日期的复合行键
  • 列簇统计
  • 最小值,最大值和平均值。

下面的函数将Sensor对象转换为HBase Put对象,该对象用于将数据行插入到HBase中。

写HBase表的配置

您可以使用Spark 的TableOutputFormat类写入HBase表,这与您从MapReduce写入HBase表的方式类似。下面我们使用TableOutputFormat类设置HBase的配置。

Spark Streaming示例代码

这些是Spark Streaming代码的基本步骤:

  1. 初始化Spark StreamingContext对象。
  2. 将转换和输出操作应用于DStream。
  3. 开始接收数据并使用streamingContext.start()处理它。
  4. 等待streamingContext.awaitTermination()的返回从而停止处理。

我们将通过示例应用程序代码完成这些步骤。

初始化StreamingContext

首先,我们创建一个StreamingContext,这是流式传输的主要入口点(2秒间隔时间)。

val sparkConf =  new  SparkConf ( ) . setAppName ( "HBaseStream" )
// 创建 StreamingContext, 流式函数的主要入口
val ssc =  new  StreamingContext ( sparkConf ,  Seconds ( 2 ) )复制代码

接下来,我们使用StreamingContext textFileStream(directory)方法创建一个输入流,该输入流监视Hadoop兼容的文件系统以获取新文件,并处理在该目录中创建的所有文件。

// 创建代表数据 DStream对象
val linesDStream = ssc . textFileStream ( "/user/user01/stream" )复制代码

linesDStream代表数据流,每个记录都是一行文本。内部DStream是一系列RDD,每个批处理间隔一个RDD。

将转换和输出操作应用于DStream

接下来,我们将数据行解析为Sensor对象,并使用DStream行上的map操作。

// 把lineDSream的每一行解析为Sensor对象
val sensorDStream = linesDStream . map ( Sensor . parseSensor )复制代码

map操作在linesDStream中的RDD上使用Sensor.parseSensor函数,从而生成Sensor对象(RDD)。

接下来,我们使用DStream foreachRDD方法将处理应用于此DStream中的每个RDD。我们过滤低psi传感器对象以创建警报,然后我们通过将传感器和警报数据转换为Put对象并使用PairRDDFunctions saveAsHadoopDataset方法将传感器和警报数据写入HBase ,该方法使用Hadoop将RDD输出到任何支持Hadoop的存储系统,该存储系统的配置对象(请参阅上面的HBase的Hadoop配置)。

// 对每一个RDD. 
sensorRDD . foreachRDD { rdd =>
// 低psi的传感器过滤器 
val alertRDD = rdd . filter ( sensor => sensor . psi <  5.0 )
// 把传感器数据转为对象并写入HD
rdd . map ( Sensor . convertToPut ) . saveAsHadoopDataset (jobConfig )
// 把警报转为对象并写入HD
rdd . map ( Sensor . convertToPutAlert ) . saveAsHadoopDataset (jobConfig )
}复制代码

sensorRDD对象被转换并写入HBase。

开始接收数据

要开始接收数据,我们必须在StreamingContext上显式调用start(),然后调用awaitTermination来等待计算完成。

// 开始计算
ssc . start ( )
// 等待计算完成
ssc . awaitTermination ( )复制代码

Spark R写入HBase

现在我们要读取HBase传感器表数据,计算每日摘要统计信息并将这些统计信息写入。

以下代码读取HBase表,传感器表,psi列数据,使用StatCounter计算此数据的统计数据,然后将统计数据写入传感器统计数据列。

// HBase的读取设置 
val conf = HBaseConfiguration . create ( )
conf . set ( TableInputFormat . INPUT_TABLE , HBaseSensorStream . tableName )
// 扫描数据
conf . set ( TableInputFormat . SCAN_COLUMNS ,  "data:psi" ) 
// 加载RDD (row key, row Result)元组
val hBaseRDD = sc . newAPIHadoopRDD ( conf , classOf [TableInputFormat ] ,
classOf [ org . apache . hadoop . hbase . io . ImmutableBytesWritable ] ,
classOf [ org . apache . hadoop . hbase . client . Result ] )
// 把(row key, row Result) 元组为RDD
val resultRDD = hBaseRDD.map(tuple => tuple._2)
// 转为 RDD (RowKey, ColumnValue), 移除Time
val keyValueRDD = resultRDD.
              map(result => (Bytes.toString(result.getRow()).
              split(" ")(0), Bytes.toDouble(result.value)))
// 分组,得到统计数据
val keyStatsRDD = keyValueRDD.
             groupByKey().
             mapValues(list => StatCounter(list))
// 转码rowkey,统计信息放入并写入hbase
keyStatsRDD.map { case (k, v) => convertToPut(k, v)}.saveAsHadoopDataset(jobConfig)复制代码

下图显示newAPIHadoopRDD的输出。PairRDDFunctions saveAsHadoopDataset将Put对象保存到HBase。

软件

运行程序

您可以将代码作为独立应用程序运行,如“MapR Sandbox上的Spark入门教程”中所述。

以下是总的步骤:

  1. 按照MapR沙箱入门Spark中的介绍,用户ID user01,密码mapr。
  2. 使用maven构建应用程序。
  3. 使用scp将jar文件和数据文件复制到沙盒主目录/ user / user01。
  4. 运行应用程序:/ opt / mapr / spark / spark- <version> / bin / spark-submit --driver-class -pathhbase classpath --class examples.HBaseSensorStream sparkstreamhbaseapp-1.0.jar
  5. 将流式数据文件复制到流目录中:cp sensordata.csv /user/user01/stream/
  6. 读取数据并计算一列的数据/ opt / mapr / spark / spark- <version> / bin / spark-submit --driver-class -path hbase classpath - --class examples.HBaseReadWrite sparkstreamhbaseapp-1.0.jar
  7. 计算整行的统计信息/ opt / mapr / spark / spark- <version> / bin / spark-submit --driver-class -path hbase classpath - --class examples.HBaseReadRowWriteStats sparkstreamhbaseapp-1.0.jar
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Spark Streaming入门 的相关文章

  • 在 Spark 2.3.0 的结构化流中禁用 _spark_metadata

    我的结构化流应用程序正在写入镶木地板 我想摆脱它创建的 spark metadata 文件夹 我使用了下面的属性 看起来不错 conf spark hadoop parquet enable summary metadata false 当
  • 如何在启动Spark Streaming进程时加载历史数据,并计算运行聚合

    我的 ElasticSearch 集群中有一些与销售相关的 JSON 数据 我想使用 Spark Streaming 使用 Spark 1 4 1 通过 Kafka 动态聚合来自我的电子商务网站的传入销售事件 以获得用户总金额的当前视图销售
  • 使用 Spark 从 Azure Blob 读取数据

    我在通过 Spark Streaming 从 azure blob 读取数据时遇到问题 JavaDStream
  • Apache Zeppelin 0.6.1:运行 Spark 2.0 Twitter Stream 应用程序

    我有一个安装了 Spark 2 0 和 Zeppelin 0 6 1 的集群 自从上课以来TwitterUtils scala从 Spark 项目移至 Apache Bahir 我无法再在我的 Zeppelin 笔记本中使用 Twitter
  • Scala Spark - 处理层次结构数据表

    我有带有树结构的层次结构数据模型的数据表 例如 这是一个示例数据行 Id name parentId path depth 55 Canada null null 0 77 Ontario 55 55 1 100 Toronto 77 55
  • Spark Streaming:接收器故障后如何不重新启动接收器

    我们正在使用自定义 Spark 接收器 它从提供的 http 链接读取流数据 如果提供的http链接不正确 则接收失败 问题是spark会不断重启接收器 并且应用程序永远不会终止 问题是如果接收器失败 如何告诉 Spark 终止应用程序 这
  • Spark Streaming kafka 偏移量管理

    我一直在做 Spark Streaming 工作 通过 kafka 消费和生成数据 我使用的是directDstream 所以我必须自己管理偏移量 我们采用redis来写入和读取偏移量 现在有一个问题 当我启动我的客户端时 我的客户端需要从
  • Spark 文件流获取文件名

    我需要知道从输入目录流式传输的输入文件的文件名 下面是scala编程中的spark FileStreaming代码 object FileStreamExample def main args Array String Unit val s
  • 使用 jmxagent 将 Spark Worker/Executor 指标导出到 Prometheus

    我已按照说明进行操作here https argus sec com monitoring spark prometheus 启用指标导出到 Prometheus for Spark 为了不仅可以从作业中导出指标 还可以从主控器和工作器中导
  • Spark 序列化错误:当我将 Spark Stream 数据插入 HBase 时

    我对 Spark 如何在数据格式方面与 HBase 交互感到困惑 例如 当我在下面的代码片段中省略 ERROR 行时 它运行良好 但是添加该行后 我发现了与序列化问题相关的 任务不可序列化 的错误 如何更改代码 发生错误的原因是什么 我的代
  • 如何一起使用SparkSession和StreamingContext?

    我正在尝试从本地计算机 OSX 上的文件夹流式传输 CSV 文件 我将 SparkSession 和 StreamingContext 一起使用 如下所示 val sc SparkContext createSparkContext spa
  • Spark Scala UDP 在侦听端口上接收

    中提到的例子http spark apache org docs latest streaming programming guide html http spark apache org docs latest streaming pro
  • Spark流式批量查找数据

    我需要从 HDFS 上的文件查找 Spark 流作业中的一些数据 该数据由批处理作业每天获取一次 有没有 设计模式 为了这样的任务 如何在执行后立即重新加载内存中的数据 哈希图 每日更新 当查找数据时 如何连续服务流作业被抓取 一种可能的方
  • 如何使用scala从apache Spark中的kafka主题读取json数据

    我是新的 Spark 您能否让我知道如何使用 scala 从 apache Spark 中的 kafka 主题读取 json 数据 Thanks 最简单的方法是使用 Spark 附带的 DataFrame 抽象 val sqlContext
  • 结合 Spark Streaming + MLlib

    我尝试使用随机森林模型来预测示例流 但似乎我无法使用该模型对示例进行分类 这是pyspark中使用的代码 sc SparkContext appName App model RandomForest trainClassifier trai
  • 如何从迭代器创建 Spark RDD?

    为了说清楚 我不是从像这样的数组 列表中寻找RDD List
  • 与查找数据集连接后进行多列值查找

    我正在使用spark sql 2 4 1v如何根据列的值进行各种连接 我需要获得多个查找值map val给定值列的列 如下所示 样本数据 val data List 20 score school 2018 03 31 14 12 21 s
  • 地图功能中的条件

    Scala 有没有类似的东西 condition first expression second expression 我可以在scala中的map函数中使用它吗 我希望能够写出这样的东西 val statuses tweets map s
  • DStream 在一个批处理间隔内生成多少个 RDD?

    是否生成一批间隔的数据一个且唯一一个DStream中的RDD不管数据量有多大 是的 每个批次间隔恰好有一个 RDD 在每个批次间隔生成 与记录数量无关 包含在 RDD 中 内部可能有零条记录 如果没有 并且 RDD 创建以元素数量为条件 则
  • 纱线堆的使用量随着时间的推移而增长

    我们在 AWS EMR 上运行 Spark Streaming 作业 该作业将稳定运行 10 到 14 小时 然后崩溃 并且 stderr stdout 或 Cloudwatch 日志中没有明显错误 在此崩溃之后 任何重新启动作业的尝试都将

随机推荐

  • 使用BP神经网络和Elman Net预测航班价格(Matlab代码实现)

    欢迎来到本博客 博主优势 博客内容尽量做到思维缜密 逻辑清晰 为了方便读者 座右铭 行百里者 半于九十 本文目录如下 目录 1 概述 2 运行结果 3 参考文献 4 Matlab代码实现 1 概述 BP神经网络模型是目前应用最为广泛神经网络
  • Unity 自定义鼠标样式

    using System Collections using System Collections Generic using Framework using UnityEngine
  • BCryptPasswordEncoder的matches方法返回false

    用BCryptPasswordEncoder 做加密 在判断时要用该对象的matches方法 第一个参数为明文 第二个参数才是密文 public static void main String args BCryptPasswordEnco
  • 欧启标O老师STM32课程笔记(二)——蜂鸣器发声

    1 硬件原理 下图为蜂鸣器的电路图 分析这个电路 想让蜂鸣器发声 则需要有电流流过蜂鸣器 当BEEP 即PB8 为高电平时 三极管导通 电流流过蜂鸣器 蜂鸣器发声 反之 当BEEP 即PB8 为低电平时 三极管截止 蜂鸣器不发声 所以控制蜂
  • Rxjava参考文档

    https mcxiaoke gitbooks io rxdocs content operators Timer html
  • lua动态链接库(luaopen_*函数的使用)

    lua中使用c动态库 像luacjson 支持unicode luasocket 都是以动态链接库的形式在lua中使用的 至于怎么写这些动态链接库很少有教程说到 下面我就说说如何把c文件编译成动态库 首先 假设需要在lua中调用一个在c中实
  • 第二章 组合逻辑设计

    第二章 组合逻辑设计 1 卡诺图化简 1 1 必须是偶数项化简 因为卡诺图只有相邻格可以消除一个项 1 2 积之和形式的化简 消除相邻的1项 得到f 1 3 和之积形式的化简 消除相邻的0项 得到f 的积之和形式 然后再通过狄摩根定律转换成
  • python 按照行取平均值补齐缺失数据

    import pandas as pd 根据行来求平均值 def fill NAN filePath r E study python 0819 filled meter 500 csv df0 pd read csv filePath e
  • 线程池参数配置与Linux CPU

    1 线程池核心线程数配置 1 核心线程计算 计算密集型 cpu的个数 1 IO密集型 2 cpu个数 1 2 linux查看cpu核数 查看物理CPU个数 cat proc cpuinfo grep physical id sort uni
  • Educoder--Java高级特性 - 多线程基础(2)常用函数

    注意啦 这期的复制又恢复正常了 第一题 有三种原因可以导致线程不能运行 它们是 A 等待 B 阻塞 C 休眠 D 挂起及由于I O操作而阻塞 Java语言中提供了一个 线程 自动回收动态分配的内存 A 异步 B 消费者 C 守护 D 垃圾收
  • 字符串截取大全

    C 几个经常用到的字符串的截取 string str 123abc456 int i 3 1 取字符串的前i个字符 str str Substring 0 i or str str Remove i str Length i 2 去掉字符串
  • [C语言]猜数字

    本文章为c语言猜数字小游戏的教学 设计思路 目录 1 菜单的创建 2 猜数字的实现 3 完整代码 1 菜单的创建 先设计一个类似菜单的界面 供选择来实现游戏或退出 void menu printf n printf 猜数字 n printf
  • iTween基础之Look(使对象面朝指定位置)

    一 基础介绍 二 基础属性 原文地址 http blog csdn net dingkun520wy article details 50578142 一 基础介绍
  • 【Java基础】关于语言:编译型与解释型

    编译型 一 定义 使用专门的编译器 针对特定的平台 将高级语言源代码一次性的编译成可被该平台硬件执行的机器码 并包装成该平台所能识别的可执行性程序的格式 二 特点 程序执行前需要专门的一个编译过程 将源代码编译成机器语言 如 exe后缀的文
  • Python调用sort函数自定义排序函数

    当有一组比较复杂的对象需要进行排序时 我们的第一想法就是尽量利用已有的函数快速完成自己的排序需求 接触过c 的码友应该知道 在c 里若想利用已有的sort函数来完成复杂的排序 对运算符进行重载即可 好久没接触过c 里的概念 应该是这个叫法
  • java项目连接mysql卡死_我的Java连接数据库之后就卡住了 不能输出结果 到底是为什么呢?程序的功能是要完成一个地址的最大正向查...

    以下是基本的函数实现最重要的是第四个要实现街道的最大正向查找 ResultSetrs s executeQuery sql if rs next Stringpostaddress rs getString 1 Joutput setTex
  • 用Python开始机器学习(5:文本特征抽取与向量化)

    假设我们刚看完诺兰的大片 星际穿越 设想如何让机器来自动分析各位观众对电影的评价到底是 赞 positive 还是 踩 negative 呢 这类问题就属于情感分析问题 这类问题处理的第一步 就是将文本转换为特征 因此 这章我们只学习第一步
  • Hyperledger Fabric 应用实战(3)--配置文件core.yaml

    1 简介 core yaml主要是为Peer服务提供一些配置定义 当Peer节点启动时 会先从命令行获取参数 环境变量 和core yaml读取配置信息 通过docker去搭建一个Peer服务 命令行默认是不带参数的 主要通过docker
  • 考研 打赢这场信息站 上岸

    23考研已经结束了 新的考研马上开始 考研 是一个 持久战 也是一个 信息战 因为大多数同学都是自我监督 自我学习 所以经常出现想要摆烂 放弃的念头 意念坚定的同学只占一小部分 这个时候我们就要先在考研之前进行问题分析 这是一个非常非常重要
  • Spark Streaming入门

    什么是Spark Streaming 首先 什么是流 streaming 数据流是连续到达的无穷序列 流处理将不断流动的输入数据分成独立的单元进行处理 流处理是对流数据的低延迟处理和分析 Spark Streaming是Spark API核