0
点赞
收藏
分享

微信扫一扫

Flume+HBase+Kafka集成与开发

倚然君 2022-06-20 阅读 95

先把flume1.7的源码包下载

​​http://archive.apache.org/dist/flume/1.7.0/​​

Flume+HBase+Kafka集成与开发_ide

 

下载解压后

Flume+HBase+Kafka集成与开发_kafka_02

 

我们通过IDEA这个软件来打开这个工程

 Flume+HBase+Kafka集成与开发_ide_03

Flume+HBase+Kafka集成与开发_ide_04

 

 Flume+HBase+Kafka集成与开发_apache_05

 

点击ok后我们选择打开一个新的窗口

 不过这个默认方式导入加载时间很长,建议大家用maven方式导入。

导入之后我们看这个类

Flume+HBase+Kafka集成与开发_ide_06

 

 Flume+HBase+Kafka集成与开发_apache_07

 

看看我们的数据源,就是我们之前下载好的搜狗实验室的数据,之前已经上传到节点1去了

Flume+HBase+Kafka集成与开发_kafka_08

 

这个是我们要配置flume的模型

Flume+HBase+Kafka集成与开发_ide_09

 

下面我们来配置节点1的flume

Flume+HBase+Kafka集成与开发_apache_10

 

配置jdk的绝对路径 

 Flume+HBase+Kafka集成与开发_ide_11

 

下面这个配置暂时这样配置先,往后可能会修改

 Flume+HBase+Kafka集成与开发_apache_12

 

 

下面对下载好的数据进行预处理一下,因为下载下来的数据格式比较混乱

Flume+HBase+Kafka集成与开发_kafka_13

 

 先是按行来把制表符换成逗号,然后生成weblog2.log

Flume+HBase+Kafka集成与开发_ide_14

 

接下来是按行把空格换成逗号生成weblog3.log

Flume+HBase+Kafka集成与开发_apache_15

 

这样子我们就统一用逗号隔开了

Flume+HBase+Kafka集成与开发_apache_16

 

把没用的文件删除掉

Flume+HBase+Kafka集成与开发_ide_17

 

 改下名字

Flume+HBase+Kafka集成与开发_ide_18

 

 把预处理完的weblog.log文件分发到节点2 和节点3上去

Flume+HBase+Kafka集成与开发_apache_19

 

我们对导入的flume源码进行二次开发

我们不要动他原来的,我们新建一个类

Flume+HBase+Kafka集成与开发_kafka_20

 

Flume+HBase+Kafka集成与开发_apache_21

 

 然后把这个类的内容拷过来然后修改文件名和类名

Flume+HBase+Kafka集成与开发_apache_22

 

 Flume+HBase+Kafka集成与开发_kafka_23

 

package org.apache.flume.sink.hbase;

/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

import com.google.common.base.Charsets;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.FlumeException;
import org.apache.flume.conf.ComponentConfiguration;
import org.hbase.async.AtomicIncrementRequest;
import org.hbase.async.PutRequest;

import java.util.ArrayList;
import java.util.List;
//package org.apache.flume.sink.hbase;

import com.google.common.base.Charsets;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.FlumeException;
import org.apache.flume.conf.ComponentConfiguration;
import org.apache.flume.sink.hbase.SimpleHbaseEventSerializer.KeyType;
import org.hbase.async.AtomicIncrementRequest;
import org.hbase.async.PutRequest;

import java.util.ArrayList;
import java.util.List;

