Cassandra/Spark 显示大表的条目计数不正确

2024-01-25

我正在尝试使用 Spark 处理大型 cassandra 表(约 4.02 亿条目和 84 列),但得到的结果不一致。最初的要求是将一些列从该表复制到另一个表。复制数据后,我注意到新表中的一些条目丢失了。为了验证我是否对大型源表进行了计数,但每次我都得到不同的值。我在一个较小的表(约 700 万条记录)上尝试了查询,结果很好。

最初,我尝试使用 pyspark 进行计数。这是我的 pyspark 脚本:

spark = SparkSession.builder.appName("Datacopy App").getOrCreate() 
df = spark.read.format("org.apache.spark.sql.cassandra").options(table=sourcetable, keyspace=sourcekeyspace).load().cache() 
df.createOrReplaceTempView("data") 
query = ("select count(1) from data " ) 
vgDF = spark.sql(query) 
vgDF.show(10)

Spark提交命令如下:

~/spark-2.1.0-bin-hadoop2.7/bin/spark-submit --master spark://10.128.0.18:7077 --packages datastax:spark-cassandra-connector:2.0.1-s_2.11 --conf spark.cassandra.connection.host="10.128.1.1,10.128.1.2,10.128.1.3" --conf "spark.storage.memoryFraction=1" --conf spark.local.dir=/media/db/ --executor-memory 10G --num-executors=6 --executor-cores=2 --total-executor-cores 18 pyspark_script.py

上述 Spark 提交过程大约需要 90 分钟才能完成。我运行了三遍,这是我得到的计数:

  • Spark迭代1:402273852
  • Spark迭代2:402273884
  • Spark迭代3:402274209

Spark在整个过程中没有显示任何错误或异常。我在 cqlsh 中运行相同的查询三次,并再次得到不同的结果:

  • Cqlsh迭代1:402273598
  • Cqlsh迭代2:402273499
  • Cqlsh迭代3:402273515

我无法找出为什么我从同一查询中得到不同的结果。 Cassandra 系统日志 (/var/log/cassandra/system.log) 仅显示一次以下错误消息:

ERROR [SSTableBatchOpen:3] 2018-02-27 09:48:23,592 CassandraDaemon.java:226 - Exception in thread Thread[SSTableBatchOpen:3,5,main]
java.lang.AssertionError: Stats component is missing for sstable /media/db/datakeyspace/sensordata1-acfa7880acba11e782fd9bf3ae460699/mc-58617-big
        at org.apache.cassandra.io.sstable.format.SSTableReader.open(SSTableReader.java:460) ~[apache-cassandra-3.9.jar:3.9]
        at org.apache.cassandra.io.sstable.format.SSTableReader.open(SSTableReader.java:375) ~[apache-cassandra-3.9.jar:3.9]
        at org.apache.cassandra.io.sstable.format.SSTableReader$4.run(SSTableReader.java:536) ~[apache-cassandra-3.9.jar:3.9]
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[na:1.8.0_131]
        at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[na:1.8.0_131]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) ~[na:1.8.0_131]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_131]
        at java.lang.Thread.run(Thread.java:748) [na:1.8.0_131]

版本:

  • 卡桑德拉 3.9。
  • 火花2.1.0。
  • Datastax 的 Spark-cassandra-connector 2.0.1
  • 斯卡拉版本 2.11

Cluster:

  • Spark 设置有 3 个工作节点和 1 个主节点。
  • 3 个工作节点还安装了 cassandra 集群。
  • 每个工作节点有 8 个 CPU 核心和 40 GB RAM。

任何帮助将不胜感激。


Spark Cassandra 连接器默认读取一致性为“LOCAL_ONE”,默认写入一致性为“LOCAL_QUORUM”,因此可以在使用默认值进行完全修复之前读取部分数据。对于写入数据失败的节点,您可以读取“ONE”,但这不是错误,因为其他 2 个副本成功。因此,您应该将两个级别设置为 QUORUM,或者将其中之一设置为 ALL

config("spark.cassandra.input.consistency.level", "LOCAL_QUORUM").
config("spark.cassandra.output.consistency.level", "LOCAL_QUORUM").

默认的 CQL shell 级别也是 ONE,因此您还应该增加它:

cqlsh> CONSISTENCY QUORUM
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Cassandra/Spark 显示大表的条目计数不正确 的相关文章

