维表join的几种方式
一 将维表预加载到内存关联
实现方式:
定义一个类实现RichFlatMapFunction在open()方法中读取全部数据加载到内存中。
优缺点:
因为存在内存中,所以仅支持小数据量维表;因为open方法中读取,所以维表变化需要重启作业。
![](https://img-blog.csdnimg.cn/20200314222814396.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2xvbmdsb3ZlZmlsbQ==,size_16,color_FFFFFF,t_70)
二 通过Distributed Cache分发本地维度文件到task manager后加载到内存关联
实现方式:
通过env.registerCachedFile注册文件
实现RichFunction在open()方法中通过RuntimeContext获取cache文件
解析使用文件数据
优缺点:
不需要外部数据库
支持的数据量小,更新维表配置文件需要重启作业
public class BatchDemoDisCache {
public static void main(String[] args) throws Exception{
//获取运行环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
//1:注册一个文件,可以使用hdfs或者s3上的文件
env.registerCachedFile("d:\\discache.txt","a.txt");
DataSource<String> data = env.fromElements("a", "b", "c", "d");
DataSet<String> result = data.map(new RichMapFunction<String, String>() {
private ArrayList<String> dataList = new ArrayList<String>();
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
//2:使用文件
File myFile = getRuntimeContext().getDistributedCache().getFile("a.txt");
List<String> lines = FileUtils.readLines(myFile);
for (String line : lines) {
this.dataList.add(line);
System.out.println("discache:" + line);
}
}
@Override
public String map(String value) throws Exception {
//在这里就可以使用dataList
return value;
}
});
result.print();
}
}
三 热存储关联查询
实现方式:
将维度数据导入到热存储redis hbase es等,通过异步IO的方式查询,利用cache机制将维度数据缓存在内存。
优缺点:
维度数据不受限与内存,支持较多维度数据
维度更新结果可能有延迟,而且对外部存储的压力较大
四 广播维表
实现方式:
利用broadcast State将维度数据流广播到下游做join
将维度数据发送到kakfa作为广播原始流S1
定义状态描述符MapStateDescriptor 调用S1.broadcast()获得broadCastStream S2
调用非广播流S3.connect(S2),得到BroadcastConnectedStream S4
应用混合流的S4.process(),并在KeyedBroadcastProcessFunction/BroadcastProcessFunction实现关联处理逻辑
优缺点:
维度数据实时更新
数据保存在内存中,支持维表数据量比较小。
![](https://img-blog.csdnimg.cn/2020031422264748.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2xvbmdsb3ZlZmlsbQ==,size_16,color_FFFFFF,t_70)