flume · 2022-03-02 0

java 使用 flume

一、maven 引入

 <dependency>
    <groupId>org.apache.flume</groupId>
    <artifactId>flume-ng-node</artifactId>
    <version>1.9.0</version>
</dependency>

<dependency>
    <groupId>org.apache.flume</groupId>
    <artifactId>flume-ng-embedded-agent</artifactId>
    <version>1.9.0</version>
</dependency>

<dependency>
    <groupId>org.apache.flume.flume-ng-sources</groupId>
    <artifactId>flume-taildir-source</artifactId>
    <version>1.9.0</version>
</dependency>

二、flume 配置

flume.conf

sinks 为自定义的类

app1.sources = r1
app1.channels = c1
app1.sinks = k1

app1.sources.r1.type = avro
app1.sources.r1.bind = 0.0.0.0
app1.sources.r1.port = 44444

app1.channels.c1.type = file
app1.channels.c1.checkpointDir = /tmp/mychannel/checkpoint
app1.channels.c1.dataDirs = /tmp/mychannel/data

app1.sinks.k1.type = com.example.flume.MySink
app1.sinks.k1.username = zxm
app1.sinks.k1.password = 123456

app1.sources.r1.channels = c1
app1.sinks.k1.channel = c1

三、自定义 sinks

public class MySink extends AbstractSink implements Configurable {

    private static final Logger LOGGER = LoggerFactory.getLogger(MySink.class);
    private String username;
    private String password;

    @Override
    public void configure(Context context) {
        username = context.getString("username");
        password = context.getString("password");
        LOGGER.info("username: " + username);
        LOGGER.info("password: " + password);
    }

    @Override
    public Status process() throws EventDeliveryException {
        Status result = Status.READY;

        Channel channel = getChannel();
        Transaction transaction = channel.getTransaction();
        Event event = null;

        try {
            transaction.begin();
            event = channel.take();

            if (event != null) {
                byte[] body = event.getBody();
                String s = new String(body);
                LOGGER.info("event body: " + s);
            } else {
                result = Status.BACKOFF;
            }
            transaction.commit();
        } catch (Exception e) {
            transaction.rollback();
            result = Status.BACKOFF;
        } finally {
            transaction.close();
        }
        return result;
    }

}

四、java 使用 flume

@Test
public void testClient() throws EventDeliveryException {
    RpcClient rpcClient = RpcClientFactory.getDefaultInstance("127.0.0.1", 44444);

    Map<String, String> header = new HashMap<>();
    header.put("object_type", "shop");

    Event event = new SimpleEvent();
    event.setHeaders(header);
    event.setBody("Hello Body".getBytes());
    rpcClient.append(event);
}

@Test
public void testServer() throws InterruptedException {
    String flumeConf = "src/main/resources/flume.conf";
    Application.main(new String[]{"agent", "-f", flumeConf, "-n", "app1"});
    TimeUnit.MINUTES.sleep(60);
}