自定义 HBase-MapReduce

2023-10-31

1 hdfs -> table

需求: 从hdfs读取数据,插入到hbase的表中
mapper

public class FruitMapper extends Mapper<LongWritable, Text,LongWritable, Text> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        context.write(key, value);
    }
}

reducer

public class FruitReducer extends TableReducer<LongWritable, Text, NullWritable> {
    @Override
    protected void reduce(LongWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
        for (Text value : values) {
            String[] split = value.toString().split("\t");
            Put put=new Put(Bytes.toBytes(split[0]));
            put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes(split[1]));
            put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("color"), Bytes.toBytes(split[2]));
            context.write(NullWritable.get(), put);
        }
    }
}

driver

public class FruitDriver implements Tool {
    private Configuration conf;
    @Override
    public int run(String[] strings) throws Exception {
        Job job = Job.getInstance(conf);
        job.setJarByClass(FruitDriver.class);

        job.setMapperClass(FruitMapper.class);
        job.setMapOutputKeyClass(LongWritable.class);
        job.setMapOutputValueClass(Text.class);

        TableMapReduceUtil.initTableReducerJob(strings[1], FruitReducer.class, job);

        FileInputFormat.setInputPaths(job, new Path(strings[0]));
        boolean b = job.waitForCompletion(true);
        return b?0:1;
    }

    @Override
    public void setConf(Configuration configuration) {
        this.conf=configuration;
    }

    @Override
    public Configuration getConf() {
        return this.conf;
    }

    public static void main(String[] args) throws Exception {
        Configuration configuration = new Configuration();
        int run = ToolRunner.run(configuration, new FruitDriver(), args);
        System.exit(run);
    }
}

打包上传并执行

  1. mr1.jar: 打包好的jar包,改了个名字
  2. com.cssl.mr1.FruitDriver: main函数所在的全类名
  3. /input_fruit/fruit.tsv: hdfs文件所在位置
  4. fruit1: hbase上的表名
yarn jar mr1.jar com.cssl.mr1.FruitDriver /input_fruit/fruit.tsv fruit1

2 table -> table

需求: 将hbase表中的数据导入到hbase的另一张表中

mapper

public class Fruit2Mapper extends TableMapper<ImmutableBytesWritable, Put> {
    @Override
    protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
        Put outV = new Put(key.get());
        for (Cell cell : value.rawCells()) {
            if ("name".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))) {
                outV.add(cell);
            }
        }
        context.write(key, outV);
    }
}

reducer

public class Fruit2Reducer extends TableReducer<ImmutableBytesWritable, Put, NullWritable> {
    @Override
    protected void reduce(ImmutableBytesWritable key, Iterable<Put> values, Context context) throws IOException, InterruptedException {
        for (Put put : values) {
            context.write(NullWritable.get(), put);
        }

    }
}

driver
这样写是直接可以在本地运行的,需要把hbase的配置文件拷贝到项目的资源目录(有域名映射的话,Windows也需要配)
在这里插入图片描述

public class Fruit2Driver implements Tool {
    private Configuration configuration;
    @Override
    public int run(String[] strings) throws Exception {
        Job job = Job.getInstance(configuration);
        job.setJarByClass(Fruit2Driver.class);

        TableMapReduceUtil.initTableMapperJob("fruit1",
                new Scan(),
                Fruit2Mapper.class,
                ImmutableBytesWritable.class,
                Put.class,
                job);

        TableMapReduceUtil.initTableReducerJob("fruit2",
                Fruit2Reducer.class,
                job);

        boolean b = job.waitForCompletion(true);
        return b?0:1;
    }

    @Override
    public void setConf(Configuration configuration) {
        this.configuration=configuration;
    }

    @Override
    public Configuration getConf() {
        return this.configuration;
    }

    public static void main(String[] args) throws Exception {
        Configuration configuration = HBaseConfiguration.create();
        int run = ToolRunner.run(configuration, new Fruit2Driver(), args);
        System.exit(run);
    }
}

