您的位置 首页 java

0268-如何开发HBase Endpoint类型的Coprocessor以及部署使用

Fayson的github:

提示:代码块部分可以左右滑动查看噢

1.文档编写目的


Fayson在前面的文章介绍了 hbase 自带的Coprocessor调用示例《

如何使用 Java 调用HBase的 Endpoint Coprocessor

》,本篇文章Fayson主要介绍如何开发一个HBase Endpoint类型的协处理器。

本篇文章示例协处理器主要实现了 对列 的Count、Max、Min、Sum以及Average。前面的文章调用Coprocessor定义的全局的,在本篇文章Fayson介绍另一种实现方式通过代码的方式对指定的表添加Coprocessor。

  • 内容概述

1.环境准备

2.使用Protobuf生成序列化类

3.Endpoint Coprocessor服务端实现

4.Endpoint Coprocessor客户端实现

5.部署及调用

  • 测试环境

1.CM和CDH版本为5.14.3

2.集群未启用Kerberos

2.环境准备


在HMaster、RegionServer内部,创建了RpcServer实例,并与Client三者之间实现了Rpc调用,在HBase0.95版本引入了Google-Protobuf作为中间数据组织方式,并在Protobuf提供的Rpc接口之上,实现了基于服务的Rpc实现。

Protobuf Buffers是一种轻便高效的结构化数据存储格式,可以用于数据序列化。适合做数据存储或RPC数据交换格式。用于通讯协议、数据存储等领域的语言无关、平台无关、可扩展的序列化结构数据格式。

这里Fayson借助于Protobuf来生成HBase RPC数据交换格式类,在HBase中使用的Protobuf版本为2.5.0,所以选择安装相同版本的Protobuf。

1.下载Protobuf2.5.0版本的安装包,地址如下:

 

(可左右滑动)

2.选择一台服务器安装Protobuf

[root@ip-172-31-5-38 ~]# wget 
 

(可左右滑动)

3.执行如下命令安装Protobuf所需要的依赖包

yum install -y autoconf automake libtool curl make g++ unzip gcc-c++
 

(可左右滑动)

4.解压protobuf-2.5.0.tar.gz,并进入解压目录执行如下命令编译安装

[root@ip-172-31-5-38 ~]# tar -zxvf protobuf-2.5.0.tar.gz
[root@ip-172-31-5-38 ~]# cd protobuf-2.5.0
[root@ip-172-31-5-38 protobuf-2.5.0]# ./configure --prefix=/usr/local/protobuf
[root@ip-172-31-5-38 protobuf-2.5.0]# make && make install
 

(可左右滑动)

5.配置Protobuf环境变量

export PROTOBUF_HOME=/usr/local/protobuf
export PATH=$PROTOBUF_HOME/bin:$PATH
 

(可左右滑动)

执行命令使环境变量生效

[root@ip-172-31-5-38 protobuf-2.5.0]# source /etc/profile
 

(可左右滑动)

6.准备HBase测试表,建表脚本及测试数据如下

create 'fayson_coprocessor', {NAME => 'info'}
put 'fayson_coprocessor','001','info:sales',12.3
put 'fayson_coprocessor','002','info:sales',24.5
put 'fayson_coprocessor','003','info:sales',10.5
put 'fayson_coprocessor','004','info:sales',11.5
put 'fayson_coprocessor','005','info:sales',10.5
put 'fayson_coprocessor','001','info:age',22
put 'fayson_coprocessor','002','info:age',33
put 'fayson_coprocessor','003','info:age',26
put 'fayson_coprocessor','004','info:age',28
put 'fayson_coprocessor','005','info:age',56
 

(可左右滑动)

0268-如何开发HBase Endpoint类型的Coprocessor以及部署使用

3.使用Protobuf生成序列化类


1.准备MyFirstCoprocessor.proto文件,内容如下

[root@ip-172-31-5-171 hbase-coprocessor]# vim MyFirstCoprocessor.proto 
syntax = "proto2";
option java_package = "com.cloudera.hbase.coprocessor.server";
option java_outer_classname = "MyFirstCoprocessor";
option java_generic_services = true;
option java_generate_equals_and_hash = true;
option optimize_for = SPEED;
message MyCoprocessRequest {
 required string family = 1;
 required string columns = 2;
}
message MyCoprocessResponse {
 required int64 count = 1;
 required  Double  maxnum = 3;
 required double minnum = 4;
 required double sumnum = 5;
}
service AggregationService {
 rpc getAggregation(MyCoprocessRequest)
 returns (MyCoprocessResponse);
}
 

