SparkStreaming知识总结

2023-11-15

一、流式计算的概述

1.1 什么是流式计算

1. 数据流与静态数据的区别
	-- 数据流指的就是不断产生的数据,是源源不断,不会停止。
	-- 静态数据指的就是存储在磁盘中的固定的数据
2. 流式计算的概念
      就是对数据流进行计算,由于数据是炼苗不断的产生的,所以这个计算也是一直再计算,不会停止。
3. 流式计算的数据流有什么特点:
   - 数据是无界的(unbounded)
   - 数据是动态的
   - 计算速度是非常快的(是不断计算的,每次计算都是微小的批量数据,因此速度快,而且还是基于内存的)
   - 计算不止一次 
   - 计算不能终止
4. 离线计算的特点:
	- 数据是有界的(unbounded)
	- 数据是静态的
	- 计算速度通常较慢   
	- 计算只执行一次
	- 计算终会终止

1.2 常见的离线和流式计算框架

1. 离线计算框架
	-- mapreduce
	-- hive
	-- sparkcore
	-- sparksql
	-- flink-dataset
2. 流式计算框架
   -- storm
   -- sparkStreaming
   -- flink-datastream(blink)

1.3 SparkStreaming简介

1.3.1 简介

1. SparkStreaming也是Spark生态栈中的一个重要模块,是一个流式计算框架
2. SparkStreaming属于准实时计算框架
3. SparkStreaming是SparkCore的api的一种扩展,使用DStream(离散流)作为数据模型。 本质就是一个时间序列上的RDD。

DStream,本质上是RDD的序列。SparkStreaming的处理流程可以归纳为下图:
在这里插入图片描述

流式计算框架从延迟的角度来分类:

1. 纯实时流式计算: 毫秒级别的延迟,或者没有延迟的计算。
2. 准实时流式计算: 亚秒级别,秒级别,分钟级别的计算

流式计算框架从处理的记录条数来分类

1. 纯实时流式计算: 来一条记录,就计算一条记录。
2. 准实时流式计算: 微小的批处理,还是多条记录一起计算。

1.3.2 原理

DStream数据流模型

1. SparkStreaming 会实时的接受输入的数据
2. SparkStreaming 会按照固定长度的时间段将源源不断进来的数据划分成batch
3. SparkStreming 会每一个batch进行一次计算,计算是不停止的
4. 每次的计算结果也是一个batch,因此结果集就是多个batch的构成
5. SparkStreaming,将数据流抽象成DStream.  称之为离散流的数据模型。本质就是一个时间序列上的RDD。
6. 在整个数据流作业中,会有多个DStream。


参考下图:    rdd1 就是一个时间序列上的 DStream
				rdd2 就是一个时间序列上的 DStream
				rdd3 就是一个时间序列上的 DStream
				rdd4 就是一个时间序列上的 DStream
             

      8:00:00      hello world  hello java hello c++
      
      rdd1 = sc.textFile("....")
      rdd2 = rdd1.flatMap(_.split(" "))
      rdd3 = rdd2.map((_,1))
      rdd4 = rdd3.reduceByKey(_+_)
      
      针对于rdd1来说:
             8:00:00      hello world  hello java hello c++
             8:00:10      no zuo no die
             8:00:20      you are best
             8:00:30:     hello you are best
      针对于rdd2来说:
             8:00:00      [hello, world,hello,java,hello,c++]
             8:00:10      [no,zuo,no,die]
             8:00:20      [you,are,best]
             8:00:30:     [hello,you,are,best]
      针对于rdd3来说:
             8:00:00      (hello,1), (world,1),(hello,1),(java,1),(hello,1),(c++,1)
             8:00:10      (no,1),(zuo,1),(no,1),(die,1)
             8:00:20      (you,1),(are,1),(best,1)
             8:00:30:     (hello,1),(you,1),(are,1),(best,1)

参考下图: 一个DStream是由不同时间段上的同一个RDD构成的
在这里插入图片描述

参考下图:如果算子的返回值是DStream,则不管是哪一个时间段上的数据,只要调用了同一个算子,则返回的都同一个DStream
在这里插入图片描述

1.3.3 Storm VS SparkStreaming VS Flink

在这里插入图片描述

1.4 怎样选择流式处理框架

何时选择storm
	--需要纯实时,不能忍受1秒以上延迟的场景
	--实时计算的功能中,要求可靠的事务机制和可靠性机制,即数据的处理完全精准,一条也不能多,一条也不能少
   --针对高峰低峰时间段,动态调整实时计算程序的并行度,以最大限度利用集群资源(通常是在小型公司,集群资源紧张的情况)
何时选择Spark Streaming	
	--不满足上述3点要求的话,我们可以考虑使用Spark Streaming来进行实时计算
	--如果一个项目除了实时计算之外,还包括了离线批处理、交互式查询、图计算和MLIB机器学习等业务功能,而且实时计算中,
	   可能还会牵扯到高延迟批处理、交互式查询等功能,,那么就应该首选Spark生态,用Spark Core开发离线批处理,
	   用Spark SQL开发交互式查询,用Spark Streaming开发实时计算,三者可以无缝整合,给系统提供非常高的可扩展性。
何时选择Flink
	支持高吞吐、低延迟、高性能的流处理
	支持带有事件时间的窗口(Window)操作
	支持有状态计算的Exactly-once语义
	支持高度灵活的窗口(Window)操作,支持基于time、count、session,以及data-driven的窗口操作
	支持具有Backpressure功能的持续流模型
	支持基于轻量级分布式快照(Snapshot)实现的容错
	一个运行时同时支持Batch on Streaming处理和Streaming处理
	Flink在JVM内部实现了自己的内存管理
	支持迭代计算
	支持程序自动优化:避免特定情况下Shuffle、排序等昂贵操作,中间结果有必要进行缓存

二、SparkStreaming的入门编程

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>redis</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <!-- sparkstreaming的核心包 -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>2.2.3</version>
        </dependency>
        <!-- sparkstreaming与kafka的整合包 -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
            <version>2.2.3</version>
        </dependency>
         <!-- redis的整合包 -->
        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
            <version>3.0.0</version>
        </dependency>
          <!-- sparksql的核心包 -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>2.2.3</version>
        </dependency>
    </dependencies>
</project>

2.1 wordcount案例演示

package com.qf.sparkstreaming.day01

import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
 *  sparkCore的入门api:    SparkContext
 *  sparkSql的入门:        SparkSession
 *  sparkStreaming的入门API: StreamingContext
 *
 *
 *  注意:
 *     1. 要先使用nc指令 开启qianfeng01和10086端口,否则sparkStreaming会提前报错
 *          在qianfeng01上运行指令: nc -lp 10086
 *                          -l  表示监听
 *                          -p  表示端口
 */
object Streaming_01_WordCount {
    def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("wordcount")

