仅从某些 Cassandra 分区检索数据时,Spark (Scala) 中的 DataFrames 是否有 joinWithCassandraTable 的替代方案?

2024-01-08

当使用 RDD 从大型 C* 表中提取少量分区时,我们可以使用:

val rdd = …  // rdd including partition data
val data = rdd.repartitionByCassandraReplica(keyspace, tableName)
    .joinWithCassandraTable(keyspace, tableName)

我们是否有使用 DataFrames 的同样有效的方法?

更新(2017 年 4 月 26 日):

为了更具体,我准备了一个例子。

我在 Cassandra 有 2 个表:

CREATE TABLE ids (
   id text,
   registered timestamp,
   PRIMARY KEY (id)
)

CREATE TABLE cpu_utils (
   id text,
   date text,
   time timestamp,
   cpu_util int,
   PRIMARY KEY (( id, date ), time)
)

第一个包含有效 ID 列表,第二个包含 CPU 利用率数据。我想有效地获得每个的平均CPU利用率id在表中ids有一天,说“2017-04-25”。

据我所知,使用 RDD 最有效的方法如下:

val sc: SparkContext = ...
val date = "2017-04-25"
val partitions = sc.cassandraTable(keyspace, "ids")
  .select("id").map(r => (r.getString("id"), date))

val data = partitions.repartitionByCassandraReplica(keyspace, "cpu_utils")
  .joinWithCassandraTable(keyspace, "cpu_utils")
  .select("id", "cpu_util").values
  .map(r => (r.getString("id"), (r.getDouble("cpu_util"), 1)))

// aggrData in form: (id, (avg(cpu_util), count))
// example row: ("718be4d5-11ad-4849-8aab-aa563c9c290e",(6,723))
val aggrData = data.reduceByKey((a, b) => (
  1d * (a._1 * a._2 + b._1 * b._2) / (a._2 + b._2), 
  a._2 + b._2))

aggrData.foreach(println)

这种方法大约需要 5 秒才能完成(在我的本地计算机上使用 Spark 设置,在某些远程服务器上使用 Cassandra 设置)。使用它,我对表 cpu_utils 中不到 1% 的分区执行操作。

对于数据框,这是我当前使用的方法:

val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
val date = "2017-04-25"

val partitions = sqlContext.read.format("org.apache.spark.sql.cassandra")
  .options(Map("table" -> "ids", "keyspace" -> keyspace)).load()
  .select($"id").withColumn("date", lit(date))

val data: DataFrame = sqlContext.read.format("org.apache.spark.sql.cassandra")
  .options(Map("table" -> "cpu_utils", "keyspace" -> keyspace)).load()
  .select($"id", $"cpu_util", $"date")

val dataFinal = partitions.join(data, partitions.col("id").equalTo(data.col("id")) and partitions.col("date").equalTo(data.col("date")))
  .select(data.col("id"), data.col("cpu_util"))
  .groupBy("id")
  .agg(avg("cpu_util"), count("cpu_util"))

dataFinal.show()

然而,这种方法似乎将整个表 cpu_utils 加载到内存中,因为这里的执行时间相当长(几乎 1 分钟)。

我问是否存在一种更好的使用 Dataframes 的方法,即使不能比上面提到的 RDD 方法表现得更好,至少也能达到这样的效果?

P.s.:我使用的是 Spark 1.6.1。


None

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

仅从某些 Cassandra 分区检索数据时,Spark (Scala) 中的 DataFrames 是否有 joinWithCassandraTable 的替代方案? 的相关文章

