Spark Streaming(组件、updateStateByKey、Windows)总结

2023-11-01

1. SparkStreaming 是什么

  • 它是一个可扩展高吞吐具有容错性流式计算框架

    吞吐量:单位时间内成功传输数据的数量

  • 之前我们接触的spark-core和spark-sql都是处理属于离线批处理任务,数据一般都是在固定位置上,通常我们写好一个脚本,每天定时去处理数据,计算,保存数据结果。这类任务通常是T+1(一天一个任务),对实时性要求不高。

在这里插入图片描述

  • 但在企业中存在很多实时性处理的需求,例如:双十一的京东阿里,通常会做一个实时的数据大屏,显示实时订单。这种情况下,对数据实时性要求较高,仅仅能够容忍到延迟1分钟或几秒钟。
    在这里插入图片描述

2. 实时计算框架对比

  • Storm

    • 流式计算框架,来一条处理一条

    • 以record为单位处理数据,支持micro-batch方式(Trident)

    • 对python不友好

  • flink

    • 流式计算框架,来一条处理一条
    • 比spark streaing速度快
  • Spark

    • 批处理计算框架,间隔一段时间,获取一次数据
    • 以RDD为单位处理数据,支持micro-batch流式处理数据(Spark Streaming
    • 实时性稍差,但是能处理的数据量更大
    • pyspark
  • 对比:

    • 吞吐量:Spark Streaming优于Storm
    • 延迟:Spark Streaming差于Storm

3. Spark Streaming组件

  • Streaming Context
    • 流上下文 通过Streaming Context 可以连接数据源获取数据
    • 通过spark context 可以获取到streaming context
    • 在创建Streaming Context 需要指定一个时间间隔(micro batch)
    • Streaming Context调用了stop方法之后,就不能再次调 start(),需要重新创建一个Streaming Context
    • 一个SparkContext创建一个Streaming Context
    • streaming Context上调用Stop方法,默认会把spark context也关掉
    • 如果只想仅关闭Streaming Context对象,设置stop()的可选参数为false
    • 对DStream中数据处理的逻辑要写在Streaming Context开启之前 一旦Streaming Context调用了start方法 就不能再添加新的数据处理逻辑
  • DStream(离散流)
    • Streaming Context 连接到不同的数据源获取到的数据 抽象成DStream模型
    • 代表一个连续的数据流
    • 一系列连续的RDD组成
    • 任何对DStreams的操作都转换成了对DStreams隐含的RDD的操作
    • 数据源
      • 基本源
        • TCP/IP Socket
        • FileSystem
      • 高级源
        • Kafka
        • Flume

4. Spark Streaming 编码实战(无状态)

4.1 Spark Streaming编码步骤:

  1. 创建一个StreamingContext
  2. 从StreamingContext中创建一个数据对象
  3. 对数据对象进行Transformations操作
  4. 输出结果
  5. 开始和停止

4.2 利用Spark Streaming实现WordCount

需求:监听某个端口上的网络数据,实时统计出现的不同单词个数。

1,需要安装一个nc工具:sudo yum install -y nc

2,执行指令:nc -lk 9999 -v

import os
# 配置spark driver和pyspark运行时,所使用的python解释器路径
PYSPARK_PYTHON = "/home/hadoop/miniconda3/envs/datapy365spark23/bin/python"
JAVA_HOME='/home/hadoop/app/jdk1.8.0_191'
SPARK_HOME = "/home/hadoop/app/spark-2.3.0-bin-2.6.0-cdh5.7.0"

os.environ["PYSPARK_PYTHON"] = PYSPARK_PYTHON
os.environ["PYSPARK_DRIVER_PYTHON"] = PYSPARK_PYTHON
os.environ['JAVA_HOME']=JAVA_HOME
os.environ["SPARK_HOME"] = SPARK_HOME

from pyspark import SparkContext
from pyspark.streaming import StreamingContext

if __name__ == "__main__":
    sc = SparkContext("local[2]",appName="NetworkWordCount")
    #参数2:指定执行计算的时间间隔
    ssc = StreamingContext(sc, 1)
    #监听ip,端口上的上的数据
    lines = ssc.socketTextStream('localhost',9999)
    #将数据按空格进行拆分为多个单词
    words = lines.flatMap(lambda line:line.split(' '))
    #将单词转换为(单词,1)的形式
    pairs = words.map(lambda word:(word,1))
    #统计单词个数
    wordCounts = pairs.reduceByKey(lambda x,y:x+y)
    #打印结果信息,会使得前面的transformation操作执行 类似于action
    wordCounts.pprint()
    #启动StreamingContext
    ssc.start()
    #等待计算结束 这里在jupyter notebook交互式环境中才需要加
    ssc.awaitTermination()

可视化查看效果: 主机地址:4040 点击streaming,查看效果

5. Spark Streaming的状态操作

  • Spark Streaming实现的是一个实时批处理操作,每隔一段时间将数据进行打包,封装成RDD,是无状态的。

    • 无状态:指的是每个时间片段的数据之间是没有关联的。
  • 需求:想要将一个大时间段(1天),即多个小时间段的数据内的数据持续进行累积操作,一般超过一天都是用RDD或Spark SQL来进行离线批处理

  • 在Spark Streaming中存在两种状态操作

    • UpdateStateByKey
    • Windows操作
  • 使用有状态的transformation,需要开启Checkpoint

    • spark streaming 的容错机制
    • 它将足够多的信息checkpoint到某些具备容错性的存储系统如hdfs上,以便出错时能够迅速恢复

5.1 updateStateByKey

步骤

  • 首先,要定义一个state,可以是任意的数据类型
  • 其次,要定义state更新函数–指定一个函数如何使用之前的state和新值来更新state
  • 对于每个batch,Spark都会为每个之前已经存在的key去应用一次state更新函数,无论这个key在batch中是否有新的数据。如果state更新函数返回none,那么key对应的state就会被删除
  • 对于每个新出现的key,也会执行state更新函数

5.2 案例:updateStateByKey

需求:监听网络端口的数据,获取到每个批次的出现的单词数量,并且需要把每个批次的信息保留下来

代码

import os
# 配置spark driver和pyspark运行时,所使用的python解释器路径
PYSPARK_PYTHON = "/home/hadoop/miniconda3/envs/datapy365spark23/bin/python"
JAVA_HOME='/home/hadoop/app/jdk1.8.0_191'
SPARK_HOME = "/home/hadoop/app/spark-2.3.0-bin-2.6.0-cdh5.7.0"
os.environ["PYSPARK_PYTHON"] = PYSPARK_PYTHON
os.environ["PYSPARK_DRIVER_PYTHON"] = PYSPARK_PYTHON
os.environ['JAVA_HOME']=JAVA_HOME
os.environ["SPARK_HOME"] = SPARK_HOME
from pyspark.streaming import StreamingContext
from pyspark.sql.session import SparkSession

# 创建SparkContext
spark = SparkSession.builder.master("local[2]").getOrCreate()
sc = spark.sparkContext
ssc = StreamingContext(sc, 3)
#开启检查点
ssc.checkpoint("checkpoint")

#定义state更新函数
def updateFunc(new_values, last_sum):
    return sum(new_values) + (last_sum or 0)

lines = ssc.socketTextStream("localhost", 9999)
# 对数据以空格进行拆分,分为多个单词
counts = lines.flatMap(lambda line: line.split(" ")) \
    .map(lambda word: (word, 1)) \
    .updateStateByKey(updateFunc=updateFunc)#应用updateStateByKey函数
    
counts.pprint()

ssc.start()
ssc.awaitTermination()

5.3 Windows

  • 窗口长度L:运算的数据量
  • 滑动间隔G:控制每隔多长时间做一次运算

每隔G秒,统计最近L秒的数据

在这里插入图片描述

操作细节

  • Window操作是基于窗口长度和滑动间隔来工作的
  • 窗口的长度控制考虑前几批次数据量
  • 默认为批处理的滑动间隔来确定计算结果的频率
  • 在这里插入图片描述

典型案例:热点搜索词滑动统计,每隔10秒,统计最近60秒钟的搜索词的搜索频次,并打印出最靠前的3个搜索词出现次数。

在这里插入图片描述

5.4 案例 windows

监听网络端口的数据,每隔3秒统计前6秒出现的单词数量

import os
# 配置spark driver和pyspark运行时,所使用的python解释器路径
PYSPARK_PYTHON = "/home/hadoop/miniconda3/envs/datapy365spark23/bin/python"
JAVA_HOME='/home/hadoop/app/jdk1.8.0_191'
SPARK_HOME = "/home/hadoop/app/spark-2.3.0-bin-2.6.0-cdh5.7.0"
# 当存在多个版本时,不指定很可能会导致出错
os.environ["PYSPARK_PYTHON"] = PYSPARK_PYTHON
os.environ["PYSPARK_DRIVER_PYTHON"] = PYSPARK_PYTHON
os.environ['JAVA_HOME']=JAVA_HOME
os.environ["SPARK_HOME"] = SPARK_HOME
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.sql.session import SparkSession

def get_countryname(line):
    country_name = line.strip()
    if country_name == 'usa':
        output = 'USA'
    elif country_name == 'ind':
        output = 'India'
    elif country_name == 'aus':
        output = 'Australia'
    else:
        output = 'Unknown'

    return (output, 1)

if __name__ == "__main__":
	#定义处理的时间间隔
    batch_interval = 10 # base time unit (in seconds)
    #定义窗口长度
    window_length = 6 * batch_interval
    #定义滑动时间间隔
    frequency = 1 * batch_interval

    #获取StreamingContext
    spark = SparkSession.builder.master("local[2]").getOrCreate()
		sc = spark.sparkContext
		ssc = StreamingContext(sc, batch_interval)
    
    #需要设置检查点
    ssc.checkpoint("checkpoint")

    lines = ssc.socketTextStream('localhost', 9999)
    addFunc = lambda x, y: x + y
    invAddFunc = lambda x, y: x - y
    #调用reduceByKeyAndWindow,来进行窗口函数的调用
    window_counts = lines.map(get_countryname) \
        .reduceByKeyAndWindow(addFunc, invAddFunc, window_length, frequency)
	#输出处理结果信息
    window_counts.pprint()

    ssc.start()
    ssc.awaitTermination()
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Spark Streaming(组件、updateStateByKey、Windows)总结 的相关文章

随机推荐

  • Python3.0 基础系列教程(目录)

    准备写一篇python的系列教程 目录暂定如下 如果有更好的建议 麻烦下方留言 如无意外 大约一周2 3篇 敬请期待 环境安装篇 1 下载并安装Python3 0 2 第一个python程序 3 安装集成开发环境ide 基础知识篇 基本数据
  • go:chan分为阻塞和非阻塞

    一句话总结 ch make chan int 由于没有缓冲发送和接收需要同步 ch make chan int 2 有缓冲不要求发送和接收操作同步 1 无缓冲时 发送阻塞直到数据被接收 接收阻塞直到读到数据 package main imp
  • 【华为OD机试真题】单核CPU任务调度

    单核CPU任务调度 考察的知识的点就一个优先队列 队列排序 题目描述 现在有一个CPU和一些任务需要处理 已提前获知每个任务的任务D 优先级 所需执行时间和到达时间 CPU同时只能运行一个任务 请编写一个任务调度程序 采用 可抢占优先权调度
  • Scala 的安装教程

    Scala 语言可以运行在Window Linux Unix Mac OS X等系统上 Scala是基于java之上 大量使用java的类库和变量 使用 Scala 之前必须先安装 Java gt 1 5版本 Mac OS X 和 Linu
  • git 提交新的工程

    git cmd exe 环境 windows git 提交新的工程 查看版本号 E software Git gt git version git version 2 15 1 windows 2 添加用户配置 E software Git
  • Error: EBUSY: resource busy or locked, lstat ‘D:\DumpStack.log.tmp‘

    问题 vue项目启动成功后报错 Error EBUSY resource busy or locked lstat D DumpStack log tmp 解决 1 npm cache clean force 2 npm install
  • 关于Apache/Tomcat/JBOSS/Nginx/lighttpd/Jetty等一些常见服务器的区别比较和理解

    今天是个很丰富的日子 早上一上班 第一个听到的惊爆消息就是楷子得了肠胃炎 一大早去医院挂水了 随后风胜和笑虎也没来 后来得知他们俩去去华星现代产业园参加培训 内容是关于Apache与Nginx的 于是乎 我非常感兴趣地查了一下培训用的PPT
  • C#按钮事件中有循环,用另一个按钮控制停止,暂停,继续程序执行

    首先在窗体上有 lable1 运行显示 button1 开始 button2 暂停和继续 button3 停止 窗体上还放Timer控件timer1 代码实现如下 using System using System Collections
  • vue+elementUI table表格嵌套表单,功能包含联动下拉框、动态增加行

    一 需求说明 vue elementUI table表格里嵌套表单 支持动态增加一行和删除一行 含checkbox复选框 联动下拉框 不同的活动名称 所对应的活动选项下拉框的值不同 针对不同的选项 值的表现形式也要发生对应的变化 如 日期形
  • An attempt was made to call a method that does not exist. The attempt was made from the following lo

    APPLICATION FAILED TO START Description An attempt was made to call a method that does not exist The attempt was made fr
  • springboot学习(三)——使用HttpMessageConverter进行http序列化和反序列化

    以下内容 如有问题 烦请指出 谢谢 对象的序列化 反序列化大家应该都比较熟悉 序列化就是将object转化为可以传输的二进制 反序列化就是将二进制转化为程序内部的对象 序列化 反序列化主要体现在程序I O这个过程中 包括网络I O和磁盘I
  • 链塔智库

    链塔智库整理最近一周内区块链相关政策 业内动态 人物观点 为大家梳理呈现各个领域的最新发展 目录 一 各地政策要闻 四川 探索建立基于区块链技术的数字资产交易平台 首个区块链领域国家标准在成都举行首场征求意见会 重庆出台优化工业园区规划建设
  • 云计算概念详解

    1 云计算的定义 1 云计算是一种能够通过网络以便利的按需的方式获取云计算资源 网络 服务器 存储 应用和服务 的模式 2 这些资源来自一个共享的 可配置的资源池 并能够快速获取和释放 提供资源的网络称为云 3 云模式能够提高可用性 4 云
  • IDEA下载与安装,保姆级教程

    这里写自定义目录标题 1 搜索idea 2 选择官方网站 3 官网进入下载页面 4 版本选择问题 5 Ultimate和Community对比 6 下载 7 安装 1 搜索idea 2 选择官方网站 以前idea的官网后面有官网俩字 现在没
  • Open NMT-py 玩具模型使用说明

    前排提示 本文仅适合纯萌新玩家 算是官方指南的补档 大佬请直接关闭网页 避免浪费时间 截至到2023 3 15 最新的OpenNMT py环境要求 Python gt 3 7 PyTorch gt 1 9 0 如果是老版本的OpenNMT
  • https网络编程——使用openssl库自建根证书

    参考 如何自建根证书 使用openssl库自建根证书带图详解 地址 https qingmu blog csdn net article details 108217572 spm 1001 2014 3001 5502 目录 根证书的普通
  • spring boot最新教程(三):Spring Boot整合JdbcTemplate以及事务管理

    一 JdbcTemplate的使用 Spring对数据库的操作在jdbc上面做了深层次的封装 使用spring的注入功能 可以把DataSource注册到JdbcTemplate之中 JdbcTemplate 是在JDBC API基础上提供
  • python:日期时间处理

    目录 一 time模块 二 秒转换为时分秒 三 计算前后几天的日期 一 time模块 1 time strftime format t 格式 说明 a 本地 locale 简化星期名称 A 本地完整星期名称 b 本地简化月份名称 B 本地完
  • 【STM32Cube】学习笔记(六):DHT11温湿度传感器

    文章目录 摘要 一 简介 1 DHT11数字温湿度传感器 2 DHT11性能参数 2 DHT11数据结构 2 DHT11传输时序 二 硬件电路设计 1 模块内部电路 2 与单片机相连接电路 三 软件设计 1 CubeMX配置 2 CubeI
  • Spark Streaming(组件、updateStateByKey、Windows)总结

    Spark Streaming 1 SparkStreaming 是什么 2 实时计算框架对比 3 Spark Streaming组件 4 Spark Streaming 编码实战 无状态 4 1 Spark Streaming编码步骤 4