HBase与MapReduce结合(二)——对HBase表中字段进行WordCount

news/2025/2/27 5:50:33

目录

  • 1. 数据文本
  • 2. pom.xml中依赖配置
  • 3. 工具类Util
  • 4. 导入数据ImportData
  • 5. 对HBase表进行WordCount
  • 6. 配置Job
  • 7. 结果
  • 参考

1. 数据文本

1_song1_2016-1-11 song1 singer1 man slow pc
2_song2_2016-1-11 song2 singer2 woman slow ios
3_song3_2016-1-11 song3 singer3 man quick andriod
4_song4_2016-1-11 song4 singer4 woman slow ios
5_song5_2016-1-11 song5 singer5 man quick pc
6_song6_2016-1-11 song6 singer6 woman quick ios
7_song7_2016-1-11 song7 singer7 man quick andriod
8_song8_2016-1-11 song8 singer8 woman slow pc
9_song9_2016-1-11 song9 singer9 woman slow ios
10_song4_2016-1-11 song4 singer4 woman slow ios
11_song6_2016-1-11 song6 singer6 woman quick ios
12_song6_2016-1-11 song6 singer6 woman quick ios
13_song3_2016-1-11 song3 singer3 man quick andriod
14_song2_2016-1-11 song2 singer2 woman slow ios

2. pom.xml中依赖配置

  <dependencies>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.11</version>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-common</artifactId>
      <version>3.3.6</version>
      <exclusions>
        <exclusion>
          <groupId>org.slf4j</groupId>
          <artifactId>slf4j-log4j12</artifactId>
        </exclusion>
      </exclusions>
    </dependency>
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-mapreduce-client-core</artifactId>
      <version>3.3.6</version>
    </dependency>
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
      <version>3.3.6</version>
    </dependency>
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-auth</artifactId>
      <version>3.3.6</version>
    </dependency>
    <dependency>
      <groupId>org.apache.hbase</groupId>
      <artifactId>hbase-common</artifactId>
      <version>2.5.10</version>
    </dependency>
    <dependency>
      <groupId>org.apache.hbase</groupId>
      <artifactId>hbase-client</artifactId>
      <version>2.5.10</version>
    </dependency>
    <dependency>
      <groupId>org.apache.hbase</groupId>
      <artifactId>hbase-mapreduce</artifactId>
      <version>2.5.10</version>
    </dependency>
    <dependency>
      <groupId>log4j</groupId>
      <artifactId>log4j</artifactId>
      <version>1.2.17</version>
    </dependency>
  </dependencies>

3. 工具类Util

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.util.Bytes;

public class Util {
    public static Connection getConnection() throws IOException {
        Configuration conf = HBaseConfiguration.create();
        return ConnectionFactory.createConnection(conf);
    }

    public static void create(Connection conn, String tableName, String[] families) throws IOException {
        if (families.length == 0) {
            System.out.println("please provide at least one column family.");
            return;
        }
        if (families.length > 3) {
            System.out.println("please reduce the number of column families.");
            return;
        }
        
        Admin admin = conn.getAdmin();
        TableName tableName2 = TableName.valueOf(tableName);

        if (admin.tableExists(tableName2)) {
            System.out.println("table exists!");
            return;
        }

        TableDescriptorBuilder tableDescBuilder = TableDescriptorBuilder.newBuilder(tableName2);
        for (String family : families) {
            ColumnFamilyDescriptor columnFamily = ColumnFamilyDescriptorBuilder.of(family);
            tableDescBuilder.setColumnFamily(columnFamily);
        }
        admin.createTable(tableDescBuilder.build());
        System.out.println("create table success!");
        admin.close();
    }

    public static void delete(Connection conn, String tableName) throws IOException {
        Admin admin = getConnection().getAdmin();
        TableName tableName2 = TableName.valueOf(tableName);
        if (admin.tableExists(tableName2)) {
            admin.disableTable(tableName2);
            admin.deleteTable(tableName2);
        }
        admin.close();
    }