/**
* A simple serializer to be used with the AsyncHBaseSink
* that returns puts from an event, by writing the event
* body into it. The headers are discarded. It also updates a row in hbase
* which acts as an event counter.
*
* Takes optional parameters:<p>
* <tt>rowPrefix:</tt> The prefix to be used. Default: <i>default</i><p>
* <tt>incrementRow</tt> The row to increment. Default: <i>incRow</i><p>
* <tt>suffix:</tt> <i>uuid/random/timestamp.</i>Default: <i>uuid</i><p>
*
* Mandatory parameters: <p>
* <tt>cf:</tt>Column family.<p>
* Components that have no defaults and will not be used if absent:
* <tt>payloadColumn:</tt> Which column to put payload in. If it is not present,
* event data will not be written.<p>
* <tt>incrementColumn:</tt> Which column to increment. If this is absent, it
* means no column is incremented.
*/
public class KfkAsyncHbaseEventSerializer implements AsyncHbaseEventSerializer {
private byte[] table;
private byte[] cf;
private byte[] payload;
private byte[] payloadColumn;
private byte[] incrementColumn;
private String rowPrefix;
private byte[] incrementRow;
private SimpleHbaseEventSerializer.KeyType keyType;

@Override
public void initialize(byte[] table, byte[] cf) {
this.table = table;
this.cf = cf;
}

@Override
public List<PutRequest> getActions() {
List<PutRequest> actions = new ArrayList<PutRequest>();
if (payloadColumn != null) {
byte[] rowKey;
try {
switch (keyType) {
case TS:
rowKey = SimpleRowKeyGenerator.getTimestampKey(rowPrefix);
break;
case TSNANO:
rowKey = SimpleRowKeyGenerator.getNanoTimestampKey(rowPrefix);
break;
case RANDOM:
rowKey = SimpleRowKeyGenerator.getRandomKey(rowPrefix);
break;
default:
rowKey = SimpleRowKeyGenerator.getUUIDKey(rowPrefix);
break;
}
PutRequest putRequest = new PutRequest(table, rowKey, cf,
payloadColumn, payload);
actions.add(putRequest);
} catch (Exception e) {
throw new FlumeException("Could not get row key!", e);
}
}
return actions;
}

public List<AtomicIncrementRequest> getIncrements() {
List<AtomicIncrementRequest> actions = new ArrayList<AtomicIncrementRequest>();
if (incrementColumn != null) {
AtomicIncrementRequest inc = new AtomicIncrementRequest(table,
incrementRow, cf, incrementColumn);
actions.add(inc);
}
return actions;
}

@Override
public void cleanUp() {
// TODO Auto-generated method stub

}

@Override
public void configure(Context context) {
String pCol = context.getString("payloadColumn", "pCol");
String iCol = context.getString("incrementColumn", "iCol");
rowPrefix = context.getString("rowPrefix", "default");
String suffix = context.getString("suffix", "uuid");
if (pCol != null && !pCol.isEmpty()) {
if (suffix.equals("timestamp")) {
keyType = SimpleHbaseEventSerializer.KeyType.TS;
} else if (suffix.equals("random")) {
keyType = SimpleHbaseEventSerializer.KeyType.RANDOM;
} else if (suffix.equals("nano")) {
keyType = SimpleHbaseEventSerializer.KeyType.TSNANO;
} else {
keyType = SimpleHbaseEventSerializer.KeyType.UUID;
}
payloadColumn = pCol.getBytes(Charsets.UTF_8);
}
if (iCol != null && !iCol.isEmpty()) {
incrementColumn = iCol.getBytes(Charsets.UTF_8);
}
incrementRow = context.getString("incrementRow", "incRow").getBytes(Charsets.UTF_8);
}

@Override
public void setEvent(Event event) {
this.payload = event.getBody();
}

@Override
public void configure(ComponentConfiguration conf) {
// TODO Auto-generated method stub
}

}

 

在原来基础上稍微做修改

Flume+HBase+Kafka集成与开发_ide_24

 

 

 

 

 

按住ctrl键单机鼠标进去

Flume+HBase+Kafka集成与开发_ide_25

 

添加以下内容

Flume+HBase+Kafka集成与开发_apache_26

 

/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.flume.sink.hbase;

import java.io.UnsupportedEncodingException;
import java.util.Random;
import java.util.UUID;

/**
* Utility class for users to generate their own keys. Any key can be used,
* this is just a utility that provides a set of simple keys.
*/
public class SimpleRowKeyGenerator {

public static byte[] getUUIDKey(String prefix) throws UnsupportedEncodingException {
return (prefix + UUID.randomUUID().toString()).getBytes("UTF8");
}

public static byte[] getRandomKey(String prefix) throws UnsupportedEncodingException {
return (prefix + String.valueOf(new Random().nextLong())).getBytes("UTF8");
}

public static byte[] getTimestampKey(String prefix) throws UnsupportedEncodingException {
return (prefix + String.valueOf(System.currentTimeMillis())).getBytes("UTF8");
}

public static byte[] getNanoTimestampKey(String prefix) throws UnsupportedEncodingException {
return (prefix + String.valueOf(System.nanoTime())).getBytes("UTF8");
}

public static byte[] getKfkRowKey(String userid,String datetime) throws UnsupportedEncodingException {
return (userid + datetime + String.valueOf(System.currentTimeMillis())).getBytes("UTF8");
}

}

 

 继续修改,修改后的代码是下面的