3 自定义协处理器

目标:当客户端往 "student" 中插入数据, 会同时向 "chen:student" 中插入数据

1)编写协处理器

/*
* 协处理器
* 1) 创建类: 继承BaseRegionObserver
* 2) 重写方法: postPut
* 3) 实现逻辑
*   增加student的数据,同时增加chen:student数据
* 4) 将项目打包后上传到hbase中,让hbase可以识别我们的协处理器
* */
public class InsertHbaseCoprocessor extends BaseRegionObserver {
    @Override
    public void postPut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability) throws IOException {
        //获取表
        Table table = e.getEnvironment().getTable(TableName.valueOf("chen:student"));
        //增加数据
        table.put(put);
        //关闭表
        table.close();
    }
}

2)打包上传到hbase的lib目录并重启hbase

3)删除原来的student表,创建新表并指定协处理器

//新增指定协处理器的表
@Test
public void addCoprocessorTable() throws IOException {
    Admin admin = connection.getAdmin();
    TableName tableName = TableName.valueOf("student");
    if (!admin.tableExists(tableName)) {
        //创建表描述对象
        HTableDescriptor hTableDescriptor=new HTableDescriptor(tableName);
        //指定协处理器
        hTableDescriptor.addCoprocessor("com.cssl.InsertHbaseCoprocessor");
        //增加列族
        hTableDescriptor.addFamily(new HColumnDescriptor("info"));
        admin.createTable(hTableDescriptor);
        System.out.println("创建成功~~~");
    }
}

4)测试插入并查看结果


本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

自定义 HBase-MapReduce 的相关文章

