当使用 RDD 从大型 C* 表中提取少量分区时,我们可以使用:
val rdd = … // rdd including partition data
val data = rdd.repartitionByCassandraReplica(keyspace, tableName)
.joinWithCassandraTable(keyspace, tableName)
我们是否有使用 DataFrames 的同样有效的方法?
更新(2017 年 4 月 26 日):
为了更具体,我准备了一个例子。
我在 Cassandra 有 2 个表:
CREATE TABLE ids (
id text,
registered timestamp,
PRIMARY KEY (id)
)
CREATE TABLE cpu_utils (
id text,
date text,
time timestamp,
cpu_util int,
PRIMARY KEY (( id, date ), time)
)
第一个包含有效 ID 列表,第二个包含 CPU 利用率数据。我想有效地获得每个的平均CPU利用率id在表中ids有一天,说“2017-04-25”。
据我所知,使用 RDD 最有效的方法如下:
val sc: SparkContext = ...
val date = "2017-04-25"
val partitions = sc.cassandraTable(keyspace, "ids")
.select("id").map(r => (r.getString("id"), date))
val data = partitions.repartitionByCassandraReplica(keyspace, "cpu_utils")
.joinWithCassandraTable(keyspace, "cpu_utils")
.select("id", "cpu_util").values
.map(r => (r.getString("id"), (r.getDouble("cpu_util"), 1)))
// aggrData in form: (id, (avg(cpu_util), count))
// example row: ("718be4d5-11ad-4849-8aab-aa563c9c290e",(6,723))
val aggrData = data.reduceByKey((a, b) => (
1d * (a._1 * a._2 + b._1 * b._2) / (a._2 + b._2),
a._2 + b._2))
aggrData.foreach(println)
这种方法大约需要 5 秒才能完成(在我的本地计算机上使用 Spark 设置,在某些远程服务器上使用 Cassandra 设置)。使用它,我对表 cpu_utils 中不到 1% 的分区执行操作。
对于数据框,这是我当前使用的方法:
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
val date = "2017-04-25"
val partitions = sqlContext.read.format("org.apache.spark.sql.cassandra")
.options(Map("table" -> "ids", "keyspace" -> keyspace)).load()
.select($"id").withColumn("date", lit(date))
val data: DataFrame = sqlContext.read.format("org.apache.spark.sql.cassandra")
.options(Map("table" -> "cpu_utils", "keyspace" -> keyspace)).load()
.select($"id", $"cpu_util", $"date")
val dataFinal = partitions.join(data, partitions.col("id").equalTo(data.col("id")) and partitions.col("date").equalTo(data.col("date")))
.select(data.col("id"), data.col("cpu_util"))
.groupBy("id")
.agg(avg("cpu_util"), count("cpu_util"))
dataFinal.show()
然而,这种方法似乎将整个表 cpu_utils 加载到内存中,因为这里的执行时间相当长(几乎 1 分钟)。
我问是否存在一种更好的使用 Dataframes 的方法,即使不能比上面提到的 RDD 方法表现得更好,至少也能达到这样的效果?
P.s.:我使用的是 Spark 1.6.1。