KfkAsyncHbaseEventSerializer.java
package org.apache.flume.sink.hbase;

/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

import com.google.common.base.Charsets;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.FlumeException;
import org.apache.flume.conf.ComponentConfiguration;
import org.hbase.async.AtomicIncrementRequest;
import org.hbase.async.PutRequest;

import java.util.ArrayList;
import java.util.List;
//package org.apache.flume.sink.hbase;

import com.google.common.base.Charsets;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.FlumeException;
import org.apache.flume.conf.ComponentConfiguration;
import org.apache.flume.sink.hbase.SimpleHbaseEventSerializer.KeyType;
import org.hbase.async.AtomicIncrementRequest;
import org.hbase.async.PutRequest;

import java.util.ArrayList;
import java.util.List;

/**
* A simple serializer to be used with the AsyncHBaseSink
* that returns puts from an event, by writing the event
* body into it. The headers are discarded. It also updates a row in hbase
* which acts as an event counter.
*
* Takes optional parameters:<p>
* <tt>rowPrefix:</tt> The prefix to be used. Default: <i>default</i><p>
* <tt>incrementRow</tt> The row to increment. Default: <i>incRow</i><p>
* <tt>suffix:</tt> <i>uuid/random/timestamp.</i>Default: <i>uuid</i><p>
*
* Mandatory parameters: <p>
* <tt>cf:</tt>Column family.<p>
* Components that have no defaults and will not be used if absent:
* <tt>payloadColumn:</tt> Which column to put payload in. If it is not present,
* event data will not be written.<p>
* <tt>incrementColumn:</tt> Which column to increment. If this is absent, it
* means no column is incremented.
*/
public class KfkAsyncHbaseEventSerializer implements AsyncHbaseEventSerializer {
private byte[] table;
private byte[] cf;
private byte[] payload;
private byte[] payloadColumn;
private byte[] incrementColumn;
private String rowPrefix;
private byte[] incrementRow;
private SimpleHbaseEventSerializer.KeyType keyType;

@Override
public void initialize(byte[] table, byte[] cf) {
this.table = table;
this.cf = cf;
}

@Override
public List<PutRequest> getActions() {
List<PutRequest> actions = new ArrayList<PutRequest>();
if (payloadColumn != null) {
byte[] rowKey;
try {


String [] columns =String.valueOf(payloadColumn).split(",");
String [] values =String.valueOf(this.payload).split(",");
for(int i=0;i<columns.length;i++) {
byte[] colColumn=columns[i].getBytes();
byte[] colValue=values[i].getBytes(Charsets.UTF_8);
if(colColumn.length!=colValue.length) break; //continue;
// if(colValue.length<3) continue;
String datetime = values[0].toString();
String userid = values[1].toString();
rowKey = SimpleRowKeyGenerator.getKfkRowKey(userid,datetime);
//获取6个列的值最终加载到hbase
PutRequest putRequest = new PutRequest(table, rowKey, cf,
colColumn, colValue);
actions.add(putRequest);
}
} catch (Exception e) {
throw new FlumeException("Could not get row key!", e);
}
}
return actions;
}

public List<AtomicIncrementRequest> getIncrements() {
List<AtomicIncrementRequest> actions = new ArrayList<AtomicIncrementRequest>();
if (incrementColumn != null) {
AtomicIncrementRequest inc = new AtomicIncrementRequest(table,
incrementRow, cf, incrementColumn);
actions.add(inc);
}
return actions;
}

@Override
public void cleanUp() {
// TODO Auto-generated method stub

}

@Override
public void configure(Context context) {
String pCol = context.getString("payloadColumn", "pCol");
String iCol = context.getString("incrementColumn", "iCol");
rowPrefix = context.getString("rowPrefix", "default");
String suffix = context.getString("suffix", "uuid");
if (pCol != null && !pCol.isEmpty()) {
if (suffix.equals("timestamp")) {
keyType = SimpleHbaseEventSerializer.KeyType.TS;
} else if (suffix.equals("random")) {
keyType = SimpleHbaseEventSerializer.KeyType.RANDOM;
} else if (suffix.equals("nano")) {
keyType = SimpleHbaseEventSerializer.KeyType.TSNANO;
} else {
keyType = SimpleHbaseEventSerializer.KeyType.UUID;
}
payloadColumn = pCol.getBytes(Charsets.UTF_8);
}
if (iCol != null && !iCol.isEmpty()) {
incrementColumn = iCol.getBytes(Charsets.UTF_8);
}
incrementRow = context.getString("incrementRow", "incRow").getBytes(Charsets.UTF_8);
}

@Override
public void setEvent(Event event) {
this.payload = event.getBody();
}

@Override
public void configure(ComponentConfiguration conf) {
// TODO Auto-generated method stub
}

}

 

