Spark Mongo 连接器,MongoShardedPartitioner 不起作用

2024-05-02

出于测试目的,我配置了一个 4 节点集群,每个节点都有一个 Spark Worker 和一个 MongoDB Shard。这些是详细信息:

  • 四台 Debian 9 服务器(名为 Visa0、Visa 1、Visa 2、Visa)
  • 4 个节点上的 Spark(v2.4.0) 集群(visa1:主节点,visa0..3:从节点)
  • MongoDB (v3.2.11) 分片集群有 4 个节点(visa1..3 上的配置服务器副本集,visa1 上的 mongos,分片服务器:visa0..3 )
  • 我正在使用通过“spark-shell --packages”安装的 MongoDB Spark 连接器 org.mongodb.spark:mongo-spark-connector_2.11:2.4.0"

配置 SparkSession 时MongoShardedPartitioner,尽管数据帧架构已正确获取,但从数据库加载的每个数据帧都是空的。

这是在配置完成时复制的spark-defaults.conf文件或与.config("spark.mongodb.input.partitioner" ,"MongoShardedPartitioner")在 SparkSession 构建器中。

With MongoShardedPartitioner, df.count() == 0:

./pyspark --master "spark://visa1:7077" --packages "org.mongodb.spark:mongo-spark-connector_2.11:2.4.0"

...

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.4.0
      /_/

Using Python version 3.5.3 (default, Sep 27 2018 17:27:03)
SparkSession available as 'spark'.
>>> spark2 = SparkSession \
...   .builder \
...   .appName("myApp") \
...   .config("spark.mongodb.input.partitioner" ,"MongoShardedPartitioner") \
...   .getOrCreate()
>>> 
>>> df2 = spark2.read.format("com.mongodb.spark.sql.DefaultSource") \
... .option("uri", "mongodb://visa1/email.emails") \
... .option("pipeline", '[ {"$match": {"mailbox": /^\/root\/pst_export\/albert_meyers_000_1_1.export/}} ]') \
... .load()
>>>                                                                             
>>> df2.count()
0  

但无需指定分区程序即可正常工作:

./pyspark --master "spark://visa1:7077" --packages "org.mongodb.spark:mongo-spark-connector_2.11:2.4.0"

...

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.4.0
      /_/

Using Python version 3.5.3 (default, Sep 27 2018 17:27:03)
SparkSession available as 'spark'.
>>> spark2 = SparkSession \
...   .builder \
...   .appName("myApp") \
...   .getOrCreate()
>>> 
>>> df2 = spark2.read.format("com.mongodb.spark.sql.DefaultSource") \
... .option("uri", "mongodb://visa1/email.emails") \
... .option("pipeline", '[ {"$match": {"mailbox": /^\/root\/pst_export\/albert_meyers_000_1_1.export/}} ]') \
... .load()
2019-01-07 22:7:33 WARN  Utils:66 - Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.debug.maxToStringFields' in SparkEnv.conf.
>>> 
>>> df2.count()
1162  

问题:

  • 我如何知道默认配置的是哪个分区程序?
  • How can MongoShardedPartitioner在这个场景中使用?

提前致谢

2019 年 1 月 13 日:建议的解决方法

正如下面的回答,似乎MongoShardedPartitioner不支持哈希索引作为分片索引。但是,我需要一个哈希索引来将块均匀地分布在我的节点上,与时间无关(我猜使用 _id 会按时间顺序分布)。

我的解决方法是使用计算出的日期存储桶的 md5 哈希值在数据库中创建一个新字段,对其建立索引(作为普通索引),并将其用作分片索引。

现在,代码可以正常工作:

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.4.0
      /_/

Using Python version 3.5.3 (default, Sep 27 2018 17:25:39)
SparkSession available as 'spark'.
>>> 
>>> 
>>> spark2 = SparkSession \
...   .builder \
...   .appName("myApp") \
...   .config("spark.mongodb.input.partitioner" ,"MongoShardedPartitioner") \
...   .config("spark.mongodb.input.partitionerOptions.shardkey", "datebuckethash") \
...   .getOrCreate()
>>> 
>>> 
>>> df2 = spark2.read.format("com.mongodb.spark.sql.DefaultSource") \
... .option("uri", "mongodb://visa1/email.emails") \
... .option("pipeline", '[ {"$match": {"mailbox": /^\/root\/pst_export\/albert_meyers_000_1_1.export/}} ]') \
... .load()

2019-01-13 11:19:31 WARN  Utils:66 - Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.debug.maxToStringFields' in SparkEnv.conf.
>>> 
>>> df2.count()
1162   

很抱歉得知您的连接器有问题。

我如何知道默认配置的是哪个分区程序?

有关分区器的信息可以在Spark 连接器文档站点 https://docs.mongodb.com/spark-connector/current/configuration/#Input-configuration。请在以下位置提交票据文档 jira 项目 https://jira.mongodb.org/projects/DOCS如果您觉得有什么遗​​漏或不清楚,它确实可以帮助未来的用户!

默认分区器是一个薄包装器MongoSamplePartitioner https://docs.mongodb.com/spark-connector/current/configuration/#conf-mongosamplepartitioner。它根据集合的统计采样将集合分成一定大小的分区。

How can MongoShardedPartitioner在这个场景中使用?

The MongoShardedPartitioner https://docs.mongodb.com/spark-connector/current/configuration/#conf-mongoshardedpartitioner使用shardKey生成分区。默认情况下它将使用_id作为钥匙。您可能需要配置该值。

Note:散列分片键是not的支持MongoShardedPartitioner由于目前无法根据哈希值查询集合 - 因此在检索分区时它将无法返回结果。我已经添加DOCS-12345 https://jira.mongodb.org/browse/DOCS-12345更新文档。

您的设置似乎存在问题MongoShardedPartitioner无法按预期对集合进行分区并返回 0 个结果。模式推断仍然有效,因为它查询集合的方式不同。如果不是配置/散列分片键问题,则问题请在Spark jira 项目 https://jira.mongodb.org/projects/SPARK我可以帮助您找出原因并发布修复程序。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Spark Mongo 连接器,MongoShardedPartitioner 不起作用 的相关文章

随机推荐