    public static void scan(Connection conn, String tableName) throws IOException {
        Table table = conn.getTable(TableName.valueOf(tableName));
        Scan scan = new Scan();
        ResultScanner scanner = table.getScanner(scan);
        System.out.println("scan: ");
        for (Result res = scanner.next(); res != null; res = scanner.next()) {
            for (Cell cell : res.listCells()) {
                String row = Bytes.toString(CellUtil.cloneRow(cell));
                String columnFamily = Bytes.toString(CellUtil.cloneFamily(cell));
                String column = Bytes.toString(CellUtil.cloneQualifier(cell));
                String data = Bytes.toString(CellUtil.cloneValue(cell));
                System.out.println(String.format("row: %s, family: %s, column: %s, data: %s", row, columnFamily,
                        column, data));
            }
        }
        scanner.close();
    }
}

4. 导入数据ImportData

import java.io.IOException;

import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class ImportData {
    public static class MyMapper extends Mapper<LongWritable, Text, Text, NullWritable> {;
        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            context.write(value, NullWritable.get());
        }
    }

    public static class MyReducer extends TableReducer<Text, NullWritable, Text> {
        @Override
        protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
            String[] columns = {"name", "singer", "gender", "ryghme", "terminal"};
            String[] splitStr = key.toString().split("\\s+");
            Put put = new Put(Bytes.toBytes(splitStr[0]));
            for (int i = 1; i < splitStr.length; i++) {
                put.addColumn(Bytes.toBytes("info"), Bytes.toBytes(columns[i - 1]), Bytes.toBytes(splitStr[i]));
            }
            context.write(key, put);
        }
    }
}

5. 对HBase表进行WordCount

  当HBase作为数据来源时,自定义Mapper要继承TableMapper,实质上是使用TableInputFormat取得数据。同时,需要在Job配置时调用TableMapReduceUtil中的静态方法initTableMapperJob来标示作为数据输入来源的HBase表名和自定义Mapper类。

import java.io.IOException;
import java.util.List;

import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;

public class WordCount {
        public static class MyMapper extends TableMapper<Text, IntWritable> {
        @Override
        protected void map(ImmutableBytesWritable key, Result value, Context context)
                throws IOException, InterruptedException {
            List<Cell> cells = value.listCells();
            for (Cell cell : cells) {
                context.write(new Text(Bytes.toString(CellUtil.cloneValue(cell))), new IntWritable(1));
            }     
        }
    }

    public static class MyReducer extends TableReducer<Text, IntWritable, Text> {
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            int count = 0;
            for (IntWritable val : values) {
                count += val.get();
            }
            Put put = new Put(Bytes.toBytes(key.toString()));
            put.addColumn(Bytes.toBytes("details"), Bytes.toBytes("rank"), Bytes.toBytes(Integer.toString(count)));
            context.write(key, put);
        }
    }
}

6. 配置Job

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.log4j.Logger;

public class App {
    private Logger logger1 = Logger.getLogger(App.class);

