springboot

您所在的位置:网站首页 kafkaappender springboot

springboot

2023-04-15 17:00| 来源: 网络整理| 查看: 265

今天遇到一个问题,自己在logback上增加一个把日志发送到springboot的appender,如果Kafka链接失败或metadata更新失败,会阻塞主应用启动,如下图

Kafka producer 在发送消息之前,会更新metadata, 关于metadata的更新机制,我觉得在这篇博客里讲的比较详细。如果更新metadata失败,kafka producer会阻塞 max.block.ms 后再继续尝试获取metadata, 就是在这阻塞的过程中,springboot主应用也处于被阻塞状态,Kafka max.block.ms 的默认值是600000.

解决方法可以减少max.block.ms的值,不过在这里有一个更好的解决思路,引入`logback-kafka-appender

com.github.danielwegener logback-kafka-appender 0.2.0 runtime ch.qos.logback logback-classic 1.2.3 runtime

定义发送日志到Kafka的appender如下:

使用 ch.qos.logback.classic.AsyncAppender 包裹一下发送Kafka的appender

记录日志时直接使用异步的appender

看了下AsyncAppender这个类,它继承一个AsyncAppenderBase类,这个类定义一个阻塞队列和单独的线程worker

@Override public void start() { if (isStarted()) return; if (appenderCount == 0) { addError("No attached appenders found."); return; } if (queueSize < 1) { addError("Invalid queue size [" + queueSize + "]"); return; } // AsyncAppender启动的时候会初始化一个阻塞队列,日志会临时放到队列里 blockingQueue = new ArrayBlockingQueue(queueSize); if (discardingThreshold == UNDEFINED) discardingThreshold = queueSize / 5; addInfo("Setting discardingThreshold to " + discardingThreshold); // worker是单独一个线程,用来把日志发送到各个子appender worker.setDaemon(true); worker.setName("AsyncAppender-Worker-" + getName()); // AsyncAppenderBase类继承UnsynchronizedAppenderBase,super.start()就是把这个appender实例标记为已经开启 super.start(); // worker线程开启,向各个子appender传消息 worker.start(); }

appender接受消息放到阻塞队列中:

// appender有消息就会调用 append方法 @Override protected void append(E eventObject) { if (isQueueBelowDiscardingThreshold() && isDiscardable(eventObject)) { return; } preprocess(eventObject); put(eventObject); } private void put(E eventObject) { if (neverBlock) { blockingQueue.offer(eventObject); } else { // 相当于blockingQueue.put(eventObject) putUninterruptibly(eventObject); } }

worker是一个线程内部类,负责把消息发送给各个子appender(上例中就是发给kafkaAppender,再由kafkaAppender去负责链接kafka),这样就把更新Kafka metadata的线程交给worker来操作,与主线程隔离开,主服务也就不会阻塞。

class Worker extends Thread { public void run() { AsyncAppenderBase parent = AsyncAppenderBase.this; AppenderAttachableImpl aai = parent.aai; // loop while the parent is started while (parent.isStarted()) { try { E e = parent.blockingQueue.take(); // 遍历所有子appender,把消息传给所有子appender aai.appendLoopOnAppenders(e); } catch (InterruptedException ie) { break; } } addInfo("Worker thread will flush remaining events before exiting. "); // 如果appender被关闭,发送完剩余的消息后,再关闭子appender for (E e : parent.blockingQueue) { aai.appendLoopOnAppenders(e); parent.blockingQueue.remove(e); } aai.detachAndStopAllAppenders(); } }


【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3