(可左右滑动)

2.在命令行执行如下命令生成Java类

[root@ip-172-31-5-38 hbase-coprocessor]# protoc --java_out=./ MyFirstCoprocessor.proto 
[root@ip-172-31-5-38 hbase-coprocessor]# ll
total 4
drwxr-xr-x 3 root root 22 May 14 16:34 com
-rw-r--r-- 1 root root 609 May 14 16:33 MyFirstCoprocessor.proto
[root@ip-172-31-5-38 hbase-coprocessor]# 
 

(可左右滑动)

0268-如何开发HBase Endpoint类型的Coprocessor以及部署使用

在当前目录下根据java_package指定的目录生成Java类。

0268-如何开发HBase Endpoint类型的Coprocessor以及部署使用

4.Endpoint Coprocessor服务端实现


1.使用Maven创建Java示例工程, pom .xml文件内容如下

<dependency>
 <groupId>org. apache .hadoop</groupId>
 <artifactId>hadoop-client</artifactId>
 <version>2.6.0-cdh5.11.2</version>
</dependency>
<dependency>
 <groupId>org.apache.hadoop</groupId>
 <artifactId>hadoop-common</artifactId>
 <version>2.6.0-cdh5.11.2</version>
</dependency>
<dependency>
 <groupId>org.apache.hbase</groupId>
 <artifactId>hbase-client</artifactId>
 <version>1.2.0-cdh5.11.2</version>
</dependency>
<dependency>
 <groupId>org.apache.hbase</groupId>
 <artifactId>hbase-examples</artifactId>
 <version>1.2.0-cdh5.11.2</version>
</dependency>
<dependency>
 <groupId>com.google.protobuf</groupId>
 <artifactId>protobuf-java</artifactId>
 <version>2.5.0</version>
</dependency>
 

(可左右滑动)

2.将Protobuf生成的java类拷贝至指定的包目录下

与MyFirstCoprocessor.proto文件指定的java_package包目录一致。

3.在com.cloudera.hbase.coprocessor.server包下新建MyFirstCoprocessorEndpoint实现类,内容如下

