【kafka】kafka集群的搭建、用户的设置和权限的控制

您所在的位置:网站首页 打开文件指令怎么设置权限 【kafka】kafka集群的搭建、用户的设置和权限的控制

【kafka】kafka集群的搭建、用户的设置和权限的控制

2024-04-09 08:54| 来源: 网络整理| 查看: 265

准备

我们准备起码三个实例,你可以在一台设备上操作,但是起码要有三个实例。我这边准备了三台设备,分别为192.168.5.1,192.168.5.2,192.168.5.3。然后是kafka的包和zookeeper的包,kafka和zookeeper的版本要适配。我这里准备的是kafka_2.12-2.4.0.tgz和apache-zookeeper-3.5.8-bin.tar.gz。

zookeeper集群的搭建

kafka的运行依赖zookeeper,考虑到高可用的能力,所以需要搭建一个zookeeper的集群。 分别在三台设备上传到zookeeper的安装包,解压后,复制zookeeper目录下conf下的zoo_sample.cfg为zoo.cfg,编辑zoo.cfg。

# The number of milliseconds of each tick tickTime=2000 # The number of ticks that the initial # synchronization phase can take initLimit=10 # The number of ticks that can pass between # sending a request and getting an acknowledgement syncLimit=5 # the directory where the snapshot is stored. # do not use /tmp for storage, /tmp here is just # example sakes. # 此处一定要修改,修改为合适的路径,如果是同一台设备的不同实例,也要分开 dataDir=/home/wjx/zkData # the port at which the clients will connect # 如果需要修改zookeeper的默认2181端口,修改这个 clientPort=2181 # 增加这个配置,其中的2888是kafka集群之间的通讯和数据同步端口,3888是集群节点leader的选举通信端口。 server.1=192.168.5.1:2888:3888 server.2=192.168.5.2:2888:3888 server.3=192.168.5.3:2888:3888 # the maximum number of client connections. # increase this if you need to handle more clients #maxClientCnxns=60 # # Be sure to read the maintenance section of the # administrator guide before turning on autopurge. # # http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance # # The number of snapshots to retain in dataDir #autopurge.snapRetainCount=3 # Purge task interval in hours # Set to "0" to disable auto purge feature #autopurge.purgeInterval=1

保存退出后,启动zookeeper,启动脚本在zookeeper的bin目录下:

bin/zkServer.sh start conf/zoo.cfg &

执行命令后,zookeeper会在后台进行运行,同理修改剩下的两台设备中zookeeper的配置文件并启动,zookeeper集群搭建完毕。

kafka集群的搭建

分别在三台设备上传到kafka的安装包,解压后,修改kafka目录下config下的server.properties。

# Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # see kafka.server.KafkaConfig for additional details and defaults ############################# Server Basics ############################# # The id of the broker. This must be set to a unique integer for each broker. # kafka的通讯器ID,集群中的每一台设备都要不一样 broker.id=0 ############################# Socket Server Settings ############################# # The address the socket server listens on. It will get the value returned from # java.net.InetAddress.getCanonicalHostName() if not configured. # FORMAT: # listeners = listener_name://host_name:port # EXAMPLE: # listeners = PLAINTEXT://your.host.name:9092 #listeners=PLAINTEXT://:9092 # Hostname and port the broker will advertise to producers and consumers. If not set, # it uses the value for "listeners" if configured. Otherwise, it will use the value # returned from java.net.InetAddress.getCanonicalHostName(). #advertised.listeners=PLAINTEXT://your.host.name:9092 # Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details #listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL # The number of threads that the server uses for receiving requests from the network and sending responses to the network num.network.threads=3 # The number of threads that the server uses for processing requests, which may include disk I/O num.io.threads=8 # The send buffer (SO_SNDBUF) used by the socket server socket.send.buffer.bytes=102400 # The receive buffer (SO_RCVBUF) used by the socket server socket.receive.buffer.bytes=102400 # The maximum size of a request that the socket server will accept (protection against OOM) socket.request.max.bytes=104857600 ############################# Log Basics ############################# # A comma separated list of directories under which to store log files # 一定要修改这个配置,指定kafka的日志存放,同一台设备的不同实例也要不一样,否则启动会报错,提示日志目录不为空 log.dirs=/home/wjx/kafka-logs # The default number of log partitions per topic. More partitions allow greater # parallelism for consumption, but this will also result in more files across # the brokers. num.partitions=1 # The number of threads per data directory to be used for log recovery at startup and flushing at shutdown. # This value is recommended to be increased for installations with data dirs located in RAID array. num.recovery.threads.per.data.dir=1 ############################# Internal Topic Settings ############################# # The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state" # For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3. offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 ############################# Log Flush Policy ############################# # Messages are immediately written to the filesystem but by default we only fsync() to sync # the OS cache lazily. The following configurations control the flush of data to disk. # There are a few important trade-offs here: # 1. Durability: Unflushed data may be lost if you are not using replication. # 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush. # 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks. # The settings below allow one to configure the flush policy to flush data after a period of time or # every N messages (or both). This can be done globally and overridden on a per-topic basis. # The number of messages to accept before forcing a flush of data to disk #log.flush.interval.messages=10000 # The maximum amount of time a message can sit in a log before we force a flush #log.flush.interval.ms=1000 ############################# Log Retention Policy ############################# # The following configurations control the disposal of log segments. The policy can # be set to delete segments after a period of time, or after a given size has accumulated. # A segment will be deleted whenever *either* of these criteria are met. Deletion always happens # from the end of the log. # The minimum age of a log file to be eligible for deletion due to age log.retention.hours=168 # A size-based retention policy for logs. Segments are pruned from the log unless the remaining # segments drop below log.retention.bytes. Functions independently of log.retention.hours. #log.retention.bytes=1073741824 # The maximum size of a log segment file. When this size is reached a new log segment will be created. log.segment.bytes=1073741824 # The interval at which log segments are checked to see if they can be deleted according # to the retention policies log.retention.check.interval.ms=300000 ############################# Zookeeper ############################# # Zookeeper connection string (see zookeeper docs for details). # This is a comma separated host:port pairs, each corresponding to a zk # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002". # You can also append an optional chroot string to the urls to specify the # root directory for all kafka znodes. # 配置zookeeper的集群,格式为 IP:PORT ,有多台的用,分隔 zookeeper.connect=192.168.5.1:2181,192.168.5.2:2181,192.168.5.3:2181 # Timeout in ms for connecting to zookeeper zookeeper.connection.timeout.ms=6000 ############################# Group Coordinator Settings ############################# # The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance. # The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms. # The default value for this is 3 seconds. # We override this to 0 here as it makes for a better out-of-the-box experience for development and testing. # However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup. group.initial.rebalance.delay.ms=0 # 如果要修改kafka的通讯端口,可以加一个配置,这里只是示意,本案例还是用的默认端口9092。 port=19092

保存退出后,启动kakfa,启动脚本在kafka的bin目录下,启动命令:

bin/kafka-server-start.sh -daemon config/server.properties

同理,剩下的两台设备的kafka都修改配置文件启动完成后,kafka集群就搭建完成。如果不需要用户认证和topic的读写权限设置,kafka已经可以使用。

如果要修改kafka的默认堆内存,防止数据量大的情况下频繁GC,需要修改bin/kafka-server-start.sh这个启动脚本

#!/bin/bash # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. if [ $# -lt 1 ]; then echo "USAGE: $0 [-daemon] server.properties [--override property=value]*" exit 1 fi base_dir=$(dirname $0) if [ "x$KAFKA_LOG4J_OPTS" = "x" ]; then export KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:$base_dir/../config/log4j.properties" fi # 就是这个地方的1G,根据实际情况改成需要的堆内存大小 if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G" fi EXTRA_ARGS=${EXTRA_ARGS-'-name kafkaServer -loggc'} COMMAND=$1 case $COMMAND in -daemon) EXTRA_ARGS="-daemon "$EXTRA_ARGS shift ;; *) ;; esac exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@" 用户的设置和权限的认证

还是用刚才搭建好的kafka集群,现在我们加上用户和读写权限的设置。我们现在要创建三个用户,分别是管理用户admin,生产者writer和消费者reader。我们随便登录一台搭建了kafka设备,进入kafka的目录,开始创建三个用户。

创建管理用户admin,密码是admin

bin/kafka-configs.sh --zookeeper 192.168.5.1:2181,192.168.5.2:2181,192.168.5.3:2181 --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=admin],SCRAM-SHA-512=[password=admin]' --entity-type users --entity-name admin

提示完成用户的创建。

创建生产者writer,密码是writerpwd

bin/kafka-configs.sh --zookeeper 192.168.5.1:2181,192.168.5.2:2181,192.168.5.3:2181 --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=writerpwd],SCRAM-SHA-512=[password=writerpwd]' --entity-type users --entity-name writer

提示完成用户的创建。

创建消费者reader,密码是readerpwd

