Spark 将每个操作执行两次

2024-01-11

我创建了一个简单的 Java 应用程序,它使用 Apache Spark 从 Cassandra 检索数据,对其进行一些转换并将其保存在另一个 Cassandra 表中。

我正在使用 Apache Spark 1.4.1,它配置为独立集群模式,具有单个主服务器和从服务器,位于我的计算机上。

DataFrame customers = sqlContext.cassandraSql("SELECT email, first_name, last_name FROM customer " +
    "WHERE CAST(store_id as string) = '" + storeId + "'");

DataFrame customersWhoOrderedTheProduct = sqlContext.cassandraSql("SELECT email FROM customer_bought_product " +
    "WHERE CAST(store_id as string) = '" + storeId + "' AND product_id = " + productId + "");

// We need only the customers who did not order the product
// We cache the DataFrame because we use it twice.
DataFrame customersWhoHaventOrderedTheProduct = customers
    .join(customersWhoOrderedTheProduct
    .select(customersWhoOrderedTheProduct.col("email")), customers.col("email").equalTo(customersWhoOrderedTheProduct.col("email")), "leftouter")
    .where(customersWhoOrderedTheProduct.col("email").isNull())
    .drop(customersWhoOrderedTheProduct.col("email"))
    .cache();

int numberOfCustomers = (int) customersWhoHaventOrderedTheProduct.count();

Date reportTime = new Date();

// Prepare the Broadcast values. They are used in the map below.
Broadcast<String> bStoreId = sparkContext.broadcast(storeId, classTag(String.class));
Broadcast<String> bReportName = sparkContext.broadcast(MessageBrokerQueue.report_did_not_buy_product.toString(), classTag(String.class));
Broadcast<java.sql.Timestamp> bReportTime = sparkContext.broadcast(new java.sql.Timestamp(reportTime.getTime()), classTag(java.sql.Timestamp.class));
Broadcast<Integer> bNumberOfCustomers = sparkContext.broadcast(numberOfCustomers, classTag(Integer.class));

// Map the customers to a custom class, thus adding new properties.
DataFrame storeCustomerReport = sqlContext.createDataFrame(customersWhoHaventOrderedTheProduct.toJavaRDD()
    .map(row -> new StoreCustomerReport(bStoreId.value(), bReportName.getValue(), bReportTime.getValue(), bNumberOfCustomers.getValue(), row.getString(0), row.getString(1), row.getString(2))), StoreCustomerReport.class);


// Save the DataFrame to cassandra
storeCustomerReport.write().mode(SaveMode.Append)
    .option("keyspace", "my_keyspace")
    .option("table", "my_report")
    .format("org.apache.spark.sql.cassandra")
    .save();

正如你所看到的,我cache the customersWhoHaventOrderedTheProductDataFrame,之后我执行count并打电话toJavaRDD.

By my calculations these actions should be executed only once. But when I go in the Spark UI for the current job I see the following stages: enter image description here

正如您所看到的,每个动作都会执行两次。

难道我做错了什么?有什么设置我错过了吗?

任何想法都将不胜感激。


EDIT:

我打电话后System.out.println(storeCustomerReport.toJavaRDD().toDebugString());

这是调试字符串:

(200) MapPartitionsRDD[43] at toJavaRDD at DidNotBuyProductReport.java:93 []
  |   MapPartitionsRDD[42] at createDataFrame at DidNotBuyProductReport.java:89 []
  |   MapPartitionsRDD[41] at map at DidNotBuyProductReport.java:90 []
  |   MapPartitionsRDD[40] at toJavaRDD at DidNotBuyProductReport.java:89 []
  |   MapPartitionsRDD[39] at toJavaRDD at DidNotBuyProductReport.java:89 []
  |   MapPartitionsRDD[38] at toJavaRDD at DidNotBuyProductReport.java:89 []
  |   ZippedPartitionsRDD2[37] at toJavaRDD at DidNotBuyProductReport.java:89 []
  |   MapPartitionsRDD[31] at toJavaRDD at DidNotBuyProductReport.java:89 []
  |   ShuffledRDD[30] at toJavaRDD at DidNotBuyProductReport.java:89 []
  +-(2) MapPartitionsRDD[29] at toJavaRDD at DidNotBuyProductReport.java:89 []
     |  MapPartitionsRDD[28] at toJavaRDD at DidNotBuyProductReport.java:89 []
     |  MapPartitionsRDD[27] at toJavaRDD at DidNotBuyProductReport.java:89 []
     |  MapPartitionsRDD[3] at cache at DidNotBuyProductReport.java:76 []
     |  CassandraTableScanRDD[2] at RDD at CassandraRDD.scala:15 []
  |   MapPartitionsRDD[36] at toJavaRDD at DidNotBuyProductReport.java:89 []
  |   ShuffledRDD[35] at toJavaRDD at DidNotBuyProductReport.java:89 []
  +-(2) MapPartitionsRDD[34] at toJavaRDD at DidNotBuyProductReport.java:89 []
     |  MapPartitionsRDD[33] at toJavaRDD at DidNotBuyProductReport.java:89 []
     |  MapPartitionsRDD[32] at toJavaRDD at DidNotBuyProductReport.java:89 []
     |  MapPartitionsRDD[5] at cache at DidNotBuyProductReport.java:76 []
     |  CassandraTableScanRDD[4] at RDD at CassandraRDD.scala:15 []

