2、java调用kafka api

您所在的位置:网站首页 kafka创建主题可以使用kafka提供的命令工具 2、java调用kafka api

2、java调用kafka api

2023-06-17 00:59| 来源: 网络整理| 查看: 265

Apache Kafka系列文章

1、kafka(2.12-3.0.0)介绍、部署及验证、基准测试 2、java调用kafka api 3、kafka重要概念介紹及示例 4、kafka分区、副本介绍及示例 5、kafka监控工具Kafka-Eagle介绍及使用

文章目录 Apache Kafka系列文章一、生产消息到Kafka中1、POM依赖2、开发步骤3、代码4、验证二、从Kafka的topic中消费消息1、开发步骤2、代码三、异步使用带有回调函数方法生产消息1、需求2、开发步骤3、代码四、kafka中发送和消费复杂类型1、需求2、代码1)、创建topic2)、创建kafka序列化和反序列化方法1、将复杂类型和字节数字相互转换2、实现kafka的序列化接口3、实现kafka的反序列化接口 3)、生产者发送数据4)、消费者消费数据 五、消费kafka topic中的历史数据1、代码

本分介绍java调用kafka api。 本文前置条件是kafka环境搭建好。 本分五部分,即简单的写数据到kafka、从topic中消费数据、异步回调、读写kafka中复杂数据类型和读取历史数据。

一、生产消息到Kafka中

将1-100的数字消息写入到Kafka中。

1、POM依赖

导入Maven Kafka POM依赖

