总则
概述
为规范TBDS HBase数据库开发工作,特制定本《TBDS HBase数据库开发规范》。
适用范围
本规范适用于 TBDS HBase 组件的开发。
基础开发指南
HBase介绍
TBDS HBase是一个分布式的、面向列的开源数据库。HBase不同于一般的关系数据库,它是一个适合于非结构化数据存储的数据库。另一个不同的是HBase基于列的而不是基于行的模式。
TBDS HBase – Hadoop Database,是一个高可靠性、高性能、面向列、可伸缩的分布式存储系统,利用HBase技术可在廉价PC Server上搭建起大规模结构化存储集群。HBase是Google Bigtable的开源实现,类似Google Bigtable利用GFS作为其文件存储系统,HBase利用Hadoop HDFS作为其文件存储系统;Google运行MapReduce来处理Bigtable中的海量数据,HBase同样利用Hadoop MapReduce来处理HBase中的海量数据;Google Bigtable利用 Chubby作为协同服务,HBase利用Zookeeper作为对应。其提供多种的数据访问方式:
Native Java API:最常规和高效的访问方式,适合Hadoop MapReduce Job并行批处理HBase表数据。
HBase Shell:HBase的命令行工具,最简单的接口,适合HBase管理使用。
Thrift Gateway:利用Thrift序列化技术,支持C++,PHP,Python等多种语言,适合其他异构系统在线访问HBase表数据。
REST Gateway:支持REST 风格的Http API访问HBase, 解除了语言限制。
Pig:可以使用Pig Latin流式编程语言来操作HBase中的数据,和Hive类似,本质最终也是编译成MapReduce Job来处理HBase表数据,适合做数据统计。
命名规范
数据库命名规范
采用26个英文字母(区分大小写)和0-9的自然数(经常不需要)加上下划线组成,命名简洁明确,多个单词用下划线'_'分隔,一个业务项目一个数据库,多个业务项目慎用同一个数据库。
数据库表命名规范
数据表命名规范
(1)采用26个英文字母(区分大小写)和0-9的自然数(经常不需要)加上下划线''组成,命名简洁明确,多个单词用下划线''分隔。
(2)全部小写命名,禁止出现大写。
(3)禁止使用数据库关键字,如:name,time,datetime,password等。
(4)表名称不应该取得太长(一般不超过三个英文单词)。
(5)表的名称一般使用名词或者动宾短语。
(6)用单数形式表示名称,例如,使用employee,而不是employees。
(7)明细表的名称为:主表的名称+字符dtl(detail缩写)例如:采购订单的名称为:po_order,则采购订单的明细表为:po_orderdtl。
(8)表必须填写描述信息。
字段命名规范
(1)采用26个英文字母(区分大小写)和0-9的自然数(经常不需要)加上下划线''组成,命名简洁明确,多个单词用下划线''分隔。
(2)全部小写命名,禁止出现大写。
(3)字段必须填写描述信息。
(4)禁止使用数据库关键字,如:name,time,datetime, password等。
(5)字段名称一般采用名词或动宾短语。
(6)采用字段的名称必须是易于理解,一般不超过三个英文单词。
(7)在命名表的列时,不要重复表的名称。
(8)不要在列的名称中包含数据类型。
(9)字段命名使用完整名称,禁止缩写。
HBase表设计规范
命名空间
采用英文单词、阿拉伯数字的组合形式,其中,单词必须大写,并且首字符必须为英文字符,不能是数字。不建议用连接符(下划线)拼接多个单词,简单语义的可采用单个单词,复杂语义的可采用多个单词的首字母拼接。长度尽量限制在4~8字符之间。命名空间一般可与项目名称、组织机构名称等保持一致。根据项目名称构建命名空间:DLQX(电力气象首字母拼接形式),简短明了。不建议过长的命名空间名称,譬如不推荐采用以下形式:USER_INFO_MANAGE等。
表名称
采用英文单词、阿拉伯数字、连接符(_)的组合形式,其中,单词必须大写,并且首字符必须为英文字符,不能是数字,可用连接符拼接多个单词。长度尽量限制在8~16字符之间。尽量采用具有明确意义的英文单词,而不建议采用汉字的拼音字母或者拼音首字母组合。符合规范的表名称:USER_INFO_MANAGE、WEATHER_DATA、T_ELECTRIC_GATHER等。
列族名称
采用英文单词、阿拉伯数字的组合形式,其中,单词必须大写,并且首字符必须为英文字符,不能是数字。长度尽量限制在1~6字符之间,过长的列族名称将占用更多的存储空间。符合规范的列族名称:D1、D2、DATA等。不推荐的列族名称:USER_INFO、D_1等。
列名称
采用英文单词、阿拉伯数字、连接符(_)的组合形式,其中,单词必须大写,并且首字符必须为英文字符,不能是数字,可用连接符拼接多个单词。长度尽量限制在1~16字符之间。尽量采用具有明确意义的英文单词,而不建议采用汉字的拼音字母或者拼音首字母组合。符合规范的列名称:USER_ID、DATA_1、REMARK等。不推荐的列名称:UserID、1_DATA等。
内部存储机制
HBase中的所有数据文件都存储在Hadoop HDFS文件系统上,主要包括上述提出的两种文件类型:
HFile, HBase中KeyValue数据的存储格式,HFile是Hadoop的二进制格式文件,实际上StoreFile就是对HFile做了轻量级包装,即StoreFile底层就是HFile。
HLog File,HBase中WAL(Write Ahead Log) 的存储格式,物理上是Hadoop的Sequence File。
HFile
首先HFile文件是不定长的,长度固定的只有其中的两块:Trailer和FileInfo。正如图中所示的,Trailer中有指针指向其他数据块的起始点。File Info中记录了文件的一些Meta信息,例如:AVG_KEY_LEN, AVG_VALUE_LEN, LAST_KEY, COMPARATOR, MAX_SEQ_ID_KEY等。Data Index和Meta Index块记录了每个Data块和Meta块的起始点。
Data Block是HBase I/O的基本单元,为了提高效率,HRegionServer中有基于LRU的Block Cache机制。每个Data块的大小可以在创建一个Table的时候通过参数指定,大号的Block有利于顺序Scan,小号Block利于随机查询。每个Data块除了开头的Magic以外就是一个个KeyValue对拼接而成, Magic内容就是一些随机数字,目的是防止数据损坏。后面会详细介绍每个KeyValue对的内部构造。
HFile里面的每个KeyValue对就是一个简单的byte数组。但是这个byte数组里面包含了很多项,并且有固定的结构。开始是两个固定长度的数值,分别表示Key的长度和Value的长度。紧接着是Key,开始是固定长度的数值,表示RowKey的长度,紧接着是RowKey,然后是固定长度的数值,表示Family的长度,然后是Family,接着是Qualifier,然后是两个固定长度的数值,表示Time Stamp和Key Type(Put/Delete)。Value部分没有这么复杂的结构,就是纯粹的二进制数据了。
HLogFile
HLog文件的结构,其实HLog文件就是一个普通的Hadoop Sequence File,Sequence File 的Key是HLogKey对象,HLogKey中记录了写入数据的归属信息,除了table和region名字外,同时还包括 sequence number和timestamp,timestamp是“写入时间”,sequence number的起始值为0,或者是最近一次存入文件系统中sequence number。
HLog Sequence File的Value是HBase的KeyValue对象,即对应HFile中的KeyValue。
HBase Phoenix SQL语法
Phoenix将HBase的数据模型映射到关系型世界,支持所有标准SQL查询构造,包括SELECT,FROM,WHERE,GROUP BY,HAVING,ORDER BY等。
它还支持一整套DML命令以及通过DDL命令创建表和版本化增量更改。Phoenix的目标:通过定义明确的行业标准API,成为Hadoop的OLTP和运营分析的可信数据平台。
数据类型
Apache Phoenix SQL支持如下这些数据类型:
整数:integer
小数:float, double, decimal, decimal(10,2)
字符串:char(10), varchar, varchar(255),值必须用单引号,不能用双引号。
布尔:boolean
时间:time, date, timestamp。
字节数组:binary, varbinary。
数组:varchar array, char(10) array[5], integer[] integer[100]
Phoenix SQL语法
DDL支持:CREATE TABLE,DROP TABLE,ALTER TABLE。
DML支持:UPSERT VALUES用于逐行插入,UPSERT SELECT用于在相同或不同的表之间传输大量数据,DELETE用于删除行。
Join连接并不完全受支持。不支持FULL OUTER JOIN和CROSS JOIN。
Phoenix SQL在CRUD操作语法解释如下。
CREATE
创建一个简单的user表,id为主键,d为列族(如果不指定列族,内部映射到'0'列族)。可定义DDL属性'DEFAULT_COLUMN_FAMILY=列族名'来覆盖默认的列族名。
create table user(
id integer not null primary key,
d.username varchar,
d.address varchar
);
UPSERT
注意,Phoenix没有insert,其insert与update合起来叫做:upsert。
现在更新/插入两行。这里,显式地为id列设置值。在内部,这个SQL调用被转换为一个HBase put。
upsert into user values(1,'张三','城市1');
upsert into user values(2,'李四','城市2');
SELECT
查询指定的用户
select username from user;
ALTER
修改表结构。
alter table user add zipcode integer;
DELETE
删除行。
delete from user where id=1;
下面演示常见的Phoenix SQL语法的使用。
--创建电商表
CREATE TABLE t1(
pkey integer NOT NULL PRIMARY KEY,
p.name varchar,
p.price double,
c.name varchar,
c.address varchar
);
-- 插入数据
-- 1) 商品数据
UPSERT INTO t1(pkey, p.name, p.price) values(1, '衬衣',168.50);
UPSERT INTO t1(pkey, p.name, p.price) values(2, '电脑',5168.50);
UPSERT INTO t1(pkey, c.name, c.address) values(3, '张三','城市1');
UPSERT INTO t1(pkey, c.name, c.address) values(4, '李四','城市2');
UPSERT INTO t1 values(5,'图书', 68.80, '王老五','城市3');
UPSERT INTO t1(pkey, p.name, p.price) values(3,'手机',3568.00);
UPSERT INTO t1(pkey, p.price) values(3,2538.00);
-- 动态修改表结构
ALTER TABLE t1 ADD c.phone varchar;
UPSERT INTO t1(pkey, c.name, c.address, c.phone) values(5, '赵小六','城市4','13566668888');
-- 删除
DELETE FROM t1 WHERE pkey=2;
DELETE FROM t1 WHERE p.name='手机';
DELETE FROM t1;
-- 查询
SELECT * FROM t1;
SELECT pkey, p.name,c.name FROM t1;
-- 删除表
DROP TABLE t1;
执行批处理
我们也可以创建自己的SQL脚本并使用命令行工具执行它们。可以通过Phoenix的bin目录下的psql.py脚本加载CSV数据或者执行包含sql脚本的文件。
现在我们来看一个例子。导航到Phoenix安装位置的bin/目录。
1)首先,创建一个us_population.sql文件,包含一个表定义:
CREATE TABLE IF NOT EXISTS us_population (
state CHAR(2) NOT NULL,
city VARCHAR NOT NULL,
population BIGINT
CONSTRAINT my_pk PRIMARY KEY (state, city));
2)然后创建一个us_population.csv文件,其中包含要放入该表的一些数据:
NY,New York,8143197
CA,Los Angeles,3844829
IL,Chicago,2842518
TX,Houston,2016582
PA,Philadelphia,1463281
AZ,Phoenix,1461575
TX,San Antonio,1256509
CA,San Diego,1255540
TX,Dallas,1213825
CA,San Jose,912332
3)最后,创建一个us_population_queries.sql文件,包含希望在该数据上运行的查询。
SELECT state as "State",count(city) as "City Count",sum(population) as "Population Sum"
FROM us_population
GROUP BY state
ORDER BY sum(population) DESC;
4)在命令行终端,执行如下的命令:
$ ./psql.py localhost:2181 us_population.sql us_population.csv us_population_queries.sql
这条命令同时做了三件事:创建表、插入数据、查询结果。典型的upsert速率是每秒20K - 50K行(取决于行有多宽)。
开发规范
Configuration实例的创建
该类应该通过调用HBaseConfiguration的create()方法来实例化。否则,将无法正确加载HBase中的相关配置项。
正确示例:
//该部分,应该是在类成员变量的声明区域声明
private Configuration hbaseConfig = null;
//最好在类的构造函数中,或者初始化方法中实例化该类
hbaseConfig = HBaseConfiguration.create();
错误示例:
hbaseConfig = new Configuration();
共享Configuration实例
HBase客户端代码通过创建一个与Zookeeper之间的HConnection,来获取与一个HBase集群进行交互的权限。一个Zookeeper的HConnection连接,对应着一个Configuration实例,已经创建的HConnection实例,会被缓存起来。也就是说,如果客户端需要与HBase集群进行交互的时候,会传递一个Configuration实例过去,HBase Client部分通过已缓存的HConnection实例,来判断属于这个Configuration实例的HConnection实例是否存在,如果不存在,就会创建一个新的,如果存在,就会直接返回相应的实例。
因此,如果频频地创建Configuration实例,会导致创建很多不必要的HConnection实例,很容易达到Zookeeper的连接数上限。
建议在整个客户端代码范围内,都共用同一个Configuration对象实例。
Table实例的创建
public abstract class TableOperationImpl {
private static Configuration conf = null;
private static Connection connection = null;
private static Table table = null;
private static TableName tableName = TableName.valueOf("sample_table");
public TableOperationImpl() {
init();
}
public void init() {
conf = ConfigurationSample.getConfiguration();
try {
connection = ConnectionFactory.createConnection(conf);
table = conn.getTable(tableName);
} catch (IOException e) {
e.printStackTrace();
}
}
public void close() {
if (table != null) {
try {
table.close();
} catch (IOException e) {
System.out.println("Can not close table.");
} finally {
table = null;
}
}
if (connection != null) {
try {
connection.close();
} catch (IOException e) {
System.out.println("Can not close connection.");
} finally {
connection = null;
}
}
}
public void operate() {
init();
process();
close();
}
}
不允许多个线程在同一时间共用同一个Table实例
Table是一个非线程安全类,因此,同一个Table实例,不应该被多个线程同时使用,否则可能会带来并发问题。
Table实例缓存
如果一个Table实例可能会被长时间且被同一个线程固定且频繁地用到,例如,通过一个线程不断的往一个表内写入数据,那么这个Table在实例化后,就需要缓存下来,而不是每一次插入操作,都要实例化一个Table对象(尽管提倡实例缓存,但也不是在一个线程中一直沿用一个实例,个别场景下依然需要重构,可参见下一条规则)。
正确示例:
说明:
注意该实例中提供的以Map形式缓存Table实例的方法,未必通用。这与多线程多Table实例的设计方案有关。如果确定一个Table实例仅仅可能会被用于一个线程,而且该线程也仅有一个Table实例的话,就无须使用Map。这里提供的思路仅供参考。
//该Map中以TableName为Key值,缓存所有已经实例化的Table
private Map < String, Table > demoTables = new HashMap < String, Table > ();
//所有的Table实例,都将共享这个Configuration实例
private Configuration demoConf = null;
/**
<初始化一个HTable类>
<功能详细描述>
@param tableName
@return
@throws IOException
@see [类、类#方法、类#成员]
*/
private Table initNewTable(String tableName) throws IOException {
try (Connection conn = ConnectionFactory.createConnection(demoConf)) {
return conn.getTable(tableName);
}
}
/**
<获取Table实例>
<功能详细描述>
@see [类、类#方法、类#成员]
*/
private Table getTable(String tableName) {
if (demoTables.containsKey(tableName)) {
return demoTables.get(tableName);
} else {
Table table = null;
try {
table = initNewTable(tableName);
demoTables.put(tableName, table);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return table;
}
}
/**
* 写数据
*/
public void putData(List < Put > dataList, String tableName) {
Table table = getTable(tableName);
//关于这里的同步:如果在采用的设计方案中,不存在多线程共用同一个Table实例
//的可能的话,就无须同步了。这里需要注意的一点,就是Table实例是非线程安全的
synchronized(table) {
try {
table.put(dataList);
table.notifyAll();
} catch (IOException e) {
// 在捕获到IOE时,需要将缓存的实例重构。
try {
// 关闭之前的Connection.
table.close();
// 重新创建这个实例.
table = initNewTable(tableName);
} catch (IOException e1) {
// TODO
}
}
}
}
/**
* 错误示例:
*/
public void putDataIncorrect(List < Put > dataList, String tableName) {
Table table = null;
try {
//每次写数据,都创建一个HTable实例
table = initNewTable(tableName);
table.put(dataList);
} catch (IOException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
} finally {
table.close();
}
}
Table实例写数据的异常处理
尽管在前一条规则中提到了提倡Table实例的重构,但是,并非提倡一个线程自始至终要沿用同一个Table实例,当捕获到IOException时,依然需要重构Table实例。示例代码可参考上一个规则的示例。
另外,勿轻易调用如下两个方法:
Configuration#clear:
这个方法,会清理掉所有的已经加载的属性,那么,对于已经在使用这个Configuration的类或线程而言,可能会带来潜在的问题(例如,假如Table还在使用这个Configuration,那么,调用这个方法后,Table中的这个Configuration的所有的参数,都被清理掉了),也就是说:只要还有对象或者线程在使用这个Configuration,就不应该调用这个clear方法,除非所有的类或线程,都已经确定不用这个Configuration了。那么,这个操作,可以在所有的线程要退出的时候来做,而不是每一次。
因此,这个方法,应该要放在进程要退出的地方去做。而不是每一次Table要重构的时候做。
HConnectionManager#deleteAllConnections:
这个可能会导致现有的正在使用的连接被从连接集合中清理掉,同时,因为在HTable中保存了原有连接的引用,可能会导致这个连接无法关闭,进而可能会造成泄漏。因此,这个方法不建议使用。
写入失败的数据要做相应的处理
在写数据的过程中,如果进程异常或一些其它的短暂的异常,可能会导致一些写入操作失败。因此,对于操作的数据,需要将其记录下来。在集群恢复正常后,重新将其写入到HBase数据表中。
另外,有一点需要注意:HBase Client返回写入失败的数据,是不会自动重试的,仅仅会告诉接口调用者哪些数据写入失败了。对于写入失败的数据,一定要做一些安全的处理,例如可以考虑将这些失败的数据,暂时写在文件中,或者,直接缓存在内存中。
正确示例:
private List<Row> errorList = new ArrayList<Row>();
/**
<采用PutList的模式插入数据>
<如果不是多线程调用该方法,可不采用同步>
*
@param put 一条数据记录
@throws IOException
@see [类、类#方法、类#成员]
*/
public synchronized void putData(Put put)
{
// 暂时将数据缓存在该List中
dataList.add(put);
// 当dataList的大小达到PUT_LIST_SIZE之后,就执行一次Put操作
if (dataList.size() >= PUT_LIST_SIZE)
{
try
{
demoTable.put(dataList);
} catch (IOException e)
{
// 如果是RetriesExhaustedWithDetailsException类型的异常,
// 说明这些数据中有部分是写入失败的这通常都是因为
// HBase集群的进程异常引起,当然有时也会因为有大量
// 的Region正在被转移,导致尝试一定的次数后失败
if (e instanceof RetriesExhaustedWithDetailsException)
{
RetriesExhaustedWithDetailsException ree =
(RetriesExhaustedWithDetailsException) e;
int failures = ree.getNumExceptions();
for (int i = 0; i < failures; i++)
{
errorList.add(ree.getRow(i));
}
}
}
dataList.clear();
}
}
资源释放
关于ResultScanner和Table实例,在用完之后,需要调用它们的close方法,将资源释放掉。Close方法,要放在finally块中,来确保一定会被调用到。
正确示例:
ResultScanner scanner = null;
try
{
scanner = demoTable.getScanner(s);
//Do Something here.
}
finally
{
scanner.close();
}
错误示例:
在代码中未调用scanner.close()方法释放相关资源。
scanner.close()方法未放置在finally块中。
ResultScanner scanner = null;
scanner = demoTable.getScanner(s);
//Do Something here.
scanner.close();
Scan时的容错处理
Scan时不排除会遇到异常,例如,租约过期。在遇到异常时,建议Scan应该有重试的操作。事实上,重试在各类异常的容错处理中,都是一种优秀的实践,这一点,可以应用在各类与HBase操作相关的接口方法的容错处理过程中。
不用Admin时,要及时关闭,Admin实例不应常驻内存。
Admin的实例应尽量遵循 “用时创建,用完关闭”的原则。不应该长时间缓存同一个Admin实例。
hbase shell操作
DDL操作
#开启hbase shell
hbase shell
#查看hbase状态
status
#查看hbase版本
version
#创建命名空间
create_namespace '命名空间名'
#显示所有命名空间
list_namespace
#删除命名空间, 在删除一个命名空间时,该命名空间不能包含任何的表,否则会报错
drop_namespace '命名空间名'
#创建默认命名空间的表
create '表名称', '列族名称1','列族名称2','列族名称N'
#创建带有命名空间的表
create '命名空间:表名称', '列族名称1','列族名称2','列族名称N'
#列出所有表
list
#获得表的描述
describe '表名'
#删除table 表的 列族名称1 列族
alter 'table',{NAME=>'列族名称1',METHOD=>'delete'}
#删除多个列族
alter 'table', {NAME => '列族名称1', METHOD => 'delete'},{NAME => '列族名称2', METHOD => 'delete'}
#先把表下线
disable '表名'
#再drop表
drop '表名'。
DML操作
#添加数据
# 语法:put <table>,<rowkey>,<family:column>,<value>,[<timestamp>]
#如果不写timestamp,则系统默认
put 'table','id01', 'c_f1:name','111'
#获取数据
#get: 获取表中一行数据,不能扫描全表
# 语法:get <table>,<rowkey>,[<family:column>,....]
get 'table','id01'
#更新数据
#语法:重新put,put时会覆盖原来的数据
put 'table','id01', 'c_f1:name','222'
#scan扫描
# 语法:scan <table> ,{COLUMNS => [ <family:column>,.... ], LIMIT => num}
#扫描全表,大表操作不可取
scan 'table'
#获取表中前两行
scan 'table', {LIMIT => 2}
#扫描表中指定列族数据
scan 'table', {COLUMNS => 'c_f1'}
#扫描表中执行列族中列的数据
scan 'table', {COLUMNS => 'c_f2:cert_no'}
#扫描表中值=222 的数据
scan 'table', FILTER=>"ValueFilter(=,'name:222')"
# 筛选行,按照rowkey的范围[STARTROW,STOPROW)
scan 'table', {STARTROW =>'id01' , STOPROW => 'id03'}
#删除行中某列数据
# 语法:delete <table>, <rowkey>, <family:column>
# 必须指定列名
# 会删除执行列的所有版本数据
delete 'table', 'id04', 'c_f2:name'
#删除整行
# 语法:deleteall <table>, <rowkey>
deleteall 'table', 'id05'
#清空表数据
# 语法: truncate <table>
truncate 'table'
#查询表中有多少行
# 语法:count <table>, {INTERVAL => intervalNum, CACHE => cacheNum}
# INTERVAL设置多少行显示一次及对应的rowkey,默认1000;
# CACHE每次去取的缓存区大小,默认是10,调整该参数可提高查询速度
#查询表中数据行数
count 'table'
#按照2行显示一次,查询
count 'table', {INTERVAL => 2}。
hbase BulkLoad写海量数据
HBase写数据方式通常使用 HBase 提供的 API 方法,实现了接口调用,但对于海量的数据,接口的调用引起 CPU、内存占用过高,影响正常业务使用。
使用 Bulk Load 方式由于利用了 HBase 的数据信息是按照特定格式存储在 HDFS 里的这一特性,直接在 HDFS 中生成持久化的 HFile 数据格式文件,然后完成巨量数据快速入库的操作,配合 MapReduce 完成这样的操作,不占用 Region 资源,不会产生巨量的写入 I/O,所以需要较少的 CPU 和网络资源。
通过MapReduce 生成 HFile具体的代码实现:
- 作业的配置。
/* Job Configure */
Job job = Job.getInstance(conf,"HFileProducerETL");
job.setJarByClass(HFileProducerETL2.class);
TableMapReduceUtil.addDependencyJars(job);
job.setMapperClass(BulkLoadMapper.class);
job.setReducerClass(KeyValueSortReducer.class);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setOutputValueClass(KeyValue.class);
if (fs.exists(outputPath)) {
fs.delete(outputPath, true);
}
FileInputFormat.setInputPaths(job,inputPath);
FileOutputFormat.setOutputPath(job,outputPath);
HTable htable =new HTable(conf, tablename);
HFileOutputFormat2.configureIncrementalLoad(job,htable,htable);
return job.waitForCompletion(false) ? 0 : 1;
- Map类实现。
key 类:ImmutableBytesWritable
value 类:KeyValue
public void (LongWritable key, Text value, Context context) {
try {
String line = value.toString();
String[] all_column_values = line.split(context.getConfiguration().get(FILE_DELIMITER_STR),-1);
String rowKeyString = all_column_values[rowKeyIndex];
byte[] rowKey=Bytes.toBytes(rowKeyString);
// rowKey 赋值
ImmutableBytesWritable rowKeyWritable=new ImmutableBytesWritable(rowKey);
if (all_column_name.length!=all_column_values.length){
return;
}
for (int i = 0; i < all_column_name.length; i++) {
if (export_column_index[i] == EXPORT_FLAG
&& !all_column_values[i].trim().equals(HBaseConstant.HiveNULL)
&& !all_column_values[i].trim().equals(HBaseConstant.HBaseNull)
&& !all_column_values[i].trim().equals(HBaseConstant.HEmpty)
){
KeyValue kv ;
// 如果是 HFILE_CLEAR_FLAG 则 清空
if (all_column_values[i].equals(HBaseConstant.HFILE_CLEAR_FLAG)){
all_column_values[i]=HBaseConstant.HEmpty;
}
if (timestampIndex == TIMESTAMP_COLUMN_DEFAULT_INDEX){
kv = new KeyValue(rowKey, CF, Bytes.toBytes(all_column_name[i]),Bytes.toBytes(all_column_values[i]));
}
else{
Long timestamp_value = Long.parseLong(all_column_values[timestampIndex]);
kv = new KeyValue(rowKey, CF, Bytes.toBytes(all_column_name[i]),timestamp_value,Bytes.toBytes(all_column_values[i]));
}
context.write(rowKeyWritable,kv);
}
}
}catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
- 生成 HFile 之后,与HBase映射实现,代码如下:
Configuration conf = DefConfiguration.GetConfiguration(hbaseEnv);
conf.setInt("hbase.mapreduce.bulkload.max.hfiles.perRegion.perFamily",5000)
HTable htable =new HTable(conf, tableName);
LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf);
loader.doBulkLoad(hFilePath, htable);
性能优化
HBase表设计建议及优化
业务表设计建议
(1)、预分region,使region分布均匀,提高并发。
(2)、避免过多的热点region。根据应用场景,可考虑将时间因素引入Rowkey。
(3)、同时访问的数据,应设计Rowkey使其在物理上相邻存储,以便通过一次范围扫描高效获取,避免多次单点查询带来的性能开销。
(4)、数据存放在同一行;同时读取的数据存放在同一cell。
(5)、查询频繁属性放在Rowkey前面部分。Rowkey的设计在排序上必须与主要的查询条件契合。
(6)、离散度较好的属性作为RowKey组成部分。分析数据离散度特点以及查询场景,综合各种场景进行设计。
(7)、存储冗余信息,提高检索性能。使用二级索引,适应更多查询场景。
(8)、利用过期时间、版本个数设置等操作,让表能自动清除过期数据。
说明:在HBase中,一直在繁忙写数据的region被称为热点region。
Pre-Creating Regions
默认情况下,在创建HBase表的时候会自动创建一个region分区,当导入数据的时候,所有的HBase客户端都向这一个region写数据,直到这个region足够大了才进行切分。一种可以加快批量写入速度的方法是通过预先创建一些空的regions,这样当数据写入HBase时,会按照region分区情况,在集群内做数据的负载均衡。有关预分区,详情参见:Table Creation: Pre-Creating Regions,下面是一个例子:
public static boolean createTable(HBaseAdmin admin, HTableDescriptor table, byte[][] splits)throws IOException {
try {
admin.createTable(table, splits);
return true;
} catch (TableExistsException e) {
logger.info("table " + table.getNameAsString() + " already exists");
// the table already exists...
return false;
}
}
public static byte[][] getHexSplits(String startKey, String endKey, int numRegions) {
//start:001,endkey:100,10region [001,010][011,020]
byte[][] splits = new byte[numRegions-1][];
BigInteger lowestKey = new BigInteger(startKey, 16);
BigInteger highestKey = new BigInteger(endKey, 16);
BigInteger range = highestKey.subtract(lowestKey);
BigInteger regionIncrement = range.divide(BigInteger.valueOf(numRegions));
lowestKey = lowestKey.add(regionIncrement);
for(int i=0; i < numRegions-1;i++) {
BigInteger key = lowestKey.add(regionIncrement.multiply(BigInteger.valueOf(i)));
byte[] b = String.format("%016x", key).getBytes();
splits[i] = b;
}
return splits;
}
Row Key
HBase中row key用来检索表中的记录,支持以下三种方式:
通过单个row key访问:即按照某个row key键值进行get操作;
通过row key的range进行scan:即通过设置startRowKey和endRowKey,在这个范围内进行扫描;
全表扫描:即直接扫描整张表中所有行记录。
在HBase中,row key可以是任意字符串,最大长度64KB,实际应用中一般为10~100bytes,存为byte[]字节数组,一般设计成定长的。
row key是按照字典序存储,因此,设计row key时,要充分利用这个排序特点,将经常一起读取的数据存储到一块,将最近可能会被访问的数据放在一块。
举个例子:如果最近写入HBase表中的数据是最可能被访问的,可以考虑将时间戳作为row key的一部分,由于是字典序排序,所以可以使用Long.MAX_VALUE - timestamp作为row key,这样能保证新写入的数据在读取时可以被快速命中。
Column Family
不要在一张表里定义太多的column family。目前Hbase并不能很好的处理超过2~3个column family的表。因为某个column family在flush的时候,它邻近的column family也会因关联效应被触发flush,最终导致系统产生更多的I/O。感兴趣的同学可以对自己的HBase集群进行实际测试,从得到的测试结果数据验证一下。
In Memory
创建表的时候,可以通过HColumnDescriptor.setInMemory(true)将表放到RegionServer的缓存中,保证在读取的时候被cache命中(小表使用,大表不建议使用)。
Max Version
创建表的时候,可以通过HColumnDescriptor.setMaxVersions(int maxVersions)设置表中数据的最大版本,如果只需要保存最新版本的数据,那么可以设置setMaxVersions(1)。
Time To Live
创建表的时候,可以通过HColumnDescriptor.setTimeToLive(int timeToLive)设置表中数据的存储生命期,过期数据将自动被删除,例如如果只需要存储最近两天的数据,那么可以设置setTimeToLive(2 * 24 * 60 * 60)。
Compact & Split
在HBase中,数据在更新时首先写入WAL 日志(HLog)和内存(MemStore)中,MemStore中的数据是排序的,当MemStore累计到一定阈值时,就会创建一个新的MemStore,并且将老的MemStore添加到flush队列,由单独的线程flush到磁盘上,成为一个StoreFile。与此同时, 系统会在zookeeper中记录一个redo point,表示这个时刻之前的变更已经持久化了(minor compact)。
StoreFile是只读的,一旦创建后就不可以再修改。因此Hbase的更新其实是不断追加的操作。当一个Store中的StoreFile达到一定的阈值后,就会进行一次合并(major compact),将对同一个key的修改合并到一起,形成一个大的StoreFile,当StoreFile的大小达到一定阈值后,又会对 StoreFile进行分割(split),等分为两个StoreFile。
由于对表的更新是不断追加的,处理读请求时,需要访问Store中全部的StoreFile和MemStore,将它们按照row key进行合并,由于StoreFile和MemStore都是经过排序的,并且StoreFile带有内存中索引,通常合并过程还是比较快的。
实际应用中,可以考虑必要时手动进行major compaction,将同一个row key的修改进行合并形成一个大的StoreFile。同时,可以将StoreFile设置大些,减少split的发生。
HBase为了防止小文件(被刷到磁盘的MemStore)过多,以保证查询效率,hbase需要在必要的时候将这些小的store file合并成相对较大的store file,这个过程就称之为compaction。在hbase中,主要存在两种类型的compaction:minor compaction和major compaction。
minor compaction:是较小、很少文件的合并。
major compaction的功能是将所有的store file合并成一个,触发major compaction的可能条件有:major_compact 命令、majorCompact() API、region server自动运行(相关参数:hbase.hregion.majorcompaction 默认为24 小时、hbase.hregion.majorcompaction.jitter 默认值为0.2 防止region server 在同一时间进行major compaction)。hbase.hregion.majorcompaction.jjitter数的作用是:对参数hbase.hregion.majorcompaction 规定的值起到浮动的作用,假如两个参数都为默认值24和0,2,那么major compact最终使用的数值为:19.2~28.8 这个范围。
1、关闭自动major compaction。
2、手动编程major compaction。
Timer类,minor compaction的运行机制要复杂一些,它由以下几个参数共同决定:
hbase.hstore.compaction.min :默认值为 3,表示至少需要三个满足条件的store file时,minor compaction才会启动。
hbase.hstore.compaction.max 默认值为10,表示一次minor compaction中最多选取10个store file。
hbase.hstore.compaction.min.size 表示文件大小小于该值的store file 一定会加入到minor compaction的store file中
hbase.hstore.compaction.max.size 表示文件大小大于该值的store file 一定会被minor compaction排除
hbase.hstore.compaction.ratio 将store file 按照文件年龄排序(older to younger),minor compaction总是从older store file开始选择。
容量限制
官方建议每台HBase RegionServer的region数量不超过200个,实际使用经验建议不超过500个。
写表操作
多HTable并发写
创建多个HTable客户端用于写操作,提高写数据的吞吐量,一个例子:
static final Configuration conf = HBaseConfiguration.create();
static final String table_log_name = “user_log”;
wTableLog = new HTable[tableN];
for (int i = 0; i < tableN; i++) {
wTableLog[i] = new HTable(conf, table_log_name);
wTableLog[i].setWriteBufferSize(5 * 1024 * 1024); //5MB
wTableLog[i].setAutoFlush(false);
}
HTable参数设置
Auto Flush
通过调用HTable.setAutoFlush(false)方法可以将HTable写客户端的自动flush关闭,这样可以批量写入数据到HBase,而不是有一条put就执行一次更新,只有当put填满客户端写缓存时,才实际向HBase服务端发起写请求。默认情况下autoFlush是开启的。
Write Buffer
通过调用HTable.setWriteBufferSize(writeBufferSize)方法可以设置HTable客户端的写buffer大小,如果新设置的buffer小于当前写buffer中的数据时,buffer将会被flush到服务端。其中,writeBufferSize的单位是byte字节数,可以根据实际写入数据量的多少来设置该值。
WAL Flag
在HBase中,客户端向集群中的RegionServer提交数据时(Put/Delete操作),会先写WAL(Write Ahead Log)日志(即HLog,一个RegionServer上的所有Region共享一个HLog),只有当WAL日志写成功后,再接着写MemStore,然后客户端被通知提交数据成功;如果写WAL日志失败,客户端则被通知提交失败。这样做的好处是可以做到RegionServer宕机后的数据恢复。
因此,对于相对不太重要的数据,可以在Put/Delete操作时,通过调用Put.setWriteToWAL(false)或Delete.setWriteToWAL(false)函数,放弃写WAL日志,从而提高数据写入的性能。
值得注意的是:谨慎选择关闭WAL日志,因为这样的话,一旦RegionServer宕机,Put/Delete的数据将会无法根据WAL日志进行恢复。
批量写
通过调用HTable.put(Put)方法可以将一个指定的row key记录写入HBase,同样HBase提供了另一个方法:通过调用HTable.put(List)方法可以将指定的row key列表,批量写入多行记录,这样做的好处是批量执行,只需要一次网络I/O开销,这对于对数据实时性要求高,网络传输RTT高的情景下可能带来明显的性能提升。
多线程并发写
在客户端开启多个HTable写线程,每个写线程负责一个HTable对象的flush操作,这样结合定时flush和写buffer(writeBufferSize),可以既保证在数据量小的时候,数据可以在较短时间内被flush(如1秒内),同时又保证在数据量大的时候,写buffer一满就及时进行flush。下面给个具体的例子:
for (int i = 0; i < threadN; i++) {
Thread th = new Thread() {
public void run() {
while (true) {
try {
sleep(1000); //1 second
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (wTableLog[i]) {
try {
wTableLog[i].flushCommits();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
};
th.setDaemon(true);
th.start();
}。
HBase BulkLoad写数据
HBase底层存储是以HFile文件存储在磁盘上。
1、根据HDFS的数据或者外部的数据生成HBase底层的数据格式HFile文件。
2、根据生成目标HFile,利用HBase提供的Bulkload工具将HFile移动(或加载)到HBase目录下,bulkLoad主要是将数据编写成HFile的形式,批量加载到HBase中,具有优于其他数据提取机制的优点,此操作完全绕过写入路径。
优缺点:
① 数据可以立即被HBase使用,但是会对集群造成额外的负载和延迟。
② BulkLoad操作不会预写日志(WALs),因此不会引起过量的flush和split。
③ BulkLoad操作不会引起过多的垃圾回收(GC)
BulkLoad使用流程
(1)、从现有的数据源中提取数据,可以从关系型数据库中导出数据,如果是TSV/CSV形式的数据参见http://hbase.apache.org/book.html#importtsv.
(2)、将已导出的数据,处理成为HFile数据格式(HFile格式介绍:http://hbase.apache.org/book.html#_hfile_format_2),需要使用MR任务进行转换,并且数据是唯一的,通常需要我们自己编写Mapper。
作业必须输出行键Key,以及一个KeyValue(一个put,delete作为value),Reducer由HBase进行处理,如果需要我们自己对数据进行处理,可以在MR任务中,以此流程Mapper->Reducer->Mapper编写,最终Reducer交由hbase处理:
1、检查表的配置信息,以及总的分区器排序。
2、将分区文件上传到集群中,并添加到 DistributedCache中。
3、设置一定数量的reduce任务数,匹配当前region数量。
4、设置输出的key/value类,匹配HFileOutputFormat。
5、设置Reducer适当的排序( KeyValueSortReducer或者 PutSortReducer)
(3)、在输出的文件夹中每个region创建一个HFile,输入的数据会被完全覆盖,一般情况下需要准备的磁盘大小至少是原始数据集大小的两倍,比如mysqldump出100G的数据,在以HFile上传到HDFS后,需要准备至少200G的存储空间。
(4)、将文件加载至HBase中。使用LoadIncrementalHFiles命令,并向HDFS传入定位文件的URL,每个文件都加载到该区域的RegionServer上相关的region当中,可以通过传递限制输入的版本数量–version = N选项,在N需要包含最大版本数量从旧到新(最大时间戳到最小时间戳),如果在创建文件后,分割了一个region,则该工具会自动根据新边界进行切分HFile,效率相对较低。
读表操作
多HTable并发读
创建多个HTable客户端用于读操作,提高读数据的吞吐量,一个例子:
static final Configuration conf = HBaseConfiguration.create();
static final String table_log_name = “user_log”;
rTableLog = new HTable[tableN];
for (int i = 0; i < tableN; i++) {
rTableLog[i] = new HTable(conf, table_log_name);
rTableLog[i].setScannerCaching(50);
HTable参数设置
Scanner Caching
hbase.client.scanner.caching配置项可以设置HBase scanner一次从服务端抓取的数据条数,默认情况下一次一条。通过将其设置成一个合理的值,可以减少scan过程中next()的时间开销,代价是scanner需要通过客户端的内存来维持这些被cache的行记录。
有三个地方可以进行配置:1)在HBase的conf配置文件中进行配置;2)通过调用HTable.setScannerCaching(int scannerCaching)进行配置;3)通过调用Scan.setCaching(int caching)进行配置。三者的优先级越来越高。
Scan Attribute Selection
scan时指定需要的Column Family,可以减少网络传输数据量,否则默认scan操作会返回整行所有Column Family的数据。
Close ResultScanner
通过scan取完数据后,记得要关闭ResultScanner,否则RegionServer可能会出现问题(对应的Server资源无法释放)。
批量读
通过调用HTable.get(Get)方法可以根据一个指定的row key获取一行记录,同样HBase提供了另一个方法:通过调用HTable.get(List)方法可以根据一个指定的row key列表,批量获取多行记录,这样做的好处是批量执行,只需要一次网络I/O开销,这对于对数据实时性要求高而且网络传输RTT高的情景下可能带来明显的性能提升。
多线程并发读
在客户端开启多个HTable读线程,每个读线程负责通过HTable对象进行get操作。下面是一个多线程并发读取HBase,获取店铺一天内各分钟PV值的例子:
public class DataReaderServer {
//获取店铺一天内各分钟PV值的入口函数
public static ConcurrentHashMap<String, String> getUnitMinutePV(long uid, long startStamp, long endStamp){
long min = startStamp;
int count = (int)((endStamp - startStamp) / (60*1000));
List<String> lst = new ArrayList<String>();
for (int i = 0; i <= count; i++) {
min = startStamp + i * 60 * 1000;
lst.add(uid + "_" + min);
}
return parallelBatchMinutePV(lst);
}
//多线程并发查询,获取分钟PV值
private static ConcurrentHashMap<String, String> parallelBatchMinutePV(List<String> lstKeys){
ConcurrentHashMap<String, String> hashRet = new ConcurrentHashMap<String, String>();
int parallel = 3;
List<List<String>> lstBatchKeys = null;
if (lstKeys.size() < parallel ){
lstBatchKeys = new ArrayList<List<String>>(1);
lstBatchKeys.add(lstKeys);
}
else{
lstBatchKeys = new ArrayList<List<String>>(parallel);
for(int i = 0; i < parallel; i++ ){
List<String> lst = new ArrayList<String>();
lstBatchKeys.add(lst);
}
for(int i = 0 ; i < lstKeys.size() ; i ++ ){
lstBatchKeys.get(i%parallel).add(lstKeys.get(i));
}
}
List<Future< ConcurrentHashMap<String, String> >> futures = new ArrayList<Future< ConcurrentHashMap<String, String> >>(5);
ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
builder.setNameFormat("ParallelBatchQuery");
ThreadFactory factory = builder.build();
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(lstBatchKeys.size(), factory);
for(List<String> keys : lstBatchKeys){
Callable< ConcurrentHashMap<String, String> > callable = new BatchMinutePVCallable(keys);
FutureTask< ConcurrentHashMap<String, String> > future = (FutureTask< ConcurrentHashMap<String, String> >) executor.submit(callable);
futures.add(future);
}
executor.shutdown();
// Wait for all the tasks to finish
try {
boolean stillRunning = !executor.awaitTermination(
5000000, TimeUnit.MILLISECONDS);
if (stillRunning) {
try {
executor.shutdownNow();
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
} catch (InterruptedException e) {
try {
Thread.currentThread().interrupt();
} catch (Exception e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
}
// Look for any exception
for (Future f : futures) {
try {
if(f.get() != null)
{
hashRet.putAll((ConcurrentHashMap<String, String>)f.get());
}
} catch (InterruptedException e) {
try {
Thread.currentThread().interrupt();
} catch (Exception e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
} catch (ExecutionException e) {
e.printStackTrace();
}
}
return hashRet;
}
//一个线程批量查询,获取分钟PV值
protected static ConcurrentHashMap<String, String> getBatchMinutePV(List<String> lstKeys){
ConcurrentHashMap<String, String> hashRet = null;
List<Get> lstGet = new ArrayList<Get>();
String[] splitValue = null;
for (String s : lstKeys) {
splitValue = s.split("_");
long uid = Long.parseLong(splitValue[0]);
long min = Long.parseLong(splitValue[1]);
byte[] key = new byte[16];
Bytes.putLong(key, 0, uid);
Bytes.putLong(key, 8, min);
Get g = new Get(key);
g.addFamily(fp);
lstGet.add(g);
}
Result[] res = null;
try {
res = tableMinutePV[rand.nextInt(tableN)].get(lstGet);
} catch (IOException e1) {
logger.error("tableMinutePV exception, e=" + e1.getStackTrace());
}
if (res != null && res.length > 0) {
hashRet = new ConcurrentHashMap<String, String>(res.length);
for (Result re : res) {
if (re != null && !re.isEmpty()) {
try {
byte[] key = re.getRow();
byte[] value = re.getValue(fp, cp);
if (key != null && value != null) {
hashRet.put(String.valueOf(Bytes.toLong(key,
Bytes.SIZEOF_LONG)), String.valueOf(Bytes
.toLong(value)));
}
} catch (Exception e2) {
logger.error(e2.getStackTrace());
}
}
}
}
return hashRet;
}
}
//调用接口类,实现Callable接口
class BatchMinutePVCallable implements Callable<ConcurrentHashMap<String, String>>{
private List<String> keys;
public BatchMinutePVCallable(List<String> lstKeys ) {
this.keys = lstKeys;
}
public ConcurrentHashMap<String, String> call() throws Exception {
return DataReadServer.getBatchMinutePV(keys);
}
}
缓存查询结果
对于频繁查询HBase的应用场景,可以考虑在应用程序中做缓存,当有新的查询请求时,首先在缓存中查找,如果存在则直接返回,不再查询HBase;否则对HBase发起读请求查询,然后在应用程序中将查询结果缓存起来。至于缓存的替换策略,可以考虑LRU等常用的策略。
Blockcache
HBase上RegionServer的内存分为两个部分,一部分作为Memstore,主要用来写;另外一部分作为BlockCache,主要用于读。
写请求会先写入MemStore,RegionServer会给每个region提供一个Memstore,当Memstore满64MB以后,会启动 flush刷新到磁盘。当Memstore的总大小超过限制时(heapsize * hbase.regionserver.global.memstore.upperLimit * 0.9),会强行启动flush进程,从最大的Memstore开始flush直到低于限制。
读请求先到MemStore中查数据,查不到就到BlockCache中查,再查不到就会到磁盘上读,并把读的结果放入BlockCache。由于BlockCache采用的是LRU策略,因此BlockCache达到上限(heapsize * hfile.block.cache.size * 0.85)后,会启动淘汰机制,淘汰掉最老的一批数据。
一个RegionServer上有一个BlockCache和N个MemStore,它们的大小之和不能大于等于heapsize * 0.8,否则HBase不能启动。默认BlockCache为0.3,而Memstore为0.4。对于注重读响应时间的系统,可以将 BlockCache设大些,比如设置BlockCache=0.4,Memstore=0.39,以加大缓存的命中率。
有关BlockCache机制,请参考这里:HBase的BlockCache,HBase的BlockCache机制,HBase中的缓存的计算与使用。
HTable和HTablePool使用注意事项
HTable和HTablePool都是HBase客户端API的一部分,可以使用它们对HBase表进行CRUD操作。下面结合在项目中的应用情况,对二者使用过程中的注意事项做一下概括总结。
Configuration conf = HBaseConfiguration.create();
try (Connection connection = ConnectionFactory.createConnection(conf)) {
try (Table table = connection.getTable(TableName.valueOf(tablename)) {
// use table as needed, the table returned is lightweight
}
}