Spark scala和java的api使用

2023-05-16

Spark scala和java的api使用

1、利用scala语言开发spark的worcount程序(本地运行)


package com.zy.spark

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

//todo:利用scala语言来实现spark的wordcount程序
object WordCount {
  def main(args: Array[String]): Unit = {
    //1、创建SparkConf对象,设置appName和master  local[2]表示本地采用2个线程去运行任务
    val sparkConf: SparkConf = new SparkConf().setAppName("WordCount").setMaster("local[2]")

    //2、创建SparkContext 该对象是所有spark程序的执行入口,它会创建DAGScheduler和TaskScheduler
    val sc = new SparkContext(sparkConf)

    //设置日志输出级别
    sc.setLogLevel("warn")

    //3、读取数据文件
    val data: RDD[String] = sc.textFile("D:\\words.txt")

    //4、切分每一行获取所有单词
    val words: RDD[String] = data.flatMap(_.split(" "))

    //5、每个单词计为1
    val wordAndOne: RDD[(String, Int)] = words.map((_, 1))

    //6、相同单词出现的所有的1累加
    val result: RDD[(String, Int)] = wordAndOne.reduceByKey(_ + _)

    //按照单词出现的次数降序排列
    val sortRDD: RDD[(String, Int)] = result.sortBy(x => x._2, false)


    //7、收集数据,打印输出
    val finalResult: Array[(String, Int)] = sortRDD.collect()
    finalResult.foreach(println)

    //8、关闭sc
    sc.stop()
  }
}  

 

2、利用scala语言开发spark的wordcount程序(集群运行)


package com.zy.spark

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

//todo:利用scala语言开发spark的wordcount程序(集群运行)
object WordCount_Online {
  def main(args: Array[String]): Unit = {
    //1、创建SparkConf对象,设置appName
    val sparkConf: SparkConf = new SparkConf().setAppName("WordCount_Online")

    //2、创建SparkContext 该对象是所有spark程序的执行入口,它会创建DAGScheduler和TaskScheduler
    val sc = new SparkContext(sparkConf)

    //设置日志输出级别
    sc.setLogLevel("warn")

    //3、读取数据文件 args(0)为文件地址参数
    val data: RDD[String] = sc.textFile(args(0))

    //4、切分每一行获取所有单词
    val words: RDD[String] = data.flatMap(_.split(" "))

    //5、每个单词计为1
    val wordAndOne: RDD[(String, Int)] = words.map((_, 1))

    //6、相同单词出现的所有的1累加
    val result: RDD[(String, Int)] = wordAndOne.reduceByKey(_ + _)

    //7、把结果数据保存到hdfs上  args(1)是保存到hdfs的目录参数
    result.saveAsTextFile(args(1))

    //8、关闭sc
    sc.stop()
  }

}  

最后打成jar包 到集群上执行


spark-submit --master spark://node1:7077 --class cn.itcast.spark.WordCount_Online --executor-memory 1g --total-executor-cores 2 original-spark_xxx-1.0-SNAPSHOT.jar /words.txt /out  

 

3、利用java语言开发spark的wordcount程序(本地运行)


package com.zy.spark;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;

import java.util.Arrays;
import java.util.Iterator;
import java.util.List;

//todo:利用java语言开发spark的wordcount程序(本地运行)
public class WordCount_Java {
    public static void main(String[] args) {
        //1、创建SparkConf对象
        SparkConf sparkConf = new SparkConf().setAppName("WordCount_Java").setMaster("local[2]");

        //2、创建JavaSparkContext对象
        JavaSparkContext jsc = new JavaSparkContext(sparkConf);

        //3、读取数据文件
        JavaRDD<String> data = jsc.textFile("D:\\words.txt");

        //4、切分每一行获取所有的单词
        JavaRDD<String> words = data.flatMap(new FlatMapFunction<String, String>() {
            public Iterator<String> call(String line) throws Exception {
                String[] words = line.split(" ");
                return Arrays.asList(words).iterator();
            }
        });

        //5、每个单词计为1
        JavaPairRDD<String, Integer> wordAndOne = words.mapToPair(new PairFunction<String, String, Integer>() {
            public Tuple2<String, Integer> call(String word) throws Exception {
                return new Tuple2<String, Integer>(word, 1);
            }
        });

        //6、相同单词出现1累加
        JavaPairRDD<String, Integer> result = wordAndOne.reduceByKey(new Function2<Integer, Integer, Integer>() {
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1 + v2;
            }
        });

