博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
flume 自己定义 hbase sink 类
阅读量:6912 次
发布时间:2019-06-27

本文共 6668 字,大约阅读时间需要 22 分钟。

參考(向原作者致敬)
  • http://ydt619.blog.51cto.com/316163/1230586
  • https://blogs.apache.org/flume/entry/streaming_data_into_apache_hbase

flume 1.5 的配置文件演示样例

#Name the  components on this agenta1.sources  = r1a1.sinks =  k1a1.channels  = c1#  Describe/configure the sourcea1.sources.r1.type  = spooldira1.sources.r1.spoolDir  = /home/scut/Downloads/testFlume# Describe  the sinka1.sinks.k1.type  = org.apache.flume.sink.hbase.AsyncHBaseSinka1.sinks.k1.table = Router #设置hbase的表名a1.sinks.k1.columnFamily = log #设置hbase中的columnFamilya1.sinks.k1.serializer.payloadColumn=serviceTime,browerOS,clientTime,screenHeight,screenWidth,url,userAgent,mobileDevice,gwId,mac # 设置hbase的columna1.sinks.k1.serializer = org.apache.flume.sink.hbase.BaimiAsyncHbaseEventSerializer # 设置serializer的处理类# Use a  channel which buffers events in memorya1.channels.c1.type  = memorya1.channels.c1.capacity  = 1000a1.channels.c1.transactionCapacity  = 100# Bind the  source and sink to the channela1.sources.r1.channels  = c1a1.sinks.k1.channel  = c1
重点说明几个属性
  • a1.sinks.k1.serializer.payloadColumn 中列出了全部的列名。
  • a1.sinks.k1.serializer设置了flume serializer的处理类。BaimiAsyncHbaseEventSerializer类中会获取payloadColumn的内容。将它以逗号分隔。从而得出全部的列名。

BaimiAsyncHbaseEventSerializer类