package com.cloudera.hbase.coprocessor.server;
import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcController;
import com.google.protobuf.Service;
import org.apache.commons.collections.map.HashedMap;
import org.apache.commons.logging.Log;
import org.apache.commons. log ging.LogFactory;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.protobuf.ResponseConverter;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
/**
 * package: com.cloudera.hbase.coprocessor.server
 * describe: HBase RegionServer上Endpoint Coprocessor实现,主要实现对指定列的Count、MAX、MIN、SUM聚合操作
 * creat_user: Fayson
 * email: htechinfo@163.com
 * creat_date: 2018/5/13
 * creat_time: 下午11:11
 * 公众号:Hadoop实操
 */public class MyFirstCoprocessorEndPoint extends MyFirstCoprocessor.AggregationService implements Coprocessor, CoprocessorService {
 protected static final Log log = LogFactory.getLog(MyFirstCoprocessorEndPoint.class);
 private RegionCoprocessorEnvironment env;
 @ Override 
 public void getAggregation(RpcController controller, MyFirstCoprocessor.MyCoprocessRequest request, RpcCallback<MyFirstCoprocessor.MyCoprocessResponse> done) {
 Scan scan = new Scan();
 scan.addFamily(Bytes.toBytes(request.getFamily()));
 //传入列的方式 sales:MAX,sales:MIN,sales:AVG,slaes:SUM,sales:COUNT
 String colums = request.getColumns();
 //记录所有要扫描的列
 Map<String, List<String>> columnMaps = new HashedMap();
 for (String columnAndType : colums.split(",")) {
 String column = columnAndType.split(":")[0];
 String type = columnAndType.split(":")[1];
 List<String> typeList = null;
 if (columnMaps.containsKey(column)) {
 typeList = columnMaps.get(column);
 } else {
 typeList = new ArrayList<>();
 //将column添加到Scan中
 scan.addColumn(Bytes.toBytes(request.getFamily()), Bytes.toBytes(column));
 }
 typeList.add(type);
 columnMaps.put(column, typeList);
 }
 InternalScanner scanner = null;
 MyFirstCoprocessor.MyCoprocessResponse response = null;
 Double max = null;
 Double min = null;
 Double sumVal = null;
 long counter = 0L;
 try {
 scanner = this.env.getRegion().getScanner(scan);
 List<Cell> results = new ArrayList<>();
 boolean hasMore = false;
 scanner = env.getRegion().getScanner(scan);
 do {
 hasMore = scanner.next(results);
 if (results.size() > 0) {
 ++counter;
 }
 log.info("counter:" + counter);
 log.info("results size:" + results.size());
 for (Cell cell : results) {
 String column = Bytes.toString(CellUtil.cloneQualifier(cell));
 log.info("Column Name: " + column);
 log.info("Cell Value:" + new String(CellUtil.cloneValue(cell)));
 Double temp = Double.parseDouble(new String(CellUtil.cloneValue(cell)));
 if (columnMaps.containsKey(column)) {
 List<String> types = columnMaps.get(column);
 for (String type : types) {
 switch (type.toUpperCase()) {
 case "MIN":
 min = min != null && (temp == null || compare(temp, min) >= 0) ? min : temp;
 log.info("MIN Value: " + min.doubleValue());
 break;
 case "MAX":
 max = max != null && (temp == null || compare(temp, max) <= 0) ? max : temp;
 break;
 case "SUM":
 if (temp != null) {
 sumVal = add(sumVal, temp);
 }
 break;
 default:
 break;
 }
 }
 }
 }
 results.clear();
 } while (hasMore);
 response = MyFirstCoprocessor.MyCoprocessResponse.newBuilder()
 .setMaxnum(max!=null?max.doubleValue():Double.MAX_VALUE)
 .setMinnum(min!=null?min.doubleValue():Double.MIN_NORMAL)
 .setCount(counter)
 .setSumnum(sumVal!=null?sumVal.doubleValue():Double.MIN_NORMAL).build();
 } catch (IOException e) {
 e.printStackTrace();
 ResponseConverter.setControllerException(controller, e);
 } finally {
 if (scanner != null) {
 try {
 scanner.close();
 } catch (IOException e) {
 e.printStackTrace();
 }
 }
 }
 done.run(response);
 }
 public static int compare(Double l1, Double l2) {
 if (l1 == null ^ l2 == null) {
 return l1 == null ? -1 : 1; // either of one is null.
 } else if (l1 == null)
 return 0; // both are null
 return l1.compareTo(l2); // natural ordering.
 }
 public double divideForAvg(Double d1, Long l2) {
 return l2 != null && d1 != null?d1.doubleValue() / l2.doubleValue():0.0D / 0.0;
 }
 public Double add(Double d1, Double d2) {
 return d1 != null && d2 != null ? Double.valueOf(d1.doubleValue() + d2.doubleValue()) : (d1 == null ? d2 : d1);
 }
 @Override
 public void start(CoprocessorEnvironment coprocessorEnvironment) throws IOException {
 if (coprocessorEnvironment instanceof RegionCoprocessorEnvironment) {
 this.env = (RegionCoprocessorEnvironment) coprocessorEnvironment;
 } else {
 throw new CoprocessorException("Must be loaded on a table region!");
 }
 }
 @Override
 public void stop(CoprocessorEnvironment coprocessorEnvironment) throws IOException {
 }
 @Override
 public Service getService() {
 return this;
 }
}
 

(可左右滑动)

5.Endpoint Coprocessor客户端实现


1.编写MyFirstCoprocessExample.java类,代码如下:

