超详细的Kafka教程

您所在的位置:网站首页 kafka复制原理 超详细的Kafka教程

超详细的Kafka教程

2023-12-06 04:12| 来源: 网络整理| 查看: 265

在说Kafka之前,假设你有一定的消息队列的知识。知道消息队列的模式(点对点模式,发布/订阅模式),也知道消息队列的优点,如果不知道没关系,去百度或者Google搜索都有相关详细的资料。那么我们接下来说说Kafka。

为什么选择Kafka

消息中间件有很多。比如ActiveMQ,RabbitMQ,RocketMQ,Kafka。那你在选型的时候一般考虑哪些因素呢?我们来比较下这几个中间件的特点。

特性

ActiveMQ

RabbitMQ

RocketMQ

Kafka

单机吞吐量

万级,吞吐量比RocketMQ和Kafka要低了一个数量级

万级,吞吐量同ActiveMQ

10万级,可以支撑高吞吐量

10万级别,高吞吐量。适合日志采集,实时计算等场景

topic数量对吞吐量的影响

topic可以达到几百,几千个的级别,吞吐量会有较小幅度的下降这是RocketMQ的一大优势,在同等机器下,可以支撑大量的topic

topic从几十个到几百个的时候,吞吐量会「大幅度下降」所以在同等机器下,kafka尽量保证topic数量不要过多。如果要支撑大规模topic,需要增加更多的机器资源

时效性

ms级

微秒级,这是rabbitmq的一大特点,延迟是最低的

ms级

延迟在ms级以内

可用性

高,基于主从架构实现高可用性

同ActiveMQ

非常高,分布式架构

非常高,同样也是分布式式

消息可靠性

有较低的概率丢失数据

经过参数优化配置,可以做到0丢失

同RocketMQ一样也可以做到消息零丢失

功能支持

MQ领域的功能极其完备

基于erlang开发,所以并发能力很强,性能极其好,延时很低

MQ功能较为完善,还是分布式的,扩展性好

功能较为简单,主要支持简单的MQ功能,在大数据领域的实时计算以及日志采集被大规模使用,是事实上的标准

优劣总结

「ActiveMQ」

优点:非常成熟,功能强大,在业内大量的公司以及项目中都有应用缺点:偶尔会有较低概率丢失消息。而且现在社区以及国内应用都越来越少,官方社区现在对 ActiveMQ 5.x维护越来越少,几个月才发布一个版本。较少在大规模吞吐的场景中使用。

「RabbitMQ」

优点:erlang语言开发,性能极其好,延时很低。吞吐量到万级,MQ功能比较完备。而且开源提供的管理界面非常棒,用起来很好用。社区相对比较活跃,几乎每个月都发布几个版本分。在国内一些互联网公司近几年用rabbitmq也比较多一些。缺点:RabbitMQ确实吞吐量会低一些,这是因为他做的实现机制比较重。而且rabbitmq集群动态扩展会很麻烦,不过这个我觉得还好。其实主要是erlang语言本身带来的问题。很难读源码,很难定制和掌控。

「RocketMQ」

优点:接口简单易用,而且毕竟在阿里大规模应用过,有阿里品牌保障。日处理消息上百亿之多,可以做到大规模吞吐,性能也非常好,分布式扩展也很方便,社区维护还可以,可靠性和可用性都不错的,还可以支撑大规模的topic数量,支持复杂MQ业务场景。缺点:社区活跃度相对较为一般,文档相对来说简单一些,然后接口这块不是按照标准JMS规范走的有些系统要迁移需要修改大量代码。还有就是阿里出台的技术,你得做好这个技术万一被抛弃,社区黄掉的风险。

「Kafka」

优点:就是仅仅提供较少的核心功能,但是提供超高的吞吐量,ms级的延迟,极高的可用性以及可靠性,而且分布式可以任意扩展。同时kafka最好是支撑较少的topic数量即可,保证其超高吞吐量。缺点:有可能消息重复消费。对数据准确性会造成极其轻微的影响,在大数据领域中以及日志采集中,这点轻微影响可以忽略。

从上面的总结我们知道,Kafka可以用于较简单的消息队列(如果对你来说足够使用)。并且较要求较高的吞吐,那么Kafka是你最合适的选择。

什么是Kafka