        /**
         * 构造器:StreamingContext(conf:SparkConf, batchDuration:Duration)
         * 第一个参数:配置对象
         * 第二个参数:用于指定SparkStreaming的流式计算的batch的时间间隔,即时间片段
         *           Durations.milliseconds(milliseconds: Long)    毫秒级别
         *           Durations.seconds(seconds: Long)      秒级别
         *           Durations.minutes(minutes: Long)      分钟级别
         *           Milliseconds(milliseconds: Long)    毫秒级别
         *           Seconds(seconds: Long)   秒级别
         *           Minutes(minutes: Long)  分钟级别
         */
        val context = new StreamingContext(conf, Seconds(10))

        /**
         * 利用TCP协议的套接字,实时的监听一个端口,如果有数据,就采集,并计算。
         * socketTextStream(hostname: String,port: Int,......)
         * T:  泛型
         * hostname: 要监听的主机名
         * port:要监听的端口号
         *
         */
        val dStream: ReceiverInputDStream[String] = context.socketTextStream("qianfeng01", 10086)
        // 打印数据流中的数据,默认打印10条记录
        //dStream.print()

        // 按照空格切分成各个单词,  返回的是一个新的DStream
        val wordDStream: DStream[String] = dStream.flatMap(_.split(" "))

        //构建成元组,返回一个新的DStream
        val wordAndOneDStream: DStream[(String, Int)] = wordDStream.map((_, 1))

        //进行统计每个单词的数量,返回的是一个新的DStream
        val wordCountDStream: DStream[(String, Int)] = wordAndOneDStream.reduceByKey(_ + _)

        //打印,默认打印10条
        wordCountDStream.print()



        //启动程序
        context.start()

        /**
         * 因为main方法一旦结束,整个程序就结束,因此需要让main方法处于等待状态
         */
        context.awaitTermination()

    }
}

2.2 从内存中的Queue中获取数据

package com.qf.sparkstreaming.day01

import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.{Durations, StreamingContext}

import scala.collection.mutable

/**
 * 从内存中的Queue中获取数据
 */
object Streaming_02_FromQueue {
    def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("FromQueue")
        val ssc:StreamingContext = new StreamingContext(conf,Durations.seconds(10))

        /**
         * queueStream[T: ClassTag]( queue: Queue[RDD[T]],oneAtATime: Boolean = true)
         * 从一个RDD队列中获取一个或多个RDD数据,进行处理。
         * queue:RDD队列
         * oneAtATime: 是否一次处理一个RDD,默认值是true。   false表示队列中有多少,就一次性处理多少。 注意:从队列中获取数据时,队列中就没有该数据了。
         */
        val queue = new mutable.Queue[RDD[Int]]()
        val dStream: InputDStream[Int] = ssc.queueStream(queue,true)

        //直接打印,默认打印10行
        dStream.print()

        //开启数据流作业
        ssc.start()

        /**
         * 利用main线程,向队列中源源不断的添加RDD。
         */
        val rdd: RDD[Int] = ssc.sparkContext.makeRDD(List(1, 2, 3, 4, 5, 6, 7, 8))
        for(i<- 1 to 300){
            queue.enqueue(rdd)//将rdd填入队列中
            Thread.sleep(1000)
            // println(queue.size)  //如果将oneAtATime改为false,则可证明队列中的数据每10秒都会被清空。
        }

        // 该方法的作用就是阻塞main方法,不让其结束。因为main方法已结束,就会停止数据流作业
        ssc.awaitTermination()
    }
}

2.3 自定义接收器

package com.qf.sparkstreaming.day01

import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.receiver.Receiver
import org.apache.spark.streaming.{Durations, StreamingContext}

object Streaming_03_CustomReceiver {
    def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("FromQueue")
        val ssc:StreamingContext = new StreamingContext(conf,Durations.seconds(10))

        //从采集器中获取DStream

        val dStream: ReceiverInputDStream[String] = ssc.receiverStream(new MyReceiver())
        dStream.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).print(20)

        ssc.start()
        ssc.awaitTermination()
    }

    /**
     * 自定义一个采集器:
     * 1. 继承采集器抽象类Receiver,   指定泛型,指定存储级别
     * 2. 重写两个抽象方法
     */
    class MyReceiver extends Receiver[String](StorageLevel.MEMORY_ONLY){
        var flag = true
        /**
         * 开启采集数据的方法,此方法是框架主动调用
         */
        override def onStart(): Unit = {
            var t = new Thread(){
                override def run(){
                   while(flag){
                       val list = List("hello world hello java hello java","hello world welcome to china hello","hello world hello java c++")
                       val element: String = list(math.floor(math.random * 3).toInt)
                       //利用采集器的存储方法,存储数据
                       store(element)

                       Thread.sleep(200)  //让产生的速度降低,否则容易造成机器的cpu压力过大
                   }
                }
            }
            //开启线程
            t.start()
        }

        /**
         * 到采集的结束时间,就会调用该方法,终止采集数据
         */
        override def onStop(): Unit = {
            flag = false
        }
    }
}

2.4 读取本地文件

不能读取已经存在的文件,只能读取IO产生的新文件,只能读取一次。

package com.qf.sparkstreaming.day01

import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.receiver.Receiver
import org.apache.spark.streaming.{Durations, StreamingContext}

/**
 * 不能读取已经存在的文件,只能读取IO产生的新文件,只能读取一次。
 *
 * 必须将文件从同一文件系统中的另一个位置“移动”到被监视的目录中
 */
object Streaming_04_ReadLocalFile {
    def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("FromQueue")
        val ssc:StreamingContext = new StreamingContext(conf,Durations.seconds(10))

        /**
         * textFileStream(directory: String)
         */
        val dStream: DStream[String] = ssc.textFileStream("D:/data")
        dStream.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).print(200)

        ssc.start()
        ssc.awaitTermination()
    }
}

使用java程序,利用IO产生一个新文件

package com.qf.sparkStreaming.day01;

import java.io.*;

public class IOTest {
    public static void main(String[] args) throws Exception {
        FileOutputStream fis = new FileOutputStream("D:/data/newFile3.txt");
        PrintWriter pw = new PrintWriter(new OutputStreamWriter(fis,"utf-8"));
        String context = "hello world hello The world puts off its mask of vastness to its lover It becomes small as one song, as one kiss of the eternal " +
                "It is the tears of the earth that keep here smiles in bloom The mighty desert is burning for the love of a blade of grass who shakes her head and laughs and flies away";
        pw.println(context);
        pw.println(context);
        /**
         * 加了一个睡眠,sparkstreaming就读不到数据了,为什么?
         * 
         *  默认情况,close才会触发缓存中的数据flush到磁盘。
         *     所以睡眠时,文件名产生了,但是文件里没有数据。
         *     所以,sparkstreaming读一次的时候没有读到数据。
         */
        Thread.sleep(11000);
        pw.println(context);
        pw.close();

    }
}

2.5 读取HDFS文件

package com.qf.sparkstreaming.day01

import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Durations, StreamingContext}

