C语言操作kafka以及安装librdkafka库

您所在的位置:网站首页 kafka生产者bootstrap C语言操作kafka以及安装librdkafka库

C语言操作kafka以及安装librdkafka库

2023-03-04 10:46| 来源: 网络整理| 查看: 265

一、安装librdkafkagit clone https://github.com/edenhill/librdkafka.git cd librdkafka git checkout v1.7.0 ./configure make sudo make install sudo ldconfig

在librdkafka的examples目录下会有示例程序。比如consumer的启动需要下列参数

% Usage: ./consumer ..

指定broker、group id、topic(可以订阅多个)。示例:

./consumer localhost:9092 0 test

缩略语介绍:

缩略语缩略语全称示例或说明rdRapid Developmentrd.hrkRdKafkatopparTopic Partitionstruct rd_kafka_toppar_t { };repReplystruct rd_kafka_t { rd_kafka_q_t *rk_rep };msgqMessage Queuestruct rd_kafka_msgq_t { };rkbRdKafka BrokerKafka代理rkoRdKafka OperationKafka操作rkmRdKafka MessageKafka消息payload存在Kafka上的消息(或叫Log)

二、开启kafka相关服务2.1、启动zookeeper

启动zookeeper可以通过下面的脚本来启动zookeeper服务,当然,也可以自己独立搭建zookeeper的集群来实现。这里我们直接使用kafka自带的zookeeper。

cd bin/ # 前台运行: sh zookeeper-server-start.sh ../config/zookeeper.properties # 后台运行: sh zookeeper-server-start.sh -daemon ../config/zookeeper.properties

可以通过命令lsof -i:2181 查看zookeeper是否启动成功。

$ lsof -i:2181 COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME java 74930 fly 96u IPv6 734467 0t0 TCP *:2181 (LISTEN)

2.2、启动Kafka

启动kafka(kafka安装路径的bin目录下执行),默认启动端口9092。

sh kafka-server-start.sh -daemon ../config/server.properties

2.3、创建topicsh kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

参数说明:

–create 是创建主题的的动作指令。–zookeeper 指定kafka所连接的zookeeper服务地址。–replicator-factor 指定了副本因子(即副本数量); 表示该topic需要在不同的broker中保存几份,这里设置成1,表示在两个broker中保存两份Partitions分区数。–partitions 指定分区个数;多通道,类似车道。–topic 指定所要创建主题的名称,比如test。

成功则显示:

Created topic "test".

三、c语言操作kafka的范例3.1、消费者

在librdkafka\examples下有consumer.c文件,该文件是一个c语言操作kafka的代码范例,内容如下。

