spark boot封装,多线程高效执行

2023-11-13

1.简介

众所周知,spark是一个分布式计算引擎,可以将计算数据分不到不同的节点进行计算,但是往往我们的业务都是比较复杂,每天定时跑的时候不只是一个job,可能是有很多的job,但是引擎本身是串行化的,而且对于经验不深的同学,一个业务可能在一个scala文件写上上前行代码,这样就很难维护,所以这里为大家提供一个简易的spark框架。框架同时采用多线程的方式,可以提高多个job的执行效率。

2.原理

框架借助java spring的思想,使用了简易的bean管理器,一共提供了三种bean类型。

  1. 仓库bean:
  2. 计算bean:
  3. 存储bean:

仓库bean需要实现DataWarehouseService接口,并且加上注解@WarehouseService进行描述读取的数据仓库的作用。

计算bean需要实现DataCalcService接口,并且加上注解@CalcService表明计算的类型,往往一个仓库bean下面可能会有多个计算bean。

存储bean需要实现DataStorageService接口,并且加上注解@StorageService表明存储的类型,如S3、DB

流程如下:

3.使用介绍

 所有仓库bean、计算bean、存储bean都需要写到service包下面,框架才能自动读取,无论service包下的层级多深,都能够扫描到。


1.仓库bean

package com.moon.service.warehouse

import com.moon.core.DataWarehouseService
import com.moon.core.annots.WarehouseService
import com.moon.core.enums.DataWarehouseTypeE
import com.moon.service.storage.S3StorageServiceImpl
import com.moon.utils.CommonUtils
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.slf4j.LoggerFactory

/**
 * device data house
 */
@WarehouseService(dataWarehouseType = DataWarehouseTypeE.DEVICE_DATA, serviceDesc = "read device data from s3")
class DeviceDataWarehouseServiceImpl extends DataWarehouseService {

  val log = LoggerFactory.getLogger(getClass)

  val s3Service = new S3StorageServiceImpl()

  /**
   * load data from s3
   *
   * @return
   */
  def loadWarehouse(sparkSession: SparkSession): DataFrame = {
    var df = CommonUtils.readFromS3(sparkSession, "s3://xxxx")
    if (df == null) return null

    df
  }
}

 2.计算bean

package com.moon.service.calculate.device

import com.moon.core.DataCalcService
import com.moon.core.annots.CalcService
import com.moon.core.enums.{DataWarehouseTypeE, StorageTypeE}
import com.moon.domain.CalcResult
import com.moon.utils.{DateUtils, UUIDUtil}
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions.{col, count, lit, udf}
import org.slf4j.LoggerFactory

@CalcService(dataWarehouseType=DataWarehouseTypeE.DEVICE_DATA,storageType=StorageTypeE.DB,
  tableName = "test_table_name",serviceDesc="save device detail data")
class ProductDeviceDetailServiceImpl extends DataCalcService{

  val log = LoggerFactory.getLogger(getClass)

  /**
   *
   * @param df raw data for yesterday
   * @return
   */
  override def dataCalc(df: DataFrame): CalcResult = {
    log.info("start device detail service...")

    val ret = df.select("productId","productName","macAddress")
      .filter(col("productId").isNotNull).distinct()
      .filter(col("macAddress").isNotNull).distinct()

    val newRet = ret.withColumn("executeDay", lit(DateUtils.yesterday()))
    val calcResult : CalcResult = new CalcResult

    log.info("finished device detail service...")

    calcResult.setResult(newRet)
    calcResult
  }
}

3.存储bean

package com.moon.service.storage

import com.google.common.util.concurrent.ThreadFactoryBuilder
import com.moon.core.DataStorageService
import com.moon.core.annots.StorageService
import com.moon.core.enums.StorageTypeE
import org.apache.spark.sql.{DataFrame, SaveMode}

import java.time.{LocalDate, Period}

@StorageService(storageType = StorageTypeE.S3, serviceDesc = "save data to s3")
class S3StorageServiceImpl extends DataStorageService {

  val threadFactory = new ThreadFactoryBuilder().setNameFormat("test-pool-%d").build()