/**
 *
 * 读取hdfs上的文件:  必须将文件从同一文件系统中的另一个位置“移动”到被监视的目录中
 *
 *     [root@qianfeng01 ~]# hdfs dfs -put emp.txt /input/emp1.txt
 *     [root@qianfeng01 ~]# hdfs dfs -copyFromLocal emp.txt /input/emp2.txt
 *     [root@qianfeng01 ~]# hdfs dfs -moveFromLocal emp.txt /input/emp3.txt
 */
object Streaming_05_readHDFSFile {
    def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("FromQueue")
        val ssc:StreamingContext = new StreamingContext(conf,Durations.seconds(10))

        /**
         * textFileStream(directory: String)
         */
        val dStream: DStream[String] = ssc.textFileStream("hdfs://qianfeng01:8020/input")
        dStream.flatMap(_.split(",")).map((_,1)).reduceByKey(_+_).print(200)

        ssc.start()
        ssc.awaitTermination()
    }
}

三、SparkStreaming与Kafka的整合

3.1 简介

在实际生产环境中,kafka用的比较多,用于消息缓存,而SparkStreaming是一个准实时计算框架,所以两者的结合在企业中的用的相对较多。

两者的整合有两个版本,一个是0-8(低版本),一个是0-10(新版本)

注意区别就是下面的三个SSL、Offset Commit API Dynamic Topic Subscription

在这里插入图片描述

3.2 两个版本的原理图解析

1) 0-8的原理解析图

在这里插入图片描述

2)0-10的原理解析图

在这里插入图片描述

3.3 SparkStreaming消费Kafka

package com.qf.sparkstreaming.day01

import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Durations, StreamingContext}

/**
 *  使用0-10版本的整合包里的api进行读取数据
 */
object Streaming_06_FromKafka {
    def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("FromQueue")
        val ssc:StreamingContext = new StreamingContext(conf,Durations.seconds(10))


        var params = Map[String,String](
            ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG->"qianfeng01:9092,qianfeng02:9092,qianfen03:9092",
            ConsumerConfig.GROUP_ID_CONFIG->"g1",
            ConsumerConfig.AUTO_OFFSET_RESET_CONFIG->"earliest",
            ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG->"org.apache.kafka.common.serialization.StringDeserializer",
            ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG->"org.apache.kafka.common.serialization.StringDeserializer"
        )

        /**
         * 使用0-10的整合API   KafkaUtils.createDirectStream(
         * sc:StreamingContext,    :   上下文对象
         * locationStrategy:LocationStrategy,  : 位置策略,经常使用的是LocationStrategies.PrePreferConsistent   该策略指的是spark的RDD的一个分区对应kafka的一个分区
         * consumerStrategy: ConsumerStrategy[K, V],  : 消费者策略,用于订阅主题的等
         * .....
         * )
         */
        val dStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(ssc, LocationStrategies.PreferConsistent,
            ConsumerStrategies.Subscribe[String, String](List("pet"), params))

        dStream.map(_.value()).flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).print(200)

        ssc.start()
        ssc.awaitTermination()
    }
}

3.4 维护offset到zookeeper上

package com.qf.sparkstreaming.day02

import com.qf.sparkstreaming.day01.MyKafkaUtils
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, HasOffsetRanges, KafkaUtils, LocationStrategies, OffsetRange}
import org.apache.spark.streaming.{Durations, StreamingContext}

/**
 *   1. sparkStreaming作为消费者,取消费Kafka中的某些主题的分区中的数据。
 *   2. 对于消费后的偏移量,我们维护到zookeeper上。
 *
 *   注意:消费者第一次消费数据时,zk上应该没有offset的维护
 *        第一次消费后,要讲读取到的偏移量维护到zk上,方便下次消费。
 */
object Streaming_01_offsetToZookeeper {
    def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setMaster("local[*]").setAppName("offsetToZookeeper")
        val ssc = new StreamingContext(conf,Durations.seconds(10))

        //获取消费者连接kafka的各种参数
        val params = MyKafkaUtils.getParamToMap()
        //sparkstreaming要消费的主题
        val topics = Array("pet")
        /**
         *  从zk上获取offset
         *
         *  如何在zk上维护offset?
         *      维护的znode路径如 :/kafka/offsets/groupid/topic/partition
         *      offset保存到partition的里面。
         *
         *  因此,在读取zk上维护的offset时,要指定相应的参数,比如 groupid,
         *
         */
        val offsets:Map[TopicPartition,Long] = MyZkUtils.getOffset(params.getOrElse("group.id","g1"),topics)
        print(offsets.size)
        var dStream:InputDStream[ConsumerRecord[String, String]] = null
        if(offsets.size>0) {
            //不是第一次读取Kafka上的数据,而是其他时候,比如宕机并恢复后,应该从zk上保存的offset开始读取
            dStream = KafkaUtils.createDirectStream(
                ssc,
                LocationStrategies.PreferConsistent,
                ConsumerStrategies.Subscribe[String, String](topics, params,offsets)
            )
        }else{
            //第一次读取Kafka上的数据
            dStream= KafkaUtils.createDirectStream(
                ssc,
                LocationStrategies.PreferConsistent,
                ConsumerStrategies.Subscribe[String, String](topics, params)
            )
        }

        /**
         *  消费数据的打印, 消费完数据后,要讲offset维护到zk上
         *  RDD的泛型:RDD[ConsumerRecord[String,String]]
         *      说明RDD里存储了一堆消息记录
         *
         *
         *
         *  OffsetRange :  该对象记录的是 消费的某一个分区的偏移量的范围,有以下属性
         *        val topic: String
         *        val partition: Int,
         *        val fromOffset: Long,   刚消费时的该分区的偏移量
         *        val untilOffset: Long   消费完后的偏移量+1的数字,表示下一次消费从untilOffset值开始消费
         */

        dStream.foreachRDD(rdd=>{
            rdd.foreach(println)
            //将当前数据流中的最后一条记录的offset维护到zk上
            val ranges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
            //将每个分区的untilOffset保存到zk上
            MyZkUtils.updateOffset(params.getOrElse("group.id","g1"),ranges)
        })

        ssc.start()
        ssc.awaitTermination()
    }
}

MyZkUtils的编写

package com.qf.sparkstreaming.day02

import org.apache.curator.RetryPolicy
import org.apache.curator.framework.CuratorFrameworkFactory
import org.apache.curator.retry.ExponentialBackoffRetry
import org.apache.kafka.common.TopicPartition
import org.apache.spark.streaming.kafka010.OffsetRange

import java.util

object MyZkUtils {
    /**
     * 将消费后的偏移量保存到zk上
     * @param groupid
     * @param ranges
     *
     * OffsetRange对象上的属性:topic,partition,fromOffset,untilOffset
     */
    def updateOffset(groupid: String, ranges: Array[OffsetRange]): Unit = {
        for(range <- ranges){
            val untilOffset: Long = range.untilOffset
            val partition =range.partition
            val topic = range.topic
            //保存到相应的路径里: /kafka/offsets/groudid/topic/partition
            checkPath(s"$basePath/$groupid/$topic/$partition")
            zkClient.setData().forPath(s"$basePath/$groupid/$topic/$partition",untilOffset.toString.getBytes())
        }
    }