现在把代码打包

Flume+HBase+Kafka集成与开发_ide_27

 

Flume+HBase+Kafka集成与开发_kafka_28

 

Flume+HBase+Kafka集成与开发_apache_29

 

我们可以看到有很多相关的依赖包,我们把不需要的删掉

 

Flume+HBase+Kafka集成与开发_apache_30

 

Flume+HBase+Kafka集成与开发_kafka_31

 

 Flume+HBase+Kafka集成与开发_apache_32

Flume+HBase+Kafka集成与开发_apache_33

 

 

Flume+HBase+Kafka集成与开发_apache_34

 

直接点击Build就可以了 

Flume+HBase+Kafka集成与开发_kafka_35

 

Flume+HBase+Kafka集成与开发_ide_36

 

打好的架包在本地的工程路径的这里

Flume+HBase+Kafka集成与开发_ide_37

 

 

现在把这个架包上传到flume的lib目录下

Flume+HBase+Kafka集成与开发_ide_38

也就是这个目录。

Flume+HBase+Kafka集成与开发_ide_39

 

可以看到上传日期,就是今天上传的

Flume+HBase+Kafka集成与开发_apache_40

 

 

下面配置flume + kafka

Flume+HBase+Kafka集成与开发_apache_41

 

agent1.sources = r1
agent1.channels = kafkaC hbaseC
agent1.sinks=kafkaSink hbaseSink

#***********flume + hbase************
agent1.sources.r1.type = avro
agent1.sources.r1.channels = hbaseC
agent1.sources.r1.bind = bigdata-pro01.kfk.com
agent1.sources.r1.port=5555
agent1.sources.r1.threads=5

agent1.channels.hbaseC.type = memory
agent1.channels.hbaseC.capacity = 100000
agent1.channels.hbaseC.transactionCapacity = 100000
agent1.channels.hbaseC.keep-alive=20

agent1.sinks.hbaseSink.type = asynchbase
agent1.sinks.hbaseSink.table=weblogs
agent1.sinks.hbaseSink.columnFamily=info
agent1.sinks.hbaseSink.serializer= org.apache.flume.sink.hbase.KfkAsyncHbaseEventSerializer
agent1.sinks.hbaseSink.channel = hbaseC
agent1.sinks.hbaseSink.serializer.payloadColumn=datatime,userid,searchname,retorder,cliorder,cliurl

#**************flume + kafka***************
agent1.channels.kafkaC.type = memory
agent1.channels.kafkaC.capacity = 100000
agent1.channels.kafkaC.transactionCapacity = 100000
agent1.channels.kafkaC.keep-alive=20

agent1.sinks.kafkaSink.channel = kafkaC
agent1.sinks.kafkaSink.type= org.apache.flume.sink.kafka.KafkaSink
agent1.sinks.kafkaSink.kafka.brokerList=bigdata-pro01.kfk.com:9092,bigdata-pro02.kfk.com:9092,bigdata-pro03.kfk.com:9092
agent1.sinks.kafkaSink.topic=test
agent1.sinks.kafkaSink.zookeeperConnect=bigdata-pro01.kfk.com:2181,bigdata-pro02.kfk.com:2181,bigdata-pro03.kfk.com:2181
agent1.sinks.kafkaSink.requiredAcks=1
agent1.sinks.kafkaSink.batchSize=1
agent1.sinks.kafkaSink.serializer.class=kafka.serializer.StringEncoder

 

举报

相关推荐

0 条评论