  def storageData(df: DataFrame, tableName: String): Unit = {
    val startTime = System.currentTimeMillis()
    val today = LocalDate.now()
    val yesterday = today.minus(Period.ofDays(1))
    val year = yesterday.getYear
    val month = if (yesterday.getMonthValue.toString.length == 1) s"0${yesterday.getMonthValue}" else yesterday.getMonthValue.toString


    df.write.mode(SaveMode.Append).option("encoding", "UTF-8").option("useSSL","false").parquet("s3://xxxxx")
    val endTime = System.currentTimeMillis()
    println("save " + tableName + " spend time is : " + (endTime - startTime))
  }

}

整个源代码已经上传到资源,scala版本使用:2.11

点击下载源码

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

spark boot封装,多线程高效执行 的相关文章

  • Java new Date() 打印

    刚刚学习 Java 我知道这可能听起来很愚蠢 但我不得不问 System out print new Date 我知道参数中的任何内容都会转换为字符串 最终值是 new Date 返回对 Date 对象的引用 那么它是如何打印这个的呢 Mo
  • Java EE:如何获取我的应用程序的 URL?

    在 Java EE 中 如何动态检索应用程序的完整 URL 例如 如果 URL 是 localhost 8080 myapplication 我想要一个可以简单地将其作为字符串或其他形式返回给我的方法 我正在运行 GlassFish 作为应
  • Play框架运行应用程序问题

    每当我尝试运行使用以下命令创建的新 Web 应用程序时 我都会收到以下错误Play http www playframework org Error occurred during initialization of VM Could no
  • 在 HTTPResponse Android 中跟踪重定向

    我需要遵循 HTTPost 给我的重定向 当我发出 HTTP post 并尝试读取响应时 我得到重定向页面 html 我怎样才能解决这个问题 代码 public void parseDoc final HttpParams params n
  • 制作一个交互式Windows服务

    我希望我的 Java 应用程序成为交互式 Windows 服务 用户登录时具有 GUI 的 Windows 服务 我搜索了这个 我发现这样做的方法是有两个程序 第一个是服务 第二个是 GUI 程序并使它们进行通信 服务将从 GUI 程序获取
  • Android MediaExtractor seek() 对 MP3 音频文件的准确性

    我在使用 Android 时无法在eek 上获得合理的准确度MediaExtractor 对于某些文件 例如this one http www archive org download emma solo librivox emma 01
  • 反射找不到对象子类型

    我试图通过使用反射来获取包中的所有类 当我使用具体类的代码 本例中为 A 时 它可以工作并打印子类信息 B 扩展 A 因此它打印 B 信息 但是当我将它与对象类一起使用时 它不起作用 我该如何修复它 这段代码的工作原理 Reflection
  • Liferay ClassNotFoundException:DLFileEntryImpl

    在我的 6 1 0 Portal 实例上 带有使用 ServiceBuilder 和 DL Api 的 6 1 0 SDK Portlet 这一行 DynamicQuery query DynamicQueryFactoryUtil for
  • 无法解析插件 Java Spring

    我正在使用 IntelliJ IDEA 并且我尝试通过 maven 安装依赖项 但它给了我这些错误 Cannot resolve plugin org apache maven plugins maven clean plugin 3 0
  • 斯坦福 NLP - 处理文件列表时 OpenIE 内存不足

    我正在尝试使用斯坦福 CoreNLP 中的 OpenIE 工具从多个文件中提取信息 当多个文件 而不是一个 传递到输入时 它会给出内存不足错误 All files have been queued awaiting termination
  • 如何为俚语和表情符号构建正则表达式 (regex)

    我需要构建一个正则表达式来匹配俚语 即 lol lmao imo 等 和表情符号 即 P 等 我按照以下示例进行操作http www coderanch com t 497238 java java Regular Expression D
  • 从 127.0.0.1 到 2130706433,然后再返回

    使用标准 Java 库 从 IPV4 地址的点分字符串表示形式获取的最快方法是什么 127 0 0 1 到等效的整数表示 2130706433 相应地 反转所述操作的最快方法是什么 从整数开始2130706433到字符串表示形式 127 0
  • 为什么HashMap不能保证map的顺序随着时间的推移保持不变

    我在这里阅读有关 Hashmap 和 Hashtable 之间的区别 http javarevisited blogspot sg 2010 10 difference Between hashmap and html http javar
  • 总是使用 Final?

    我读过 将某些东西做成最终的 然后在循环中使用它会带来更好的性能 但这对一切都有好处吗 我有很多地方没有循环 但我将 Final 添加到局部变量中 它会使速度变慢还是仍然很好 还有一些地方我有一个全局变量final 例如android Pa
  • Java Integer CompareTo() - 为什么使用比较与减法?

    我发现java lang Integer实施compareTo方法如下 public int compareTo Integer anotherInteger int thisVal this value int anotherVal an
  • Android 中麦克风的后台访问

    是否可以通过 Android 手机上的后台应用程序 服务 持续监控麦克风 我想做的一些想法 不断聆听背景中的声音信号 收到 有趣的 音频信号后 执行一些网络操作 如果前台应用程序需要的话 后台应用程序必须能够智能地放弃对麦克风的访问 除非可
  • 如何从指定日期获取上周五的日期? [复制]

    这个问题在这里已经有答案了 如何找出上一个 上一个 星期五 或指定日期的任何其他日期的日期 public getDateOnDay Date date String dayName 我不会给出答案 先自己尝试一下 但是 也许这些提示可以帮助
  • 玩!框架:运行“h2-browser”可以运行,但网页不可用

    当我运行命令时activator h2 browser它会使用以下 url 打开浏览器 192 168 1 17 8082 但我得到 使用 Chrome 此网页无法使用 奇怪的是它以前确实有效 从那时起我唯一改变的是JAVA OPTS以启用
  • 如何实现仅当可用内存较低时才将数据交换到磁盘的写缓存

    我想将应用程序生成的数据缓存在内存中 但如果内存变得稀缺 我想将数据交换到磁盘 理想情况下 我希望虚拟机通知它需要内存并将我的数据写入磁盘并以这种方式释放一些内存 但我没有看到任何方法以通知我的方式将自己挂接到虚拟机中before an O
  • 节拍匹配算法

    我最近开始尝试创建一个移动应用程序 iOS Android 它将自动击败比赛 http en wikipedia org wiki Beatmatching http en wikipedia org wiki Beatmatching 两