    val zkClient = {
        //获取连接zk的客户端api
        val zkClient = CuratorFrameworkFactory
          .builder()
          .connectString("qianfeng01:2181,qianfeng02:2181,qianfeng03:2181")
          .retryPolicy(new ExponentialBackoffRetry(5000,6))
          .build()
        zkClient.start()
        zkClient
    }
    val basePath = "/kafka/offsets"

    /**
     * 检查路径是否存在,不存在就创建
     * @param path
     * @return
     */
    def checkPath(path: String) = {
        if(zkClient.checkExists().forPath(path)==null){
            //创建znode,递归创建
            zkClient.create().creatingParentsIfNeeded().forPath(path)
        }
    }

    /**
     * 获取zk上某一个消费者组下的某些主题的分区里的offset
     *
     * @param groupid     消费者组
     * @param topics      消费的主题集合
     * @return
     *
     *
     *  维护的znode路径如 :/kafka/offsets/groupid/topic/partition
     */
    def getOffset(groupid: String, topics: Array[String]): Map[TopicPartition, Long] = {
        //创建一个map对象,用于存储每个分区和相应的偏移量
        var offsets = Map[TopicPartition, Long]()

        //遍历每一个主题
        for(topic <- topics){
            val path = s"$basePath/$groupid/$topic"
            checkPath(path) //执行完这一步,路径一定存在
            //获取主题znode下的所有分区znode
            val partitionsZnodes: util.List[String] = zkClient.getChildren.forPath(path)
            //遍历每一个分区,  注意:如果主题下没有分区,表示还没有维护offset到zk上,循环进不去
            import  scala.collection.JavaConversions._
            for(partitionZnode <- partitionsZnodes){
                //获取partition里存储的offset
                val bytes: Array[Byte] = zkClient.getData.forPath(s"$path/$partitionZnode")
                val offset = new String(bytes).toLong
                offsets += (new TopicPartition(topic,partitionZnode.toInt)->offset)
            }
        }

        offsets
    }


    def main(args: Array[String]): Unit = {
        zkClient.create().forPath("/names")
    }
}

3.5 维护offset到redis上

package com.qf.sparkstreaming.day02

import com.qf.sparkstreaming.day01.MyKafkaUtils
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, HasOffsetRanges, KafkaUtils, LocationStrategies, OffsetRange}
import org.apache.spark.streaming.{Durations, StreamingContext}

/**
 * 将sparkStreaming作为消费者消费消息的offset保存到redis中
 *     思考:使用redis的哪一种类型来保存offset
 *           使用hash类型来保存这些数据:
 *              key            field         字段值
 *              groupid     topic#partition   offset
 */
object Streaming_02_offsetToRedis {
    def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setMaster("local[*]").setAppName("offsetToZookeeper")
        // 消费者的序列化:需要Kryo序列化机制,但是配置文件里可能不生效,那么就写入程序中,如下即可
        conf.set("spark.serializer","org.apache.spark.serializer.KryoSerializer")
        val ssc = new StreamingContext(conf,Durations.seconds(10))

        val topics = Array("pet")
        val params = MyKafkaUtils.getParamToMap()
        //从redis中获取偏移量
        val offsets:Map[TopicPartition,Long] = MyRedisUtils.getOffset(params.getOrElse("group.id","g1"),topics)

        var dStream:InputDStream[ConsumerRecord[String, String]] = null
        if(offsets.size>0) {
            //不是第一次读取Kafka上的数据,而是其他时候,比如宕机并恢复后,应该从zk上保存的offset开始读取
            dStream = KafkaUtils.createDirectStream(
                ssc,
                LocationStrategies.PreferConsistent,
                ConsumerStrategies.Subscribe[String, String](topics, params,offsets)
            )
        }else{
            //第一次读取Kafka上的数据
            dStream= KafkaUtils.createDirectStream(
                ssc,
                LocationStrategies.PreferConsistent,
                ConsumerStrategies.Subscribe[String, String](topics, params)
            )
        }

        dStream.foreachRDD(rdd => {
            rdd.foreach(println)
            //从rdd上获取每个分区的消费的偏移量的范围
            val arr: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
            //更新到redis上  :  topic,parition,fromOffset,untilOffset
            MyRedisUtils.updateOffset(params.getOrElse("group.id","g1"),arr)
        })

        ssc.start()
        ssc.awaitTermination()
    }
}

MyRedisUtils的编写

package com.qf.sparkstreaming.day02

import org.apache.kafka.common.TopicPartition
import org.apache.spark.streaming.kafka010.OffsetRange
import redis.clients.jedis.{HostAndPort, JedisCluster}

import java.util

object MyRedisUtils {
    //将偏移量保存到redis对应的hash中
    def updateOffset(groupid: String, arr: Array[OffsetRange]): Unit = {
        for(offsetRange <- arr){
            //获取相应的属性
            val untiloffset = offsetRange.untilOffset
            val topic: String = offsetRange.topic
            val partition: Int = offsetRange.partition

            redisCluster.hset(groupid,topic+"#"+partition,untiloffset.toString)
        }
    }

    //获取redis集群客户端api
    val redisCluster = {
        val sets = new util.HashSet[HostAndPort]()
        //添加集群的master节点
        sets.add(new HostAndPort("qianfeng01",7001))
        sets.add(new HostAndPort("qianfeng01",7002))
        sets.add(new HostAndPort("qianfeng01",7003))
        val redisCluster = new JedisCluster(sets)
        redisCluster
    }

    /**
     * 读取偏移
     * @param groupid
     * @param topics
     * @return
     *              key            field         字段值
     *              groupid     topic#partition   offset
     */
    def getOffset(groupid: String, topics: Array[String]): Map[TopicPartition, Long] = {
        var offsets = Map[TopicPartition,Long]()
        //使用redis客户端读取hash类型
        val stringToString: util.Map[String, String] = redisCluster.hgetAll(groupid)
        //获取所有的字段
        val fields: util.Set[String] = stringToString.keySet()
        import  scala.collection.JavaConversions._
        for(field<-fields){
            //通过字段取偏移量
            val offset: Long = stringToString.get(field).toLong
            //解析字段的格式:  topic#partition
            val arr: Array[String] = field.split("#")
            offsets += (new TopicPartition(arr(0),arr(1).toInt)->offset)
        }
        offsets
    }



    def main(args: Array[String]): Unit = {
        val str: String = redisCluster.get("k1001")
        println(str)
    }
}

3.6 总结

0-8与0-10的总结

1)简化的并行性