Kafka本质还是一个存储容器,最初由LinkedIn公司开发,并于2011年初开源。2012年10月从Apache Incubator毕业。该项目的目标是为处理实时数据提供一个统一、高通量、低等待的平台。

Kafka是一个分布式消息队列。Kafka对消息保存时根据Topic进行归类,发送消息者称为Producer,消息接受者称为Consumer,此外kafka集群有多个kafka实例组成,每个实例(server)称为broker。

Kafka组成架构

从上面的架构图我们获得几个词:

Producer :消息生产者,就是向kafka broker发消息的客户端;Consumer :消息消费者,向kafka broker取消息的客户端;Topic :可以理解为一个队列;Consumer Group (CG):这是kafka用来实现一个topic消息的广播(发给所有的consumer)和单播(发给任意一个consumer)的手段。一个topic可以有多个CG。topic的消息会复制(不是真的复制,是概念上的)到所有的CG,但每个partion只会把消息发给该CG中的一个consumer。如果需要实现广播,只要每个consumer有一个独立的CG就可以了。要实现单播只要所有的consumer在同一个CG。用CG还可以将consumer进行自由的分组而不需要多次发送消息到不同的topic;Broker :一台kafka服务器就是一个broker。一个集群由多个broker组成。一个broker可以容纳多个topic;Partition:为了实现扩展性,一个非常大的topic可以分布到多个broker(即服务器)上,一个topic可以分为多个partition,每个partition是一个有序的队列。partition中的每条消息都会被分配一个有序的id(offset)。kafka只保证按一个partition中的顺序将消息发给consumer,不保证一个topic的整体(多个partition间)的顺序;Offset:kafka的存储文件都是按照offset.kafka来命名,用offset做名字的好处是方便查找。例如你想找位于2049的位置,只要找到2048.kafka的文件即可。当然the first offset就是00000000000.kafka。Kafka在Linux上的操作

你可能想,我把Kafka安装在Windows下不就完事了吗,为什么还要特意在Linux下面操作呢。但实际生产Kafka等中间件肯定是部署在Linux上面的,作为开发的我们可能也很少接触怎么部署,但是学习一下总归是有好处的。

下载

下载地址:http://kafka.apache.org/downloads

我们选取这个下载

单机部署

把下载的压缩包拷贝到Linux上,解压:

修改config/server.properties

修改Zookeeper的配置。

启动Kafka

注意:如果配置的是单独的Zookeeper,在启动Kafka之前需要启动Zookeeper。如果你有使用docker的经验,你可以使用docker-compose快速搭建一个zk集群。

发现有了kafka进程

端口为9092

集群部署

如果需要部署Kafka集群,我们需要设置多个Broker。

> cp server.properties config/server.properties config/server-1.properties > cp server.properties config/server.properties config/server-2.properties

编辑配置文件

config/server-1.properties: broker.id=1 listeners=PLAINTEXT://:9093 log.dir=/tmp/kafka-logs-1 config/server-2.properties: broker.id=2 listeners=PLAINTEXT://:9094 log.dir=/tmp/kafka-logs-2

broker.id属性是集群中每个节点的名称,这一名称是唯一且永久的。接下来我们只需要启动两个新的节点:

> bin/kafka-server-start.sh config/server-1.properties & ... > bin/kafka-server-start.sh config/server-2.properties & ...

现在创建一个副本为3的新topic:my-lvshen-topic

> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-lvshen-topic

运行命令describe topics查看集群中的topic信息

> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-lvshen-topic Topic:my-lvshen-topic PartitionCount:1 ReplicationFactor:3 Configs: Topic: my-lvshen-topic Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0

以下是对输出信息的解释:第一行给出了所有分区的摘要,下面的每行都给出了一个分区的信息。因为我们只有一个分区,所以只有一行。Leader是负责给定分区所有读写操作的节点。每个节点都是随机选择的部分分区的领导者。

Replicas是复制分区日志的节点列表,不管这些节点是Leader还是仅仅活着。

isr是一组「同步」Replicas,是Replicas列表的子集,它活着并被指到Leader。

命令操作创建Topic

在安装目录下输入命令

bin/kafka-topics.sh --zookeeper 192.168.42.128:2181/kafka --create --topic LVSHEN-TOPIC --partitions 1 --replication-factor 1

创建了一个topic:「LVSHEN_TOPIC」。

