年龄段
import bean.HBaseMeta
import org.apache.spark.SparkContext
import org.apache.spark.sql._
// 关联 不仅仅是一个相同的
// 可以 一个与两个之间
object AgeTag {
//inType=HBase##zkHosts=192.168.10.20##zkPort=2181##hbaseTable=tbl_users##family=detail##selectFields=id,birthday
// 年龄段 以搞定
def main(args: Array[String]): Unit = {
//1 创建sparksql
val spark: SparkSession = SparkSession.builder.appName("AgeTag").master("local[*]").getOrCreate
val sc: SparkContext = spark.sparkContext
sc.setLogLevel("WARN")
//2 连接MySQL
//2 连接MySQL数据库
val url = "jdbc:mysql://bd001:3306/tags_new?userUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC&user=root&password=123456"
val table = "tbl_basic_tag"
val properties = new Properties()
val mysqlCoon: DataFrame = spark.read.jdbc(url, table, properties)
//隐式转换
import spark.implicits._
//引入sparkSQL的内置函数
import org.apache.spark.sql.functions._
//3 读取四级标签 为读取HBASE做准备
val fourDS: Dataset[Row] = mysqlCoon.select('rule).where("id=99")
val fourMap: Map[String, String] = fourDS.map(row => {
//使用##切分
row.getAs("rule").toString.split("##")
//再用=切分
.map(line => {
val arr: Array[String] = line.split("=")
(arr(0), arr(1)) // 通过key 获取value
})
}).collectAsList().get(0).toMap
//将fourMap转化为HBaseMeta样例类
var HbaseMeta: HBaseMeta = getHBaseMeta(fourMap)
//println(HbaseMeta.selectFields)
//4 读取五级标签 未匹配职业做准备
val fiveRow: Dataset[Row] = mysqlCoon.select('id, 'rule).where("pid=99")
val fiveDF: DataFrame = fiveRow.map(row => {
val id: String = row.getAs("id").toString
var start = ""
var end = ""
//19800101-19891231
val rule: String = row.getAs("rule").toString
//把rule拆分
val arr: Array[String] = rule.split("-")
if (arr != null && arr.length == 2) {
start = arr(0)
end = arr(1)
}
(id, start, end)
}).toDF("id", "start", "end")
//5 读取HBASE数据
val HBaseDatas: DataFrame = spark.read.format("tools.HBaseDataSource")
.option(HBaseMeta.ZKHOSTS, HbaseMeta.zkHosts)
.option(HBaseMeta.ZKPORT, HbaseMeta.zkPort)
.option(HBaseMeta.HBASETABLE, HbaseMeta.hbaseTable)
.option(HBaseMeta.FAMILY, HbaseMeta.family)
.option(HBaseMeta.SELECTFIELDS, HbaseMeta.selectFields)
.load()
//HBaseDF.show(20)
/*
巧妙处理 日期转为19900101-19991231
hbase中birthday格式为1989-12-31
但是五级标签中格式为19891231
所以需要将yyyy-MM-dd 换为yyyyMMdd
*/
//使用replace方法替换日期格式
val HBaseDF: DataFrame = HBaseDatas.select('id.as("userId"),
// 用udf 函数
regexp_replace('birthday,"-","").as("tagsId"))
//6 与五级标签规则进行匹配,以hbase的 birthday在 五级标签start end之间作为条件进行匹配
val newAgeTag: DataFrame = HBaseDF.join(fiveDF, HBaseDF.col("tagsId")
.between(fiveDF.col("start"), fiveDF.col("end")))
// hbase 的userid 五级标签的id 新的标签的Id
.select('userId.as("userId"), 'id.as("tagsId"))
//自定义函数
val getAllTags = udf((historyTagId: String, newFaceTagsId:String) => {
if (historyTagId == "") {
newFaceTagsId
} else if (newFaceTagsId == "") {
historyTagId
} else if (newFaceTagsId == "" && historyTagId == "") {
""
} else {
//拼接历史数据和新数据(多次运行可能有重复数据)
val alltags: String = historyTagId + "," + newFaceTagsId
//使用,分割去重后返回字符串类型
alltags.split(",").distinct.mkString(",")
// alltags.toSet
}
})
//7 解决标签覆盖问题
//读取标签结果表 追加标签覆盖写入
//a读取test内历史标签数据 已经计算出来的标签
val historyTag: DataFrame = spark.read.format("