两个都不需要创建多个输入Kafka流,手动来合并。 在使用directStream,Spark Streaming将创建与使用Kafka分区一样多的RDD分区,这些分区将全部从Kafka并行读取数据。 所以在Kafka和RDD分区之间有一对一的映射关系。

2)效率

0-8方法中实现零数据丢失需要将数据存储在预写日志中,这会进一步复制数据。这实际上是效率低下的,因为数据被有效地复制了两次:一次是Kafka,另一次是由预先写入日志(WriteAhead Log)复制。
0-10这种方式消除了0-8的问题,因为没有接收器,因此不需要预先写入日志。只要Kafka数据保留时间足够长。

3)Exactly-once

0-8使用Kafka的高级API来在Zookeeper中存储消费的偏移量,也就是利用了低版本的Kafka消费数据的保存方式。虽然这种方法(结合提前写入日志)可以确保零数据丢失(即至少一次语义),但是在某些失败情况下,有一些记录可能会消费两次。发生这种情况是因为Spark Streaming可靠接收到的数据与Zookeeper跟踪的偏移之间的不一致。   也就是说0-8 没有保证Exactly-once

0-10 我们不再Zookeeper的简单Kafka API。在其检查点内,Spark Streaming跟踪偏移量。这消除了Spark Streaming和Zookeeper/Kafka之间的不一致,因此Spark Streaming每次记录都会在发生故障的情况下有效地接收到一次。为了实现输出结果的一次语义,将数据保存到外部数据存储区的输出操作必须是幂等的,或者是保证结果和偏移量的原子事务。

四、SparkStreaming的常用算子

4.1 算子的分类

# SparkStreaming的算子分为两类,
	-- 一类是Transformations算子,转换算子又可从状态上来分类:
		 --有状态算子:可以累加前n次的计算结果的算子
		 --无状态算子:普通的算子都是无状态算子
	--一类是Output Operations算子

# 注意:
	SparkStreaming的程序中,如果没有Output Operations算子,会报错

1) 常见的转换算子

转换算子:可以将上游的DStream转换成另一种DStream。 DStream的许多转换算子都是和RDD的转换算子一样。

在这里插入图片描述

常见算子的练习

package com.qf.sparkstreaming.day02

import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Durations, StreamingContext}

object Streaming_03_tansfromitions {
    def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setMaster("local[*]").setAppName("offsetToZookeeper")
        val ssc = new StreamingContext(conf,Durations.seconds(10))

        val dStream: ReceiverInputDStream[String] = ssc.socketTextStream("qianfeng01", 10086)

        println("---------map: 对上游的DStream中每一个元素做映射处理,返回一个新的DStream---------")
        //dStream.map((_,1)).print()  //  将每一个元素 与1 映射成一个元组。

        println("---------flatMap: 将元素展开,并压平---------")
        //dStream.flatMap(_.split(" ")).print()

        println("---------filter: 对DStream中的数据进行筛选过滤,返回true的元素,构成一个新的DStream---------")
        //dStream.filter(elem => {elem.toInt> 10}).print()
        println("---------repartition: 对DStream进行重新分区,返回新的DStream---------")
        //dStream.repartition(4).print()
        println("---------count: 对DStream里的元素进行计算统计,返回一个新的只有一个元素的DStream---------")
        //dStream.count().print()
        println("---------union: 两个DStream进行联合---------")
        //dStream.union(dStream).print()
        println("---------reduce: DStream里的元素进行规约计算---------")
//        dStream.reduce((m,n)=>{
//            var sum = m.toInt
//            sum+=n.toInt
//            sum.toString
//        }).print()
        println("---------countByValue: 通过value进行分组,然后统计每种value的个数---------")
//        dStream.countByValue().print()
        println("---------reduceByKey: 通过key进行分组,然后对每一组中的value进行规约计算---------")
        //dStream.map((_,1)).reduceByKey((m,n)=>{ m+n}).print()
        println("---------join: 两个DStream进行关联,返回的是一个两个元素的元组,第二个元素还是一个元组---------")
//        val d2: DStream[(String, Int)] = dStream.map((_, 1))
//        d2.join(d2).print()
        println("---------cogroup: 两个DStream进行重组: 返回的是一个两个元素的元组,第二个元素还是一个元组,只不过第二个元组的每个元素是一个序列---------")
        val d2: DStream[(String, Int)] = dStream.map((_, 1))
        d2.cogroup(d2).print()

        ssc.start()
        ssc.awaitTermination()
    }
}

transform算子的练习

package com.qf.sparkstreaming.day02

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Durations, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}

/**
 * transform算子的练习
 */
object Streaming_04_transform_transform {
    def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setMaster("local[*]").setAppName("offsetToZookeeper")
        val ssc = new StreamingContext(conf,Durations.seconds(10))

        val dStream: ReceiverInputDStream[String] = ssc.socketTextStream("qianfeng01", 10086)

        /**
         * transform的作用:就是将DStream中的数据转成RDD,进行操作,然后返回RDD,  sparkStream会再次将返回的RDD
         * 进行包装成DStream。
         *
         * 为什么有这样的一个操作?
         * 原因是:DStream中的算子较RDD少很多。有transform算子后,就可以直接操作RDD了。
         */
        println("@@@@@@@@")   // 该代码是在driver端执行的
//        dStream.transform(rdd=>{
//            println("########")  // 该代码是在driver端执行, 该处通常用于进行每次的微小的批次计算的初始化操作。
//            rdd.map(line=>{
//                println("-----------") // 该代码是在executor端执行
//                line
//            })
//
//        }).print()

        val d2: DStream[Int] = dStream.transform(rdd => {
            val result: Int =
                rdd.aggregate(10)(
                    (x, y) => {
                        math.max(x, y.toInt)
                    }, //分区内取较大值,与默认值10做比较
                    (m, n) => m + n) //分区间求和

            ssc.sparkContext.makeRDD(List(result) //包装成RDD

            )
        })
        d2.print()


        ssc.start()
        ssc.awaitTermination()
    }
}

updateStateByKey算子的练习

package com.qf.sparkstreaming.day02

import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Durations, StreamingContext}

/**
 *  需求:
 *       准实时计算: 10秒一计算
 *            客户的需求时,求30秒之内的单词统计|热搜|排名榜
 */
object Streaming_05_transform_updateStateByKey {
    def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setMaster("local[*]").setAppName("offsetToZookeeper")
        val ssc = new StreamingContext(conf,Durations.seconds(10))
        ssc.checkpoint("data")

        val dStream: ReceiverInputDStream[String] = ssc.socketTextStream("qianfeng01", 10086)

