Kafka 分区好处 (1)便于合理使用存储资源,每个 Partition 在一个 Broker 上存储,可以把海量的数据按照分区切割成一块一块数据存储在多台 Broker 上。合理控制分区的任务,可以实现负载均衡的效果。
(2)提高并行度,生产者可以以分区为单位发送数据;消费者可以以分区为单位进行消费。
生产者发送消息的分区策略 默认的分区器 DefaultPartitioner
The default partitioning strategy:
If a partition is specified in the record, use it If no partition is specified but a key is present choose a partition based on a hash of the key If no partition or key is present choose the sticky partition that changes when the batch is full. See KIP-480 for details about sticky partitioning. 解释
图中前面四个
1 2 3 4 public ProducerRecord (String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers) {}public ProducerRecord (String topic, Integer partition, Long timestamp, K key, V value) {}public ProducerRecord (String topic, Integer partition, K key, V value, Iterable<Header> headers) {}public ProducerRecord (String topic, Integer partition, K key, V value) {}
在指定了 partition
的情况下,直接将指明的值作为 partition
值;例如 partition=0,所有数据写入分区 0
图中第五个
1 public ProducerRecord (String topic, K key, V value) {}
在没有指明 partition 值 但有 key 的情况下,将 key的hash值
与 topic的partition数
进行取余得到 partition 值。
实际应用:通过指定 key 值来保证数据发送到同一个分区。
例如:key1 的 hash 值=5, key2 的 hash 值=6 ,topic 的 partition 数=2
那么 key1 对应的 value1 写入 1 号分区(5 mod 2 = 1),key2 对应的 value2 写入 0 号分区(6 mod 2 = 0)
图中最后一个
1 public ProducerRecord (String topic, V value) {}
在既没有 partition值
又没有 key值
的情况下,Kafka 采用 Sticky Partition(黏性分区器)
,会随机选择一个分区,并尽可能一直使用该分区,待该分区的 batch
已满或者已完成,Kafka 再随机一个分区进行使用(和上一次的分区不同)。
例如:第一次随机选择 0 号分区,等 0 号分区当前 batch.size
满了(默认 16k)或者 linger.ms
设置的时间到, Kafka 再随机一个分区进行使用(如果还是 0 会继续随机)。
自定义分区器 上面是默认的分区器策略,根据实际生产需要,可能需要自定义分区策略。
比如说实现一个分区器,发送过来的数据中如果包含 order,就发往 0 号分区,不包含 order,就发往 1 号分区。
实现 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 import org.apache.kafka.clients.producer.Partitioner;import org.apache.kafka.common.Cluster;import java.util.Map;public class MyPartitioner implements Partitioner { @Override public int partition (String topic, Object key, byte [] keyBytes, Object value, byte [] valueBytes, Cluster cluster) { String msg = value.toString(); int partition; if (msg.contains("order" )) { partition = 0 ; } else { partition = 1 ; } return partition; } @Override public void close () { } @Override public void configure (Map<String, ?> configs) { } }
使用 在生产者的配置中添加分区器参数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 import org.apache.kafka.clients.producer.*;import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;import java.util.concurrent.ExecutionException;public class CustomProducer { public static void main (String[] args) throws ExecutionException, InterruptedException { Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092,localhost:9093" ); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyPartitioner.class.getName()); KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties); for (int i = 0 ; i < 10 ; i++) { kafkaProducer.send(new ProducerRecord<>("firstTest" , "order" + i), (recordMetadata, e) -> { if (e == null ) { System.out.println("消息发送成功!" + "主题:" + recordMetadata.topic() + "\n分区:" + recordMetadata.partition()); } else { System.out.println("消息发送失败" + e); } }); } kafkaProducer.close(); } }