Kafka

您所在的位置:网站首页 推特转发的内容哪去了 Kafka

Kafka

2024-07-13 09:44| 来源: 网络整理| 查看: 265

如题:我发的消息都发到哪儿去了?你是否有此疑问?

1、首先,我们先来看看kafka的消息格式是什么样的。

public class ProducerRecord { /** * 消息主题 */ private final String topic; /** * 指定的分区号 */ private final Integer partition; /** * 消息头信息 */ private final Headers headers; /** * 消息的key */ private final K key; /** * 消息值 */ private final V value; /** * 时间戳 */ private final Long timestamp; }

 

2、了解完Kafka消息的格式,我们就来看看kafka是如何发送的吧(暂不考虑事务消息)。

/** * See {@link KafkaProducer#send(ProducerRecord)} */ Future send(ProducerRecord record); /** * See {@link KafkaProducer#send(ProducerRecord, Callback)} */ Future send(ProducerRecord record, Callback callback);

KafkaProducer中做了重载,send(producerRecord)内部调用的是send(producerRecord,callback),如下图:

/** * Asynchronously send a record to a topic. Equivalent to send(record, null) . * See {@link #send(ProducerRecord, Callback)} for details. */ @Override public Future send(ProducerRecord record) { return send(record, null); }

3、知道如何发送之后我们就来看看真正发送的地方吧;

发送主要做了几件事情:

检查生产者是否已关闭通过发送消息中的主题和分区找到集群信息对key和value进行序列化计算消息应该发到哪个分区上找到发送到哪个分区之后就准备发送发送失败捕获异常并进行发送异常回调

4、那么这个分区到底是如何计算的呢?如下为生产者中的代码@Producer#832行(kafka 2.0版本)

byte[] serializedKey; try { serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key()); } catch (ClassCastException cce) { throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() + " to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() + " specified in key.serializer", cce); } byte[] serializedValue; try { serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value()); } catch (ClassCastException cce) { throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() + " to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() + " specified in value.serializer", cce); } int partition = partition(record, serializedKey, serializedValue, cluster); tp = new TopicPartition(record.topic(), partition);

通过代码可以知道计算分区号时需要发送记录、序列化后的key、序列化后的value、集群cluster信息。进入patition方法,

/** * computes partition for given record. * if the record has partition returns the value otherwise * calls configured partitioner class to compute the partition. */ private int partition(ProducerRecord record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) { Integer partition = record.partition(); return partition != null ? partition : partitioner.partition( record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster); }

 

通过代码可知:如果我们在发送记录中指定了分区号,那么会直接使用指定的分区,否则:我们去看看Patitioner接口吧。

/** * Compute the partition for the given record. * * @param topic The topic name * @param key The key to partition on (or null if no key) * @param keyBytes The serialized key to partition on( or null if no key) * @param value The value to partition on or null * @param valueBytes The serialized value to partition on or null * @param cluster The current cluster metadata */ public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);

通过接口我们可以大致了解到每个参数含义,下面我们来看看实现。

public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { //根据主题拿到集群中主题对应的分区信息 List partitions = cluster.partitionsForTopic(topic); //计算分区总数 int numPartitions = partitions.size(); //如果指定的分区key为空 if (keyBytes == null) { //计算这个主题在这个实例上发送了多少条消息 int nextValue = nextValue(topic); //拿到可用的分区信息 List availablePartitions = cluster.availablePartitionsForTopic(topic); if (availablePartitions.size() > 0) { // 发送记录总数与可用分区数相除取余(散列) int part = Utils.toPositive(nextValue) % availablePartitions.size(); //找到对应的分区 return availablePartitions.get(part).partition(); } else { //此处英文注释明了 // no partitions are available, give a non-available partition return Utils.toPositive(nextValue) % numPartitions; } } else { // 存在指定的分区key,则按照murmur2算法路由到对应的分区 // hash the keyBytes to choose a partition return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions; } } private int nextValue(String topic) { //每个主题一个发送消息记录的counter,次数通过主题拿到对应的counter(注意是每个实例会有一个counter) AtomicInteger counter = topicCounterMap.get(topic); if (null == counter) { counter = new AtomicInteger(ThreadLocalRandom.current().nextInt()); AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter); if (currentCounter != null) { counter = currentCounter; } } //拿到本实例发送的消息记录数 return counter.getAndIncrement(); }

综上,我们可以比较清楚的知道我们发送的一个kafka记录是如何路由到对应的分区上了。



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3