        /**
         * updateStateByKey:   是一个有状态维护的转换算子。
         *
         * 参数: updateFunc: (Seq[V], Option[S]) => Option[S]
         *       函数的第一个参数:表示当前批次的同一个Key的所有value的集合    数据如右:hello,1, hello,1 hello,1  那么seq[v]就是List(1,1,1)
         *       函数的第二个参数:就是状态数据,也就是同一个key的之前的所有批次的计算结果。
         *
         *       假如这是第一个批次,那么第二个参数是没有数据的,因此sparkStreaming要求第二个参数类型是option,因为可能拿不到值。
         */
        dStream.map((_,1)).updateStateByKey((seq,ops:Option[Int])=>{
            //定义一个变量sum,用于统计当前匹配的同一个key的所有的1的和
            var sum = 0
            for(num <- seq){
                sum += num
            }
            //再将当前批次的计算结果,累加到之前的状态数据中
            val result = ops.getOrElse(0)+sum
            Option(result)
        }).print()

        ssc.start()
        ssc.awaitTermination()
    }
}

2)常见的输出算子

在sparkStreaming中,除了转换算子外,就是输出算子(不叫行动算子)。 输出算子的作用,就是将数据流中的数据保存到外部系统,比如数据库或者是文件系统,实际上底层,就是调用了rdd的行动算子。

在这里插入图片描述

package com.qf.sparkstreaming.day02

import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.{Durations, StreamingContext}

/**
 * sparkStreaming的输出算子:
 *    print()
 *    saveXXXXX()
 *    foreachRDD()
 */
object Streaming_06_outputop_foreachRDD {
    def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setMaster("local[*]").setAppName("offsetToZookeeper")
        val ssc = new StreamingContext(conf,Durations.seconds(10))
        ssc.checkpoint("data")

        val dStream: ReceiverInputDStream[String] = ssc.socketTextStream("qianfeng01", 10086)

        /**
         * 1. foreachRDD:是一个输出操作算子
         * 2. 作用:将DStream中的数据当成RDD进行遍历,此时的RDD的最终计算是要使用RDD的行动算子的。不能再返回RDD
         *    rdd的最终计算其实就是打印,保存等操作
         *
         *    foreachRDD算子中,没有时间戳。
         *
         *   注意:默认保存的时候,会将上一个批次保存的数据覆盖
         */
        dStream.foreachRDD(rdd=>{
            println("-----")
            rdd.saveAsTextFile("output1")
            }
           )

        ssc.start()
        ssc.awaitTermination()
    }
}

4.2 窗口算子

4.2.1 window的介绍

1. window操作就是窗口函数,作用就是计算窗口里所包含的所有数据。

Spark Streaming提供了滑动窗口操作的支持,从而让我们可以对一个滑动窗口内的数据执行计算操作。每次掉落在窗口内的RDD的数据,会被聚合起来执行计算操作,然后生成的RDD,会作为window DStream的一个RDD。比如下图中,窗口对应的大小是3个RDD,3个RDD会被聚合起来进行处理,然后过了2个duration,窗口会继续滑动,又会对最近的3个RDD,也就是窗口所包含的数据进行计算。所以每个滑动窗口操作,都应该指定两个参数,窗口长度以及滑动间隔,而且这两个参数值都必须是batch间隔的整数倍。

在这里插入图片描述

在这里插入图片描述

4.2.2 窗口算子

在这里插入图片描述

4.2.3 案例演示

1)

package com.qf.sparkstreaming.day02

import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
 * 使用window算子,统计最近30秒内的单词频率
 */
object Streaming_07_WindowOperationDemo {
    def main(args: Array[String]): Unit = {
        val  conf = new SparkConf().setAppName("Streaming_07_WindowOperationDemo").setMaster("local[*]")
        val  ssc = new StreamingContext(conf,Seconds(10))

        //从nc上读取数据
        val dStream: ReceiverInputDStream[String] = ssc.socketTextStream("qianfeng01", 10086)

        val mapDStream: DStream[(String, Int)] = dStream.flatMap(_.split(" ")).map((_, 1))

        /**
         * window(
         *  windowDuration:Duration,    //第一个参数:用于指定窗口的大小,即长度,必须是micro-batch处理时的时间整数倍
         *  slideDuration:Duration      //第二个参数:用于指定窗口滑动的周期,必须是micro-batch处理时的时间整数倍
         *  )
         *
         *  数据的情况:
         *        1. 刚启动时,窗口里一定没有数据,如果近30秒内没有数据流,则窗口里也没有数据
         *        2. 启动程序不久,窗口的数据的变化应该是由少变多
         *        3. 在处理数据过程中,窗口的数据可能会由多变少,甚至没有
         */
        val windowDStream: DStream[(String, Int)] = mapDStream.window(Seconds(30), Seconds(10))
        val resultDStream: DStream[(String, Int)] = windowDStream.reduceByKey(_ + _)
        resultDStream.print()

        ssc.start()
        ssc.awaitTermination()
    }
}

2)

package com.qf.sparkstreaming.day02

import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
 * 生产环境中的需求:统计最近1小时的所有产品的销量的排名top3
 *
 * 模拟:30秒内的数据
 *
 *  8:00      1001 毛衣 10
 *  8:00      1002 牙刷 1
 *  8:00      1003 手机 1
 *  8:00      1004 毛巾 8
 *  8:10      1001 毛衣 8
 *  8:10      1002 牙刷 10
 *  8:10      1003 手机 15
 *  8:10      1004 毛巾 1
 *  8:20      1001 毛衣 3
 *  8:20      1002 牙刷 2
 *  8:20      1003 手机 4
 *  8:20      1004 毛巾 10
 *  8:30      1001 毛衣 3
 *  8:30      1002 牙刷 22
 *  8:30      1003 手机 4
 *  8:30      1004 毛巾 10
 *  ........
 *
 *  计算时间片段:5秒一算
 *  窗口大小:30秒
 *  窗口滑动周期:5秒一滑动
 *
 *  分组统计销售量,降序,top3
 *
 *
 */
object Streaming_08_SalesRank {
    def main(args: Array[String]): Unit = {
        val  conf = new SparkConf()
          .setAppName("_01FromKafkaCustomOffsetToRedis")
          .setMaster("local[*]")
        val  ssc = new StreamingContext(conf,Seconds(5))

        //从nc上读取数据
        val dStream: ReceiverInputDStream[String] = ssc.socketTextStream("qianfeng01", 10086)

        //得到的是每一个产品id和sum
        val mapDStream: DStream[(String, Int)] = dStream.map(line => {
            val arr: Array[String] = line.split(" ")
            (arr(0), arr(2).toInt)
        })
        //规定窗口大小和滑动周期
        val windowDStream: DStream[(String, Int)] = mapDStream.window(Seconds(30), Seconds(5))
        // foreachRDD 是输出算子
        windowDStream.reduceByKey(_ + _).foreachRDD(

            rdd => {
                println("-------------------------")
                rdd.sortBy(t => t._2, false).take(3).foreach(println)
            }
        )

        ssc.start()
        ssc.awaitTermination()
    }
}
/*机 4
 *  8:20      1004 毛巾 10
 *  8:30      1001 毛衣 3
 *  8:30      1002 牙刷 22
 *  8:30      1003 手机 4
 *  8:30      1004 毛巾 10
 *  ........
 *
 *  计算时间片段:5秒一算
 *  窗口大小:30秒
 *  窗口滑动周期:5秒一滑动
 *
 *  分组统计销售量,降序,top3
 *
 *
 */