/** * Simple high-level balanced Apache Kafka consumer * using the Kafka driver from librdkafka * (https://github.com/edenhill/librdkafka) */ #include #include #include #include /* Typical include path would be , but this program * is builtin from within the librdkafka source tree and thus differs. */ //#include #include "rdkafka.h" static volatile sig_atomic_t run = 1; /** * @brief Signal termination of program */ static void stop (int sig) { run = 0; } /** * @returns 1 if all bytes are printable, else 0. */ static int is_printable (const char *buf, size_t size) { size_t i; for (i = 0 ; i err) fprintf(stderr, "%% Message delivery failed: %s\n", rd_kafka_err2str(rkmessage->err)); else fprintf(stderr, "%% Message delivered (%zd bytes, " "partition %"PRId32")\n", rkmessage->len, rkmessage->partition); /* The rkmessage is destroyed automatically by librdkafka */ } int main (int argc, char **argv) { rd_kafka_t *rk; /* Producer instance handle */ rd_kafka_conf_t *conf; /* Temporary configuration object */ char errstr[512]; /* librdkafka API error reporting buffer */ char buf[512]; /* Message value temporary buffer */ const char *brokers; /* Argument: broker list */ const char *topic; /* Argument: topic to produce to */ /* * Argument validation */ if (argc != 3) { fprintf(stderr, "%% Usage: %s \n", argv[0]); return 1; } brokers = argv[1]; topic = argv[2]; /* * Create Kafka client configuration place-holder */ conf = rd_kafka_conf_new(); /* Set bootstrap broker(s) as a comma-separated list of * host or host:port (default port 9092). * librdkafka will use the bootstrap brokers to acquire the full * set of brokers from the cluster. */ if (rd_kafka_conf_set(conf, "bootstrap.servers", brokers, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) { fprintf(stderr, "%s\n", errstr); return 1; } /* Set the delivery report callback. * This callback will be called once per message to inform * the application if delivery succeeded or failed. * See dr_msg_cb() above. * The callback is only triggered from rd_kafka_poll() and * rd_kafka_flush(). */ rd_kafka_conf_set_dr_msg_cb(conf, dr_msg_cb); /* * Create producer instance. * * NOTE: rd_kafka_new() takes ownership of the conf object * and the application must not reference it again after * this call. */ rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr)); if (!rk) { fprintf(stderr, "%% Failed to create new producer: %s\n", errstr); return 1; } /* Signal handler for clean shutdown */ signal(SIGINT, stop); fprintf(stderr, "%% Type some text and hit enter to produce message\n" "%% Or just hit enter to only serve delivery reports\n" "%% Press Ctrl-C or Ctrl-D to exit\n"); while (run && fgets(buf, sizeof(buf), stdin)) { size_t len = strlen(buf); rd_kafka_resp_err_t err; if (buf[len-1] == '\n') /* Remove newline */ buf[--len] = '\0'; if (len == 0) { /* Empty line: only serve delivery reports */ rd_kafka_poll(rk, 0/*non-blocking */); continue; } /* * Send/Produce message. * This is an asynchronous call, on success it will only * enqueue the message on the internal producer queue. * The actual delivery attempts to the broker are handled * by background threads. * The previously registered delivery report callback * (dr_msg_cb) is used to signal back to the application * when the message has been delivered (or failed). */ retry: err = rd_kafka_producev( /* Producer handle */ rk, /* Topic name */ RD_KAFKA_V_TOPIC(topic), /* Make a copy of the payload. */ RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY), /* Message value and length */ RD_KAFKA_V_VALUE(buf, len), /* Per-Message opaque, provided in * delivery report callback as * msg_opaque. */ RD_KAFKA_V_OPAQUE(NULL), /* End sentinel */ RD_KAFKA_V_END); if (err) { /* * Failed to *enqueue* message for producing. */ fprintf(stderr, "%% Failed to produce to topic %s: %s\n", topic, rd_kafka_err2str(err)); if (err == RD_KAFKA_RESP_ERR__QUEUE_FULL) { /* If the internal queue is full, wait for * messages to be delivered and then retry. * The internal queue represents both * messages to be sent and messages that have * been sent or failed, awaiting their * delivery report callback to be called. * * The internal queue is limited by the * configuration property * queue.buffering.max.messages */ rd_kafka_poll(rk, 1000/*block for max 1000ms*/); goto retry; } } else { fprintf(stderr, "%% Enqueued message (%zd bytes) " "for topic %s\n", len, topic); } /* A producer application should continually serve * the delivery report queue by calling rd_kafka_poll() * at frequent intervals. * Either put the poll call in your main loop, or in a * dedicated thread, or call it after every * rd_kafka_produce() call. * Just make sure that rd_kafka_poll() is still called * during periods where you are not producing any messages * to make sure previously produced messages have their * delivery report callback served (and any other callbacks * you register). */ rd_kafka_poll(rk, 0/*non-blocking*/); } /* Wait for final messages to be delivered or fail. * rd_kafka_flush() is an abstraction over rd_kafka_poll() which * waits for all messages to be delivered. */ fprintf(stderr, "%% Flushing final messages..\n"); rd_kafka_flush(rk, 10*1000 /* wait for max 10 seconds */); /* If the output queue is still not empty there is an issue * with producing messages to the clusters. */ if (rd_kafka_outq_len(rk) > 0) fprintf(stderr, "%% %d message(s) were not delivered\n", rd_kafka_outq_len(rk)); /* Destroy the producer instance */ rd_kafka_destroy(rk); return 0; }

其操作流程和消费者是类似的。

函数调用含义rd_kafka_conf_new();创建配置文件rd_kafka_conf_set(…)设置参数。设置bootstrap.serversrd_kafka_conf_set_dr_msg_cb(…)设置交付报告回调rd_kafka_new(…)创建一个kafka生产者,RD_KAFKA_PRODUCERrd_kafka_producev(…)发送数据

3.3、生产者和消费者的交互

(1)启动消费者。

./consumer localhost:9092 0 test

显示:

% Subscribed to 1 topic(s), waiting for rebalance and messages...

(2)启动生产者。

./producer localhost:9092 test

显示

% Type some text and hit enter to produce message % Or just hit enter to only serve delivery reports % Press Ctrl-C or Ctrl-D to exit

(3)通信过程。

生产者发送hello:

$ ./producer localhost:9092 test % Type some text and hit enter to produce message % Or just hit enter to only serve delivery reports % Press Ctrl-C or Ctrl-D to exit hello consumer % Enqueued message (14 bytes) for topic test

消费者接受:

$ ./consumer localhost:9092 0 test % Subscribed to 1 topic(s), waiting for rebalance and messages... Message on test [0] at offset 4: Value: hello consumer

总结分区只能被一个消费者读取。如果一个topic只有一个分区,多个消费者读取时只有一个消费者能读到数据;单个分区开启多个消费者去读取数据是没有意义的。熟悉消费者概念和C/C++编写生产消费。



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3