    public static void main(String[] args) throws Exception {
        String file = "file:///root/CodeProject/mapreduce-hbase/play_records.txt";
        Connection conn = Util.getConnection();
        Util.delete(conn, "music");
        Util.delete(conn, "namelist");
        Util.create(conn, "music", new String[] { "info" });
        Util.create(conn, "namelist", new String[] { "details" });
        Configuration conf = HBaseConfiguration.create();
        Job job = Job.getInstance(conf, "import-data");
        job.setJarByClass(App.class);
        job.setMapperClass(ImportData.MyMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(NullWritable.class);
        job.setNumReduceTasks(2);
        TableMapReduceUtil.initTableReducerJob("music", ImportData.MyReducer.class, job);
        FileInputFormat.addInputPath(job, new Path(file));
        int res1 = job.waitForCompletion(true) ? 0 : 1;
        if (res1 == 0) {
            Job countJob = Job.getInstance(conf, "word-count");
            countJob.setJarByClass(App.class);
            Scan scan = new Scan();
            scan.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"));
            TableMapReduceUtil.initTableMapperJob(TableName.valueOf("music"), scan, WordCount.MyMapper.class, Text.class, IntWritable.class, countJob);
            TableMapReduceUtil.initTableReducerJob("namelist", WordCount.MyReducer.class, countJob);
            int res2 = countJob.waitForCompletion(true) ? 0 : 1;
            if (res2 == 0) {
                Util.scan(conn, "namelist");
            }
            System.exit(res2);
        }
        conn.close();
        System.exit(res1);
    }
}

7. 结果

在这里插入图片描述

参考

吴章勇 杨强著 大数据Hadoop3.X分布式处理实战


http://www.niftyadmin.cn/n/5869567.html

相关文章

Project #0 - C++ Primer前置知识学习

这次的Lab0主要以熟悉C 11和C 17的新特性。这里先把相关新特性分析一下。 目录 一、Doxygen注释 二、函数签名分析 三、构造函数与delete、explicit 四、static constexpr 解释 五、各种cast转换(重点) 5.1 const_cast与reinterpret_cast 5.2 static_cast与dynamic_cas…

eclipse 4.4.2 m2eclipse apache-maven-3.2.1

apache-maven-3.2.1-CSDN博客 eclipse 4.4.2 m2eclipse

Java SE与Java EE

Java SE&#xff08;Java 平台标准版&#xff09; Java SE 是 Java 平台的核心&#xff0c;提供了 Java 语言的基础功能。它包含了 Java 开发工具包&#xff08;JDK&#xff09;&#xff0c;其中有 Java 编译器&#xff08;javac&#xff09;、Java 虚拟机&#xff08;JVM&…

营销过程乌龟图模版

营销过程乌龟图模版 输入 公司现状产品服务客户问询客户期望电话、电脑系统品牌软件硬件材料 售前 - 沟通 - 确定需求 - 满足需求 - 售后 机料环 电话、电脑等设备软件硬件、系统品牌等工具材料 人 责任人协助者生产者客户 法 订单由谁评审控制程序营销过程控制程序顾客满意度…

SOC-ATF 安全启动BL1流程分析(1)

一、ATF 源码下载链接 1. ARM Trusted Firmware (ATF) 官方 GitHub 仓库 GitHub 地址: https://github.com/ARM-software/arm-trusted-firmware 这是 ATF 的官方源码仓库&#xff0c;包含最新的代码、文档和示例。 下载方式&#xff1a; 使用 Git 克隆仓库&#xff1a; git…

miqiu的分布式锁(二):实战——用JMeter验证JVM锁能否解决MySQL超卖问题

miqiu的分布式锁二&#xff1a;实战——用JMeter验证JVM锁能否解决MySQL超卖问题 实验背景 在秒杀场景中&#xff0c;超卖问题是典型的并发编程挑战。本文通过JMeter压测工具&#xff0c;验证基于JVM的两种锁机制&#xff08;synchronized/ReentrantLock&#xff09;对MySQL库…

kubernetes 初学命令

基础命令 kubectl 1. kubetcl get #查看node节点状态 kubectl get nodes #查看pods节点状态 kubectl get pods 2.kubectl run #kubectl run 命令在 pod 中创建并运行特定的镜像 kubectl run nginx --imagenginx --port 80 3.kubectl describe #看到pod 的详情 kubectl d…

0.MySQL安装|卸载内置环境|配置官方yum源|安装mysql|登录mysql|设置配置文件(centos8.2)

卸载内置环境 检查是否有mariadb和mysql服务 ps ajx |grep mariadb ps ajx |grep mysql停止mysql服务 systemctl stop mysqld找到mysql安装包 rpm -qa | grep mysql删除安装包 rpm -qa | grep mysql | xargs yum -y remove检查 ls /etc/my.cnfls /var/lib/mysql/配置官方…