object Streaming_08_SalesRank {
    def main(args: Array[String]): Unit = {
        val  conf = new SparkConf()
          .setAppName("_01FromKafkaCustomOffsetToRedis")
          .setMaster("local[*]")
        val  ssc = new StreamingContext(conf,Seconds(5))

        //从nc上读取数据
        val dStream: ReceiverInputDStream[String] = ssc.socketTextStream("qianfeng01", 10086)

        //得到的是每一个产品id和sum
        val mapDStream: DStream[(String, Int)] = dStream.map(line => {
            val arr: Array[String] = line.split(" ")
            (arr(0), arr(2).toInt)
        })
        //规定窗口大小和滑动周期
        val windowDStream: DStream[(String, Int)] = mapDStream.window(Seconds(30), Seconds(5))
        // foreachRDD 是输出算子
        windowDStream.reduceByKey(_ + _).foreachRDD(

            rdd => {
                println("-------------------------")
                rdd.sortBy(t => t._2, false).take(3).foreach(println)
            }
        )

        ssc.start()
        ssc.awaitTermination()
    }
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

SparkStreaming知识总结 的相关文章

  • 如何用hadoop实现自连接/叉积?

    对成对的项目进行评估是常见的任务 示例 重复数据删除 协同过滤 相似项目等 这基本上是具有相同数据源的自连接或叉积 要进行自连接 您可以遵循 减少端连接 模式 映射器将连接 外键作为键发出 将记录作为值发出 因此 假设我们想要对以下数据的
  • Hadoop MapReduce:可以在一个 hadoop 作业类中定义两个映射器和缩减器吗?

    我有两个独立的 java 类 用于执行两个不同的 MapReduce 作业 我可以独立运行它们 对于这两个作业 它们所操作的输入文件是相同的 所以我的问题是是否可以在一个java类中定义两个映射器和两个缩减器 例如 mapper1 clas
  • PHP MongoDB映射减少数据库断言失败

    我第一次使用 PHP MongoDB 进行 Map Reduce 运行 MapReduce 命令时遇到错误 My code map function emit this topic id re date this date posted r
  • JA017:无法查找已启动的 hadoop 作业 ID

    当我在Hue的Oozie编辑器中提交mapreduce作业时 如何解决这个问题 JA017 无法查找与操作 0000009 150711083342968 oozie root W mapreduce f660 关联的已启动 hadoop
  • 如何具体确定MRJob中每个map步骤的输入?

    我正在从事一项地图缩减工作 包含多个步骤 使用 mrjob 每个步骤都会接收上一步的输出 问题是我不想这样 我想要的是提取一些信息并在第二步中针对所有输入等使用它 可以使用 mrjob 来做到这一点吗 Note 因为我不想使用emr 这个问
  • array_reduce() 不能用作 PHP 的关联数组“reducer”?

    我有一个关联数组 assoc 并且需要将其简化为字符串 在这种情况下 OUT
  • Log4j RollingFileAppender 未将映射器和减速器日志添加到文件中

    我们希望将应用程序日志打印到本地节点上的文件中 我们使用 Log4j 的 RollingFileAppender Our log4j properties文件如下 ODS LOG DIR var log appLogs ODS LOG IN
  • 如何读取 RCFile

    我正在尝试将一个小的 RCFile 约 200 行数据 读入 HashMap 中以进行 Map Side 连接 但是在将文件中的数据变为可用状态时遇到了很多麻烦 这是我到目前为止所拥有的 其中大部分来自这个例子 http sumit1001
  • Hadoop YARN 作业陷入映射 0% 并减少 0%

    我正在尝试运行一个非常简单的作业来测试我的 hadoop 设置 所以我尝试使用 Word Count Example 它陷入了 0 所以我尝试了一些其他简单的作业 并且每个作业都陷入了困境 52191 0003 14 07 14 23 55
  • java.lang.IllegalArgumentException:错误的 FS:,预期:hdfs://localhost:9000

    我正在尝试实现reduce side join 并使用mapfile reader来查找分布式缓存 但在stderr中检查时它没有查找值 它显示以下错误 lookupfile文件已经存在于hdfs中 并且似乎已正确加载进入缓存 如标准输出中
  • 使用 Hadoop 映射两个数据集

    假设我有两个键值数据集 数据集A和B 我们称它们为数据集A和B 我想用 B 组的数据更新 A 组中的所有数据 其中两者在键上匹配 因为我要处理如此大量的数据 所以我使用 Hadoop 进行 MapReduce 我担心的是 为了在 A 和 B
  • 从 Eclipse 在 AWS-EMR 上运行 MapReduce 作业

    我在 Eclipse 中有 WordCount MapReduce 示例 我将其导出到 Jar 然后将其复制到 S3 然后我在 AWS EMR 上运行它 成功地 然后 我读到了这篇文章 http docs aws amazon com El
  • 如何使用 Amazon 的 EMR 在 CLI 中使用自定义 jar 指定 mapred 配置和 java 选项?

    我想知道如何指定mapreduce配置 例如mapred task timeout mapred min split size等等 当使用自定义 jar 运行流作业时 当我们使用 ruby 或 python 等外部脚本语言运行时 我们可以使
  • Sqoop - 绑定到 YARN 队列

    因此 使用 MapReduce v2 您可以使用绑定到某些 YARN 队列来管理资源和优先级 基本上通过使用 hadoop jar xyz jar D mapreduce job queuename QUEUE1 input output
  • 在 RavenDB 中创建更多类似的内容

    我的域中有这些文档 public class Article public string Id get set some other properties public IList
  • MongoDB/PyMongo:如何在 Map 函数中使用点表示法?

    我正在尝试计算每个邮政编码中找到的记录数 在我的 MongoDB 中 嵌入了邮政编码 使用点表示法 它位于 a res z a 代表地址 res 代表住宅 z 代表邮政编码 例如 这工作得很好 db NY count a res z 141
  • 使用mongodb聚合框架按数组长度分组

    我有一个看起来像这样的集合 id id0 name saved things id id1 name saved things id id2 name saved things etc 我想使用 mongodb 的聚合框架来得出一个直方图结
  • 为什么在我的例子中 For 循环比 Map、Reduce 和 List 理解更快

    我编写了一个简单的脚本来测试速度 这就是我发现的结果 实际上 for 循环在我的例子中是最快的 这真的让我感到惊讶 请查看下面 正在计算平方和 这是因为它在内存中保存列表还是有意为之 谁能解释一下这一点 from functools imp
  • Hadoop - 直接从 Mapper 写入 HBase

    我有一个 hadoop 作业 其输出应写入 HBase 我并不真正需要减速器 我想要插入的行类型是在映射器中确定的 如何使用 TableOutputFormat 来实现此目的 从所有示例中 我看到的假设是 reducer 是创建 Put 的
  • MapReduce 中的分区到底是如何工作的?