org.apache.kafka kafka-clients 2.4.1 org.apache.commons commons-io 1.3.2 2、开发步骤 创建用于连接Kafka的Properties配置创建一个生产者对象KafkaProducer调用send发送1-100消息到指定Topic test,并获取返回值Future,该对象封装了返回值再调用一个Future.get()方法等待响应关闭生产者 3、代码 import java.util.Properties; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; public class KafkaProducerTest { public static void main(String[] args) { // 1. 创建用于连接Kafka的Properties配置 // 2. 创建一个生产者对象KafkaProducer // 3. 调用send发送1-100消息到指定Topic test,并获取返回值Future,该对象封装了返回值 // 4. 再调用一个Future.get()方法等待响应 // 5. 关闭生产者 // 1. 创建用于连接Kafka的Properties配置 Properties props = new Properties(); props.put("bootstrap.servers", "server1:9092"); props.put("acks", "all"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 2. 创建一个生产者对象KafkaProducer KafkaProducer producer = new KafkaProducer(props); // 3. 调用send发送1-100消息到指定Topic test for (int i = 0; i // 获取返回值Future,该对象封装了返回值 Future future = producer.send(new ProducerRecord("test", null, i + "")); // 调用一个Future.get()方法等待响应 future.get(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } // 5. 关闭生产者 producer.close(); } } 4、验证

通过客户端查看写入的内容 在这里插入图片描述

二、从Kafka的topic中消费消息

从 test topic中,将消息都消费,并将记录的offset、key、value打印出来

1、开发步骤 创建Kafka消费者配置 创建Kafka消费者订阅要消费的主题使用一个while循环,不断从Kafka的topic中拉取消息将将记录(record)的offset、key、value都打印出来 2、代码 import java.time.Duration; import java.util.Arrays; import java.util.Properties; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; public class KafkaConsumerTest { /** * @param args * @throws InterruptedException */ public static void main(String[] args) throws InterruptedException { // 1. 创建Kafka消费者配置 // 2. 创建Kafka消费者 // 3. 订阅要消费的主题 // 4. 使用一个while循环,不断从Kafka的topic中拉取消息 // 5. 将将记录(record)的offset、key、value都打印出来 // 1.创建Kafka消费者配置 Properties props = new Properties(); props.setProperty("bootstrap.servers", "server1:9092,server2:9092,server3:9092"); // 消费者组(可以使用消费者组将若干个消费者组织到一起),共同消费Kafka中topic的数据 // 每一个消费者需要指定一个消费者组,如果消费者的组名是一样的,表示这几个消费者是一个组中的 props.setProperty("group.id", "test"); // 自动提交offset props.setProperty("enable.auto.commit", "true"); // 自动提交offset的时间间隔 props.setProperty("auto.commit.interval.ms", "1000"); // 拉取的key、value数据的 props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // 2.创建Kafka消费者 KafkaConsumer kafkaConsumer = new KafkaConsumer(props); // 3. 订阅要消费的主题 // 指定消费者从哪个topic中拉取数据 kafkaConsumer.subscribe(Arrays.asList("test")); // 4.使用一个while循环,不断从Kafka的topic中拉取消息 while (true) { // Kafka的消费者一次拉取一批的数据 ConsumerRecords consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(5)); // 5.将将记录(record)的offset、key、value都打印出来 for (ConsumerRecord consumerRecord : consumerRecords) { // 主题 String topic = consumerRecord.topic(); // offset:这条消息处于Kafka分区中的哪个位置 long offset = consumerRecord.offset(); // key\value String key = consumerRecord.key(); String value = consumerRecord.value(); System.out.println("topic: " + topic + " offset:" + offset + " key:" + key + " value:" + value); } Thread.sleep(1000); } } }

运行结果 在这里插入图片描述

三、异步使用带有回调函数方法生产消息

如果我们想获取生产者消息是否成功,或者成功生产消息到Kafka中后,执行一些其他动作。此时,可以很方便地使用带有回调函数来发送消息。

1、需求 在发送消息出现异常时,能够及时打印出异常信息 在发送消息成功时,打印Kafka的topic名字、分区id、offset 2、开发步骤 创建用于连接Kafka的Properties配置创建一个生产者对象KafkaProducer调用send发送1-100消息到指定Topic test,并在回调方法中打印异常信息或者打印topic、分区和offset关闭生产者 3、代码 import java.util.Properties; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; public class KafkaProducerTest2 { public static void main(String[] args) { // 1. 创建用于连接Kafka的Properties配置 Properties props = new Properties(); props.put("bootstrap.servers", "server1:9092"); props.put("acks", "all"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 2. 创建一个生产者对象KafkaProducer KafkaProducer producer = new KafkaProducer(props); // 3. 调用send发送1-100消息到指定Topic test for (int i = 0; i @Override public void onCompletion(RecordMetadata metadata, Exception exception) { if (exception != null) { System.out.println("发送消息出现异常"); } else { String topic = metadata.topic(); int partition = metadata.partition(); long offset = metadata.offset(); System.out.println("发送消息到Kafka中的名字为" + topic + "的主题,第" + partition + "分区,第" + offset + "条数据成功!"); } } }); } // 5. 关闭生产者 producer.close(); } } 四、kafka中发送和消费复杂类型 1、需求

1、往kafka中写入user数据 2、消费kafka中user的数据 3、kafka的topic是userinfo

2、代码 1)、创建topic kafka-topics.sh --create --bootstrap-server server1:9092 --topic userinfo --partitions 1 --replication-factor 1 2)、创建kafka序列化和反序列化方法

kafka本身已经实现了基本类型的序列化与反序列化,复杂类型则需要自己实现。 https://kafka.apache.org/31/javadoc/org/apache/kafka/common/serialization/package-summary.html 实现序列化与反序列化,主要是以字节流的形式读取和写入数据,然后实现kafka的序列化与反序列化的方法,最后在生产者或消费者中设置key、value的序列化与反序列化的类。

示例:

生产者 // key的序列化,其类型是Integer props.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer"); // value的序列化,其类型是Object props.put("value.serializer", "org.kafka.objectexample.util.EncodeingKafka"); 消费者 // key的反序列化,其类型是Integer props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer"); // value的反序列化,其类型是Object props.setProperty("value.deserializer", "org.kafka.objectexample.util.DecodeingKafka"); 1、将复杂类型和字节数字相互转换 import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; public class BeanConversion { /** * 对象转字节数组 * * @param obj * @return */ public static byte[] ObjectToBytes(Object obj) { byte[] bytes = null; ByteArrayOutputStream bo = null; ObjectOutputStream oo = null; try { bo = new ByteArrayOutputStream(); oo = new ObjectOutputStream(bo); oo.writeObject(obj); bytes = bo.toByteArray(); } catch (IOException e) { e.printStackTrace(); } finally { try { if (bo != null) { bo.close(); } if (oo != null) { oo.close(); } } catch (IOException e) { e.printStackTrace(); } } return bytes; } /** * 字节数组转对象 * * @param bytes * @return */ public static Object BytesToObject(byte[] bytes) { Object obj = null; ByteArrayInputStream bi = null; ObjectInputStream oi = null; try { bi = new ByteArrayInputStream(bytes); oi = new ObjectInputStream(bi); obj = oi.readObject(); } catch (Exception e) { e.printStackTrace(); } finally { try { if (bi != null) { bi.close(); } if (oi != null) { oi.close(); } } catch (IOException e) { e.printStackTrace(); } } return obj; } } 2、实现kafka的序列化接口 import org.apache.kafka.common.serialization.Serializer; public class EncodeingKafka implements Serializer { @Override public byte[] serialize(String topic, Object data) { return BeanConversion.ObjectToBytes(data); } } 3、实现kafka的反序列化接口 import org.apache.kafka.common.serialization.Deserializer; public class DecodeingKafka implements Deserializer { @Override public Object deserialize(String topic, byte[] data) { return BeanConversion.BytesToObject(data); } } 3)、生产者发送数据 import java.util.Properties; import java.util.concurrent.Future; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; public class KafkaProducerUser { public final static String TOPIC_NAME = "userinfo"; public static void main(String[] args) throws Exception { Properties props = new Properties(); props.put("bootstrap.servers", "server1:9092"); props.put("acks", "all"); props.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer"); props.put("value.serializer", "org.kafka.objectexample.util.EncodeingKafka"); KafkaProducer producer = new KafkaProducer(props); for (int i = 0; i u.setAge(i + 20); } else { u.setAge(i); } // public ProducerRecord(String topic, K key, V value) ProducerRecord producerRecord = new ProducerRecord(TOPIC_NAME, u.getId(), u); // send() 方法会返回一个包含 RecordMetadata 的 // Future对象,不过因为我们会忽略返回值,所以无法知道消息是否发送成功。如果不关心发送结果,那么可以使用这种发送方式。 Future future = producer.send(producerRecord); // send() 方住先返回一个 Future对象,然后调用 Future对象的 get() 方法等待 Kafka 响应。如果服务器返回错误, // get()方怯会抛出异常。如果没有发生错误,我们会得到一个 // RecordMetadata对象,可以用它获取消息的偏移量。如果在发送数据之前或者在发送过程中发生了任何错误 ,比如 broker返回 // 了一个不允许重发消息的异常或者已经超过了重发的次数 ,那么就会抛出异常。 RecordMetadata rm = future.get(); System.out.println(rm.topic() + " 分区:" + rm.partition() + " 偏移量:" + rm.offset()); } producer.close(); } } 4)、消费者消费数据 import java.time.Duration; import java.util.Arrays; import java.util.Properties; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; public class kafkaConsumerUser { public final static String TOPIC_NAME = "userinfo"; public static void main(String[] args) throws Exception { Properties props = new Properties(); props.setProperty("bootstrap.servers", "server1:9092,server2:9092,server3:9092"); props.setProperty("group.id", TOPIC_NAME); // 自动提交offset props.setProperty("enable.auto.commit", "true"); // 自动提交offset的时间间隔 props.setProperty("auto.commit.interval.ms", "1000"); // 拉取的key、value数据的 props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer"); props.setProperty("value.deserializer", "org.kafka.objectexample.util.DecodeingKafka"); // 2.创建Kafka消费者 KafkaConsumer kafkaConsumer = new KafkaConsumer(props); // 3. 订阅要消费的主题 // 指定消费者从哪个topic中拉取数据 kafkaConsumer.subscribe(Arrays.asList(TOPIC_NAME)); // 4.使用一个while循环,不断从Kafka的topic中拉取消息 while (true) { // Kafka的消费者一次拉取一批的数据 ConsumerRecords consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(5)); // 5.将将记录(record)的offset、key、value都打印出来 for (ConsumerRecord consumerRecord : consumerRecords) { // 主题 String topic = consumerRecord.topic(); // offset:这条消息处于Kafka分区中的哪个位置 long offset = consumerRecord.offset(); // key\value Integer key = consumerRecord.key(); User value = consumerRecord.value(); System.out.println("topic: " + topic + " offset:" + offset + " key:" + key + " value:" + value); } Thread.sleep(1000); } } } 五、消费kafka topic中的历史数据

消费topic已经存在的数据。类似命令中–from-beginning 参数。

1、代码

在该服务启动前,如果topic中存在数据,是可以全部读出来,但如果topic数据部分已经被消费了,也会被读出来。

import java.time.Duration; import java.util.Collection; import java.util.Collections; import java.util.Map; import java.util.Properties; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.StringDeserializer; public class KafkaConsumerHistoryOfUser { public final static String TOPIC_NAME = "userinfo"; public static void main(String[] args) throws Exception { Properties props = new Properties(); props.setProperty("bootstrap.servers", "server1:9092,server2:9092,server3:9092"); props.setProperty("group.id", TOPIC_NAME); // 自动提交offset props.setProperty("enable.auto.commit", "true"); // 自动提交offset的时间间隔 props.setProperty("auto.commit.interval.ms", "1000"); // 拉取的key、value数据的 props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer"); props.setProperty("value.deserializer", "org.kafka.objectexample.util.DecodeingKafka"); KafkaConsumer kafkaConsumer = new KafkaConsumer(props); // 基于再均衡监听器,在给消费者分配分区的时候将消息偏移量跳转到起始位置 kafkaConsumer.subscribe(Collections.singletonList(TOPIC_NAME), new ConsumerRebalanceListener() { @Override public void onPartitionsRevoked(Collection collection) { } @Override public void onPartitionsAssigned(Collection collection) { Map beginningOffset = kafkaConsumer.beginningOffsets(collection); // 读取历史数据 --from-beginning for (Map.Entry entry : beginningOffset.entrySet()) { // 基于seek方法 // TopicPartition tp = entry.getKey(); // long offset = entry.getValue(); // consumer.seek(tp,offset); // 基于seekToBeginning方法 kafkaConsumer.seekToBeginning(collection); } } }); while (true) { // Kafka的消费者一次拉取一批的数据 ConsumerRecords consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(5)); // 5.将将记录(record)的offset、key、value都打印出来 for (ConsumerRecord consumerRecord : consumerRecords) { // 主题 String topic = consumerRecord.topic(); // offset:这条消息处于Kafka分区中的哪个位置 long offset = consumerRecord.offset(); // key\value Integer key = consumerRecord.key(); User value = consumerRecord.value(); System.out.println("topic: " + topic + " offset:" + offset + " key:" + key + " value:" + value); } Thread.sleep(1000); } } }

以上示例简单的介绍了几种读写kafka数据的情况。



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3