bin/kafka-configs.sh --zookeeper 192.168.5.1:2181,192.168.5.2:2181,192.168.5.3:2181 --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=readerpwd],SCRAM-SHA-512=[password=readerpwd]' --entity-type users --entity-name reader

提示完成用户的创建。

我们可以用以下命令查看用户信息确定是否创建成功,比如查看writer用户信息:

bin/kafka-configs.sh --zookeeper 192.168.5.1:2181,192.168.5.2:2181,192.168.5.3:2181 --describe --entity-type users --entity-name writer

接下来我们要配置kafka通讯器,在kafka的config目录下创建一个kafka-broker-jaas.conf文件,文件内容如下:

KafkaServer { org.apache.kafka.common.security.scram.ScramLoginModule required username="admin" password="admin"; };

其中的用户名密码就是我们配置的管理员,这个用户接着还要用到。注意:每一行的最后不要有空格,否则会有错误!(本人的血泪教训)

接下来继续修改config目录下的server.properties,在文件的最后加上:

# 启用ACL authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer # 设置admin为超级用户 super.users=User:admin # 启用SCRAM机制,采用SCRAM-SHA-512算法 sasl.enabled.mechanisms=SCRAM-SHA-512 # 为broker间通讯开启SCRAM机制,采用SCRAM-SHA-512算法 sasl.mechanism.inter.broker.protocol=SCRAM-SHA-512 # broker间通讯使用PLAINTEXT,本例中不演示SSL配置 security.inter.broker.protocol=SASL_PLAINTEXT # 配置listeners使用SASL_PLAINTEXT,ip改成实际的本机ip listeners=SASL_PLAINTEXT://192.168.5.1:9092 # 配置advertised.listeners,ip改成实际的本机ip advertised.listeners=SASL_PLAINTEXT://192.168.5.1:9092 # 禁止自动创建topic auto.create.topics.enable=false

然后我们重启kafka,这次启动要带上新加的kafka-broker-jaas.conf文件

KAFKA_OPTS=-Djava.security.auth.login.config=xxx/config/kafka-broker-jaas.conf bin/kafka-server-start.sh config/server.properties &

剩下两台也是重复以上除了创建用户的操作,最后重启。这样我们的kafka就有了用户和权限的限制。接着我们创建一个kafka的单分区单副本topic用于测试:

bin/kafka-topics.sh --create --zookeeper 192.168.5.1:2181,192.168.5.2:2181,192.168.5.3:2181 --topic test --partitions 1 --replication-factor 1

接下来我们用脚本来演示数据的生产和消费,同时配置用户的读写权限。

首先我们尝试向test这个topic发送数据

bin/kafka-console-producer.sh --broker-list 192.168.5.1:9092,192.168.5.2:9092,192.168.5.3:9092 --topic test

输入任意字符发现会提示报错,ErrorLoggingCallback表示登录失败。

接着我们创建一个生产者的配置文件,然后用writer用户发送数据。配置文件producer.conf内容如下:

security.protocol=SASL_PLAINTEXT sasl.mechanism=SCRAM-SHA-512 sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="writer" password="writerpwd";

执行生产者的脚本:

bin/kafka-console-producer.sh --broker-list 192.168.5.1:9092,192.168.5.2:9092,192.168.5.3:9092 --topic test --producer.config xx/producer.conf

输入任意字符会发现新的报错,此时就是因为权限的缘故了。因为writer用户是没有对test这个topic的写权限的,我们给它加上:

bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=192.168.5.1:2181,192.168.5.2:2181,192.168.5.3:2181 --add --allow-principal User:writer --operation Write --topic test

执行后会打印操作的结果,以及当前这个topic的用户权限情况。

我们再次执行生产者的脚本:

bin/kafka-console-producer.sh --broker-list 192.168.5.1:9092,192.168.5.2:9092,192.168.5.3:9092 --topic test --producer.config xx/producer.conf

输入任意字符后会发现没有报错,说明发送成功!

同理的操作,我们需要对reader用户进行读topic的权限配置,先创建一个consumer.conf的文件:

security.protocol=SASL_PLAINTEXT sasl.mechanism=SCRAM-SHA-512 sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="reader" password="readerpwd";

添加test的读权限:

bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=192.168.5.1:2181,192.168.5.2:2181,192.168.5.3:2181 --add --allow-principal User:reader --operation Read --topic test

我们执行消费者脚本尝试读取test里面的数据:

