看来自动 cassandra 服务器端下推谓词仅在选择、过滤或排序时才起作用。
https://github.com/datastax/spark-cassandra-connector/blob/master/doc/14_data_frames.md https://github.com/datastax/spark-cassandra-connector/blob/master/doc/14_data_frames.md
所以,如果你的distinct()
,spark 获取所有行,然后,distinct()
.
解决方案1
你说你的cqlselect distinct...
已经是超快了。我猜分区键的数量相对较少(machine_name 和sensor_name 的组合),但“ts”却有很多。
因此,最简单的解决方案就是使用 cql(例如,卡桑德拉驱动程序 https://datastax.github.io/python-driver/).
解决方案2
由于 cassandra 是一个查询优先的数据库,只需再创建一个表,该表仅包含不同查询所需的分区键。
CREATE TABLE ipe_smart_meter.avt_sensor_name_machine_name (
machine_name text,
sensor_name text,
PRIMARY KEY ((machine_name, sensor_name))
);
然后,每次在原始表中插入一行时,都将 machine_name 和sensor_name 插入到新表中。
由于它只有分区键,因此这是一个自然鲜明表供您查询。只需获取所有行即可。也许超快。无需明确的过程。
解决方案3
我认为解决方案2是最好的。但是,如果您不想对一条记录进行两次插入,另一种解决方案是更改表并创建一张物化视图表。
CREATE TABLE ipe_smart_meter.ipe_smart_meter.avt_sensor_data (
machine_name text,
sensor_name text,
ts timestamp,
id bigint,
value double,
dist_hint_num smallint,
PRIMARY KEY ((machine_name, sensor_name), ts)
) WITH CLUSTERING ORDER BY (ts DESC)
;
CREATE MATERIALIZED VIEW IF NOT EXISTS ipe_smart_meter.avt_sensor_data_mv AS
SELECT
machine_name
,sensor_name
,ts
,dist_hint_num
FROM ipe_smart_meter.avt_sensor_data
WHERE
machine_name IS NOT NULL
AND sensor_name IS NOT NULL
AND ts IS NOT NULL
AND dist_hint_num IS NOT NULL
PRIMARY KEY ((dist_hint_num), machine_name, sensor_name, ts)
WITH
AND CLUSTERING ORDER BY (machine_name ASC, sensor_name DESC, ts DESC)
;
The dist_hint_num
列用于限制查询迭代和分布记录的分区总数。
例如,从 0 到 15。 随机整数random.randint(0, 15)
或基于哈希的整数hash_func(machine_name + sensor_name) % 16
没问题。
然后,当您查询如下时。 cassandra 仅从 16 个分区获取所有记录,这可能比您当前的情况更有效。
但是,无论如何,必须读取所有记录,然后distinct()
(发生随机播放)。不节省空间。我认为这不是一个好的解决方案。
functools.reduce(
lambda df, dist_hint_num: df.union(
other=spark_session.read.format(
'org.apache.spark.sql.cassandra',
).options(
keyspace='ipe_smart_meter',
table='avt_sensor_data_mv',
).load().filter(
col('dist_hint_num') == expr(
f'CAST({dist_hint_num} AS SMALLINT)'
)
).select(
col('machine_name'),
col('sensor_name'),
),
),
range(0, 16),
spark_session.createDataFrame(
data=(),
schema=StructType(
fields=(
StructField(
name='machine_name',
dataType=StringType(),
nullable=False,
),
StructField(
name='sensor_name',
dataType=StringType(),
nullable=False,
),
),
),
),
).distinct().persist().alias(
'df_all_machine_sensor',
)