EDIT 2:

因此,经过一些研究并结合试验和错误,我设法优化了这项工作。

我创建了一个 RDDcustomersWhoHaventOrderedTheProduct我在调用之前缓存它count()行动。 (我将缓存从DataFrame to the RDD).

之后我用这个RDD来创建storeCustomerReport DataFrame.

JavaRDD<Row> customersWhoHaventOrderedTheProductRdd = customersWhoHaventOrderedTheProduct.javaRDD().cache();

现在各个阶段如下所示:

正如你所看到的两个count and cache现在已经消失了,但仍然有两个“javaRDD”操作。我不知道他们从哪里来,正如我所说的toJavaRDD我的代码中只出现过一次。


看起来您正在下面的代码段中应用两个操作

// Map the customers to a custom class, thus adding new properties.
DataFrame storeCustomerReport = sqlContext.createDataFrame(customersWhoHaventOrderedTheProduct.toJavaRDD()
    .map(row -> new StoreCustomerReport(bStoreId.value(), bReportName.getValue(), bReportTime.getValue(), bNumberOfCustomers.getValue(), row.getString(0), row.getString(1), row.getString(2))), StoreCustomerReport.class);


// Save the DataFrame to cassandra
storeCustomerReport.write().mode(SaveMode.Append)
    .option("keyspace", "my_keyspace")

One at sqlContext.createDataFrame()另一个在storeCustomerReport.write()而这两者都需要customersWhoHaventOrderedTheProduct.toJavaRDD().

持久化 生成的 RDD 应该可以解决这个问题。