/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements.  See the NOTICE file * distributed with this work for additional information * regarding copyright ownership.  The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License.  You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied.  See the License for the * specific language governing permissions and limitations * under the License. */package org.apache.flume.sink.hbase;import java.util.ArrayList;import java.util.List;import org.apache.flume.Context;import org.apache.flume.Event;import org.apache.flume.FlumeException;import org.hbase.async.AtomicIncrementRequest;import org.hbase.async.PutRequest;import org.apache.flume.conf.ComponentConfiguration;import org.apache.flume.sink.hbase.SimpleHbaseEventSerializer.KeyType;import com.google.common.base.Charsets;public class BaimiAsyncHbaseEventSerializer implements AsyncHbaseEventSerializer {  private byte[] table;  private byte[] cf;  private byte[][] payload;  private byte[][] payloadColumn;  private final String payloadColumnSplit = "\\^A";  private byte[] incrementColumn;  private String rowSuffix;  private String rowSuffixCol;  private byte[] incrementRow;  private KeyType keyType;  @Override  public void initialize(byte[] table, byte[] cf) {    this.table = table;    this.cf = cf;  }  @Override  public List
getActions() { List
actions = new ArrayList
(); if(payloadColumn != null){ byte[] rowKey; try { switch (keyType) { case TS: rowKey = SimpleRowKeyGenerator.getTimestampKey(rowSuffix); break; case TSNANO: rowKey = SimpleRowKeyGenerator.getNanoTimestampKey(rowSuffix); break; case RANDOM: rowKey = SimpleRowKeyGenerator.getRandomKey(rowSuffix); break; default: rowKey = SimpleRowKeyGenerator.getUUIDKey(rowSuffix); break; } // for 循环。提交全部列和对于数据的put请求。 for (int i = 0; i < this.payload.length; i++) { PutRequest putRequest = new PutRequest(table, rowKey, cf,payloadColumn[i], payload[i]); actions.add(putRequest); } } catch (Exception e){ throw new FlumeException("Could not get row key!", e); } } return actions; } public List
getIncrements(){ List
actions = new ArrayList
(); if(incrementColumn != null) { AtomicIncrementRequest inc = new AtomicIncrementRequest(table, incrementRow, cf, incrementColumn); actions.add(inc); } return actions; } @Override public void cleanUp() { // TODO Auto-generated method stub } @Override public void configure(Context context) { String pCol = context.getString("payloadColumn", "pCol"); String iCol = context.getString("incrementColumn", "iCol"); rowSuffixCol = context.getString("rowPrefixCol", "mac"); String suffix = context.getString("suffix", "uuid"); if(pCol != null && !pCol.isEmpty()) { if(suffix.equals("timestamp")){ keyType = KeyType.TS; } else if (suffix.equals("random")) { keyType = KeyType.RANDOM; } else if(suffix.equals("nano")){ keyType = KeyType.TSNANO; } else { keyType = KeyType.UUID; } // 从配置文件里读出column。 String[] pCols = pCol.replace(" ", "").split(","); payloadColumn = new byte[pCols.length][]; for (int i = 0; i < pCols.length; i++) { // 列名转为小写 payloadColumn[i] = pCols[i].toLowerCase().getBytes(Charsets.UTF_8); } } if(iCol != null && !iCol.isEmpty()) { incrementColumn = iCol.getBytes(Charsets.UTF_8); } incrementRow = context.getString("incrementRow", "incRow").getBytes(Charsets.UTF_8); } @Override public void setEvent(Event event) { String strBody = new String(event.getBody()); String[] subBody = strBody.split(this.payloadColumnSplit); if (subBody.length == this.payloadColumn.length) { this.payload = new byte[subBody.length][]; for (int i = 0; i < subBody.length; i++) { this.payload[i] = subBody[i].getBytes(Charsets.UTF_8); if ((new String(this.payloadColumn[i]).equals(this.rowSuffixCol))) { // rowkey 前缀是某一列的值, 默认情况是mac地址 this.rowSuffix = subBody[i]; } } } } @Override public void configure(ComponentConfiguration conf) { // TODO Auto-generated method stub }}
重点能够查看setEent,configure,getActions函数。

  • configure函数:读取flume配置文件内容。包含列名。rowkey后缀等信息
  • setEvent函数:获取flume event 内容,将其保存到payload数组中。
  • getActions函数:创建PutRequest实例。将rowkey,columnfamily,column,value等信息写入putrequest实例中。

源代码编译和运行

     编写好自己定义的BaimiAsyncHbaseEventSerializer函数后,接下来须要编译源代码,生成flume-ng-hbase-sink.*.jar包,替换flume中原来的flume-ng-hbase-sink.*.jar包。
  • 下载flume 1.5 源代码,解压后进入文件夹flume-1.5.0-src/flume-ng-sinks/flume-ng-hbase-sinks/src/main/java/org/apache/flume/sink/hbase/
  • 复制上面的BaimiAsyncHbaseEventSerializer类到上面的文件夹中。
  • 进入flume-1.5.0-src/flume-ng-sinks/flume-ng-hbase-sinks/。执行mvn编译命令【mvn install -Dmaven.test.skip=true
  • mvn编译后会在flume-1.5.0-src/flume-ng-sinks/flume-ng-hbase-sinks/target文件夹下生成flume-ng-hbase-sink-1.5.0.jar,将这个jar包替换$FLUME_HOME/lib下的jar包
  • 执行flume执行命令【flume-ng agent -c . -f conf/spoolDir.conf -n a1  -Dflume.root.logger=INFO,console

转载地址:http://iencl.baihongyu.com/

你可能感兴趣的文章
jsp学习-分页功能的实现
查看>>
第三章 熟悉常用的HDFS操作
查看>>
23种C++设计模式:Factory 模式
查看>>
Tomcat在webapps下部署多个项目
查看>>
截取字符串一之substr
查看>>
hibernate.cfg.xml配置文件对关联关系的书写技巧!
查看>>
【ORACLE】使用中注意事项(二)
查看>>
复选框单选框与文字对齐问题的研究与解决
查看>>
linux 常用命令-配置登陆方式
查看>>
[Android Pro] Android 手机root 并 安装 BusyBox pro 和 Android Terminal Emulator
查看>>
[Android Pro] 查看 keystore文件的签名信息 和 检查apk文件中的签名信息
查看>>
业务数据模型短暂思考整理
查看>>
教你构建iSCSI服务器实现SAN存储模型
查看>>
python 脚本学习(二)
查看>>
活动目录里导出用户信息后在Access里update数据的"风波"
查看>>
Gartner:2012年SIEM(安全信息与事件管理)市场分析报告
查看>>
Hyper-V 3.0部署PART 11:创建Hyper-V群集
查看>>
PowerShell 运维菜鸟系列-01-批量为n台服务器导入PFX证书(2017年除夕奉献)
查看>>
管理的相对公平
查看>>
SCCM2012 R2集成WSUS服务器-1:启用软件更新点
查看>>