随机推荐

  • MQTT vs webSocket协议

    边缘服务器采用了容器和微服务架构 其中重要的一个方面就是要选择一个高效率的消息系统 用于微服务之间的消息交换 为什么选择websocket 协议 modular 2 edge 设计了自己的消息系统base service 它采纳了webso
  • aps是什么意思_APS系统是什么意思?起什么作用

    原标题 APS系统是什么意思 起什么作用 APS系统是什么意思 起什么作用 随着企业规模不断扩大 在经营管理方面会面临各种各样的问题 为了帮助解决此类问题 很多公司都会引入APS高级排程系统帮助进行生产管理的优化 那APS系统是什么意思 起
  • JMeter压测常见面试问题

    1 JMeter可以模拟哪些类型的负载 JMeter可以模拟各种类型的负载 包括但不限于Web应用程序 API 数据库 FTP SMTP JMS SOAP RESTful Web服务等 这使得JMeter成为一个功能强大且灵活的压力测试工具
  • Linux网络设备之注销

    在注销网络设备时 会调用pci driver gt remove函数 以e100网卡驱动为例 实际调用e100 remove 该函数调用函数unregister netdev进行设备注销操作 函数调用关系图如下 注销分为两步 1 回滚注册操
  • 仿阿姨帮

    实例简介 仿阿姨帮 58到家上门O2O系统源码 BAOCMS二次开发 七牛云 是一款PHP MYSQL开发制作的在线上门O2O系统 PC WAP 微信端等功能 在BAOCMS基础上二次开发的东西内核是BAOCMS 最新版内核 修复了所有的功
  • 标准25码 Barcode 25

    Code25 码 标准 25 码 Interleaved 2 of 5 Code25 计算 2of5i xsl
  • windows10安装Trading View出错解决办法

    一 直接从https cn tradingview com desktop 下载安装 出现报错 解析应用包时出错 二 解决办法 1 找下载的安装文件 TradingView appinstaller 2 用记事本打开 找到 https tv
  • mybaties踩坑之错误的@ID 注解引用, 导致org.apache.ibatis.type.TypeException

    在定义完实体类以后 使用mybaties的主键查询实体类 一查就报错 说无法将字符串格式转换为时间类型 仔细检查了一遍 类型并没有问题 于是尝试将Timestamp的字段去掉再看 仍然报无法将字符串格式转换为Integer类型 基本可以确定
  • 视频质量诊断和图像质量诊断 视频质量分析算法

    目前我们常说的视频质量诊断应用 主要分两种 一种是将视频质量诊断系统设在监控中心 通过中心矩阵或流媒体服务器来获取前端所有摄像机的视频信号 通过轮巡方式对各路视频进行检测 这种方式受限于网络带宽和服务器自身性能 上传画面质量无法保障 非真正
  • ROS小车+Velodyne16线+legoloam仿真

    系统使用Ubuntu18 04 ros1 一 建立catkin工作空间 mkdir p catkin ws src cd catkin ws src catkin init workspace 上述三步建立了catkin工作空间并将其初始化
  • 2021河南高驻马店高考成绩查询,河南驻马店2020高考喜报,驻高包揽市理科前三强,一本上线数稳增...

    原标题 河南驻马店2020高考喜报 驻高包揽市理科前三强 一本上线数稳增 读过笔者这篇河南驻马店叱咤风云5所高中 皆是省重点 堪称重点本科人才摇篮的人知道 以驻马店高中 驻高 为代表的几所省级示范高中 在去年的高考中 取得了相当不错的成绩
  • 北京大学肖臻老师《区块链技术与应用》公开课笔记8——BTC挖矿篇

    北京大学肖臻老师 区块链技术与应用 公开课笔记 比特币挖矿篇 对应肖老师视频 click here 全系列笔记请见 全系列笔记请见 click here About Me 点击进入我的Personal Page 在之前的文章 已经基本上介绍
  • c语言 乘除法优先级,运算符运算符优先级 - C语言教程

    运算符优先级 运算符的优先级确定表达式中项的组合 这会影响到一个表达式如何计算 某些运算符比其他运算符有更高的优先级 例如 乘除运算符具有比加减运算符更高的优先级 例如 x 7 3 2 在这里 x 被赋值为 13 而不是 20 因为运算符
  • elasticSearch 实现对nested对象的查询

    1 下面我是对一个nested对象进行查询时候执行的结果 希望对您有帮助 GET my store search query bool must nested path owner query match owner name keywor
  • 前端入门:HTML5+CSS3+JAAVASCRIPT

    1 初识HTML HTML Hyper Text Markup Language 超文本标记语言 超文本包括 文字 图片 音频 视频 动画等 1 1 W3C标准 1 2 HTML基本结构 示例
  • 无服务器计算系统,无服务器计算三大问题及解决办法

    遵循这些建议 以消除非计算瓶颈 避免供应商节流和排队 以及保持无服务器功能的响应 无服务器计算现在十分流行 所有人要么在调查它 要么已经部署它了 不要落在最后 否则你会错过的 有什么好大惊小怪的 无服务器计算提供了一种基础结构 让服务器资源
  • 浅谈exp与expdp的区别

    1 把用户usera的对象导到用户userb 用法区别在于fromuser usera touser userb remap schema usera usera 例如 imp system passwd fromuser usera to
  • Node=>Express自定义中间件 学习5

    手动模拟一个类似于express urlencoded这样的中间件 来解析POST提交到服务器的表单数据 定义中间件 监听req的data事件 监听req的end事件 使用querystring模块解析请求体数据 讲解洗出来的数据对象挂碍为
  • 2021-06-12

    同样的SQL语句在数据库中能查出结果 但是在java项目中 执行结果查不到数据 今天在调试的时候发现同样的SQL语句在数据库中能查出结果 但是在java项目中 执行结果查不到数据 原因是查询语句中有一个条件字段有空格 在PLSQL中查询时会
  • 自定义 HBase-MapReduce

    自定义 HBase MapReduce 1 hdfs gt table 2 table gt table 3 自定义协处理器 1 hdfs gt table 需求 从hdfs读取数据 插入到hbase的表中 mapper public cl