    我认为我总体上对 MapReduce 编程模型有一定的了解 但即使在阅读了原始论文和其他一些来源之后 我仍然不清楚许多细节 特别是关于中间结果的分区 我将快速总结到目前为止我对 MapReduce 的理解 我们有一个可能非常大的输入数据集

随机推荐

  • WSL——NextCloud 在 Windows 端的网络访问配置

    NextCloud 在 Windows 端的网络访问配置 1 WSL2 的 IP 配置问题 在 Windows 每次重启后 WSL2 的 IP 将会发生变化 即 WSL2 的 IP 并非静态地址 为此 通过下述代码可以手动为 WSL2 增加
  • golang:ent实体框架

    好烦 我就想自己使用自定义除了id字段以外的字段作为主键名 参见评论区大佬 field String id StorageKey stu id Unique Immutable
  • 归并排序MergeSort算法--分治

    归并排序 MergeSort 是一种有效的排序算法 该算法是采用分治法 Divide and Conquer 的一个非常典型的应用 将已有序的子序列合并 得到完全有序的序列 即先使每个子序列有序 再使子序列段间有序 若将两个有序表合并成一个
  • Django之路由层

    目录 django请求生命周期流程图 路由匹配 分组命名匹配 无名分组 有名分组 传递额外的参数给视图函数 命名URL 和 URL反向解析 命名URL URL反向解析 前端 URL反向解析 后端 无名分组反向解析 有名分组反向解析 路由分发
  • Group by + Limit 的效率优化

    背景 最近接手一个项目 清洗历史表中的数据 在原有表中添加一个新的字段 并根据user id进行分组 查询到证件号 证件类型后 换取唯一编码 将唯一编码存入历史数据中 以达到未来替换user id的效果 清洗数据的大体思路 将数据库中带清洗
  • 【华为OD机试真题 Java】单行道汽车通行时间

    前言 本专栏将持续更新华为OD机试题目 并进行详细的分析与解答 包含完整的代码实现 希望可以帮助到正在努力的你 关于OD机试流程 面经 面试指导等 如有任何疑问 欢迎联系我 wechat steven moda email nansun09
  • Calendar,Date,Timestamp的使用及其转换

    下面列出了Date Timestamp String相互转换 以及通过Calendar创建时间的例子 import java sql Timestamp import java text ParseException import java
  • VTK学习之三维图像切片交互提取(回调函数、观察者-命令模式)

    参考博客 VTK Learning 三维图像切片 二 鼠标交互 回调函数 观察者 命令模式 江南又旧雨的博客 CSDN博客 根据鼠标交互事件 同时实现切片的实时提取功能 上代码 include
  • GOplot

    GOplot 为这个包写笔记 主要是复习一下markdown写作而已 还是建议大家看原作者的英文文档 安装并加载必须的packages 如果你还没有安装 就运行下面的代码安装 install packages GOplot library
  • 两数相加-2

    题目描述 给出两个 非空 的链表用来表示两个非负的整数 其中 它们各自的位数是按照 逆序 的方式存储的 并且它们的每个节点只能存储 一位 数字 如果 我们将这两个数相加起来 则会返回一个新的链表来表示它们的和 您可以假设除了数字 0 之外
  • [SDN]Mininet中的miniedit问题汇总

    作者 清水寺丞 简介 正在学习unity 数据库 计算机通信网络和python 喜欢部署各种奇奇怪怪的小项目 喜欢就点个关注一起学习吧 目录 前言 怎么打开MiniEdit 保存mn与py文件出现问题 为什么我run了拓扑之后终端没有出现m
  • 掌握到胃-奈氏图与伯德图的绘制

    自控笔记 5 4绘制频率特性曲线 一 开环奈奎斯特曲线的绘制 先上步骤 确定起点G j0 和终点G j 中间段由s平面零极点矢量随s j 变化规律绘制 必要时可求出G j 与实轴 虚轴的交点 再看细节 对于一个系统的传递函数 可以将其分解成
  • touch、mkdir、rmdir、cp、mv、rm命令的常用参数的使用

    touch 可创建多个新文件或更新文件的修改日期 touch m t 时间 修改文件的时间 并可以指定修改时间 touch a 将文件的存取时间改为当前时间 mkdir 用于创建一个目录 mkdir p 用于创建目录时 如果父目录不存在 则
  • 基于MATLAB用图解法解方程(附图像与代码)

    目录 一 一元方程图解法 例题1 二 二元方程图解法 例题2 三 多项式型方程 例题 3 一 一元方程图解法 例题1 用图解法求 解 MATLAB代码 clc clear ezplot exp 3 t sin 4 t 2 4 exp 0 5
  • C# 笔记4——如何实现单击放大全屏和退出全屏

    C 笔记4 如何实现单击放大全屏和退出全屏 由于工作需求 需要实现单击放大和退出全屏功能 想了一下 即单击放大时候把播放视频的picturebox的大小设置和屏幕宽高相同 位置设置为屏幕左上角 0 0 即可 单击退出全屏时候把控件大小和位置
  • 扩散模型与生成模型详解

    扩散模型与其他生成模型 什么是扩散模型 扩散模型的简介 生成建模是理解自然数据分布的开创性任务之一 VAE GAN和Flow系列模型因其实用性能而在过去几年中占据了该领域的主导地位 尽管取得了商业上的成功 但它们的理论和设计缺陷 棘手的似然
  • 中国传统节日端午节网页HTML代码 学生网页课程设计期末作业下载 春节大学生网页设计制作成品下载 DW春节节日网页作业代码下载

    HTML5期末大作业 节日网站设计 中国传统节日端午节网页HTML代码 7页 HTML CSS JavaScript 学生DW网页设计作业成品 web课程设计网页规划与设计 计算机毕设网页设计源码 常见网页设计作业题材有 个人 美食 公司
  • visio技巧(曲线、连接点、自制模具)

    一 画曲线 1 1 铅笔 任意多边形 弧形都可以画曲线 但曲度不好更改 1 2 鼠标选中连接线 在画布上画一个直角线 选中该线 点击右键 选曲线连接线 随意拉动该线上的连接点可以调整成任意曲度 二 增加 移动 删除图形上的连接点 1 1 增
  • 【Python 基础篇】Python代码 之 程序结构

    目录 前言 一 顺序结构 1 1 分支结构 1 2 双向分支 1 3 多路分支 1 4 if语句补充 二 顺序结构 三 循环结构 while while else for in for else 四 流程控制语句 break continu
  • SparkStreaming知识总结

    一 流式计算的概述 1 1 什么是流式计算 1 数据流与静态数据的区别 数据流指的就是不断产生的数据 是源源不断 不会停止 静态数据指的就是存储在磁盘中的固定的数据 2 流式计算的概念 就是对数据流进行计算 由于数据是炼苗不断的产生的 所以