        //按照单词出现的次数降序排列 (单词,次数)------>(次数,单词).sortByKey------->(单词,次数)

        JavaPairRDD<Integer, String> reverseRDD = result.mapToPair(new PairFunction<Tuple2<String, Integer>, Integer, String>() {
            public Tuple2<Integer, String> call(Tuple2<String, Integer> t) throws Exception {
                return new Tuple2<Integer, String>(t._2, t._1);
            }
        });

        JavaPairRDD<String, Integer> sortedRDD = reverseRDD.sortByKey(false).mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>() {
            public Tuple2<String, Integer> call(Tuple2<Integer, String> t) throws Exception {
                return new Tuple2<String, Integer>(t._2, t._1);
            }
        });


        //7、收集数据打印输出
        List<Tuple2<String, Integer>> finalResult = sortedRDD.collect();
        for (Tuple2<String, Integer> tuple : finalResult) {
            System.out.println("单词:" + tuple._1 + " 次数:" + tuple._2);
        }

        //8、关闭jsc
        jsc.stop();
    }
}  

 

posted @ 2018-02-12 13:22 青衫仗剑 阅读( ...) 评论( ...) 编辑 收藏
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Spark scala和java的api使用 的相关文章

  • 获取 2 个字母的州/省缩写

    因此 我使用 GeoNames API 获取国家 地区和州 省信息 并使用这些信息填充表单中的选择下拉列表 此表单将其信息提交给 SOAP Web 服务 并且 SOAP 服务器仅理解 2 个字母形式的国家 地区和州 省数据 IE CA 代表
  • Java AES 128 加密方式与 openssl 不同

    我们遇到了一种奇怪的情况 即我们在 Java 中使用的加密方法会向 openssl 生成不同的输出 尽管它们在配置上看起来相同 使用相同的键和 IV 文本 敏捷的棕色狐狸跳过了懒狗 加密为 Base64 字符串 openssl A8cMRI
  • 当从服务类中调用时,Spring @Transactional 不适用于带注释的方法

    在下面的代码中 当方法内部 是从内部调用的方法外部 应该在交易范围内 但事实并非如此 但当方法内部 直接从调用我的控制器class 它受到事务的约束 有什么解释吗 这是控制器类 Controller public class MyContr
  • hibernate锁等待超时超时;

    我正在使用 Hibernate 尝试模拟对数据库中同一行的 2 个并发更新 编辑 我将 em1 getTransaction commit 移至 em1 flush 之后我没有收到任何 StaleObjectException 两个事务已成
  • Calendar.getInstance(TimeZone.getTimeZone("UTC")) 不返回 UTC 时间

    我对得到的结果真的很困惑Calendar getInstance TimeZone getTimeZone UTC 方法调用 它返回 IST 时间 这是我使用的代码 Calendar cal Two Calendar getInstance
  • 使用 scala 集合 - CanBuildFrom 麻烦

    我正在尝试编写一个接受任何类型集合的方法CC 并将其映射到一个新的集合 相同的集合类型但不同的元素类型 我正在挣扎 基本上我正在尝试实施map but 不在集合本身上 问题 我正在尝试实现一个带有签名的方法 它看起来有点像 def map
  • 使用 SQLITE 按最近的纬度和经度坐标排序

    我必须获得一个 SQLite SQL 语句 以便在给定初始位置的情况下按最近的纬度和经度坐标进行排序 这是我在 sqlite 数据库中的表的例句 SELECT id name lat lng FROM items EXAMPLE RESUL
  • 类型级编程有哪些示例? [关闭]

    Closed 这个问题需要多问focused help closed questions 目前不接受答案 我不明白 类型级编程 是什么意思 也无法使用Google找到合适的解释 有人可以提供一个演示类型级编程的示例吗 范式的解释和 或定义将
  • 以编程方式在java的resources/source文件夹中创建文件?

    我有两个资源文件夹 src 这是我的 java 文件 资源 这是我的资源文件 图像 properties 组织在文件夹 包 中 有没有办法以编程方式在该资源文件夹中添加另一个 properties 文件 我尝试过这样的事情 public s
  • 编辑文件名在 JComboBox 中的显示方式,同时保持对文件的访问

    我对 Java 很陌生 对堆栈溢出也很陌生 我正在尝试利用 JMF API 创建一个用 Java 编码的简单媒体播放器 到目前为止 我已经能够设置一个简单的队列 播放列表来使用JComboBox called playListHolder
  • Javafx过滤表视图

    我正在尝试使用文本字段来过滤表视图 我想要一个文本字段 txtSearch 来搜索 nhs 号码 名字 姓氏 和 分类类别 我尝试过在线实施各种解决方案 但没有运气 我对这一切仍然很陌生 所以如果问得不好 我深表歉意 任何帮助将不胜感激 我
  • 有没有办法通过API调用访问私有数据集

    我正在使用 CKAN 2 8 运行 Mirth 3 6 1 作为新手 我遇到了一个问题 有没有办法通过 API 请求访问 CKAN 中私有数据集中的资源 我好像做不到 我有一个拥有公共数据集的组织 我可以通过 API 路由器通过 Mirth
  • Netty:阻止调用以获取连接的服务器通道?

    呼吁ServerBootstrap bind 返回一个Channel但这不是在Connected状态 因此不能用于写入客户端 Netty 文档中的所有示例都显示写入Channel从它的ChannelHandler的事件如channelCon
  • SoftLayer_Account::getOperatingSystemReloadImages

    我想在 OSReload 期间使用 API 获取可用操作系统列表 我发现提到了 SoftLayer Account getOperatingSystemReloadImages 方法 但找不到该方法的用法 谁能帮我解决这个问题 谢谢 我找不
  • javafx android 中的文本字段和组合框问题

    我在简单的 javafx android 应用程序中遇到问题 问题是我使用 gradle javafxmobile plugin 在 netbeans ide 中构建了非常简单的应用程序 其中包含一些文本字段和组合框 我在 android
  • Eclipse 中 Spring MVC 模型对象的 (jsp /jstl) 视图中的代码辅助

    在 Spring MVC 中 当将对象放置在视图模型中时 如下所示 public String getUser Model model fetch user model addAttribute user user return viewN
  • 为什么C++代码执行速度比java慢?

    我最近用 Java 编写了一个计算密集型算法 然后将其翻译为 C 令我惊讶的是 C 的执行速度要慢得多 我现在已经编写了一个更短的 Java 测试程序和一个相应的 C 程序 见下文 我的原始代码具有大量数组访问功能 测试代码也是如此 C 的
  • 如何使用 JSch 将多行命令输出存储到变量中

    所以 我有一段很好的代码 我很难理解 它允许我向我的服务器发送命令 并获得一行响应 该代码有效 但我想从服务器返回多行 主要类是 JSch jSch new JSch MyUserInfo ui new MyUserInfo String
  • Trie 数据结构 - Java [关闭]

    Closed 这个问题不符合堆栈溢出指南 help closed questions 目前不接受答案 是否有任何库或文档 链接提供了在 java 中实现 Trie 数据结构的更多信息 任何帮助都会很棒 Thanks 你可以阅读Java特里树
  • 在 RESTful Web 服务中实现注销

    我正在开发一个需要注销服务的移动应用程序 登录服务是通过数据库验证来完成的 现在我陷入了注销状态 退一步 您没有提供有关如何在应用程序中执行身份验证的详细信息 并且很难猜测您在做什么 但是 需要注意的是 在 REST 应用程序中 不能有会话

