1、生产者提高吞吐量

可以修改的参数

参数说明
batch.size批次大小,默认 16k
linger.ms等待时间,默认 0ms,修改为 5-100ms
compression.type压缩方式,默认 none,可配置值 gzip、snappy、lz4、zstd
RecordAccumulator缓冲区大小,默认 32m,修改为 64m

代码配置:同样是添加属性信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// 创建kafka生产者的配置对象
Properties properties = new Properties();

// 给kafka配置对象添加配置信息:bootstrap.servers
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092,localhost:9093");
// key,value 必须序列化,key.serializer,value.serializer
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// 使用自定义分区器
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyPartitioner.class.getName());

// batch.size:批次大小,默认 16K
properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
// linger.ms:等待时间,默认 0
properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);
// RecordAccumulator:缓冲区大小,默认 32M:buffer.memory
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
// compression.type:压缩,默认 none,可配置值 gzip、snappy、lz4 和 zstd
properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");

2、数据可靠性问题

根据 ACKS 的应答级别,数据可靠性不同

acks=0

生产者发送过来数据就不管了,可靠性差,效率高

acks=1

生产者发送过来数据 Leader 应答,可靠性中等,效率中等;

acks=-1

生产者发送过来数据 Leader 和 ISR 队列里面所有 Follwer 应答,可靠性高,效率低;

ISR 队列

Leader 维护了一个动态的 in-sync replica(ISR),意为和 Leader 保持正常同步的 Follower + Leader 集合

(leader:0,follower:1,2,isr:0,1,2),这里的 0,1,2 代表的都是 broker

如果 Follower 长时间未向 Leader 发送通信请求或同步数据,则该 Follower 将被踢出 ISR。

该时间阈值由 replica.lag.time.max.ms 参数设定,默认 30s。

例如 2 超时,(leader:0, isr:0,1)。这样就不用等长期联系不上或者已经故障的节点。

注意

如果分区副本设置为 1 个,或 者 ISR 里应答的最小副本数量( min.insync.replicas 默认为 1)设置为 1,和 ack=1 的效果是一样的,仍然有丢数的风险(leader:0,isr:0)

数据完全可靠条件 = ACK 级别设置为-1 + 分区副本大于等于 2 + ISR 里应答的最小副本数量大于等于 2

总结

在生产环境中,acks=0 很少使用;

acks=1,一般用于传输普通日志,允许丢个别数据;

acks=-1,一般用于传输交易数据等,对可靠性要求比较高的场景。

代码配置

关于 retries:retries

1
2
3
4
// 设置 acks
properties.put(ProducerConfig.ACKS_CONFIG, "all");
// 重试次数 retries,默认是 int 最大值,2147483647
properties.put(ProducerConfig.RETRIES_CONFIG, 3);

问题

如果原来的 Leader 节点接收到数据后,应答时挂掉了,从新选举了新的 Leader,新的 Leader 一共接受了两份数据,出现了数据重复问题

3、消息传递的三种语义

  • 至少一次(At Least Once):acks = -1 ,同时,分区副本>=2 + ISR 里应答的最小副本数量>=2,
    • 效果:消息至少消费一次,保证数据不丢失
  • 最多一次(At Most Once) :acks = 0
    • 效果:消息最多消费一次,数据不重复,单数数据可能丢失;
  • 精确一次(Exactly Once) :幂等性 + 至少一次(At Least Once)
    • 效果:数据不重复也不丢失,但是无法保证跨分区跨会话的 Exactly Once

总结

At Least Once 可以保证数据不丢失,但是不能保证数据不重复

At Most Once 可以保证数据不重复,但是不能保证数据不丢失

Exactly Once 可以保证数据不重复也不丢失,但是无法保证跨分区跨会话的 Exactly Once

4、幂等性

对于一些非常重要的信息,比如说交易数据,要求数据既不能重复也不丢失,所以 Kafka 在 0.11 版本以后,引入了一项重大特性:幂等性和事务

幂等性就是指 Producer 不论向 Broker 发送多少次重复数据,Broker 端都只会持久化一条,保证了不重复。

重复数据的判断标准:具有<PID, Partition, Seq-Number>相同主键的消息提交时,Broker 只会持久化一条。

其中 PID 是 Kafka 每次重启都会分配一个新的;

Partition 表示分区号;

Sequence Number 是单调自增的。

所以幂等性只能保证的是在单分区单会话内不重复。

开启参数 enable.idempotence 即可使用 幂等性 ,默认为 true

1
2
// 幂等性,默认开启
properties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);

5、生产者事务

kafka 事务属性是指一系列的生产者生产消息和消费者提交偏移量的操作在一个事务,或者说是是一个原子操作),同时成功或者失败。

开启事务,必须开启幂等性

Kafka 事务原理:

事务 API

Kafka 的事务一共有如下 5 个 API

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

/**
* 初始化事务
*/
void initTransactions();

/**
* 开启事务
*/
void beginTransaction() throws ProducerFencedException;

/**
* 在事务内提交已经消费的偏移量(主要用于消费者)
*/
@Deprecated
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
String consumerGroupId) throws ProducerFencedException;

/**
* 提交事务
*/
void commitTransaction() throws ProducerFencedException;

/**
* 放弃事务(类似于回滚事务的操作)
*/
void abortTransaction() throws ProducerFencedException;

代码配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

/**
* @author: ShiGuang
* @create: 2022-02-26 23:51
* @description: 测试kafka事务
*/
public class CustomProducerTransactions {
public static void main(String[] args) {
// 创建kafka生产者的配置对象
Properties properties = new Properties();

// 给kafka配置对象添加配置信息:bootstrap.servers
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092,localhost:9093");
// key,value 必须序列化,key.serializer,value.serializer
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

// 设置事务 id(必须),事务 id 任意起名
properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "001");

// 创建kafka生产者对象,并填充属性信息
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);

// 初始化事务
kafkaProducer.initTransactions();
// 开启事务
kafkaProducer.beginTransaction();

try {
// 发送消息
for (int i = 0; i < 5; i++) {
kafkaProducer.send(new ProducerRecord<>("firstTest", "order" + i));
// 尝试出现错误后,是否会继续收到消息
i = 1 / 0;
}
// 提交事务
kafkaProducer.commitTransaction();
} catch (ProducerFencedException e) {
e.printStackTrace();
// 终止事务
kafkaProducer.abortTransaction();
} finally {
// 关闭资源
kafkaProducer.close();
}
}
}

这里遇到了一个问题,就是如果没有使用 try,catch (也就是异常才需要终止事务),直接放到提交事务后,会遇到一个状态转换的错误

Invalid transition attempted from state READY to state ABORTING_TRANSACTION

6、数据有序

kafka 在 1.x 版本之前保证数据单分区有序,条件如下

max.in.flight.requests.per.connection=1(不需要考虑是否开启幂等性)

kafka 在 1.x 及以后版本保证数据单分区有序,条件如下

(1)未开启幂等性 max.in.flight.requests.per.connection 需要设置为 1。

(2)开启幂等性 max.in.flight.requests.per.connection 需要设置<=5。

原因说明:因为在 kafka1.x 以后,启用幂等后,kafka 服务端会缓存 producer 发来的最近 5 个 request 的元数据,故无论如何,都可以保证最近 5 个 request 的数据都是有序