发送原理

在消息发送的过程中,涉及到了两个线程:main 线程Sender 线程

main 线程中创建了一个双端队列 RecordAccumulator

main 线程将消息发送给 RecordAccumulatorSender 线程不断从 RecordAccumulator 中拉取消息发送到 Kafka Broker

生产者重要参数列表

参数名称描述
bootstrap.servers生产者连接集群所需的 broker 地址清单,可以设置 1 个或者多个,中间用逗号隔开,例如 node01:9092,node02:9092
key.seralizer 和 value.serializer指定发送消息的 key 和 value 的序列化类型。一定要写全类名
buffer.memoryRecord Accumulator 缓冲区总大小, 默认 32m。
batch.size缓冲区一批数据最大值,默认 16k。适当增加该值,可以提高吞吐量,但是如果该值设置太大,会导致数据传输延迟增加。
linger.ms如果数据迟迟未达到 batch.size, sender 等待 linger time 之后就会发送数据。单位 ms,默认值是 0ms,表示没有延迟。生产环境建议该值大小为 5-100ms 之间。
acks0:生产者发送过来的数据,不需要等数据落盘应答。
1:生产者发送过来的数据, Leader 收到数据后应答。
-1(all) :生产者发送过来的数据, Leader+和 is r 队列里面的所有节点收齐数据后应答。
默认值是-1,-1 和 all 是等价的。
max.in.flight requests.per.connection允许最多没有返回 ack 的次数, 默认为 5, 开启幂等性要保证该值是 1-5 的数字。
retries当消息发送出现错误的时候, 系统会重发消息。retries 表示重试次数。默认是 int 最大值:2147483647。
如果设置了重试,还想保证消息的有序性,需要设置 MAX _INFLIGHT_REQUESTS_PER CONNECTION = 1
否则在重试此失败消息的时候,其他的消息可能发送成功了。
retry.backoff.ms两次重试之间的时间间隔,默认是 100ms。
enable.idempotence是否开启幂等性, 默认 true, 开启幂等性。
compression.type生产者发送的所有数据的压缩方式。默认是是 none, 也就是不压缩。
支持压缩类型:none、gzip、snappy、lz4 和 zstd.

生产者发送消息 API

异步发送

1、创建工程,导入依赖

1
2
3
4
5
6
7
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.0.0</version>
</dependency>
</dependencies>

2、不带回调函数版本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

/**
* @author: ShiGuang
* @create: 2022-02-26 12:57
* @description: kafka没有回调函数的异步发送
*/
public class CustomProducer {
public static void main(String[] args) {

// 创建kafka生产者的配置对象
Properties properties = new Properties();

// 给kafka配置对象添加配置信息:bootstrap.servers
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092,localhost:9093");
// key,value 必须序列化,key.serializer,value.serializer
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

// 创建kafka生产者对象,并填充属性信息
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);

// 调用send方法,发送消息
kafkaProducer.send(new ProducerRecord<>("firstTest", "hello kafka"));

// 消息发送结束,关闭资源
kafkaProducer.close();
}
}

3、测试

程序运行后,消费者端显示

4、带回调函数版本

回调函数会在 producer 收到 ack 时调用,为异步调用。

该方法有两个参数,分别是元数据信息(RecordMetadata)和异常信息(Exception)

如果 Exception 为 null,说明消息发送成功,如果 Exception 不为 null,说明消息发送失败。

在上面的基础上,发送消息时增加回调函数。

1
2
3
4
5
6
7
8
9
// 调用send方法,发送消息,带回调函数
kafkaProducer.send(new ProducerRecord<>("firstTest", "hello kafka"), (recordMetadata, e) -> {
// 为 null,消息发送成功
if (e == null) {
System.out.println("消息发送成功!" + "主题:" + recordMetadata.topic() + "\n分区:" + recordMetadata.partition());
} else {
System.out.println("消息发送失败" + e);
}
});

同步发送

只需在异步发送的基础上,在发送消息时调用一下 get()方法(异常抛出或者捕捉)

1
2
3
4
5
6
7
8
9
// 调用send方法,发送消息,带回调函数
kafkaProducer.send(new ProducerRecord<>("firstTest", "hello kafka"), (recordMetadata, e) -> {
// 为 null,消息发送成功
if (e == null) {
System.out.println("消息发送成功!" + "主题:" + recordMetadata.topic() + "\n分区:" + recordMetadata.partition());
} else {
System.out.println("消息发送失败" + e);
}
}).get();