JavaRDD cachedRdd = customersWhoHaventOrderedTheProduct.toJavaRDD().persist(StorageLevel.DISK_AND_MEMORY) //Or any other storage level
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Spark 将每个操作执行两次 的相关文章

  • Spring应用中Eureka健康检查的问题

    我正在开发一个基于 Spring 的应用程序 其中包含多个微服务 我的一个微服务充当尤里卡服务器 到目前为止一切正常 在我所有其他微服务中 用 EnableEurekaClient 我想启用这样的健康检查 应用程序 yml eureka c
  • Junit:如何测试从属性文件读取属性的方法

    嗨 我有课ReadProperty其中有一个方法ReadPropertyFile返回类型的Myclass从属性文件读取参数值并返回Myclass目的 我需要帮助来测试ReadPropertyFile方法与JUnit 如果可能的话使用模拟文件
  • 动态选择端口号?

    在 Java 中 我需要获取端口号以在同一程序的多个实例之间进行通信 现在 我可以简单地选择一些固定的数字并使用它 但我想知道是否有一种方法可以动态选择端口号 这样我就不必打扰我的用户设置端口号 这是我的一个想法 其工作原理如下 有一个固定
  • 如何使用assertEquals 和 Epsilon 在 JUnit 中断言两个双精度数?

    不推荐使用双打的assertEquals 我发现应该使用带有Epsilon的形式 这是因为双打不可能100 严格 但无论如何我需要比较两个双打 预期结果和实际结果 但我不知道该怎么做 目前我的测试如下 Test public void te
  • 过滤两次 Lambda Java

    我有一个清单如下 1 2 3 4 5 6 7 和 预期结果必须是 1 2 3 4 5 6 7 我知道怎么做才能到7点 我的结果 1 2 3 4 5 6 我也想知道如何输入 7 我添加了i gt i objList size 1到我的过滤器
  • HSQL - 识别打开连接的数量

    我正在使用嵌入式 HSQL 数据库服务器 有什么方法可以识别活动打开连接的数量吗 Yes SELECT COUNT FROM INFORMATION SCHEMA SYSTEM SESSIONS
  • Pig Udf 显示结果

    我是 Pig 的新手 我用 Java 编写了一个 udf 并且包含了一个 System out println 其中的声明 我必须知道在 Pig 中运行时该语句在哪里打印 假设你的UDF 扩展了 EvalFunc 您可以使用从返回的 Log
  • 如何获取之前的URL?

    我需要调用我的网络应用程序的 URL 例如 如果有一个从 stackoverflow com 到我的网站 foo com 的链接 我需要 Web 应用程序 托管 bean 中的 stackoverflow 链接 感谢所有帮助 谢谢 并不总是
  • 帮助将图像从 Servlet 获取到 JSP 页面 [重复]

    这个问题在这里已经有答案了 我目前必须生成一个显示字符串文本的图像 我需要在 Servlet 上制作此图像 然后以某种方式将图像传递到 JSP 页面 以便它可以显示它 我试图避免保存图像 而是以某种方式将图像流式传输到 JSP 自从我开始寻
  • 如何在用户输入数据后重新运行java代码

    嘿 我有一个基本的java 应用程序 显示人们是成年人还是青少年等 我从java开始 在用户输入年龄和字符串后我找不到如何制作它它们被归类为 我希望它重新运行整个过程 以便其他人可以尝试 的节目 我一直在考虑做一个循环 但这对我来说没有用
  • Spring Boot Data JPA 从存储过程接收多个输出参数

    我尝试通过 Spring Boot Data JPA v2 2 6 调用具有多个输出参数的存储过程 但收到错误 DEBUG http nio 8080 exec 1 org hibernate engine jdbc spi SqlStat
  • 如何对不同的参数类型使用相同的java方法?

    我的问题 我有 2 个已定义的记录 创建对象请求 更新对象请求 必须通过实用方法进行验证 由于这两个对象具有相同的字段 因此可以对这两种类型应用相同的验证方法 现在我只是使用两种方法进行重载 但它很冗长 public record Crea
  • 尝试将 Web 服务部署到 TomEE 时出现“找不到...的 appInfo”

    我有一个非常简单的项目 用于培训目的 它是一个 RESTful Web 服务 我使用 js css 和 html 创建了一个客户端 我正在尝试将该服务部署到 TomEE 这是我尝试部署时遇到的错误 我在这里做错了什么 刚刚遇到这个问题 我曾
  • java for windows 中的文件图标叠加

    我正在尝试像 Tortoise SVN 或 Dropbox 一样在文件和文件夹上实现图标叠加 我在网上查了很多资料 但没有找到Java的解决方案 Can anyone help me with this 很抱歉确认您的担忧 但这无法在 Ja
  • java.io.Serialized 在 C/C++ 中的等价物是什么?

    C C 的等价物是什么java io Serialized https docs oracle com javase 7 docs api java io Serializable html 有对序列化库的引用 用 C 序列化数据结构 ht
  • 如何使用 jUnit 将测试用例添加到套件中?

    我有 2 个测试类 都扩展了TestCase 每个类都包含一堆针对我的程序运行的单独测试 如何将这两个类 以及它们拥有的所有测试 作为同一套件的一部分执行 我正在使用 jUnit 4 8 在 jUnit4 中你有这样的东西 RunWith
  • Cucumber 0.4.3 (cuke4duke) 与 java + maven gem 问题

    我最近开始为 Cucumber 安装一个示例项目 并尝试使用 maven java 运行它 我遵循了这个指南 http www goodercode com wp using cucumber tests with maven and ja
  • 如何使用mockito模拟构建器

    我有一个建造者 class Builder private String name private String address public Builder setName String name this name name retur
  • 如何将双精度/浮点四舍五入为二进制精度?

    我正在编写对浮点数执行计算的代码的测试 不出所料 结果很少是准确的 我想在计算结果和预期结果之间设置一个容差 我已经证实 在实践中 使用双精度 在对最后两位有效小数进行四舍五入后 结果始终是正确的 但是usually四舍五入最后一位小数后
  • 如何防止在Spring Boot单元测试中执行import.sql

    我的类路径中有一个 import sql 文件 其中包含一些 INSERT 语句 当使用 profile devel 运行我的应用程序时 它的数据被加载到 postgres 数据库中 到目前为止一切正常 当使用测试配置文件执行测试时 imp

