0
点赞
收藏
分享

微信扫一扫

【Flume中间件】(13)自定义Source


自定义Source

有时候,flume中的source不符合我们的需求,这时就可以进行自己定义Source。

自定义Source的流程就是首先继承并实现官方类,然后实现相应的方法,重点是读取数据的方法,在该内部可以定义jdbc或者是IO流进行读取数据。

然后将数据封装成事件,交给channel处理器。

处理器的内部流程是先将该事件交给拦截器进行处理(封装头部信息等),然后判断是否为空,不为空,将其将给选择器,将该事件交给自己对应的channel。

自定义的Source类型为自己编写的代码的全类名。

注意要将写好的代码打成jar包丢到flume的lib目录下。

a1.sources = r1
a1.sinks = k1
a1.channels = c1

a1.sources.r1.type = com.atguigu.Source.MySource
a1.sources.r1.prefix=log
a1.sources.r1.suffix=mod

a1.sinks.k1.type = logger

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

package com.atguigu.Source;

import org.apache.flume.Context;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.PollableSource;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.SimpleEvent;
import org.apache.flume.source.AbstractSource;

public class MySource extends AbstractSource implements Configurable, PollableSource {

// 定义键值对
String prefix;
String suffix;

// 根据上下文获取flume中配置的值
@Override
public void configure(Context context) {
prefix = context.getString("prefix");
suffix = context.getString("suffix", ".www");
}

@Override
public Status process() throws EventDeliveryException {
Status status = null;

// 读取数据,可以自定义jdbc或IO流
try {
for (int i = 0; i < 10; i++) {
SimpleEvent event = new SimpleEvent();
event.setBody((prefix + "-->" + i + suffix).getBytes());

// 将事件交给处理器
getChannelProcessor().processEvent(event);

// 设置提交状态
status = Status.READY;
}
} catch (Exception e) {
e.printStackTrace();
status = Status.BACKOFF;
}

// 定义延时,也可以设置成配置参数
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return status;
}

@Override
public long getBackOffSleepIncrement() {
return 0;
}

@Override
public long getMaxBackOffSleepInterval() {
return 0;
}

}


举报

相关推荐

0 条评论