package com.cloudera.hbase.coprocessor.client;
import com.cloudera.hbase.coprocessor.server.MyFirstCoprocessor;
import com.cloudera.hbase.coprocessor.server.MyFirstCoprocessorEndPoint;
import com.google.common.util.concurrent.AtomicDouble;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.client.coprocessor.DoubleColumnInterpreter;
import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicLong;
/**
 * package: com.cloudera.hbase.coprocessor.client
 * describe: 调用HBase RegionServer端的协处理器
 * creat_user: Fayson
 * email: htechinfo@163.com
 * creat_date: 2018/5/14
 * creat_time: 下午6:36
 * 公众号:Hadoop实操
 */public class MyFirstCoprocessExample {
 public static void main(String[] args) {
 String table_name = "fayson_coprocessor";
 //初始化HBase配置
 Configuration configuration = HBaseConfiguration.create();
 configuration.set("hbase. zookeeper .property.clientPort", "2181");
 configuration.setStrings("hbase.zookeeper.quorum", "ip-172-31-5-38.ap-southeast-1.compute.internal,ip-172-31-8-230.ap-southeast-1.compute.internal,ip-172-31-5-171.ap-southeast-1.compute.internal");
 try {
 //创建一个HBase的Connection
 Connection connection = ConnectionFactory.createConnection(configuration);
 TableName tableName = TableName.valueOf(table_name);
 if(!connection.getAdmin().tableExists(tableName)) {
 System.out.println(table_name + "does not exist....");
 System.exit(0);
 }
 Table table = connection.getTable(tableName);
 //删除表上的协处理器
 deleteCoprocessor(connection, table, MyFirstCoprocessorEndPoint.class);
 //为指定的表添加协处理器
 String hdfspath = "hdfs://nameservice3/hbase/coprocessor/hbase-demo-1.0-SNAPSHOT.jar";
 setupToExistTable(connection, table, hdfspath, MyFirstCoprocessorEndPoint.class);
 //客户端调用Region端的协处理器
 execFastEndpointCoprocessor(table, "info", "sales:MAX,sales:MIN,sales:AVG,sales:SUM,sales:COUNT");
 //关闭连接
 connection.close();
 } catch (IOException e) {
 e.printStackTrace();
 }
 }
 /**
 * 删除HBase表上的协处理器
 * @param connection
 * @param table
 * @param cls
 */ public static void deleteCoprocessor(Connection connection, Table table, Class<?>... cls) {
 System.out.println("begin delete " + table.getName().toString() + " Coprocessor......");
 try {
 HTableDescriptor hTableDescriptor = table.getTableDescriptor();
 for(Class cass : cls) {
 hTableDescriptor.removeCoprocessor(cass.getCanonicalName());
 }
 connection.getAdmin().modifyTable(table.getName(), hTableDescriptor);
 } catch (IOException e) {
 e.printStackTrace();
 }
 System.out.println("end delete " + table.getName().toString() + " Coprocessor......");
 }
 /**
 *
 * @param connection
 * @param table
 * @param jarPath
 * @param cls
 */ public static void setupToExistTable(Connection connection, Table table, String jarPath, Class<?>... cls) {
 try {
 if(jarPath != null && !jarPath.isEmpty()) {
 Path path = new Path(jarPath);
 HTableDescriptor hTableDescriptor = table.getTableDescriptor();
 for(Class cass : cls) {
 hTableDescriptor.addCoprocessor(cass.getCanonicalName(), path, Coprocessor.PRIORITY_USER, null);
 }
 connection.getAdmin().modifyTable(table.getName(), hTableDescriptor);
 }
 } catch (IOException e) {
 e.printStackTrace();
 }
 }
 /**
 * 效率最高的方式,在方式二的基础上优化
 * 通过HBase的coprocessorService(Class, byte[],byte[],Batch.Call,Callback<R>)方法获取表的总条数
 * @param table HBase表名
 * @return 返回表的总条数
 */ public static long execFastEndpointCoprocessor(Table table, String family, String columns) {
 long start_t = System.currentTimeMillis();
 //定义总的 rowCount 变量
 AtomicLong totalRowCount = new AtomicLong();
 AtomicDouble maxValue = new AtomicDouble(Double.MIN_VALUE);
 AtomicDouble minValue = new AtomicDouble(Double.MAX_VALUE);
 AtomicDouble sumValue = new AtomicDouble();
 try {
 Batch.Callback<MyFirstCoprocessor.MyCoprocessResponse> callback = new Batch.Callback<MyFirstCoprocessor.MyCoprocessResponse>() {
 @Override
 public void update(byte[] bytes, byte[] bytes1, MyFirstCoprocessor.MyCoprocessResponse myCoprocessResponse) {
 //更新Count值
 totalRowCount.getAndAdd(myCoprocessResponse.getCount());
 //更新最大值
 if(myCoprocessResponse.getMaxnum() > maxValue.doubleValue()) {
 maxValue.compareAndSet(maxValue.doubleValue(), myCoprocessResponse.getMaxnum());
 }
 //更新最小值
 if(myCoprocessResponse.getMinnum() < minValue.doubleValue()) {
 minValue.compareAndSet(minValue.doubleValue(), myCoprocessResponse.getMinnum());
 }
 //更新求和
 sumValue.getAndAdd(myCoprocessResponse.getSumnum());
 }
 };
 table.coprocessorService(MyFirstCoprocessor.AggregationService.class, null, null, new Batch.Call<MyFirstCoprocessor.AggregationService, MyFirstCoprocessor.MyCoprocessResponse>() {
 @Override
 public MyFirstCoprocessor.MyCoprocessResponse call(MyFirstCoprocessor.AggregationService aggregationService) throws IOException {
 MyFirstCoprocessor.MyCoprocessRequest requet = MyFirstCoprocessor.MyCoprocessRequest.newBuilder().setFamily(family).setColumns(columns).build();
 BlockingRpcCallback<MyFirstCoprocessor.MyCoprocessResponse> rpcCallback = new BlockingRpcCallback<>();
 aggregationService.getAggregation(null, requet, rpcCallback);
 MyFirstCoprocessor.MyCoprocessResponse response = rpcCallback.get();
 return response;
 }
 }, callback);
 } catch (Throwable throwable) {
 throwable.printStackTrace();
 }
 System.out.println("耗时:" + (System.currentTimeMillis() - start_t));
 System.out.println("totalRowCount:" + totalRowCount.longValue());
 System.out.println("maxValue:" + maxValue.doubleValue());
 System.out.println("minValue:" + minValue.doubleValue());
 System.out.println("sumValue:" + sumValue.doubleValue());
 System.out.println("avg:" + new DoubleColumnInterpreter().divideForAvg(sumValue.doubleValue(), totalRowCount.longValue()));
 return totalRowCount.longValue();
 }
}
 