查看Topic信息 bin/kafka-topics.sh --zookeeper 192.168.42.128:2181/kafka --describe --topic LVSHEN-TOPIC 生产者生产数据 bin/kafka-console-producer.sh --broker-list 192.168.42.128:9092 --topic LVSHEN-TOPIC 消费者接收数据 bin/kafka-console-consumer.sh --bootstrap-server 192.168.42.128:9092 --topic 'LVSHEN-TOPIC' Kafka Tool

如果觉得用命令查看太过麻烦,我们可以用工具查看(前提是你的生产环境和你的本地已经打通)。这里推荐一个工具「Kafka Tool」。

如图,左边会显示Brokers,Topics,Consumers,右边会显示相关的具体信息。

SpringBoot 默认方式开发Kafka Demo

这里我是采用SpringBoot开发,接下来写一个Java的demo。

Maven导入 org.springframework.kafka spring-kafka 2.4.3.RELEASE 配置文件 ## kafka ## spring.kafka.bootstrap-servers=192.168.42.128:9092 spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.consumer.group-id=test spring.kafka.consumer.enable-auto-commit=true spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer #定义Topic spring.kafka.topic=lvshen_demo_test spring.kafka.listener.missing-topics-fatal=false 生产者类 @Component @Slf4j public class KafkaProducer { @Autowired private KafkaTemplate kafkaTemplate; @Value("${spring.kafka.topic}") private String topic; /** * 发送kafka消息 * * @param jsonString */ public void send(String jsonString) { ListenableFuture future = kafkaTemplate.send(topic, jsonString); future.addCallback(o -> log.info("kafka消息发送成功:" + jsonString), throwable -> log.error("kafka消息发送失败:" + jsonString)); } } 消费者类 @Component @Slf4j public class KafkaConsumer { @KafkaListener(topics = "${spring.kafka.topic}") public void listen(ConsumerRecord record) { log.info("topic={}, offset={}, message={}", record.topic(), record.offset(), record.value()); } } 测试 @Test public void testDemo() throws InterruptedException { log.info("start send"); kafkaProducer.send("I am Lvshen"); log.info("end send"); // 休眠10秒,为了使监听器有足够的时间监听到topic的数据 Thread.sleep(10); }

如上图,控制台消费接收到了数据:

c.l.d.k.kafka.consumer.KafkaConsumer : topic=lvshen_demo_test, offset=1, message=I am Lvshen

Kafka Tool也显示接收到了消息:

自定义Kafka demo开发

假如你不想使用application.properties里面kafka的配置,我们可以采用第二种开发方法。

定义配置文件

config/kafka-config.properties