随机推荐

  • rabbit-mq 本地环境搭建

    一 xff1a 安装RabbitMQ需要先安装Erlang语言开发包 xff0c 直接下载地址 xff1a http erlang org download otp win64 18 3 exe 尽量安装时不选择C盘 xff0c 避免操作系
  • oracle小数点前面没有0,纠结解惑

    一1 天前台人找到我 xff0c 说我们安装的数据库有问题 xff0c 为什么小数点前面是0就不显示呢 xff0c 我去看了一下 xff0c command窗口要显示 SQL gt create table ml test num numb
  • 网监后台管理系统设计思路

    本次做的是网监系统saas服务平台的后台管理系统 xff0c 不涉及复杂功能逻辑 就是从菜单 模板 系 统 组织架构 角色 用户的设计思路 产品需求 xff1a 在各个省市网监系统的数量不断增长 xff0c 且系统逻辑和功能模块大致相同 x
  • 分类算法 c4.5 详解

    C4 5是一系列用在机器学习和数据挖掘的分类问题中的算法 它的目标是监督学习 xff1a 给定一个数据集 xff0c 其中的每一个元组都能用一组属性值来描述 xff0c 每一个元组属于一个互斥的类别中的某一类 C4 5的目标是通过学习 xf
  • 正则表达式匹配Html代码中图片路劲

    正则表达匹配图片路径 public static string GetHtmlImageUrlList string sHtmlText 定义正则表达式用来匹配 img 标签 Regex regImg 61 new Regex 64 34
  • (GPU版)Pytorch+pycharm+jupyter安装记录(截至23年3月14日)

    由于搞了一台旧主机 xff0c 主机上没有pytorch等软件程序 xff0c 所以重新装一遍 xff0c 顺便记录一下 xff01 一 安装显卡GPU的驱动程序 xff0c 搞定CUDA先 WIN 43 R打开命令行 xff0c 输入命令
  • Alibaba官方最新发布的这份Java学习导图+彩版手册,真不是吹的

    时间飞逝 xff0c 转眼间毕业七年多 xff0c 从事 Java 开发也六年了 我在想 xff0c 也是时候将自己的 Java 整理成一套体系 这一次的知识体系面试题涉及到 Java 知识部分 性能优化 微服务 并发编程 开源框架 分布式
  • linux下使用第三方商店安装应用

    安装 snap store 进行下载 xff0c 相当与第三方应用商店 xff0c 但是往往比某一个官方软件源里面的应用要丰富或更实用 到 snap docs 中选择你的 linux 版本进入安装文档 xff0c 根据指示一步一步安装即可
  • Centos7离线安装sqlserver2017

    Centos7离线安装sqlserver2017 根据操作系统版本选择下载匹配的sqlserver版本 可以在这里找一下https packages microsoft com config 我选择是先在一台有网的机器上下载好rpm安装包之
  • HC-05蓝牙模块配置

    目录 1 连接蓝牙模块a 蓝牙模块通过USB转TTL连接电脑b 打开串口助手 xff0c 波特率设置为38400c 检验是否连接成功 2 配置波特率3 修改密码4 设置主从模式5 设置蓝牙连接模式6 查询自身地址7 添加配对蓝牙地址8 测试
  • Windows沙盒技术调研

    转载自 xff1a 移动云开发者社区 一 Windows沙盒技术介绍 Windows沙盒提供了轻型桌面环境 xff0c 可安全地隔离运行应用程序 沙盒环境中Windows软件保持 34 沙盒 34 状态 xff0c 并独立于主机运行 沙盒是
  • OS + Linux Shell bash / sh / ksh / csh / tcsh / adb shell

    s Android adb shell ADB Android debug bridge Android手机实际是基于Linux系统的 Google提供的ADB工具包带有fastboot exe rar http dl iteye com
  • kali利用CVE_2019_0708(远程桌面代码执行漏洞)攻击win7

    一 漏洞说明 2019年5月15日微软发布安全补丁修复了CVE编号为CVE 2019 0708的Windows远程桌面服务 xff08 RDP xff09 远程代码执行漏洞 该漏洞在不需身份认证的情况下即可远程触发 危害与影响面极大 目前
  • 数据库系统原理1

    第一章 数据库管理技术发展的不同阶段形成不同的特点 数据描述经历了三个阶段对应于三个数据模型 第二章 数据库系统的生命周期 xff0c 书中可能和我们学习软工的时候有些出入 xff0c 其实就是不同时间有不同的理解 xff0c 横看成岭侧成
  • ssh详解

    SSH ssh secure shell protocol 22 tcp 安全的 具体的软件实现 xff1a OpenSSH ssh协议的开源实现 xff0c CentOS dropbear xff1a 另一个开源实现 SSH协议版本 v1
  • spring框架的简单配置步骤——小马同学@Tian

    spring框架配置步骤 1 导入jar包 本教程使用spring5 1 5 xff0c 在pom xml中进行导入依赖 Maven方式 xff1a span class token tag span class token tag spa
  • PSReadLine - Powershell 的强化工具

    PSReadLine Powershell 的强化工具 UPDATE 2022 3 4 根据其 Github README 的说明 xff0c If you are using Windows PowerShell on Windows 1
  • 美化 PowerShell

    美化 PowerShell UPDATE 2022 3 4 本文使用的 oh my posh 基于 V2 版本 xff0c 而更新且功能更强大的新版本已经发布 xff0c 如需使用请参考其官方文档 1 准备工作 Step1 下载并安装 Po
  • nltk下载语料库

    1 首先我们使用命令pip list查看是否安装了nltk模块 xff0c 如果没有 xff0c 则执行命令pip3 install nltk进行安装 2 之后 xff0c 我们在Jupyter Notebook中进行语料库的安装 impo
  • Spark scala和java的api使用

    Spark scala和java的api使用 1 利用scala语言开发spark的worcount程序 xff08 本地运行 xff09 package com zy spark import org apache spark rdd R