随机推荐

  • JAVA开发环境JDK安装及配置

    一 安装JDK 获取JDK的安装包 1 通过官网下载 2 打开安装包 开始安装JDK和JRE 1 打开JDK安装包 2 点击下一步开始JDK安装 3 更改安装路径 接下来以我的电脑为例安装到E盘 其他盘同理 4 将文件夹路径改到E盘新建的文
  • 用js动态创建svg

    吃水不忘挖井人 svg基础教程https www bilibili com video BV1Pt411y7V6 p 1 要实现的效果 svg文件的写法
  • 【LSTM回归】基于粒子群优化注意力机制的长短时记忆神经网络PSO-attention-LSTM实现数据回归预测附matlab代码

    作者简介 热爱科研的Matlab仿真开发者 修心和技术同步精进 matlab项目合作可私信 个人主页 Matlab科研工作室 个人信条 格物致知 更多Matlab完整代码及仿真定制内容点击 智能优化算法 神经网络预测 雷达通信 无线传感器
  • MySQL之分表分库分区

    数据库分表可以解决单表海量数据的查询性能问题 分库可以解决单台数据库的并发访问压力问题 分表 分表分为水平分表和垂直分表 水平分表原理 分表策略通常是用户ID取模 如果不是整数 可以首先将其进行hash获取到整 水平分表遇到的问题 1 跨表
  • Hadoop序列化案例

    Hadoop序列化案例 统计每一个手机号耗费的总上行流量 总下行流量 总流量 数据 1 13736230513 192 196 100 1 www baidu com 2481 24681 200 2 13846544121 192 196
  • Kafka3.0.0版本——Leader Partition自动平衡

    目录 一 Leader Partition自动平衡的概述 二 Leader Partition自动平衡的相关配置参数 三 Leader Partition自动平衡的示例 一 Leader Partition自动平衡的概述 正常情况下 Kaf
  • 代码审计练习题

    代码审计练习题 源码 方法 简单记录一下姿势 源码 判断var1和var2是否为对象 用弱不等号判断 分别判断md5
  • 将CSDN文章导出为.md、HTML、pdf格式

    将CSDN文章导出为 md HTML pdf格式 一 将CSDN文章导出为 md文件 二 将CSDN文章导出为HTML文件 三 把 md文件转换为pdf格式 一 将CSDN文章导出为 md文件 1 打开一篇CSDN文章 点击上方的 导出 按
  • 后端(五):JVM

    目录 JVM 中的内存区域划分 JVM 的类加载机制 1 加载 2 验证 3 准备 4 解析 5 初始化 JVM 中的垃圾回收策略 找 确认垃圾 1 引用计数 2 可达行分析 释放 垃圾 对象 1 标记清除 2 复制算法 3 标记整理 分代
  • Ubuntu18.04搭建VSCode编译环境

    确认ubuntu 18 04 uname a 添加root帐户密码 sudo passwd root 第一步 配置C 编译环境 安装gcc 和 g gcc v g v sudo apt install gcc sudo apt instal
  • 【maven】maven settings.xml 中 mirror 和 repository 的区别

    一 概述 maven的settings xml文件里面有proxy server repository mirror的配置 在配置仓库地址的时候容易混淆 proxy是服务器不能直接访问外网时需要设置的代理服务 不常用 server是服务器要
  • [论文阅读] (13)英文论文模型设计(Model Design)如何撰写及精句摘抄——以入侵检测系统(IDS)为例

    娜璋带你读论文 系列主要是督促自己阅读优秀论文及听取学术讲座 并分享给大家 希望您喜欢 由于作者的英文水平和学术能力不高 需要不断提升 所以还请大家批评指正 非常欢迎大家给我留言评论 学术路上期待与您前行 加油 前一篇从个人角度介绍英文论文
  • 一个对前端程序员比较友好的mock数据工具网址

    由前大搜车公司出品的mock网站 如下 https www easy mock com login 妈妈再也不用担心我从网上找不到假数据了 更多 如何写一个自己的小程序并上线 Github搭建个人博客 2019最新版 亲测 qq加油小程序
  • FreeRTOS操作系统的学习(一)

    操作系统的定义 管理和控制计算机硬件与软件资源的计算机程序 直接运行在 裸机 上的最基本的系统软件 任何其他软件都必须在操作系统的支持下才能运行 其介于APP和硬件之间 2 为什么要使用操作系统 1 与裸机相比 大大提高了CPU的灵活性 2
  • SpringBoot如何将项目打成jar包,并运行jar包呢?

    转自 SpringBoot如何将项目打成jar包 并运行jar包呢 下文笔者讲述springboot将项目打成jar包的方法分享及运行jar包的方法分享 如下所示 实现思路 1 pom中进行相应的build配置 2 运行maven inst
  • 位运算说明

    文章目录 参考文档 表格 来自百度百科 按位与运算符 按位或运算符 异或运算符 取反运算符 左移运算符 lt lt 右移运算符 gt gt 无符号右移运算符 gt gt gt 复合赋值运算符 不同长度的数据进行位运算 参考文档 百度百科 h
  • Windows记事本编码反汇编分析

    转载自 liam page 网上有一个流传多年的段子 这个段子大致是说 若你在简体中文版本的 Windows 系统下 用系统自带的记事本程序 以默认的 ANSI 编码保存 联通 两个字 那么重新打开后 联通 二字就消失了 如果我没记错的话
  • 【JDBC】idea添加mysql-jar包(很轻松)

    添加jar包 官网下载jar包 idea导入jar包 检查 官网下载jar包 官网地址 MySQL Download Connector J 下载完之后解压 打开文件夹 直到见到我们需要的jar包 idea导入jar包 我们复制刚才下载好的
  • 【ERROR】AssertionError: The NVIDIA driver on your system is too old (found version). Please upd

    错误信息 AssertionError The NVIDIA driver on your system is too old found version 10000 Please update your GPU driver by dow
  • spark boot封装,多线程高效执行

    1 简介 众所周知 spark是一个分布式计算引擎 可以将计算数据分不到不同的节点进行计算 但是往往我们的业务都是比较复杂 每天定时跑的时候不只是一个job 可能是有很多的job 但是引擎本身是串行化的 而且对于经验不深的同学 一个业务可能