#consumer kafka.bootstrapServers=192.168.42.128:9092 kafka.groupId=bootKafka kafka.enableAutoCommit=true kafka.autoCommitIntervalMs=100 kafka.sessionTimeoutMs=15000 #producer kafka.retries=1 kafka.batchSize=16384 kafka.lingerMs=1 kafka.bufferMemory=1024000 配置类 @Component @ConfigurationProperties(prefix="kafka") @PropertySource(value = {"classpath:config/kafka-config.properties"}, encoding = "utf-8") @Getter @Setter @AllArgsConstructor @NoArgsConstructor public class KafkaConfigProperties { private String bootstrapServers; private String groupId; private String enableAutoCommit; private String autoCommitIntervalMs; private String sessionTimeoutMs; private String retries; private String batchSize; private String lingerMs; private String bufferMemory; } //文件配置类 @Component("kafkaConfigurations") @EnableKafka public class KafkaConfiguration { @Autowired private KafkaConfigProperties kafkaConfigProperties; /** * ConcurrentKafkaListenerContainerFactory为创建Kafka监听器的工程类,这里只配置了消费者 */ @Bean public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory(); factory.setConsumerFactory(consumerFactory()); factory.getContainerProperties().setPollTimeout(1500); factory.setMissingTopicsFatal(false); return factory; } /** * 根据consumerProps填写的参数创建消费者工厂 */ @Bean public ConsumerFactory consumerFactory() { return new DefaultKafkaConsumerFactory(consumerProps()); } /** * 根据senderProps填写的参数创建生产者工厂 */ @Bean public ProducerFactory producerFactory() { return new DefaultKafkaProducerFactory(senderProps()); } /** * kafkaTemplate实现了Kafka发送接收等功能 */ @Bean("kafkaTemplates") public KafkaTemplate kafkaTemplate() { KafkaTemplate template = new KafkaTemplate(producerFactory()); return template; } /** * 消费者配置参数 */ private Map consumerProps() { Map props = new HashMap(); // 连接地址 props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConfigProperties.getBootstrapServers()); // GroupID props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaConfigProperties.getGroupId()); // 是否自动提交 props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); // 自动提交的频率 props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, kafkaConfigProperties.getAutoCommitIntervalMs()); // Session超时设置 props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, kafkaConfigProperties.getSessionTimeoutMs()); // 键的反序列化方式 props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class); // 值的反序列化方式 props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return props; } /** * 生产者配置 */ private Map senderProps() { Map props = new HashMap(); // 连接地址 props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConfigProperties.getBootstrapServers()); // 重试,0为不启用重试机制 props.put(ProducerConfig.RETRIES_CONFIG, kafkaConfigProperties.getRetries()); // 控制批处理大小,单位为字节 props.put(ProducerConfig.BATCH_SIZE_CONFIG, kafkaConfigProperties.getBatchSize()); // 批量发送,延迟为1毫秒,启用该功能能有效减少生产者发送消息次数,从而提高并发量 props.put(ProducerConfig.LINGER_MS_CONFIG, kafkaConfigProperties.getLingerMs()); // 生产者可以使用的总内存字节来缓冲等待发送到服务器的记录 props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, kafkaConfigProperties.getBufferMemory()); // 键的序列化方式 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); // 值的序列化方式 props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return props; } } 获取监听 @Component @Slf4j public class DemoListener { /** * 声明consumerID为demo,监听topicName为topic.quick.demo的Topic */ @KafkaListener(id = "demo", topics = "topic.quick.demo") public void listen(String msgData) { log.info("demo receive : "+msgData); } } 测试 @Test public void testDemoDepth() throws InterruptedException { log.info("start send"); kafkaTemplate.send("topic.quick.demo", "this is a test for depth kafka"); log.info("end send"); // 休眠10秒,为了使监听器有足够的时间监听到topic的数据 Thread.sleep(500000); }

Listener监听到kafka里面的数据。

Kafka存储机制

经过上面的描述,我们发现「Partition」很重要。其实「Partition」还可以细分为「Segment」。至于什么是「Segment」,下面会有详细说明。

Partition中文件的存储方式:

每个partion(目录)相当于一个巨型文件被平均分配到多个大小相segment(段)数据文件中。但每个段segment file消息数量不一定相等,这种特性方便old segment file快速被删除。(默认情况下每个文件大小为1G)。

每个partiton只需要支持顺序读写就行了,segment文件生命周期由服务端配置参数决定。

这样做的好处就是能快速删除无用文件,有效提高磁盘利用率。

好了以上就是关于Kafka的简短的介绍了,如果想要深入学习,可以去官网多多了解相关知识。

Kafka为什么性能如此优越读写快

kafka会将数据顺序写入磁盘,我们用的磁盘大部分用的是机械磁盘。机械结构的银盘,寻址是最耗时的。所以硬盘随机I/O是很耗性能的,如果是顺序I/O,那么性能会有很大的改善。

MMFile

Kafka的数据并不是实时的写入磁盘(「Memory Mapped Files」),它充分利用了现代操作系统「分页存储」来提高I/O效率。操作系统会选择适当的时机将数据写入硬盘。但这样也会不可靠,写到「mmap」中的数据并没有被真正的写到硬盘,操作系统会在程序主动调用flush的时候才把数据真正的写到硬盘。

Kafka提供了一个参数——producer.type来控制是不是主动flush,如果Kafka写入到「mmap」之后就立即flush然后再返回Producer叫 同步 (sync);写入「mmap」之后立即返回Producer不调用flush叫 异步 (async)。

零拷贝

消费者向broker索要消息时,「kafka」使用 零拷贝(zero-copy) ,建立一个磁盘空间和内存的直接映射,数据不再复制到“用户态缓冲区”,直接复制到socket缓冲区。

一般的读写是这样的,会有用户态和内核态的切换,这个切换也是比较耗时的。

如果采用零拷贝,不会经过用户态。

关于零拷贝的详细描述,可以看看我的另一篇文章:【使用了零拷贝技术的Kafka,当然很快】。



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3