Scala Spark,列表缓冲区为空

2023-11-21

在注释 1 中的这段代码中,列表缓冲区项的长度正确显示,但在第二条注释中代码永远不会执行。为什么会出现这种情况呢?

val conf = new SparkConf().setAppName("app").setMaster("local")
val sc = new SparkContext(conf)

var wktReader: WKTReader = new WKTReader(); 
val dataSet = sc.textFile("dataSet.txt")

val items = new ListBuffer[String]() 
dataSet.foreach { e =>
  items += e
  println("len = " + items.length) //1. here length is ok
}

println("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
items.foreach { x => print(x)} //2. this code doesn't be executed

日志在这里:

16/11/20 01:16:52 INFO Utils: Successfully started service 'SparkUI' on port 4040.
    16/11/20 01:16:52 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://192.168.56.1:4040
    16/11/20 01:16:53 INFO Executor: Starting executor ID driver on host localhost
    16/11/20 01:16:53 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 58608.
    16/11/20 01:16:53 INFO NettyBlockTransferService: Server created on 192.168.56.1:58608
    16/11/20 01:16:53 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 192.168.56.1, 58608)
    16/11/20 01:16:53 INFO BlockManagerMasterEndpoint: Registering block manager 192.168.56.1:58608 with 347.1 MB RAM, BlockManagerId(driver, 192.168.56.1, 58608)
    16/11/20 01:16:53 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 192.168.56.1, 58608)
    Starting app
    16/11/20 01:16:57 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 139.6 KB, free 347.0 MB)
    16/11/20 01:16:58 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 15.9 KB, free 346.9 MB)
    16/11/20 01:16:58 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 192.168.56.1:58608 (size: 15.9 KB, free: 347.1 MB)
    16/11/20 01:16:58 INFO SparkContext: Created broadcast 0 from textFile at main.scala:25
    16/11/20 01:16:58 INFO FileInputFormat: Total input paths to process : 1
    16/11/20 01:16:58 INFO SparkContext: Starting job: foreach at main.scala:28
    16/11/20 01:16:58 INFO DAGScheduler: Got job 0 (foreach at main.scala:28) with 1 output partitions
    16/11/20 01:16:58 INFO DAGScheduler: Final stage: ResultStage 0 (foreach at main.scala:28)
    16/11/20 01:16:58 INFO DAGScheduler: Parents of final stage: List()
    16/11/20 01:16:58 INFO DAGScheduler: Missing parents: List()
    16/11/20 01:16:58 INFO DAGScheduler: Submitting ResultStage 0 (dataSet.txt MapPartitionsRDD[1] at textFile at main.scala:25), which has no missing parents
    16/11/20 01:16:58 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 3.3 KB, free 346.9 MB)
    16/11/20 01:16:58 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 2034.0 B, free 346.9 MB)
    16/11/20 01:16:58 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 192.168.56.1:58608 (size: 2034.0 B, free: 347.1 MB)
    16/11/20 01:16:58 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:1012
    16/11/20 01:16:59 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 0 (dataSet.txt MapPartitionsRDD[1] at textFile at main.scala:25)
    16/11/20 01:16:59 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
    16/11/20 01:16:59 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, partition 0, PROCESS_LOCAL, 5427 bytes)
    16/11/20 01:16:59 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
    16/11/20 01:16:59 INFO HadoopRDD: Input split: file:/D:/dataSet.txt:0+291
    16/11/20 01:16:59 INFO deprecation: mapred.tip.id is deprecated. Instead, use mapreduce.task.id
    16/11/20 01:16:59 INFO deprecation: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id
    16/11/20 01:16:59 INFO deprecation: mapred.task.is.map is deprecated. Instead, use mapreduce.task.ismap
    16/11/20 01:16:59 INFO deprecation: mapred.task.partition is deprecated. Instead, use mapreduce.task.partition
    16/11/20 01:16:59 INFO deprecation: mapred.job.id is deprecated. Instead, use mapreduce.job.id
    len = 1
    len = 2
    len = 3
    len = 4
    len = 5
    len = 6
    len = 7
    16/11/20 01:16:59 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 989 bytes result sent to driver
    16/11/20 01:16:59 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 417 ms on localhost (1/1)
    16/11/20 01:16:59 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 
    16/11/20 01:16:59 INFO DAGScheduler: ResultStage 0 (foreach at main.scala:28) finished in 0,456 s
    16/11/20 01:16:59 INFO DAGScheduler: Job 0 finished: foreach at main.scala:28, took 0,795126 s
    !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
    16/11/20 01:16:59 INFO SparkContext: Invoking stop() from shutdown hook
    16/11/20 01:16:59 INFO SparkUI: Stopped Spark web UI at http://192.168.56.1:4040
    16/11/20 01:16:59 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
    16/11/20 01:16:59 INFO MemoryStore: MemoryStore cleared
    16/11/20 01:16:59 INFO BlockManager: BlockManager stopped
    16/11/20 01:16:59 INFO BlockManagerMaster: BlockManagerMaster stopped
    16/11/20 01:16:59 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
    16/11/20 01:16:59 INFO SparkContext: Successfully stopped SparkContext
    16/11/20 01:16:59 INFO ShutdownHookManager: Shutdown hook called
    16/11/20 01:16:59 INFO ShutdownHookManager: Deleting directory

