Kafka 分区好处

(1)便于合理使用存储资源,每个 Partition 在一个 Broker 上存储,可以把海量的数据按照分区切割成一块一块数据存储在多台 Broker 上。合理控制分区的任务,可以实现负载均衡的效果。

(2)提高并行度,生产者可以以分区为单位发送数据;消费者可以以分区为单位进行消费。

生产者发送消息的分区策略

默认的分区器 DefaultPartitioner

The default partitioning strategy:

  • If a partition is specified in the record, use it
  • If no partition is specified but a key is present choose a partition based on a hash of the key
  • If no partition or key is present choose the sticky partition that changes when the batch is full. See KIP-480 for details about sticky partitioning.

解释

图中前面四个

1
2
3
4
public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers) {}
public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value) {}
public ProducerRecord(String topic, Integer partition, K key, V value, Iterable<Header> headers) {}
public ProducerRecord(String topic, Integer partition, K key, V value) {}

在指定了 partition 的情况下,直接将指明的值作为 partition 值;例如 partition=0,所有数据写入分区 0

图中第五个

1
public ProducerRecord(String topic, K key, V value) {}

在没有指明 partition 值 但有 key 的情况下,将 key的hash值topic的partition数 进行取余得到 partition 值。

实际应用:通过指定 key 值来保证数据发送到同一个分区。

例如:key1 的 hash 值=5, key2 的 hash 值=6 ,topic 的 partition 数=2

那么 key1 对应的 value1 写入 1 号分区(5 mod 2 = 1),key2 对应的 value2 写入 0 号分区(6 mod 2 = 0)

图中最后一个

1
public ProducerRecord(String topic, V value) {}

在既没有 partition值 又没有 key值 的情况下,Kafka 采用 Sticky Partition(黏性分区器),会随机选择一个分区,并尽可能一直使用该分区,待该分区的 batch 已满或者已完成,Kafka 再随机一个分区进行使用(和上一次的分区不同)。

例如:第一次随机选择 0 号分区,等 0 号分区当前 batch.size 满了(默认 16k)或者 linger.ms 设置的时间到, Kafka 再随机一个分区进行使用(如果还是 0 会继续随机)。

自定义分区器

上面是默认的分区器策略,根据实际生产需要,可能需要自定义分区策略。

比如说实现一个分区器,发送过来的数据中如果包含 order,就发往 0 号分区,不包含 order,就发往 1 号分区。

实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;

import java.util.Map;

/**
* @author: ShiGuang
* @create: 2022-02-26 14:40
* @description: 自定义分区器
*
* 步骤:
* 1. 实现接口 Partitioner
* 2. 实现 3 个方法:partition,close,configure
* 3. 编写 partition 方法,返回分区号
*/
public class MyPartitioner implements Partitioner {
/**
* 计算给定记录的分区并返回
*
* @param topic 主题
* @param key 消息的 key
* @param keyBytes 消息的 key 序列化后的字节数组
* @param value 消息的 value
* @param valueBytes 消息的 value 序列化后的字节数组
* @param cluster 集群元数据,可以查看分区信息
*/
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
// 获取消息
String msg = value.toString();

// 初始化分区号
int partition;

// 判断消息是否包含 order
if (msg.contains("order")) {
partition = 0;
} else {
partition = 1;
}

// 返回分区号
return partition;
}

/**
* 关闭资源
*/
@Override
public void close() {

}

/**
* 使用给定的键值对配置此类
*
* @param configs
*/
@Override
public void configure(Map<String, ?> configs) {

}
}

使用

在生产者的配置中添加分区器参数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class CustomProducer {
public static void main(String[] args) throws ExecutionException, InterruptedException {

// 创建kafka生产者的配置对象
Properties properties = new Properties();

// 给kafka配置对象添加配置信息:bootstrap.servers
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092,localhost:9093");
// key,value 必须序列化,key.serializer,value.serializer
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

// 使用自定义分区器
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyPartitioner.class.getName());

// 创建kafka生产者对象,并填充属性信息
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);

// 调用send方法,发送消息,带回调函数
for (int i = 0; i < 10; i++) {
kafkaProducer.send(new ProducerRecord<>("firstTest", "order" + i), (recordMetadata, e) -> {
// 为 null,消息发送成功
if (e == null) {
System.out.println("消息发送成功!" + "主题:" + recordMetadata.topic() + "\n分区:" + recordMetadata.partition());
} else {
System.out.println("消息发送失败" + e);
}
});
}

// 消息发送结束,关闭资源
kafkaProducer.close();
}
}