bin/kafka-console-consumer.sh --bootstrap-server 192.168.5.1:9092,192.168.5.2:9092,192.168.5.3:9092 --topic test --from-beginning --consumer.config xxx/consumer.conf --group test-group

发现报错,提示我们没有访问用户组的权限,所以我们加一下权限:

bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=192.168.5.1:2181,192.168.5.2:2181,192.168.5.3:2181 --add --allow-principal User:reader --operation Read --group test-group

再次执行消费者脚本读取test里面的数据:

bin/kafka-console-consumer.sh --bootstrap-server 192.168.5.1:9092,192.168.5.2:9092,192.168.5.3:9092 --topic test --from-beginning --consumer.config xxx/consumer.conf --group test-group

可以看到数据读取成功。

至此,我们生产和消费数据的kafka权限就配置完成了。

zookeeper的ACL登录认证 基本信息

zookeeper的bin目录下有zkCli.sh可以直接连接zookeeper,可以对zk的节点进行任何操作,这在生产环境中是及其危险的,所以需要对zk进行目录的权限限定

通过zkCli.sh可以登录指定的zk

./bin/zkCli.sh 192.168.5.1:2181

通过命令查看节点的权限

[zk: localhost:2181(CONNECTED) 0] getAcl /brokers 'world,'anyone : cdrwa

可以看到/brokers这个节点是world:anyone的用户范围,这个也就是所有人可以操作,后面的cdrwa就是可以的操作,c-create(创建),d-delete(删除),r-read(读取),w-write(写入),a-auth(设置权限)

如果是获取一个无权限的节点信息则是如下:

[zk: localhost:2181(CONNECTED) 4] getAcl / Authentication is not valid : / 设置权限

如果要设置权限,操作步骤如下:

登录用户,账号usera,密码userpwd addauth digest usera:userpwd

这个登录准确也不叫登录,而是一种用户信息的创建,这个用户不需要存在,可以调用多次来同时登录多个用户

设置权限 setAcl / auth:usera:userpwd:cdrwa

这个命令就是将根目录/设置用户账号和密码,以及权限

查看权限 getAcl /

这个命令就是查看目录的权限

清除目录权限 setAcl / world:anyone:crdwa

清除目录权限的命令和设置权限的命令类似,但是没有用户前面的auth

忘记用户密码

以上都是设置权限的方法,但是有一个问题,就是设置节点的权限后忘了用户的账号密码,导致别的用户都无法操作这个节点。所以需要一个方法重置权限。这个方法就是开启一个超级账号去操作,超级账号没有权限限制,所以操作完记得关闭。

创建超级用户 useradmin 密码 userpwd 设备上输入以下命令 echo -n useradmin:userpwd | openssl dgst -binary -sha1 | openssl base64

回显会得到密码的密文4pOzGFD5H++TMb0bB7hp2AD5+6U=,记录一下

修改zkServer.sh,即zk的启动脚本,找到脚本中的start方法部分,在指定参数的位置加上 "-Dzookeeper.DigestAuthenticationProvider.superDigest=useradmin:4pOzGFD5H++TMb0bB7hp2AD5+6U=" 重启zk再次用zkCli.sh连接zk,然后用登录命令登录: addauth digest useradmin:userpwd

这个时候就会发现可以任意操作任何节点,可以用重置节点权限的命令重置节点,然后重新设置权限

5.另外的方法

如果是单纯忘记了节点的用户是谁,账号密码对应的还是记得的,可以通过暂时关闭acl验证来查看权限用户是谁。为了安全,使用完后记得删除或者注释配置,然后重启zk。

先修改zoo.conf配置文件,在最底下加上:

skipACL=yes

然后重启zk,用zkCli.sh连接zk,就能发现任何节点的权限信息都可以查看了,但是不能修改权限。

kafka脚本使用 查看topic消费情况

注意:如果没有权限,那么该命令最后的–command-config admin.conf不需要配置

bin/kafka-consumer-groups.sh --bootstrap-server 192.168.5.1:9092,192.168.5.2:9092:192.168.5.3:9092 --describe --group test-group --command-config admin.conf

admin.conf的内容为:

security.protocol=SASL_PLAINTEXT sasl.mechanism=SCRAM-SHA-512 sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="admin" password="admin";

PS:因为我们在kafka中配置了超级用户是admin,所以这里一定要配置admin,用其他的用户会因为权限的问题而失败

整理文档:

启动zookeeper

bin/zkServer.sh start conf/zoo.cfg &

启动kafka

KAFKA_OPTS=-Djava.security.auth.login.config=xxx/config/kafka-broker-jaas.conf bin/kafka-server-start.sh config/server.properties &