Apache Spark 不提供共享内存,因此这里:

dataSet.foreach { e =>
  items += e
  println("len = " + items.length) //1. here length is ok
}

你修改一个本地副本 of items在各自的执行者上。原本的items驱动程序上定义的列表不会被修改。结果是这样的:

items.foreach { x => print(x) }

执行,但没有任何内容可打印。

请检查了解闭包

虽然这里建议这样做,但您可以将项目替换为累加器

val acc = sc.collectionAccumulator[String]("Items")
dataSet.foreach(e => acc.add(e))
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Scala Spark,列表缓冲区为空 的相关文章

随机推荐

  • HTML5 自定义数据属性在 IE 6 中“有效”吗?

    自定义数据属性 http dev w3 org html5 spec Overview html embedding custom non visible data 当我说 工作 时 我的意思是 如果我有这样的 HTML div 将执行以下
  • 可以同时使用AVCaptureVideoDataOutput和AVCaptureMovieFileOutput吗?

    我想用我的代码同时录制视频和抓取帧 我在用AVCaptureVideoDataOutput用于抓取框架和AVCaptureMovieFileOutput用于视频录制 但在同时工作但单独工作时无法工作并收到错误代码 12780 我搜索了这个问
  • 当新的视图控制器被推送时,如何通过画外音读出标题?

    在设置应用程序中 如果我双击 常规 行 它推动通用视图控制器 然后它说 一般的 当前视图控制器的名称 Then 设置 后退按钮 所选项目的名称 但是在我的应用程序中带有自定义self navigaitonItem titleView它只说
  • 是否可以使用 Doxygen、Sandcastle 或其他文档生成器来记录 XML?

    我目前正在使用 Sandcastle Doxygen 和 JavaDoc 为我编写的代码生成文档 是否可以使用这些包来记录 XML 模式 如果没有 是否有任何 最好是免费的 软件包可以做到这一点 我可以自己编写文档作为 Doxygen 或
  • 下拉列表所选项目文本始终返回第一个项目文本

    我正在使用此代码从数据库填充下拉列表 public void fillcountry BL obj new BL DataSet ds obj dss select from Country drplistcountry DataSourc
  • 如何在DataGridViewComboBoxColumn中设置SelectedIndex?

    我正在使用 datagridview 因为我正在使用 datagridviewcomboboxcolumn 组合框列正在显示文本 但问题是我想默认选择组合框列的第一项 我该怎么做 DataGridViewComboBoxColumn dgv
  • android 放大后如何画圆

    我正在使用图像视图并使用layerDrawable 绘制覆盖图像 我使用了两个位图 original 和 myBitmap 缩放后 我无法在正确的位置绘制圆圈 它是在不同的位置绘制的 这是我正在使用的代码 ImageView view Im
  • 为什么Hibernate无法解析STRING?

    我刚刚下载了新版本的 Hibernate 4 0 1 最终版本 很奇怪的是 当我输入以下代码时 Hibernate STRING Eclipse IDE 显示 STRING 关键字错误 我确信我的构建路径中有 hibernate jar 并
  • 分析图像的颜色

    我剪掉了部分图像 并通过 12 个轨迹栏定义了 2 个颜色范围 H S L 我还有一个 精度 速度 滑块 范围从 1 到 10 我需要分析图像有多少像素属于每个指定的颜色范围 根据精度 速度滑块 我跳过一些行 像素 它工作得很好 但太慢了
  • PHP 中的 == 运算符具有传递性吗?

    在 JavaScript 中 运算符不一定是传递的 js gt 0 0 true js gt 0 true js gt 0 false PHP 中也是这样吗 你能给个例子吗 No the 运算符不具有传递性 完全相同的场景在 PHP 中给出
  • 将字符串拆分为基于单词长度的列表 C#

    我有一串用空格分隔的单词 如何根据单词长度将字符串拆分为单词列表 Example input aa aaa aaaa bb bbb bbbb cc ccc cccc cccc bbb bb aa output List 1 aa bb cc
  • 为操作编写基于 Python 的自定义梯度函数? (没有 C++ 实现)

    我正在尝试为 my op 编写一个自定义梯度函数 为了示例的目的 该函数仅包含对 tf identity 的调用 理想情况下 它可以是任何图形 import tensorflow as tf from tensorflow python f
  • 如何使用 R 允许用户进行多个输入?

    例如 如果我需要用户指定矩阵的行数和列数 提示 行数 用户输入 数字 我需要 R 等待 输入 然后将 一个数字 保存到变量v1中 下一个 提示 列数 用户输入 另一个号码 同时将 另一个数字 保存到变量v2中 最后 我将有两个变量 v1 v
  • DENSE_RANK 根据特定顺序

    您好 我有一个数据表 我想根据排序的日期顺序输出从第一组名称开始的名称的密集排名 例如 DROP TABLE MyTable SELECT INTO MyTable FROM VALUES 2015 12 23 ccc 2015 12 21
  • 抑制 Java 中已弃用的导入警告

    在 Java 中 如果导入已弃用的类 import SomeDeprecatedClass 您会收到此警告 The type SomeDeprecatedClass is deprecated 有没有办法抑制这个警告 为了避免警告 做not
  • 如何在express Node js中获取删除请求的参数

    我对nodejs比较陌生 如何获取Delete请求传递的value参数 我正在使用节点express js 感谢advs 您可以使用 req body 来获取您发送的值 eg router delete test function req
  • HTML Canvas - 圆圈周围的虚线描边

    我确实知道在画布上渲染点划线没有原生支持 但我已经看到人们能够为此提供支持的聪明方法 我想知道是否有任何方法可以翻译它以允许在形状 特别是圆形 周围渲染点状笔划 最简单的方法使用context setLineDash ctx setLine
  • nodejs socket.io 无法连接到服务器?

    我整个下午都在盯着node js 和socket io 示例 我正在尝试拼凑一个简单的页面 它会告诉我有多少用户连接到了服务器 我已阅读以下文档http socket io 以及这里的一些教程 问题准确地概述了我正在尝试做的事情 我也发现了
  • 用户看不到 mysql 工作台中的数据库

    我正在尝试使用 Spring Roo 来学习本教程 所以 我所做的是 转到 mysql 工作台 创建一个名为 pizzashop 的新模式 通过服务器管理帐户访问本地主机 在 用户和权限 中创建一个名为 pizzashop 和密码 pizz
  • Scala Spark,列表缓冲区为空

    在注释 1 中的这段代码中 列表缓冲区项的长度正确显示 但在第二条注释中代码永远不会执行 为什么会出现这种情况呢 val conf new SparkConf setAppName app setMaster local val sc ne