一、maven 引入
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-node</artifactId>
<version>1.9.0</version>
</dependency>
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-embedded-agent</artifactId>
<version>1.9.0</version>
</dependency>
<dependency>
<groupId>org.apache.flume.flume-ng-sources</groupId>
<artifactId>flume-taildir-source</artifactId>
<version>1.9.0</version>
</dependency>
二、flume 配置
flume.conf
sinks 为自定义的类
app1.sources = r1
app1.channels = c1
app1.sinks = k1
app1.sources.r1.type = avro
app1.sources.r1.bind = 0.0.0.0
app1.sources.r1.port = 44444
app1.channels.c1.type = file
app1.channels.c1.checkpointDir = /tmp/mychannel/checkpoint
app1.channels.c1.dataDirs = /tmp/mychannel/data
app1.sinks.k1.type = com.example.flume.MySink
app1.sinks.k1.username = zxm
app1.sinks.k1.password = 123456
app1.sources.r1.channels = c1
app1.sinks.k1.channel = c1
三、自定义 sinks
public class MySink extends AbstractSink implements Configurable {
private static final Logger LOGGER = LoggerFactory.getLogger(MySink.class);
private String username;
private String password;
@Override
public void configure(Context context) {
username = context.getString("username");
password = context.getString("password");
LOGGER.info("username: " + username);
LOGGER.info("password: " + password);
}
@Override
public Status process() throws EventDeliveryException {
Status result = Status.READY;
Channel channel = getChannel();
Transaction transaction = channel.getTransaction();
Event event = null;
try {
transaction.begin();
event = channel.take();
if (event != null) {
byte[] body = event.getBody();
String s = new String(body);
LOGGER.info("event body: " + s);
} else {
result = Status.BACKOFF;
}
transaction.commit();
} catch (Exception e) {
transaction.rollback();
result = Status.BACKOFF;
} finally {
transaction.close();
}
return result;
}
}
四、java 使用 flume
@Test
public void testClient() throws EventDeliveryException {
RpcClient rpcClient = RpcClientFactory.getDefaultInstance("127.0.0.1", 44444);
Map<String, String> header = new HashMap<>();
header.put("object_type", "shop");
Event event = new SimpleEvent();
event.setHeaders(header);
event.setBody("Hello Body".getBytes());
rpcClient.append(event);
}
@Test
public void testServer() throws InterruptedException {
String flumeConf = "src/main/resources/flume.conf";
Application.main(new String[]{"agent", "-f", flumeConf, "-n", "app1"});
TimeUnit.MINUTES.sleep(60);
}