(可左右滑动)

0268-如何开发HBase Endpoint类型的Coprocessor以及部署使用

6.部署及调用


1.使用mvn编译工程

mvn clean package
 

(可左右滑动)

2.将编译好的jar包,上传HDFS的/hbase/coprocessor目录下

[root@ip-172-31-5-38 ~]# export HADOOP_USER_NAME=hbase 
[root@ip-172-31-5-38 ~]# hadoop fs -mkdir -p /hbase/coprocessor
[root@ip-172-31-5-38 ~]# hadoop fs -put hbase-demo-1.0-SNAPSHOT.jar /hbase/coprocessor
[root@ip-172-31-5-38 ~]# hadoop fs -ls /hbase/coprocessor
 

(可左右滑动)

0268-如何开发HBase Endpoint类型的Coprocessor以及部署使用

在客户端调用的示例代码中使用的是代码为指定的表添加Coprocessor操作,所以这里不需要在HBase中配置全局的Coprocessor。

3.运行MyFirstCoprocessorExample代码,查看运行结果

0268-如何开发HBase Endpoint类型的Coprocessor以及部署使用

统计的值与我们写入的数据一致。

7.总结


  • 在开发HBase的Coprocessor借助于Protobuf生成RPC请求数据交互类,我们只需要在生成的类基础上实现业务即可。
  • 本篇文章主要介绍了怎么样通过代码的方式为指定的HBase表添加Coprocessor,这种方式使用更灵活,不需要重启HBase服务。
  • 将编写好的Coprocessor jar上传至HDFS,确保文件的目录属主。
  • HBase自带的也有AggregateImplementation类实现列的聚合,原生的不能同时对多个列进行聚合处理,如果需要多次聚合则需要多次调用RPC请求,HBase数据在不断的写入会出现每次聚合的结果有偏差,本示例将聚合放在一个RPC中处理可以减少RPC的请求次数并确保查询条件相同的情况下不会出现数据不一致问题。

GitHub地址:

提示:代码块部分可以左右滑动查看噢

为天地立心,为生民立命,为往圣继绝学,为万世开太平。

温馨提示:要看高清无码套图,请使用手机打开并单击图片放大查看。

推荐关注Hadoop实操,第一时间,分享更多Hadoop干货,欢迎转发和分享。

原创文章,欢迎转载,转载请注明:转载自微信公众号Hadoop实操

文章来源:智云一二三科技

文章标题:0268-如何开发HBase Endpoint类型的Coprocessor以及部署使用

文章地址:https://www.zhihuclub.com/188018.shtml

关于作者: 智云科技

热门文章

网站地图