jav中spark迁移hive到mongo(更新数据)

2023-10-31

业务中的数据库使用的mongo,离线使用spark计算的每天统计指标需要累加到历史指标中,然后将结果写到mongo库中。如果mongo库中已经有这条记录则覆盖,mongo库中没有此记录则为新增。

我们如果根据 MongoSpark.save(),这样的操作无法做到如果mongo库中已经有这条记录则覆盖,mongo库中没有此记录则为新增。

我们需要这样做

1、保证数据形式是Dateset,因为这样的格式才会更新

2、将将结果Dataset<Row>进行Append的形式写入mongo即可,因为mongo的主键是_id,所以要将Row的主键改成_id的列名。

具体操作如下:

Dataset<Row> dataset = sparkSession.sql(sql);
            MongoSpark.save(dataset);
            //因为mongo的主键是_id,所以将mongo的_id的值换成hive的id值
            Dataset<Row>mongoData = dataset.withColumnRenamed("id", "_id");
            Map<String, String> writeOverrides = new HashMap<>();
            writeOverrides.put("collection", targetTable);
            WriteConfig writeConfig =  WriteConfig.create(jc).withOptions(writeOverrides);
            //如果目标位置已经存在数据,那么将数据追加;相同_id的数据会直接覆盖
            MongoSpark.save(mongoData.write().mode(SaveMode.Append), writeConfig);

我们来看看SaveMode.Append这个方法,进入方法内部可以看到这些 

具体的含义分别是这些

 

 就上就可以实现

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

jav中spark迁移hive到mongo(更新数据) 的相关文章

随机推荐