随机推荐

  • 无法解析方法,为什么?

    方法setDateListener DateListener dl 无法解决 它是公共的 我在包含该方法的 DatePickerFragment java 类的对象上使用它 这里是onCreateView 片段中的方法setDateList
  • C++ 刷新缓冲区

    我知道这里有很多缓冲区问题 但我似乎找不到明确的答案 std cout lt lt write to screen lt lt std endl 我知道这段代码会因为 endl 而写入屏幕并刷新缓冲区 但如果我这样写 std cout lt
  • 如何创建参数化 SQL 查询?我为什么要?

    我听说 每个人 都在使用参数化 SQL 查询来防止 SQL 注入攻击 而不必验证每一条用户输入 你怎么做到这一点 使用存储过程时会自动获取此信息吗 所以我的理解这是非参数化的 cmdText String Format SELECT foo
  • Function 对象是否必要

    创建如下函数是常见且容易的 var f function alert something 那么为什么会有函数对象 like var f new Function alert something 后者很难写 读 我只能想到一种情况 即有人在网
  • Ant在表单中设计DatePicker

    我在用着DatePicker组件来自antd在表单内并想要更改默认值onChange and value中的道具数DatePicker但它不起作用
  • Oauth2 与 Postman 和 IdentityServer4

    我正在尝试在我的 Identity Server 4 上使用 Postman 进行身份验证 它适用于 Net Code 2 但我最近更新到 Net Core 3 并进行了调整 我可以打开登录页面 可以登录 但无法正确重定向 停留在登录页面上
  • Telerik 和 jquery

    我正在开发一个从客户那里收到的应用程序 他们使用了一些 Telerik Web 控件 Telerik 显然包含它自己的 jquery 1 3 2 版本 而我使用的是 1 4 1 我遇到了一些奇怪的 javascript 问题 我想排除旧的
  • 具有默认参数值的 C# 方法不会生成无参数重载?

    最近 我想向扩展方法添加一个可选参数 原来的方法是这样的 public static class Extensions public static bool Foo this IFoo target target DoIt true 这显然
  • QString::split() 和“\r”、“\n”和“\r\n”约定

    我明白那个QString split应该用来获得QStringList从多行QString 但是如果我有一个文件并且我不知道它是来自 Mac Windows 还是 Unix 我不确定是否QString split n 在所有情况下都会很好地
  • 基类的类型不完整

    我有一个基类Point我从中继承Point3D 然而 由于某种原因 班级Point必须始终返回Point3D为操作add 所以我将其包含在我的包含中 这是我的班级Point ifndef POINT H define POINT H inc
  • CodeIgniter 罐身份验证

    我正在利用这个名为坦克验证 http www konyukhov com soft tank auth 希望这个问题不会太小众 因为它特定于这个库 我正在尝试如何在用户注册后立即登录 看来这个库不提供这个功能 我不想开始搞乱这个库 但也许有
  • 从 Django 视图启动 Scrapy

    我使用Scrapy的经验有限 每次使用它总是通过终端的命令 如何从 django 模板获取表单数据 要抓取的 url 以与 scrapy 通信以开始抓取 到目前为止 我只想到从django的视图中获取表单返回的数据 然后尝试进入scrapy
  • require.js +backbone.js:如何构建具有初始化功能的模块?

    我有一个包含三个页面的应用程序 它们是单页界面 这些页面具有相似但不相同的功能 所以我想要有提供通用功能的 javascript 模块 然后每个页面可以定制 覆盖部分通用功能 我使用的是backbone js 所以我要做的是 加载包含常见模
  • 使用Python登录Google帐户进入网站[关闭]

    Closed 这个问题不符合堆栈溢出指南 help closed questions 目前不接受答案 我正在制作一个网站 该网站制作分组中人数的图表 来自 www codecamy com 为了实现这一目标 我制定了一个计划 我将有一个服务
  • HTTP 状态代码 200,但页面未加载 Node.js Socket.io -- Node.js Socket.io 教程、Daniel Nill、fs.readFile()、socket.html

    了解 node js 和 socket io 并进行操作Daniel Nill 的本教程 http www danielnill com blog nodejs tutorial with socketio 服务器启动没有问题 但是 当我导
  • 如何从codeigniter中的视图调用控制器函数?

    使用 codeigniter 我有一个控制器 如下所示
  • Android AccessibilityNodeInfo刷新()和回收()

    我已经阅读了 android 文档https developer android com reference android view accessibility AccessibilityNodeInfo html https devel
  • 当我在 OnChange 事件中调用 Delete 时,为什么会出现 RichEdit 行插入错误?

    我已经用谷歌搜索并检查了很多地方来寻找解决方案 但我发现的所有情况都不同或涉及比简单地添加或删除行更高级的东西 基本上 我想进行一种滚动丰富的编辑 替代方法是将插入符号移动到底部 我已经找到了解决方案 我正在向其中添加行并检查Lines C
  • 如何将变量/参数写入标准输出?

    我正在尝试调试我的第一个Bicep https learn microsoft com en us azure azure resource manager bicep overview模板 如何将变量或参数值写入标准输出 就像是 var
  • Spark 将每个操作执行两次

    我创建了一个简单的 Java 应用程序 它使用 Apache Spark 从 Cassandra 检索数据 对其进行一些转换并将其保存在另一个 Cassandra 表中 我正在使用 Apache Spark 1 4 1 它配置为独立集群模式