伪分布式Kafka环境搭建与SpringBoot集成

您所在的位置:网站首页 kafka集群搭建 伪分布式Kafka环境搭建与SpringBoot集成

伪分布式Kafka环境搭建与SpringBoot集成

2022-12-21 03:18| 来源: 网络整理| 查看: 265

安装包下载 下载安装包 wget https://archive.apache.org/dist/kafka/2.2.0/kafka_2.12-2.2.0.tgz # 解压 tar -zxf kafka_2.12-2.2.0.tgz # 复制kafka cp -rp kafka_2.12-2.2.0 kafka-broker-1 新建数据和日志路径

新建kafka目录

cd /usr/local/kafka mkdir -p kafka/001/log mkdir -p kafka/002/log mkdir -p kafka/003/log

新建zookeeper目录

cd /usr/local/kafka mkdir -p zookeeper/001/log mkdir -p zookeeper/002/log mkdir -p zookeeper/003/log mkdir -p zookeeper/001/data mkdir -p zookeeper/002/data mkdir -p zookeeper/003/data # 新建myid cat 1 > zookeeper/001/data/myid cat 2 > zookeeper/002/data/myid cat 3 > zookeeper/003/data/myid 修改配置文件

第一个broker(路径:/usr/local/kafka/kafka-broker-1/config)

新增jaas.conf

