kafka与flume整合(kafka充当source,sink,channel)

kafka与flume整合(kafka充当source,sink,channel)

1. Kafka充当Flume的source数据源,此时也就意味着Flume需要采集Kafka的数据,Flume相当于是kafka的一个消费者

.conf文件(KafkaToConsole.conf)

#sources别名:r1

a1.sources = r1

#sink别名:k1

a1.sinks = k1

#channel别名:c1

a1.channels = c1

# 定义flume的source数据源 ---kafka

a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource

a1.sources.r1.batchSize = 5000

a1.sources.r1.batchDurationMillis = 2000

a1.sources.r1.kafka.bootstrap.servers = node1:9092,node2:9092,node3:9092

a1.sources.r1.kafka.topics = kafka

a1.sources.r1.kafka.consumer.group.id = uek

# 定义flume的channel----使用基于内存的channel

a1.channels.c1.type = memory

#默认该通道中最大的可以存储的event数量是1000

a1.channels.c1.capacity = 10000

#每次最大可以source中拿到或者送到sink中的event数量也是100

a1.channels.c1.transactionCapacity = 5000

# 定义sink----console---logger

a1.sinks.k1.type = logger

# 整合一个flume进程中channel source sink

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

启动

# 启动flume

[root@node1 data]# flume-ng agent -n a1 -f KafkaToConsole.conf -Dflume.root.logger=INFO,console

# 生产数据1

[root@node2 ~]# kafka-console-producer.sh --broker-list node1:9092,node2:9092,node3:9092 --topic kafka

# 生产数据2---通过java代码的方式生产

package new_pro;

import org.apache.kafka.clients.producer.KafkaProducer;

import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

// 新版本生产者代码--建议大家使用的版本

public class NewProducer {

public static void main(String[] args) {

Properties props = new Properties();

// Kafka服务端的主机名和端口号

props.put("bootstrap.servers", "192.168.200.111:9092, 192.168.200.112:9092, 192.168.200.113:9092");

// 等待所有副本节点的应答

props.put("acks", "all");

// 消息发送最大尝试次数

props.put("retries", 0);

// 一批消息处理大小

props.put("batch.size", 16384);

// 请求延时

props.put("linger.ms", 1);

// 发送缓存区内存大小

props.put("buffer.memory", 33554432);

// key序列化

props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");

// value序列化

props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

KafkaProducer producer = new KafkaProducer(props);

for (int i = 0; i < 100; i++) {

producer.send(new ProducerRecord("kafka", "hello--"+i));

}

// 必须有下面的语句才行,二选一

// producer.commitTransaction();

producer.close();

}

}

2. Kafka充当Flume的channel缓存管道,相当于是将Flume采集的数据源的数据先存放到kafka当中,此时Flume相当于是Kafka的生产者

说明:flume相当于是kafka的生产者,生产数据到指定的主题中 主题是flume(自定义) 如果flume主题不存在 会自动创建。但是如果是自动创建的,必须重新启动一下flume采集程序,因为自动创建的topic主题的分区有问题的。重启一下即可

如果flume充当kafka的生产者,要求topic主题最好提前存在

.config文件(netcatToLogger.conf)

#sources别名:r1

a1.sources = r1

#sink别名:k1

a1.sinks = k1

#channel别名:c1

a1.channels = c1

# 定义flume的source数据源 ---netcat

a1.sources.r1.type = netcat

#监听的主机ip:

a1.sources.r1.bind = node1

#监听的端口:

a1.sources.r1.port = 44444

#

# 定义flume的channel----使用基于kafka的channel

a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel

a1.channels.c1.kafka.bootstrap.servers = node1:9092,node2:9092,node3:9092

a1.channels.c1.kafka.topic = flume

a1.channels.c1.kafka.consumer.group.id = flume-consumer

# 定义sink----console---logger

a1.sinks.k1.type = logger

# 整合一个flume进程中channel source sink

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

启动

[root@node1 data]# flume-ng agent -n a1 -f netcatToLogger.conf -Dflume.root.logger=INFO,console

[root@node2 ~]# telnet node1 44444

3. 最常用的整合方式:Kafka充当Flume的sink下沉地,也就意味着需要将Flume采集的数据源的数据采集到Kafka中,此时Flume相当于是Kafka的生产者

.conf文件(netcatToKafka.conf)

#sources别名:r1

a1.sources = r1

#sink别名:k1

a1.sinks = k1

#channel别名:c1

a1.channels = c1

# 定义flume的source数据源 ---netcat

a1.sources.r1.type = netcat

#监听的主机ip:

a1.sources.r1.bind = node1

#监听的端口:

a1.sources.r1.port = 44444

# 定义flume的channel----使用基于kafka的channel

a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel

a1.channels.c1.kafka.bootstrap.servers = node1:9092,node2:9092,node3:9092

a1.channels.c1.kafka.topic = flume

a1.channels.c1.kafka.consumer.group.id = flume-consumer

# 定义sink----console---logger

a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink

a1.sinks.k1.kafka.topic = student

a1.sinks.k1.kafka.bootstrap.servers = node1:9092,node2:9092,node3:9092

a1.sinks.k1.kafka.flumeBatchSize = 200

a1.sinks.k1.kafka.producer.acks = 1

# 整合一个flume进程中channel source sink

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

启动

[root@node1 data]# flume-ng agent -n a1 -f netcatToKafka.conf -Dflume.root.logger=INFO,console

[root@node2 ~]# telnet node1 44444

[root@node3 ~]# kafka-console-consumer.sh --bootstrap-server node1:9092 --topic student

本文来自博客园,作者:jsqup,转载请注明原文链接:https://www.cnblogs.com/jsqup/p/16667979.html

相关推荐

放逐游戏大逃杀车在哪里 怎么开车
一篇文章讲透:朋友圈广告怎么投放,投放步骤、投放技巧、收费标准
算分必备!世界杯小组赛出线规则一览,前两名晋级