随机推荐

  • CasperJS 中的文件 IO

    是否可以在 CasperJS 脚本中读 写文件 var fs require fs var data fs readFileSync testdata data utf 8 console log data Calling casperjs
  • 将海量数据插入Mysql数据库的最快方法[重复]

    这个问题在这里已经有答案了 我实际上有一个包含 100 000 条记录的列表 我想将其插入 MySQL 数据库中 我尝试将它们插入foreach和简单的INSERT INTO然而 即使插入 100 行也需要花费很多时间 比如 1 秒 行 有
  • 我怎样才能执行这个聚合?

    我装了两张表 一张是客户 另一张是订单 select from customers id Name age adress salary 2 102 jpj 24 zzzz 10000 3 103 ftd 20 kkkk 20000 4 10
  • 在PHP中对多维数组进行排序的最快方法

    Array 0 gt Array t gt 81881 b gt 99494 1 gt Array 我有一个像上面这样的多维数组 这个数组中的条目最大可达 20k 我想对这个数组进行排序 到 t 索引而不调用任何外部函数 任何提高效率的建议
  • 由于“INFO Spawnerr:为“app_name”制作调度程序时出现未知错误:EACCES”,无法使用 nohup 启动服务

    我正在尝试与主管一起启动服务 但收到错误消息 信息spawnerr 为 app name 制作调度程序时出现未知错误 EACCES 这是我的supervisord conf 文件 supervisord logfile tmp superv
  • 在类定义之外定义显式专用类的成员函数

    我看到与模板相关的错误 编译器是 Visual Studio 2012 但我不明白 这是代码 归结为要点 Templated class generic template
  • 使用 Windows Azure 队列锁定队列中的消息

    我正在使用 Windows Azure 消息队列 我想知道是否有一种方法可以在收到消息时锁定队列中的消息 当您从队列中检索消息时 它被标记为无形的直到您删除它 或达到超时期限 当它被标记为不可见时 其他人都看不到该消息 我想这已经接近 锁定
  • java中使用数字作为包名

    我已经检查了以下帖子 https docs oracle com javase specs jls se7 html jls 6 html jls 6 2 https docs oracle com javase specs jls se7
  • 在 j2me 中读取收件箱中的短信

    我如何阅读收件箱中的短信 我想阅读短信 没有短信等 我可以在 j2me 中做吗 如果可以的话怎么做 我想在诺基亚和索尼上运行该应用程序 我相信您无法直接从收件箱读取短信 但是 您可以运行 j2me 应用程序并等待传入 的短信 换句话说 如果
  • Firebase Auth:手动检测当前用户最近是否经过身份验证

    默认情况下 如果用户 X 年前登录 Firebase 将要求他 她通过以下方式重新进行身份验证reauthenticateWithCredential以完成该动作 如果用户在继续下一步操作之前已经符合重新身份验证的资格 是否可以查询 Fir
  • 尝试避免使用 sapply 进行 for 循环(对于 gsub)

    尽量避免使用for使用以下代码循环sapply 如果可能的话 带循环的解决方案对我来说非常适合 我只是想学习更多 R 并探索尽可能多的方法 目标 有一个向量i和两个向量sf 搜索 和rp 代替 对于每个i需要循环sf并替换为rp哪里匹配 i
  • 带有 Ignited-Datatables 库的服务器端 DataTables

    如何使用 Ignited Datatables 库来服务器端 DataTables 我的应用程序使用 CodeIgniter 我使用的库是Ignited datatables 库 https github com IgnitedDatata
  • SOLR计数多值字段查询

    是否可以创建一个 solr 查询 其中仅返回在多值字段中具有多个条目的文档 例如 docs id 1 myfield hello word hello stackoverflow id 2 myfield hello word 我天真的示例
  • 将数值向量中的 NA 替换为从邻居计算出的值

    我正在尝试写一个替换 函数将给定数值向量中的每个缺失值替换为算术平均值 of 它的前面和后面的元素 例如 如果c 5 NA 6 2 3 5 6 4 NA 2 NA 5 给出 那么结果应该是c 5 5 5 6 2 3 5 6 4 3 2 3
  • 如何在 printf 函数中使用宏

    所以我在我的头文件中定义了这个宏和其他一些宏 define COL1WIDTH 16 我想用它来打印这样的东西 word 25 Dir1 FileB 129 Sat Jan 1 00 00 02 2011 12 1 x4 2 x2 3 x2
  • ASP.NET 中的 jQuery ajax 带有 customErrors mode="On"

    知道如何检索服务器端在执行时抛出的原始异常使用 jQuery 调用 ajax 并使用 自定义错误模式 开 在 web config 中 如果 mode Off 我可以使用此函数获取错误 error function xhr status e
  • C++ 中的重定向

    include
  • TDD 如何应用于基于 Django 类的通用视图?

    由于 Django 中基于类的通用视图涉及框架的一些工作 我发现很难以 TDD 风格使用它们 现在 我使用 TestClient 从 http 模拟堆栈访问视图 但我更愿意在使用 TestClient 进行 功能 测试之前正确地对特定方法
  • 何时在 Linq 中使用 Cast() 和 OfType()

    我知道有两种将类型转换为IEnumerable从一个Arraylist在 Linq 中并想知道在什么情况下使用它们 e g IEnumerable
  • Cassandra/Spark 显示大表的条目计数不正确

    我正在尝试使用 Spark 处理大型 cassandra 表 约 4 02 亿条目和 84 列 但得到的结果不一致 最初的要求是将一些列从该表复制到另一个表 复制数据后 我注意到新表中的一些条目丢失了 为了验证我是否对大型源表进行了计数 但