KafkaServer { org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin" user_admin="admin"; };

新增zk_jaas.conf

Server { org.apache.zookeeper.server.auth.DigestLoginModule required username="admin" password="admin" user_admin="admin"; }; Client { org.apache.zookeeper.server.auth.DigestLoginModule required username="admin" password="admin"; };

修改zookeeper配置文件

zookeeper.properties

# 数据文件与日志文件 dataDir=/usr/local/kafka/zookeeper/001/data dataLogDir=/usr/local/kafka/zookeeper/001/log # 端口 clientPort=2181 # 服务端口,第一个是给客户端提供服务端口,后面一个是内部通讯端口,例如选举leader server.1=127.0.0.1:2887:3887 server.2=127.0.0.1:2888:3888 server.3=127.0.0.1:2889:3889 # SASL认证 authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider jaasLoginRenew=3600000 requireClientAuthScheme=sasl zookeeper.sasl.client=true

修改zookeeper启动命令

zookeeper-server-start.sh

# 加入export这一行 export KAFKA_OPTS="-Djava.security.auth.login.config=/usr/local/kafka/kafka-broker-1/config/zk_jaas.conf ${KAFKA_OPTS}" exec $base_dir/kafka-run-class.sh $EXTRA_ARGS org.apache.zookeeper.server.quorum.QuorumPeerMain "$@"

修改kafka配置文件

server.properties

broker.id=0 listeners=SASL_PLAINTEXT://:9092 listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL security.inter.broker.protocol=SASL_PLAINTEXT sasl.mechanism.inter.broker.protocol=PLAIN sasl.enabled.mechanisms=PLAIN log.dirs=/usr/local/kafka/kafka/001/log zookeeper.connect=127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183

修改kafka启动命令

kafka-run-class.sh

# Generic jvm settings you want to add if [ -z "$KAFKA_OPTS" ]; then #KAFKA_OPTS="" # 加入下面这一行 KAFKA_OPTS="-Djava.security.auth.login.config=$base_dir/config/jaas.conf" fi

配置第二个broker(路径:/usr/local/kafka/kafka-broker-2)

cd /usr/local/kafka cp -rp kafka-broker-1 kafka-broker-2

修改zookeeper配置文件

zookeeper.properties

# 修改 # 数据文件与日志文件 dataDir=/usr/local/kafka/zookeeper/002/data dataLogDir=/usr/local/kafka/zookeeper/002/log # 修改 # 端口 clientPort=2182 # 服务端口,第一个是给客户端提供服务端口,后面一个是内部通讯端口,例如选举leader server.1=127.0.0.1:2887:3887 server.2=127.0.0.1:2888:3888 server.3=127.0.0.1:2889:3889 # SASL认证 authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider jaasLoginRenew=3600000 requireClientAuthScheme=sasl zookeeper.sasl.client=true

修改zookeeper启动命令

zookeeper-server-start.sh

# 修改 export KAFKA_OPTS="-Djava.security.auth.login.config=/usr/local/kafka/kafka-broker-2/config/zk_jaas.conf ${KAFKA_OPTS}" exec $base_dir/kafka-run-class.sh $EXTRA_ARGS org.apache.zookeeper.server.quorum.QuorumPeerMain "$@"

修改kafka配置文件

server.properties

# 修改 broker.id=1 # 修改 listeners=SASL_PLAINTEXT://:9093 listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL security.inter.broker.protocol=SASL_PLAINTEXT sasl.mechanism.inter.broker.protocol=PLAIN sasl.enabled.mechanisms=PLAIN # 修改 log.dirs=/usr/local/kafka/kafka/002/log zookeeper.connect=127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183

配置第三个broker(路径:/usr/local/kafka/kafka-broker-3)

cd /usr/local/kafka cp -rp kafka-broker-1 kafka-broker-3

修改zookeeper配置文件

zookeeper.properties

# 修改 # 数据文件与日志文件 dataDir=/usr/local/kafka/zookeeper/003/data dataLogDir=/usr/local/kafka/zookeeper/003/log # 修改 # 端口 clientPort=2183 # 服务端口,第一个是给客户端提供服务端口,后面一个是内部通讯端口,例如选举leader server.1=127.0.0.1:2887:3887 server.2=127.0.0.1:2888:3888 server.3=127.0.0.1:2889:3889 # SASL认证 authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider jaasLoginRenew=3600000 requireClientAuthScheme=sasl zookeeper.sasl.client=true

修改zookeeper启动命令

zookeeper-server-start.sh

# 修改 export KAFKA_OPTS="-Djava.security.auth.login.config=/usr/local/kafka/kafka-broker-3/config/zk_jaas.conf ${KAFKA_OPTS}" exec $base_dir/kafka-run-class.sh $EXTRA_ARGS org.apache.zookeeper.server.quorum.QuorumPeerMain "$@"

修改kafka配置文件

server.properties

# 修改 broker.id=2 # 修改 listeners=SASL_PLAINTEXT://:9094 listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL security.inter.broker.protocol=SASL_PLAINTEXT sasl.mechanism.inter.broker.protocol=PLAIN sasl.enabled.mechanisms=PLAIN # 修改 log.dirs=/usr/local/kafka/kafka/003/log zookeeper.connect=127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183 启动

zookeeper启动

# 启动命令 nohup /usr/local/kafka/kafka-broker-1/bin/zookeeper-server-start.sh /usr/local/kafka/kafka-broker-1/config/zookeeper.properties & nohup /usr/local/kafka/kafka-broker-2/bin/zookeeper-server-start.sh /usr/local/kafka/kafka-broker-2/config/zookeeper.properties & nohup /usr/local/kafka/kafka-broker-3/bin/zookeeper-server-start.sh /usr/local/kafka/kafka-broker-3/config/zookeeper.properties & # 查看端口(正常应该有2181、3887、3888、3889和2887|2888|2889中的一个) ss -lnput | egrep '2181|2887|3887|2888|3888|2889|3889'

kafka启动

# 启动命令 /usr/local/kafka/kafka-broker-1/bin/kafka-server-start.sh -daemon /usr/local/kafka/kafka-broker-1/config/server.properties /usr/local/kafka/kafka-broker-2/bin/kafka-server-start.sh -daemon /usr/local/kafka/kafka-broker-2/config/server.properties /usr/local/kafka/kafka-broker-3/bin/kafka-server-start.sh -daemon /usr/local/kafka/kafka-broker-3/config/server.properties # 查看端口 ss -lnput | egrep '9092|9093|9094'

安装kafka-manager

# 安装yum-utils软件包 yum install -y yum-utils # 设置docker仓库 yum-config-manager --add-repo http://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo # 安装docker yum install docker-ce docker-ce-cli containerd.io # 启动docker systemctl start docker # 安装kafka-manager docker run -itd --rm -p 9000:9000 -e ZK_HOSTS="172.30.129.14:2181" -e APPLICATION_SECRET=letmein sheepkiller/kafka-manager

访问http://xxx:9000/即可

Springboot项目集成

新建springboot项目,添加依赖

pom.xml

org.springframework.boot spring-boot-starter-parent 2.7.2 org.springframework.kafka spring-kafka

添加配置

kafka-jaas.config

KafkaClient { org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin"; };

application.properties

# 端口 server.port=8899 # 应用名称 spring.application.name=spring-boot-test # kafka配置 # 生产者配置 spring.kafka.bootstrap-servers=xxx:9092,xxx:9093,xxx:9094 #发送失败后,重试次数,0表示不重试 spring.kafka.producer.retries=3 # 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1) spring.kafka.producer.acks=1 # 批量大小 spring.kafka.producer.batch-size=16384 # 生产端缓冲区大小 spring.kafka.producer.buffer-memory=33554432 spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer # 开启sasl认证 spring.kafka.producer.properties.sasl.mechanism=PLAIN spring.kafka.producer.properties.security.protocol=SASL_PLAINTEXT # 消费者配置 # 默认的消费组ID spring.kafka.consumer.group-id=test-topic # 是否自动提交offset spring.kafka.consumer.enable-auto-commit=true # 提交offset延时(接收到消息后多久提交offset spring.kafka.consumer.auto-commit-interval=100 spring.kafka.consumer.auto-offset-reset=earliest # earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费 # latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据 # none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常 spring.kafka.consumer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.consumer.value-serializer=org.apache.kafka.common.serialization.StringSerializer # 开启sasl认证 spring.kafka.consumer.properties.sasl.mechanism=PLAIN spring.kafka.consumer.properties.security.protocol=SASL_PLAINTEXT

添加kafka消费类

KafkaConsumer

@Component public class KafkaConsumer { @KafkaListener(id="test-topic-consumer", topics = "test-topic") public void recieveMsg(ConsumerRecord consumerRecord) { System.out.println("接收到消息:消息值:" + consumerRecord.value() + ",消息偏移量:" + consumerRecord.offset()); } }

添加kafka生产者

KafkaController

@RestController @RequestMapping(value = "kafka") public class KafkaController { @Autowired private KafkaTemplate kafkaTemplate; @PostMapping("sendMsg") public String sendMsg(String msg) { kafkaTemplate.send("test-topic", msg); return "success"; } }

SpringBoot启动类

SpringBootTestApplication

@SpringBootApplication public class SpringBootTestApplication { public static void main(String[] args) throws FileNotFoundException { // 启动时配置sasl认证 final File file = ResourceUtils.getFile("classpath:kafka-jaas.conf"); System.setProperty("java.security.auth.login.config", file.getAbsolutePath()); SpringApplication.run(SpringBootTestApplication.class, args); } }

使用postman发送请求测试即可。



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3