随机推荐

  • 为什么单击对附加元素不起作用?

    我想使用 jQuery 将一些 html 元素从一个容器无休止地移动到另一个容器append函数 但是当我单击已附加的元素时 单击事件将不再触发 基于与我类似的一些线程 我发现附加元素已从其事件侦听器中剥离 我怎样才能避免这种情况 有人可以
  • 将本地核心数据移动到 iCloud

    如何在已使用本地存储 Core Data 的应用程序中启用 iCloud Core Data 我尝试过使用NSPersistentStoreUbiquitousContentNameKey在我的持久存储选项中 不幸的是 此选项启用 iClo
  • 为什么找不到主类?

    我有一个非常简单的代码 package mygame public class RunGame public static void main String args System out println args 0 我可以编译该代码 但
  • 如何向 Swift Playground 添​​加 UIButton?

    所以我打开了 Playground 我只想添加一个简单的 UIButton 或简单的 UIView 用于测试目的 但我无法让它显示 这是我到目前为止所拥有的 import UIKit var uiButton UIButton button
  • 使弹性项目采用内容宽度,而不是父容器的宽度

    我有一个容器 div with display flex 它有一个孩子 a 我怎样才能让孩子出现 内联 具体来说 如何使子级的宽度由其内容决定 而不扩展到父级的宽度 我尝试过的 我将孩子设置为display inline flex 但它仍然
  • .NET SDK 安装不正确

    我在安装 NET SDK 时遇到问题 首先当我进入 Visual Studio 2019 时 它说我缺少 dotnet Runtime sdk 所以我按照它的要求安装了它并重新启动了我的计算机 然后我再次进入 Visual Studio 2
  • 拦截 iOS 上的崩溃

    描述 我想捕获 iOS 应用程序中发生的所有异常并将它们记录到文件中 并最终将它们发送到应用程序使用的后端服务器 我一直在阅读有关此主题的内容 并发现了设备发送的信号的用法并对其进行了处理 但我不确定它是否会违反应用程序商店审查指南 或者可
  • 带悬停的 CSS 动态导航 - 如何使其在 iOS Safari 中工作?

    在我的网站中 我使用纯 CSS 动态菜单 这在桌面浏览器中没问题 但在 iOS iphone ipad 等 上不行 因为触摸界面不支持 hover选择器 我的问题是 在 iOS 上支持此功能的最佳方式是什么 理想情况下 可以通过修补一些 C
  • 将键值对添加到 JavaScript 中的对象数组中?

    如果我有一个这样的数组 var myarray myarray push Name Adam Age 33 myarray push Name Emily Age 32 这给了我一个数组 我可以在其中提取值 例如myarray 0 Name
  • 为什么在使用花括号初始化列表时首选 std::initializer_list 构造函数?

    考虑代码 include
  • 在 Android 中解析大型 XML 文件

    我正在尝试解析一个相当大的 XML 文件 1MB 但我遇到了一些困难 我首先尝试将 xml 文件添加到 res xml 并使用 XmlResourceParser 解析它 但出现异常 数据超出 UNCOMPRESS DATA MAX 经过一
  • 如何在 firebug 和 chrome 调试器中查看附加到 :hover 和其他伪类的样式

    我知道一定有办法做到这一点 而且我一直在解决这个问题 但是 有什么方法可以查看 和 或编辑 应用于元素的伪类样式吗 例如 我想编辑 myclass hover or someid active在调试器中 附 我真的更关心如何在 chrome
  • 在 Web 服务中接收 JSON 数组作为参数

    我正在使用在 Visual Basic NET 3 5 中编程的 Web 服务来接收从其他应用程序发送的 JSON 数组 我正在发送一个如下所示的 JSON 字符串 idRecoleccion 1 PIN 553648138 idRecol
  • Jquery 悬停时淡出

    我需要一些 jquery 的帮助才能实现淡出效果 这是我的代码 http jsfiddle net PPpnT 25 http jsfiddle net PPpnT 25 当您将鼠标悬停在图像上时 图像需要淡出并显示下面的红色 当您将鼠标移
  • 在 Jenkins 中找不到私有 git 子模块

    问题 我正在尝试在 Jenkins 中构建我的应用程序 它位于 Github 上的私有存储库上 还有一个私有子模块 我可以通过设置凭证来克隆 Jenkins 中的私有存储库 但 Jenkins 无法克隆子模块 这是失败构建的输出 Start
  • 如何在 MySQL 中将字符串列读取为列表?

    我有一张桌子 它有两列class id and student 学生列是学生列表 学生列的数据类型是varchar 我想编写一个 SQL 查询 返回行 其中学生列是较大列表的子集 例如 A B C D E F G class id stud
  • Moodle 中个人资料图片的路径?

    我正在 Moodle Web 应用程序中编程一些东西 并正在考虑检索用户个人资料图像的路径 我以为我可以在数据库中的某个位置找到路径 但我只找到 mdl user picture 和 mdl user imagealt 所以实际上我知道谁上
  • 便携式图书馆中的计时器

    我在便携式库 Windows 应用商店中找不到计时器 针对 net 4 5 和 Windows Store aka Metro 有人知道如何创建某种计时事件吗 我需要某种秒表 所以应该每秒刷新一次左右 Update 我们已在 Visual
  • Flutter 应用在​​ Android 12 上启动时崩溃

    我在 Google Play 上发布了一个应用程序 并且经常使用 在上次更新中 我将compileSdkVersion和targetSdkVersion都发布到了31 我发现 Android 版本低于 12 的用户在使用应用程序时没有任何问
  • 仅从某些 Cassandra 分区检索数据时,Spark (Scala) 中的 DataFrames 是否有 joinWithCassandraTable 的替代方案?

    当使用 RDD 从大型 C 表中提取少量分区时 我们可以使用 val rdd rdd including partition data val data rdd repartitionByCassandraReplica keyspace