创建topic

bin/kafka-topics.sh --create --zookeeper 192.168.5.1:2181,192.168.5.2:2181,192.168.5.3:2181 --topic test --partitions 1 --replication-factor 1

新增生产者:

bin/kafka-configs.sh --zookeeper 192.168.5.1:2181,192.168.5.2:2181,192.168.5.3:2181 --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=writerpwd],SCRAM-SHA-512=[password=writerpwd]' --entity-type users --entity-name writer

设置写权限:

bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=192.168.5.1:2181,192.168.5.2:2181,192.168.5.3:2181 --add --allow-principal User:writer --operation Write --topic test

新增消费者:

bin/kafka-configs.sh --zookeeper 192.168.5.1:2181,192.168.5.2:2181,192.168.5.3:2181 --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=readerpwd],SCRAM-SHA-512=[password=readerpwd]' --entity-type users --entity-name reader

设置读权限:

bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=192.168.5.1:2181,192.168.5.2:2181,192.168.5.3:2181 --add --allow-principal User:reader --operation Read --topic test

设置消费组权限:

bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=192.168.5.1:2181,192.168.5.2:2181,192.168.5.3:2181 --add --allow-principal User:reader --operation Read --group test-group

删除用户:

bin/kafka-configs.sh --zookeeper 192.168.5.1:2181,192.168.5.2:2181,192.168.5.3:2181 --alter --delete-config 'SCRAM-SHA-256' --entity-type users --entity-name writer Java代码demo: 生产者 package com.wangjx.message.kafka; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; import java.util.UUID; import java.util.concurrent.TimeUnit; /** * @program: swallow * @description: * @author: wangjiaxing * @created: 2020/10/30 11:43 */ public class KafkaSender { public static void main(String[] args) throws InterruptedException { Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.5.1:9092,192.168.5.2:9092,192.168.5.3:9092"); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");//key 序列化方式 properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");//value //此处设置kafka的消费者配置文件 System.setProperty("java.security.auth.login.config", "./kafka/kafka-client-jaas.conf"); //如果system这个配置已经被占用,可以使用下面注释的方式进行设置 //properties.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"writer\" password=\"writerpwd\";"); //安全协议 properties.put("security.protocol", "SASL_PLAINTEXT"); //加密方式 properties.put("sasl.mechanism", "SCRAM-SHA-512"); properties.put(ProducerConfig.BATCH_SIZE_CONFIG, "10"); properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip"); properties.put(ProducerConfig.LINGER_MS_CONFIG, "50"); properties.put(ProducerConfig.ACKS_CONFIG, "all"); properties.put(ProducerConfig.RETRIES_CONFIG, 30); properties.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 20000); properties.put(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, 20000); KafkaProducer producer = new KafkaProducer(properties); while (true) { String topic = "test"; String k = UUID.randomUUID().toString(); Long v = System.currentTimeMillis(); producer.send(new ProducerRecord(topic, k, Long.toString(v))); System.out.println("send to topic > "+topic+" : "+k+" | "+v+""); TimeUnit.SECONDS.sleep(1); } } } 消费者 package com.wangjx.message.kafka; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.time.Duration; import java.util.ArrayList; import java.util.List; import java.util.Properties; /** * @program: swallow * @description: * @author: wangjiaxing * @created: 2020/10/30 11:43 */ public class KafkaReceiver { public static void main(String[] args) throws InterruptedException { Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.5.1:9092,192.168.5.2:9092,192.168.5.3:9092"); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");//key 反序列化方式 properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");//value //此处设置kafka的消费者配置文件 System.setProperty("java.security.auth.login.config", "./kafka/kafka-client-jaas.conf"); //如果system这个配置已经被占用,可以使用下面注释的方式进行设置 //properties.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"reader\" password=\"readerpwd\";"); properties.put("security.protocol", "SASL_PLAINTEXT"); properties.put("sasl.mechanism", "SCRAM-SHA-512"); properties.put(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG, 20000); properties.put(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG, 20000); properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group"); KafkaConsumer consumer = new KafkaConsumer(properties); List topics = new ArrayList(); String topic = "test"; topics.add(topic); consumer.subscribe(topics); while (true) { ConsumerRecords records = consumer.poll(Duration.ofSeconds(10)); // 消息处理 for (ConsumerRecord record : records) { System.out.println("receive message from topic "+topic+", key = "+record.key()+", offset = "+record.offset()+", message = "+record.value()); } } } }


【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3