(1). 需求
自定义拦截器
(2). 添加依赖
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.8.0</version>
<!-- 仅仅在编译时需要 -->
<scope>compile</scope>
</dependency>
(3). 创建Interceptor
package help.lixin.flume.interceptor;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* 自定义拦截器
*
* @author lixin
*/
public class CustomerInterceptor implements Interceptor {
private static final Logger logger = LoggerFactory.getLogger(CustomerInterceptor.class);
private static final String LEVEL = "LEVEL";
private static final String DEBUG = "DEBUG";
private static final String ERROR = "ERROR";
private static final String INFO = "INFO";
private static final String WARN = "WARN";
private List<Event> tmpEvents;
public void initialize() {
tmpEvents = new ArrayList<Event>();
}
public Event intercept(Event event) {
Map<String, String> headers = event.getHeaders();
String body = new String(event.getBody());
// 在Header中添加信息.
if (body.contains("DEBUG")) {
headers.put(LEVEL, DEBUG);
} else if (body.contains("ERROR")) {
headers.put(LEVEL, ERROR);
} else if (body.contains("INFO")) {
headers.put(LEVEL, INFO);
} else if (body.contains("WARN")) {
headers.put(LEVEL, WARN);
}
return event;
}
public List<Event> intercept(List<Event> events) {
tmpEvents.clear();
for (Event event : events) {
tmpEvents.add(intercept(event));
}
return tmpEvents;
}
public void close() {
}
public static class Builder implements Interceptor.Builder {
public void configure(Context context) {
}
public Interceptor build() {
return new CustomerInterceptor();
}
}
}
(4). 打包
将Maven项目打包成jar,并添加到${FLUME}/lib/目录下
(5). Flum1配置(taildir-flume-customer-interceptor.conf)
a1.sources = r1
a1.channels = c1 c2
a1.sinks = k1 k2
# 定义source的selector类型
a1.sources.r1.selector.type=multiplexing
a1.sources.r1.selector.header=LEVEL
a1.sources.r1.selector.mapping.DEBUG = c1
a1.sources.r1.selector.mapping.ERROR = c1
a1.sources.r1.selector.mapping.INFO = c2
a1.sources.r1.selector.mapping.WARN = c2
a1.sources.r1.type=TAILDIR
a1.sources.r1.positionFile=/tmp/log/flume/taildir_position.json
# define groups
a1.sources.r1.filegroups=f2
#group f2
a1.sources.r1.filegroups.f2=/Users/lixin/Workspace/spring-cloud-sample-provider/logs/level-logs/.*.log
a1.sources.r1.fileHeader = true
a1.sources.ri.maxBatchCount = 1000
a1.sources.r1.interceptors = custimerInceptor
a1.sources.r1.interceptors.custimerInceptor.type=help.lixin.flume.interceptor.CustomerInterceptor$Builder
a1.sources.r1.channels=c1 c2
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100
# k1输出到4545端口
a1.sinks.k1.type=avro
a1.sinks.k1.hostname=localhost
a1.sinks.k1.port=4545
a1.sinks.k1.channel=c1
# k2输出到4546端口
a1.sinks.k2.type=avro
a1.sinks.k2.hostname=localhost
a1.sinks.k2.port=4546
a1.sinks.k2.channel=c2
(6). Flume2配置(avro-flume-log.conf)
a1.sources = r1
a1.channels = c1
a1.sinks = k1
# Describe/configure the source
a1.sources.r1.type = avro
a1.sources.r1.bind = localhost
a1.sources.r1.port = 4545
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Describe the sink
a1.sinks.k1.type = logger
# Bind the source and sink to the channel
a1.sinks.k1.channel = c1
(7). Flume2配置(avro-flume-file.conf)
a1.sources = r1
a1.channels = c1
a1.sinks = k1
# Describe/configure the source
a1.sources.r1.type = avro
a1.sources.r1.bind = localhost
a1.sources.r1.port = 4546
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Describe the sink
a1.sinks.k1.type = file_roll
a1.sinks.k1.sink.rollInterval=0
a1.sinks.k1.sink.directory = /tmp/log/flume
# Bind the source and sink to the channel
a1.sinks.k1.channel = c1
(8). Flume1启动
bin/flume-ng agent --name a1 --conf conf --conf-file ./works/taildir-flume-customer-interceptor.conf -Dflume.root.logger=INFO,console
(9). Flume2启动
bin/flume-ng agent --name a1 --conf conf --conf-file ./works/avro-flume-log.conf -Dflume.root.logger=INFO,console
(10). Flume3启动
bin/flume-ng agent --name a1 --conf conf --conf-file ./works/avro-flume-file.conf -Dflume.root.logger=INFO,console
(10). 总结
ERROR/DEBUG级别日志会在Flume(4545端口)的控制台输出
INFO/WARN级别日志会在Flume(4546端口)的文件目录(/tmp/log/flume)里