PySpark + Cassandra:获取分区键的不同值

2024-03-05

我正在尝试获取 pyspark 中 cassandra 表的分区键的不同值。然而,pyspark似乎不理解我并完全迭代所有数据(很多)而不是查询索引。

这是我使用的代码,对我来说看起来非常简单:

from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Spark! This town not big enough for the two of us.") \
    .getOrCreate()

ct = spark.read\
    .format("org.apache.spark.sql.cassandra")\
    .options(table="avt_sensor_data", keyspace="ipe_smart_meter")\
    .load()

all_sensors = ct.select("machine_name", "sensor_name")\
    .distinct() \
    .collect()

列“machine_name”和“sensor_name”一起构成分区键(完整架构见下文)。在我看来,这应该是超级快的,事实上,如果我在 cql 中执行这个查询,只需要几秒钟:

select distinct machine_name,sensor_name from ipe_smart_meter.avt_sensor_data;

然而,spark 作业大约需要 10 个小时才能完成。从 Spark 告诉我的计划来看,它看起来确实想迭代所有数据:

== Physical Plan ==
*HashAggregate(keys=[machine_name#0, sensor_name#1], functions=[], output=[machine_name#0, sensor_name#1])
+- Exchange hashpartitioning(machine_name#0, sensor_name#1, 200)
   +- *HashAggregate(keys=[machine_name#0, sensor_name#1], functions=[], output=[machine_name#0, sensor_name#1])
      +- *Scan org.apache.spark.sql.cassandra.CassandraSourceRelation@2ee2f21d [machine_name#0,sensor_name#1] ReadSchema: struct<machine_name:string,sensor_name:string>

我不是专家,但这对我来说看起来不像“使用 cassandra 索引”。

我究竟做错了什么?有没有办法告诉 Spark 委托从 cassandra 获取不同值的任务?任何帮助将不胜感激!

如果这有帮助,这里是底层 cassandra 表的架构描述:

CREATE KEYSPACE ipe_smart_meter WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '2'}  AND durable_writes = true;

CREATE TABLE ipe_smart_meter.avt_sensor_data (
    machine_name text,
    sensor_name text,
    ts timestamp,
    id bigint,
    value double,
    PRIMARY KEY ((machine_name, sensor_name), ts)
) WITH CLUSTERING ORDER BY (ts DESC)
    AND bloom_filter_fp_chance = 0.01
    AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}
    AND comment = '[PRODUCTION] Table for raw data from AVT smart meters.'
    AND compaction = {'class': 'org.apache.cassandra.db.compaction.DateTieredCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4'}
    AND compression = {'chunk_length_in_kb': '64', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'}
    AND crc_check_chance = 1.0
    AND dclocal_read_repair_chance = 0.1
    AND default_time_to_live = 0
    AND gc_grace_seconds = 864000
    AND max_index_interval = 2048
    AND memtable_flush_period_in_ms = 0
    AND min_index_interval = 128
    AND read_repair_chance = 0.0
    AND speculative_retry = '99PERCENTILE';

看来自动 cassandra 服务器端下推谓词仅在选择、过滤或排序时才起作用。

https://github.com/datastax/spark-cassandra-connector/blob/master/doc/14_data_frames.md https://github.com/datastax/spark-cassandra-connector/blob/master/doc/14_data_frames.md

所以,如果你的distinct(),spark 获取所有行,然后,distinct().

解决方案1

你说你的cqlselect distinct...已经是超快了。我猜分区键的数量相对较少(machine_name 和sensor_name 的组合),但“ts”却有很多。

因此,最简单的解决方案就是使用 cql(例如,卡桑德拉驱动程序 https://datastax.github.io/python-driver/).

解决方案2

由于 cassandra 是一个查询优先的数据库,只需再创建一个表,该表仅包含不同查询所需的分区键。

CREATE TABLE ipe_smart_meter.avt_sensor_name_machine_name (
    machine_name text,
    sensor_name text,
    PRIMARY KEY ((machine_name, sensor_name))
);

然后,每次在原始表中插入一行时,都将 machine_name 和sensor_name 插入到新表中。 由于它只有分区键,因此这是一个自然鲜明表供您查询。只需获取所有行即可。也许超快。无需明确的过程。

解决方案3

我认为解决方案2是最好的。但是,如果您不想对一条记录进行两次插入,另一种解决方案是更改表并创建一张物化视图表。

CREATE TABLE ipe_smart_meter.ipe_smart_meter.avt_sensor_data (
    machine_name text,
    sensor_name text,
    ts timestamp,
    id bigint,
    value double,
    dist_hint_num smallint,
    PRIMARY KEY ((machine_name, sensor_name), ts)
) WITH CLUSTERING ORDER BY (ts DESC)
;

CREATE MATERIALIZED VIEW IF NOT EXISTS ipe_smart_meter.avt_sensor_data_mv AS
  SELECT
    machine_name
    ,sensor_name
    ,ts
    ,dist_hint_num
  FROM ipe_smart_meter.avt_sensor_data
  WHERE
    machine_name IS NOT NULL
    AND sensor_name IS NOT NULL
    AND ts IS NOT NULL
    AND dist_hint_num IS NOT NULL
  PRIMARY KEY ((dist_hint_num), machine_name, sensor_name, ts)
  WITH
  AND CLUSTERING ORDER BY (machine_name ASC, sensor_name DESC, ts DESC)
;

The dist_hint_num列用于限制查询迭代和分布记录的分区总数。

例如,从 0 到 15。 随机整数random.randint(0, 15)或基于哈希的整数hash_func(machine_name + sensor_name) % 16没问题。 然后,当您查询如下时。 cassandra 仅从 16 个分区获取所有记录,这可能比您当前的情况更有效。

但是,无论如何,必须读取所有记录,然后distinct()(发生随机播放)。不节省空间。我认为这不是一个好的解决方案。

functools.reduce(
    lambda df, dist_hint_num: df.union(
        other=spark_session.read.format(
            'org.apache.spark.sql.cassandra',
        ).options(
            keyspace='ipe_smart_meter',
            table='avt_sensor_data_mv',
        ).load().filter(
            col('dist_hint_num') == expr(
                f'CAST({dist_hint_num} AS SMALLINT)'
            )
        ).select(
            col('machine_name'),
            col('sensor_name'),
        ),
    ),
    range(0, 16),
    spark_session.createDataFrame(
        data=(),
        schema=StructType(
            fields=(
                StructField(
                    name='machine_name',
                    dataType=StringType(),
                    nullable=False,
                ),
                StructField(
                    name='sensor_name',
                    dataType=StringType(),
                    nullable=False,
                ),
            ),
        ),
    ),
).distinct().persist().alias(
    'df_all_machine_sensor',
)
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

PySpark + Cassandra:获取分区键的不同值 的相关文章

随机推荐

  • Spring MVC 控制器返回 HTML

    我在尝试将 HTML 返回到 Spring MVC 控制器时遇到问题 它看起来像这样 RequestMapping value QUESTION GROUP CREATE URL method RequestMethod POST publ
  • Node.js 上的多个服务器

    我需要在同一个 Nodejs 实例上模拟四个服务器 具有不同的主机和端口 一个例子可以是 域1 8000 域2 8010 域名3 8020 域4 8030 有人可以帮我吗 谢谢 我添加了一个示例 其中包含使用节点的 2 个服务器的可能解决方
  • Java 8 中的多个 null 检查

    我有下面的代码 对于多个空检查来说有点难看 String s null if str1 null s str1 else if str2 null s str2 else if str3 null s str3 else s str4 所以
  • Scala 类型系统的优点

    我正在探索 Scala 语言 我经常听到的一个说法是 Scala 有一个stronger类型系统优于 Java 我认为人们的意思是 scalac拒绝某些有缺陷的程序javac会愉快地编译 只会导致运行时错误 某些不变量可以在 Scala 程
  • Windows批处理文件:多个if条件

    有没有办法说类似的话 if 1 1 or 1 2 在批处理文件中 或者 如果我可以指定一组候选值 例如 if 1 in 1 2 3 4 20 事实证明 and 很简单 只是不是你期望的语法 这3个例子就说明了这一点 换句话说 If 1 1
  • iPhone - UIImageView 中的中心 UIImage

    我有一个 UIImageView 我从 URL 获取 UIImage 图像显示在 UIImageView 中 但我无法使其正确居中 UIImage 为 80 x 68 像素 UIImaveView 的大小为 90 x 90 当我在 UIIm
  • 如何使用来自不同 ViewController 的 JSON 响应填充 UITableView?

    我在 stackoverflow com 上进行了谷歌搜索 但无法摆脱这种情况 如何使用 GET 的响应来填充 TableView 我发送 GET 并在 DetailViewController 我的主控制器 中的方法中解析响应 并希望使用
  • 选择 LINQ 中 JOIN 后的所有列

    我有两张桌子 Table1 and Table2 我想执行左外连接 var myOutput from object1 in Table1 join object2 in Table2 on object1 Property1 equals
  • 批处理文件日/月/年语法?

    我找不到用于提取当前日 月 年的批处理文件语法的简单细分 我有以下语法来声明用作目录名称的变量 set folder date 10 4 date 7 2 date 4 2 任何人都可以阐明 或发布链接 波浪号 双百分比的含义吗 我似乎无法
  • 在 Android 4.x 上 touchmove 后 Touchend 未触发?

    我正在用 Javascript 编写一些代码 如下所示 var el document getElementById some div el ontouchstart function e el innerHTML touch start
  • 处理具有共同属性但不同对象类型的对象的方法

    我有大量自动生成的对象 尽管它们都是不同的 不相关的类 但所有对象都共享一些基本属性 名称 id 等 我无法控制这些对象的生成 因此不幸的是我无法采取实现接口的理想方法 我想创建一种方法 在其中传递这些对象中的任意一个 并使用这些公共属性执
  • 在 Intellij 中查找 lombok 生成的构造函数的用法

    我有一个带有 lombok 注释的类 Value or Data 并且我正在寻找一种简单的方法来查找自动生成的构造函数的用法 我现在能做的就是找到构造函数的一种用法 将光标放在那里并运行 查找用法 命令 然后我得到我想要的结果 不过我想直接
  • Zurb Foundation _global.scss js 元样式?

    我当时正在开发一个 Foundation 5 项目 结果发现该项目有一个过时的 global scss 组件 我试图得到范围滑块 http foundation zurb com docs components range slider h
  • 如何使用 pymongo 获取 mongo 实例中所有数据库的列表

    如何使用 pymongo 将 mongo 实例中所有数据库的列表获取到变量 例如使用 pymongo 将以下命令发送到 mongo 实例 db adminCommand listDatabases 1 Use 数据库名称 https api
  • 装饰器在 Nest 控制器中返回 404

    我正在使用 NestJS 开发后端 顺便说一句 这很棒 我有一个 标准获取实体情况的单个实例 类似于下面的示例 Controller user export class UserController constructor private
  • 为什么在全新项目(xcode 3.1.4)上不调用 dealloc?

    我开始学习 iPhone 编程 这显然是一个非常简单的问题 我在 xcode 3 1 4 中工作 现在 当我创建一个基于窗口的应用程序的新项目并修改 dealloc 在 AppDelegate m 文件中 以便它实际上在控制台上生成打印语句
  • 使用带有无限参数的函数进行柯里化

    假设我有以下 add 函数 它接受无限数量的参数 function add var total 0 var args Array prototype slice call arguments 0 for var i 0 i
  • 如何在Javascript中计算两个日期之间的年和月?

    有没有办法计算 Javascript 中两个不同日期之间的年数 也考虑闰年 和月份数 这是我知道的获得年份和月份的最佳方法 Assumes Date From df and Date To dt are valid etc var df n
  • 集成 Paytm 支付网关 Android

    我正在尝试将 paytm 的支付网关集成到我的 Android 应用程序中 我似乎在他们的网站上找不到任何适当的文件或程序 我已经通过谷歌检查了这方面的每一条线索 但没有帮助 尝试通过邮件和电话联系 Paytm 团队 没有回复 现在我被困在
  • PySpark + Cassandra:获取分区键的不同值

    我正在尝试获取 pyspark 中 cassandra 表的分区键的不同值 然而 pyspark似乎不理解我并完全迭代所有数据 很多 而不是查询索引 这是我使用的代码 对我来说看起来非常简单 from pyspark sql import