出于测试目的,我配置了一个 4 节点集群,每个节点都有一个 Spark Worker 和一个 MongoDB Shard。这些是详细信息:
- 四台 Debian 9 服务器(名为 Visa0、Visa 1、Visa 2、Visa)
- 4 个节点上的 Spark(v2.4.0) 集群(visa1:主节点,visa0..3:从节点)
- MongoDB (v3.2.11) 分片集群有 4 个节点(visa1..3 上的配置服务器副本集,visa1 上的 mongos,分片服务器:visa0..3 )
- 我正在使用通过“spark-shell --packages”安装的 MongoDB Spark 连接器
org.mongodb.spark:mongo-spark-connector_2.11:2.4.0"
配置 SparkSession 时MongoShardedPartitioner
,尽管数据帧架构已正确获取,但从数据库加载的每个数据帧都是空的。
这是在配置完成时复制的spark-defaults.conf
文件或与.config("spark.mongodb.input.partitioner" ,"MongoShardedPartitioner")
在 SparkSession 构建器中。
With MongoShardedPartitioner
, df.count() == 0:
./pyspark --master "spark://visa1:7077" --packages "org.mongodb.spark:mongo-spark-connector_2.11:2.4.0"
...
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 2.4.0
/_/
Using Python version 3.5.3 (default, Sep 27 2018 17:27:03)
SparkSession available as 'spark'.
>>> spark2 = SparkSession \
... .builder \
... .appName("myApp") \
... .config("spark.mongodb.input.partitioner" ,"MongoShardedPartitioner") \
... .getOrCreate()
>>>
>>> df2 = spark2.read.format("com.mongodb.spark.sql.DefaultSource") \
... .option("uri", "mongodb://visa1/email.emails") \
... .option("pipeline", '[ {"$match": {"mailbox": /^\/root\/pst_export\/albert_meyers_000_1_1.export/}} ]') \
... .load()
>>>
>>> df2.count()
0
但无需指定分区程序即可正常工作:
./pyspark --master "spark://visa1:7077" --packages "org.mongodb.spark:mongo-spark-connector_2.11:2.4.0"
...
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 2.4.0
/_/
Using Python version 3.5.3 (default, Sep 27 2018 17:27:03)
SparkSession available as 'spark'.
>>> spark2 = SparkSession \
... .builder \
... .appName("myApp") \
... .getOrCreate()
>>>
>>> df2 = spark2.read.format("com.mongodb.spark.sql.DefaultSource") \
... .option("uri", "mongodb://visa1/email.emails") \
... .option("pipeline", '[ {"$match": {"mailbox": /^\/root\/pst_export\/albert_meyers_000_1_1.export/}} ]') \
... .load()
2019-01-07 22:7:33 WARN Utils:66 - Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.debug.maxToStringFields' in SparkEnv.conf.
>>>
>>> df2.count()
1162
问题:
- 我如何知道默认配置的是哪个分区程序?
- How can
MongoShardedPartitioner
在这个场景中使用?
提前致谢
2019 年 1 月 13 日:建议的解决方法
正如下面的回答,似乎MongoShardedPartitioner
不支持哈希索引作为分片索引。但是,我需要一个哈希索引来将块均匀地分布在我的节点上,与时间无关(我猜使用 _id 会按时间顺序分布)。
我的解决方法是使用计算出的日期存储桶的 md5 哈希值在数据库中创建一个新字段,对其建立索引(作为普通索引),并将其用作分片索引。
现在,代码可以正常工作:
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 2.4.0
/_/
Using Python version 3.5.3 (default, Sep 27 2018 17:25:39)
SparkSession available as 'spark'.
>>>
>>>
>>> spark2 = SparkSession \
... .builder \
... .appName("myApp") \
... .config("spark.mongodb.input.partitioner" ,"MongoShardedPartitioner") \
... .config("spark.mongodb.input.partitionerOptions.shardkey", "datebuckethash") \
... .getOrCreate()
>>>
>>>
>>> df2 = spark2.read.format("com.mongodb.spark.sql.DefaultSource") \
... .option("uri", "mongodb://visa1/email.emails") \
... .option("pipeline", '[ {"$match": {"mailbox": /^\/root\/pst_export\/albert_meyers_000_1_1.export/}} ]') \
... .load()
2019-01-13 11:19:31 WARN Utils:66 - Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.debug.maxToStringFields' in SparkEnv.conf.
>>>
>>> df2.count()
1162