1.kafka安装
1.1下载安装包解压
tar -zxvf kafka_2.11-0.10.1.1.tgz cd kafka_2.11-0.10.1.1/
1.2启动服务
首先启动zookeeper:这里使用zookeeper默认配置。
nohup bin/zookeeper-server-start.sh config/zookeeper.properties >/dev/null 2>&1 &
启动kafka服务:这里使用单节点多代理配置。
修改server.properties
broker.id=0 #节点IDlisteners=PLAINTEXT://192.168.1.112:9092 #节点配置log.dirs=/opt/kafka_2.11-0.10.1.1/log #数据日志保存目录num.partitions=4 #默认分区数zookeeper.connect=192.168.1.112:2181 #zookeeper服务
其他配置项保持默认。
复制配置文件
cp config/server.properties config/server-1.propertiescp config/server.properties config/server-2.properties
修改配置节点id 、端口、日志路径
server-1.properties
broker.id=1 #节点IDlisteners=PLAINTEXT://192.168.1.112:9093 #节点配置log.dirs=/opt/kafka_2.11-0.10.1.1/log-1 #数据日志保存目录num.partitions=4 #默认分区数zookeeper.connect=192.168.1.112:2181 #zookeeper服务
server-2.properties
broker.id=2 #节点IDlisteners=PLAINTEXT://192.168.1.112:9094 #节点配置log.dirs=/opt/kafka_2.11-0.10.1.1/log-2 #数据日志保存目录num.partitions=4 #默认分区数zookeeper.connect=192.168.1.112:2181 #zookeeper服务
启动kafka节点:broker0,broker1,broker2
nohup bin/kafka-server-start.sh config/server.properties >/dev/null 2>&1 &nohup bin/kafka-server-start.sh config/server-1.properties >/dev/null 2>&1 &nohup bin/kafka-server-start.sh config/server-2.properties >/dev/null 2>&1 &
1.3创建topic
创建topic:设置备份2,分区3
./bin/kafka-topics.sh --create --zookeeper 192.168.1.112:2181 --replication-factor 2 --partitions 3 --topic my-topic
运行命令 “describe topics”查看topic信息
bin/kafka-topics.sh --describe --zookeeper 192.168.1.112:2181 --topic my-topic
输出:
Topic:my-topic PartitionCount:3 ReplicationFactor:2 Configs: Topic: my-topic Partition: 0 Leader: 1 Replicas: 1,2 Isr: 1,2 Topic: my-topic Partition: 1 Leader: 2 Replicas: 2,0 Isr: 2,0 Topic: my-topic Partition: 2 Leader: 0 Replicas: 0,1 Isr: 0,1
第一行是所有分区的摘要,其次,每一行提供一个分区信息。这里三行显示每个分区的信息。
“Partition”:分区信息
“Leader”:该节点负责该分区的所有的读和写,每个节点的leader
都是随机选择的。
“Replicas”:备份节点列表,无论该节点是否是leader或者目前是否还活着,只是显示。
“Isr”:“同步备份”的节点列表,也就是活着的节点并且正在同步leader。
在topic中发布消息:
bin/kafka-console-producer.sh --broker-list 192.168.1.112:9092 --topic my-topicthis is message 1this is message 2
在topic中获取消息
bin/kafka-console-consumer.sh --zookeeper 192.168.1.112:2181 --topic my-topic --from-beginningthis is message 1this is message 2
测试kill掉其中一个节点,分区是否正常工作,这里我们停掉broker0
ps | grep server.propertieskill -9 3789
然后查看分区信息
bin/kafka-topics.sh --describe --zookeeper 192.168.1.112:2181 --topic my-topicTopic:my-topic PartitionCount:3 ReplicationFactor:2 Configs: Topic: my-topic Partition: 0 Leader: 1 Replicas: 1,2 Isr: 1,2 Topic: my-topic Partition: 1 Leader: 2 Replicas: 2,0 Isr: 2 Topic: my-topic Partition: 2 Leader: 1 Replicas: 0,1 Isr: 1
跟前面执行"describe topic"命令相比,分区2的leader变为broker1,“同步备份”的 列表中也没有broker0。执行从最初获取消息的命令也会发现,消息并没有丢失。
2.kafka接口API
Apache Kafka引入一个新的java客户端(在org.apache.kafka.clients 包中),替代老的Scala客户端,但是为了兼容,将会共存一段时间。为了减少依赖,这些客户端都有一个独立的jar,而旧的Scala客户端继续与服务端保留在同个包下。
在项目中引入kafka服务端jar包,包含scala客户端。
org.apache.kafka kafka_2.11 0.10.1.1
我们鼓励所有新开发的程序使用新的JAVA客户端,新的java客户端比以前的Scala的客户端更快、功能更全面。
org.apache.kafka kafka-clients 0.10.1.1
2.1生产者API
生产者是线程安全的,在多个线程之间共享单个生产者实例。
public static void main(String[] args) throws Exception { Properties props = new Properties(); props.put("bootstrap.servers", "192.168.1.112:9092"); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 10); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); KafkaProducer producer = new KafkaProducer(props); for (int i = 0; i < 10; i++) { Futurefuture = producer .send(new ProducerRecord ("my-topic", Integer.toString(i), Integer.toString(i))); } producer.close(); }
生产者缓冲空间池保留尚未发送到服务器的消息,后台I/O线程负责将这些消息转换成请求发送到集群。
默认缓冲可立即发送,即便缓冲空间还没有满,但是,如果你想减少请求的数量,可以设置linger.ms
大于0。这将指示生产者发送请求之前等待一段时间,希望更多的消息填补到未满的批中。
send():方法是异步的,添加消息到缓冲区等待发送,并立即返回。生产者将单个的消息批量在一起发送来提高效率。
producer提供两种send方法实现,第一种不带回调方法
public Futuresend(ProducerRecord record)
异步发送一条消息到topic,并且消息一旦被保存到“等待发送的消息缓存”中,此方法就立即返回。这样并行发送多条消息而不阻塞去等待每一条消息的响应。发送消息返回的结果是RecordMetadata,它指定了消息发送的分区,分配的offset和消息的时间戳。
由于send调用是异步的,它将为发送此消息的响应RecordMetadata返回一个。调用future的get()方法则将阻塞,直到相关请求完成并返回该消息的metadata,或抛出发送异常。
阻塞方式调用:
Futurefuture = producer.send(new ProducerRecord ("my-topic", Integer.toString(i), Integer.toString(i))); RecordMetadata recordMetadata = future.get(); System.out.println("partition="+recordMetadata.partition()+",offset="+recordMetadata.offset()+"value="+i);
另一种send方法实现是带回调方法:
public Futuresend(ProducerRecord record, Callback callback)
异步发送消息,当发送成功或异常时会调用回调方法,返回发送结果RecordMetadata信息或异常。这样可以不阻塞的方式确保每条消息发送成功。
非阻塞方式调用:
Futurefuture = producer.send(new ProducerRecord ("my-topic", Integer.toString(i), Integer.toString(i)),new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { if(exception == null){ //发送成功 System.out.println("partition=" + metadata.partition() + ",offset="+ metadata.offset()); }else{ //发送异常 exception.printStackTrace(); } } });
发送到同一个分区的消息回调保证按一定的顺序执行。
注意:callback一般在生产者的I/O线程中执行,所以是相当的快的,否则将延迟其他的线程的消息发送。如果你需要执行阻塞或计算昂贵(消耗)的回调,建议在callback主体中使用自己的来并行处理。
2.2消费者API
随着0.9.0版本,我们已经增加了一个新的Java消费者替换我们现有的基于zookeeper的高级和低级消费者。这个客户端还是测试版的质量。为了确保用户平滑升级,我们仍然维护旧的0.8版本的消费者客户端继续在0.9集群上工作,两个老的0.8 API的消费者( 和 )。
这个新的消费API,清除了0.8版本的高版本和低版本消费者之间的区别。
kafka客户端消费者类
public class KafkaConsumerimplements Consumer
偏移量和消费者位置
kafka为分区中的每条消息保存一个偏移量(offset),这个偏移量是该分区中一条消息的唯一标示符。也表示消费者在分区的位置。
消费者的位置指向下一条记录的偏移量。它比消费者在该分区中读取过的最大偏移量要大一个。 它在每次消费者在调用poll(long)中拉取消息时自动增长。
“已提交”的位置是已保存的最后偏移量,如果进程失败或重新启动时,消费者将恢复到这个偏移量。消费者可以选择定期自动提交偏移量,也可以选择通过调用commit API来手动的控制(如:commitSync 和 commitAsync)。
消费者组和订阅主题
消费者组:具有相同group.id的消费者进程,可以在一台机器上运行也可以分布在多台机器上,拉取kafka消息并处理消息。
订阅主题:分组中的每个消费者都通过subscribe API
动态的订阅一个topic列表。kafka将已订阅topic的消息发送到每个消费者组中。并通过平衡分区在消费者分组中所有成员之间来达到平均。
消费者组的成员是动态维护的:如果一个消费者故障。分配给它的分区将重新分配给同一个分组中其他的消费者。同样的,如果一个新的消费者加入到分组,将从现有消费者中移一个分区给它。这被称为重新平衡分组。
此外,当分组重新分配自动发生时,可以通过ConsumerRebalanceListener通知消费者,这允许他们完成必要的应用程序级逻辑,例如状态清除,手动偏移提交等。允许消费者通过使用assign(Collection)
手动分配指定分区,如果使用手动指定分配分区,那么动态分区分配和协调消费者组将失效。
消费者故障
订阅一组topic后,当调用poll(long)时,消费者将自动加入到组中。只要持续的调用poll,消费者将一直保持可用,并继续从分配的分区中接收消息。此外,消费者向服务器定时发送心跳。 如果消费者崩溃或无法在session.timeout.ms配置的时间内发送心跳,则消费者将被视为死亡,并且其分区将被重新分配。
还有一种可能,消费可能遇到“活锁”的情况,它持续的发送心跳,但是没有处理。为了预防消费者在这种情况下一直持有分区,我们使用max.poll.interval.ms活跃检测机制。 在此基础上,如果你调用的poll的频率大于最大间隔,则客户端将主动地离开组,以便其他消费者接管该分区。 发生这种情况时,你会看到offset提交失败(调用commitSync()引发的CommitFailedException)。这是一种安全机制,保障只有活动成员能够提交offset。所以要留在组中,你必须持续调用poll。
两个配置设置来控制poll循环:
max.poll.interval.ms
:增大poll的间隔,可以为消费者提供更多的时间去处理返回的消息(调用poll(long)返回的消息,通常返回的消息都是一批)。缺点是此值越大将会延迟组重新平衡。
max.poll.records
:此设置限制每次调用poll返回的消息数,这样可以更容易的预测每次poll间隔要处理的最大值。通过调整此值,可以减少poll间隔,减少重新平衡分组的。
对于消息处理时间不可预测地的情况,这些选项是不够的。 处理这种情况的推荐方法是将消息处理移到另一个线程中,让消费者继续调用poll。
另外,你必须禁用自动提交,并只有在线程完成处理后才为记录手动提交偏移量。 还要注意,你需要pause暂停
分区,不会从poll接收到新消息,让线程处理完之前返回的消息(如果你的处理能力比拉取消息的慢,那创建新线程将导致你机器内存溢出)。
自动提交偏移量实例:
public static void main(String[] args) throws Exception { Properties props = new Properties(); props.setProperty("bootstrap.servers", "192.168.1.112:9092"); props.setProperty("group.id", "test"); props.setProperty("enable.auto.commit", "true"); props.setProperty("auto.commit.interval.ms", "1000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumerconsumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("my-topic")); while (true) { ConsumerRecords records = consumer.poll(100);//拉取消息超时时间100ms for (ConsumerRecord consumerRecord : records) { System.out.printf("offset=%d,key=%s,value=%s%n", consumerRecord.offset(), consumerRecord.key(), consumerRecord.value()); } } }
enable.auto.commit:true 自动提交偏移量
auto.commit.interval.ms:1000 自动提交偏移量间隔时间session.timeout.ms:30000 broker自动检测客户端进程心跳的最大超时时间。如果停止心跳的时间超过该值,则认为该客户端出现故障,它的分区将被重新分配。
集群是通过配置bootstrap.servers指定一个或多个broker。不用指定全部的broker,它将自动发现集群中的其余的borker(最好指定多个,万一有服务器故障)。
手动控制偏移量
不需要定时的提交offset,可以自己控制offset,当消息被认为已消费过了,这个时候再去提交它们的偏移量,这样可以保证消息被完整的处理。
public static void main(String[] args) { Properties props = new Properties(); props.setProperty("bootstrap.servers", "192.168.1.112:9092"); props.setProperty("group.id", "test"); props.setProperty("enable.auto.commit", "false"); props.setProperty("session.timeout.ms", "30000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumerconsumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("my-topic")); final int minBatchSize = 20; List > buffer = new ArrayList<>(); while(true){ ConsumerRecords records = consumer.poll(100); for (ConsumerRecord consumerRecord : records) { buffer.add(consumerRecord);//拉取kafka数据到内存中 } if(buffer.size() >= minBatchSize){ //处理buffer中消息 for (ConsumerRecord cr : buffer) { System.out.println("处理消息:"+cr.value()); } consumer.commitAsync();//处理完内存中缓存的消息之后手动提交 System.out.println("提交offset"); buffer.clear(); } } }
在这个例子中,我们将消费一批消息并将它们存储在内存中。当我们积累足够多的消息后,我们再将它们批量处理。等确保消息被成功完全处理则手动提交offset。
这种情况我们可以保证“至少一次”,上面例子当处理完所有消息之后,在提交offset时出现异常(虽然概率很小)。这种情况下进程重启之后就会重复消费这部分消息。“至少一次”保证,在故障情况下,可以重复。
使用自动提交也可以“至少一次”。但是要求你必须下次调用poll(long)之前或关闭消费者之前,处理完所有返回的数据。如果操作失败,这将会导致已提交的offset超过消费的位置,从而导致丢失消息。使用手动控制offset的优点是,你可以直接控制消息何时提交。
处理完每个分区中的消息后,提交偏移量实例:
public static void main(String[] args) { Properties props = new Properties(); props.setProperty("bootstrap.servers", "192.168.1.112:9092"); props.setProperty("group.id", "test"); props.setProperty("enable.auto.commit", "false"); props.setProperty("session.timeout.ms", "30000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumerconsumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("my-topic")); while (true) { ConsumerRecords records = consumer.poll(100); for (TopicPartition partition : records.partitions()) { //获取分区中的消息 List > partitionRecords = records.records(partition); for (ConsumerRecord consumerRecord : partitionRecords) { System.out.println("partion:"+consumerRecord.partition()+",value:"+consumerRecord.value()+",offset:"+consumerRecord.offset()); } long lastOffset = partitionRecords.get(partitionRecords.size()-1).offset(); System.out.println(partition.partition()+"分区最后offset:"+lastOffset); //提交最后一个消息的下一个offset consumer.commitSync(Collections.singletonMap(partition,new OffsetAndMetadata(lastOffset+1))); } } }
注意:已提交的offset应始终是你的程序将读取的下一条消息的offset。因此,调用commitSync(offsets)时,你应该加1个到最后处理的消息的offset。
订阅指定分区
分区的设置只能通过调用assign
修改,因为手动分配不会进行分组协调,因此消费者故障不会引发分区重新平衡。
public static void main(String[] args) { Properties props = new Properties(); props.setProperty("bootstrap.servers", "192.168.1.112:9092"); props.setProperty("group.id", "test"); props.setProperty("enable.auto.commit", "true"); props.setProperty("auto.commit.interval.ms", "1000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumerconsumer = new KafkaConsumer<>(props); TopicPartition p = new TopicPartition("my-topic",0);//订阅主题的指定分区 consumer.assign(Arrays.asList(p)); while (true) { ConsumerRecords records = consumer.poll(100);//拉取消息超时时间100ms for (ConsumerRecord consumerRecord : records) { System.out.printf("offset=%d,key=%s,value=%s%n", consumerRecord.offset(), consumerRecord.key(), consumerRecord.value()); } } }
assign方法只指定消费者拉取指定分区的消息,消费者分组仍需要提交offset。
自定义存储offset
消费者可以不使用kafka内置的offset仓库,可以选择自己来存储offset。将消费的offset和消息结果存储在同一个的系统中,用原子的方式存储结果和offset。提供的“正好一次”的消费保证比kafka默认的“至少一次”的语义要更高。
自己管理偏移量要注意一下几点:
1.关闭自动提交offset,enable.auto.commit=false。
2.根据ConsumerRecord 保存分区offset的位置。
3.启动时恢复分区消费到的位置,通过方法seek(TopicPartition, long)。
手动控制分区,启动时指定分区偏移量消费实例:
public class ConsumerTest5 { //用于跟踪偏移量的map,手动提交偏移量情况下,在分区再均衡时提交每个分区消费的位置,以便均衡之后新的消费者开始从该位置进行消费 static MapcurrentOffsets = new ConcurrentHashMap(); //用于保存偏移量位置的map,以便下次启动时从该位置开始消费。 static Map map = new ConcurrentHashMap<>(); private static KafkaConsumer consumer; public static void main(String[] args) { map.put("my-topic0", 1000l); map.put("my-topic1", 1000l); map.put("my-topic2", 1000l); Properties props = new Properties(); props.setProperty("bootstrap.servers", "192.168.1.112:9092"); props.setProperty("group.id", "test"); props.setProperty("enable.auto.commit", "false"); props.setProperty("auto.commit.interval.ms", "1000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); consumer = new KafkaConsumer<>(props); // 手动指定位置消费,需要手动注册 TopicPartition p = new TopicPartition("my-topic", 0); TopicPartition p1 = new TopicPartition("my-topic", 1); TopicPartition p2 = new TopicPartition("my-topic", 2); //手动注册分区 consumer.subscribe(Arrays.asList("my-topic"),new ConsumerRebalanceListener() { @Override public void onPartitionsRevoked(Collection partitions) { //再均衡开始之前和消费者停止读取消息之后被调用 //在这里提交偏移量,下一个接管该分区的消费者就知道从什么位置开始消费。 //提交所有分区的偏移量 System.out.println("提交所有分区偏移量"); consumer.commitSync(currentOffsets); } @Override public void onPartitionsAssigned(Collection partitions) { //重新分配partition之后和消费者开始读取消息之前被调用 } }); //这里要先调用一次poll方法,因为本次拉取数据不是我们需要的位置所以不做处理 consumer.poll(100); //指定分区消费位置 consumer.seek(p, map.get("my-topic0")); consumer.seek(p1, map.get("my-topic1")); consumer.seek(p2, map.get("my-topic2")); long offset = 0; String partitionKey; while (true) { ConsumerRecords records = consumer.poll(100); Set partitions = records.partitions(); for (TopicPartition topicPartition : partitions) { // 标识分区 partitionKey = topicPartition.topic() + topicPartition.partition(); System.out.println(partitionKey + "从" + map.get(partitionKey) + "开始消费"); List > list = records.records(topicPartition); offset = 0; for (ConsumerRecord consumerRecord : list) { System.out.println("partition=" + topicPartition.partition() + "offset=" + consumerRecord.offset() + ",value=" + consumerRecord.value()); offset = consumerRecord.offset(); } // offset保存到map中 map.put(partitionKey, offset + 1); //手动提交偏移量 consumer.commitSync(Collections.singletonMap(topicPartition, new OffsetAndMetadata(offset + 1))); //缓存跟踪分区的偏移量 currentOffsets.put(topicPartition, new OffsetAndMetadata(offset+1)); } } }}
再均衡监听器(ConsumerRebalanceListener)
在为消费者分配新的partition或者移除旧的partition时,可以通过消费者API执行一些应用程序代码,在使用subscribe()方法时传入一个ConsumerRebalanceListener实例。
ConsumerRebalanceListener需要实现的两个方法
1:public void onPartitionRevoked(Collection<TopicPartition> partitions)方法会在再均衡开始之前和消费者停止读取消息之后被调用。如果在这里提交偏移量,下一个接管partition的消费者就知道该从哪里开始读取了。
2:public void onPartitionAssigned(Collection<TopicPartition> partitions)方法会在重新分配partition之后和消费者开始读取消息之前被调用。
消费者流量控制
如果消费者分配了多个分区,并同时消费所有的分区,这些分区具有相同的优先级。在一些情况下,消费者需要首先消费一些指定的分区,当指定的分区有少量或者已经没有可消费的数据时,则开始消费其他分区。
kafka支持动态控制消费流量,分别在consumer的poll(long)中使用pause
和 resume
来暂停消费指定分配的分区,重新开始消费指定暂停的分区。
public void pause(Collection<TopicPartition> partitions) //暂停消费指定分区。
public void resume(Collection<TopicPartition> partitions)//重新开始消费,已经暂停消费的分区。
2.3Stream API
2.4Connect API