Spark:如何通过 python-api 使用 HBase 过滤器,例如 QualiferFilter

2024-03-03

我想通过使用像 python-api 中的 QualiferFilter 这样的过滤器从 HBase 获取行。
我知道如何从 HBase 获取行,就像在代码下一样。

host = 'localhost'
keyConv = "org.apache.spark.examples.pythonconverters.ImmutableBytesWritableToStringConverter"
valueConv = "org.apache.spark.examples.pythonconverters.HBaseResultToStringConverter"
conf = {"hbase.zookeeper.quorum": host, "hbase.mapreduce.inputtable": "user",
                "hbase.mapreduce.scan.columns": "u:uid",
                "hbase.mapreduce.scan.row.start": "1", "hbase.mapreduce.scan.row.stop": "100"}
rdd = sc.newAPIHadoopRDD("org.apache.hadoop.hbase.mapreduce.TableInputFormat",
                             "org.apache.hadoop.hbase.io.ImmutableBytesWritable",
                         "org.apache.hadoop.hbase.client.Result",
                         keyConverter=keyConv, valueConverter=valueConv, conf=conf)

但是,我也想通过使用过滤器来获取行。
我需要添加什么类型的代码?


您好,您可以检查此代码......

def  doYourStuff(row):
     text = row.split("\n")
     data = {} 
     for row in text:
        if json.loads(row)["qualifier"] == "message":
              data["message"] = json.loads(row)["value"]
        if json.loads(row)["qualifier"] == "domain":
              data["domain"] = json.loads(row)["value"]
        data["rowKey"] = json.loads(row)["row"]
      return DoWhatYouWantToDo(data)

    def save_record(rdd):
        host = '172.31.@@.@@'
        table = 'TableName'
        keyConv1 = "org.apache.spark.examples.pythonconverters.StringToImmutableBytesWritableConverter"
        valueConv1 = "org.apache.spark.examples.pythonconverters.StringListToPutConverter"
        conf = {"hbase.zookeeper.quorum": host,
                "hbase.mapred.outputtable": table,
                "mapreduce.outputformat.class": "org.apache.hadoop.hbase.mapreduce.TableOutputFormat",
                "mapreduce.job.output.key.class": "org.apache.hadoop.hbase.io.ImmutableBytesWritable",
                "mapreduce.job.output.value.class": "org.apache.hadoop.io.Writable"}
         rdd.saveAsNewAPIHadoopDataset(
            keyConverter=keyConv1, valueConverter=valueConv1,conf=conf)


    hbaseRdd = hbaseRdd.map(lambda x: x[1])  # message_rdd = hbase_rdd.map(lambda x:x[0]) will give only row-key

    processedRdd = hbaseRdd.map(lambda x: doYourStuff(x))
    save_record(processedRdd)
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Spark:如何通过 python-api 使用 HBase 过滤器,例如 QualiferFilter 的相关文章

随机推荐