一文搞定 Flink 消费消息的全流程 |
您所在的位置:网站首页 › flink数据传输底层代码 › 一文搞定 Flink 消费消息的全流程 |
我们以下面代码为例: FlinkKafkaConsumer consumer = new FlinkKafkaConsumer("canal_monitor_order_astable", new SimpleStringSchema(), properties); consumer.setStartFromEarliest(); env.addSource(consumer).flatMap(...).print()当 Flink 程序启动,leader、blobServer 等都创建完毕,当 ExecutionGraph 构建完成,提交成功之后。就到了,task 正式执行的阶段了。这个时候,一条消息是如何流转的呢? 首先,进入了 Task 的 run 方法 ...... /* 这个方法就是用户代码所真正被执行的入口。比如我们写的什么 new MapFunction() 的逻辑,最终就是在这里被执行的 */ // run the invokable invokable.invoke(); ......然后就到了 StreamTask 的 invoke 方法,这里是每个算子真正开始执行的地方 ...... run(); .....最为关键的就是 run 方法。 进入 SourceStreamTask run 方法 @Override // source task 获取数据的入口方法 protected void run() throws Exception { headOperator.run(getCheckpointLock(), getStreamStatusMaintainer()); }继续追踪就到了 StreamSource 的 run 方法 ...... // 生成上下文之后,接下来就是把上下文交给 SourceFunction 去执行,用户自定义的 run 方法开始正式运行 userFunction.run(ctx); ......此处的 userFunction 实际上就是 FlinkKafkaConsumer 具体是如何消费消息的可以参考 写给大忙人看的Flink 消费 Kafka 彻底搞懂 Flink Kafka OffsetState 存储 继续追踪到 RecordWriter private void emit(T record, int targetChannel) throws IOException, InterruptedException { // 最底层的抽象是 MemorySegment,用于数据传输的是 Buffer,将 java 对象转化为 buffer 是这个 // Flink 把对象调用该对象所属的序列化器序列化为字节数组 serializer.serializeRecord(record); if (copyFromSerializerToTargetChannel(targetChannel)) { serializer.prune(); } }RecordWriter 还是比较有意思的,RecordWriter 主要就是把 java 对象转化为 byte 数组( 也就是 flink 自己管理内存,不借助与 JVM )。而后面的传输也是基于 byte 数组的。 copyFromSerializerToTargetChannel 会将 byte 数据 flush 到 相应的 targetChannel ( targetChannel 对于下游来说就是 InputChannel 具体可以参考一下 Flink反压机制 ) 底层通过 netty 进行数据的传送,传送至 PartitionRequestQueue ...... if (cause != null) { ErrorResponse msg = new ErrorResponse( new ProducerFailedException(cause), reader.getReceiverId()); // 真正往 netty 的 nio 通道里写入. // 在这里,写入的是一个 RemoteInputChannel,对应的就是下游节点的 InputGate 的 channels。 ctx.writeAndFlush(msg); } ......这个时候,这条数据就进入了下游的 InputChannel 。 有写得需要有读,进入到 CreditBasedPartitionRequestClientHandler // nio 通道的另一端( 下游 )需要读入 buffer // 上游的算子写入,下游的算子读取,这也是反压的原理 // 为什么叫 decodeMsg,主要上游传过来的是 byte 数组,这个将 byte 数组 转化为 record private void decodeMsg(Object msg) throws Throwable { final Class msgClazz = msg.getClass(); // ---- Buffer -------------------------------------------------------- if (msgClazz == NettyMessage.BufferResponse.class) { NettyMessage.BufferResponse bufferOrEvent = (NettyMessage.BufferResponse) msg; RemoteInputChannel inputChannel = inputChannels.get(bufferOrEvent.receiverId); if (inputChannel == null) { bufferOrEvent.releaseBuffer(); cancelRequestFor(bufferOrEvent.receiverId); return; } decodeBufferOrEvent(inputChannel, bufferOrEvent); } else if (msgClazz == NettyMessage.ErrorResponse.class) { // ---- Error --------------------------------------------------------- NettyMessage.ErrorResponse error = (NettyMessage.ErrorResponse) msg; SocketAddress remoteAddr = ctx.channel().remoteAddress(); if (error.isFatalError()) { notifyAllChannelsOfErrorAndClose(new RemoteTransportException( "Fatal error at remote task manager '" + remoteAddr + "'.", remoteAddr, error.cause)); } else { RemoteInputChannel inputChannel = inputChannels.get(error.receiverId); if (inputChannel != null) { if (error.cause.getClass() == PartitionNotFoundException.class) { inputChannel.onFailedPartitionRequest(); } else { inputChannel.onError(new RemoteTransportException( "Error at remote task manager '" + remoteAddr + "'.", remoteAddr, error.cause)); } } } } else { throw new IllegalStateException("Received unknown message from producer: " + msg.getClass()); } }至此呢,就该下游算子 flapMap 运行处理了。(当然啦,实际上应该是先 print 对应的 task 运行,然后 flatMap 对应的 task 运行,最后才是 source 对应的 task 运行 )。 我们得回到 Task 的 run 方法 ...... /* 这个方法就是用户代码所真正被执行的入口。比如我们写的什么 new MapFunction() 的逻辑,最终就是在这里被执行的 */ // run the invokable invokable.invoke(); ......然后就到了 StreamTask 的 invoke 方法,这里是每个算子真正开始执行的地方 ...... run(); .....最为关键的就是 run 方法。 这次调用的是 flatMap 对应 task 的 run 方法,所以进入 OneInputStreamTask @Override protected void run() throws Exception { // cache processor reference on the stack, to make the code more JIT friendly final StreamInputProcessor inputProcessor = this.inputProcessor; //处理输入的消息 while (running && inputProcessor.processInput()) { // all the work happens in the "processInput" method } }进入 processInput 方法 // 程序首先获取下一个 buffer // 主要是尝试获取 buffer,然后赋值给当前的反序列化器 // 处理 barrier 的逻辑,被包含在了getNextNonBlocked 中 final BufferOrEvent bufferOrEvent = barrierHandler.getNextNonBlocked(); if (bufferOrEvent != null) { if (bufferOrEvent.isBuffer()) { currentChannel = bufferOrEvent.getChannelIndex(); currentRecordDeserializer = recordDeserializers[currentChannel]; currentRecordDeserializer.setNextBuffer(bufferOrEvent.getBuffer()); } else { // Event received final AbstractEvent event = bufferOrEvent.getEvent(); if (event.getClass() != EndOfPartitionEvent.class) { throw new IOException("Unexpected event: " + event); } } }获取到 buffer 之后 // 这里就是真正的,用户的代码即将被执行的地方 // now we can do the actual processing StreamRecord record = recordOrMark.asRecord(); synchronized (lock) { numRecordsIn.inc(); //set KeyContext setCurrentKey streamOperator.setKeyContextElement1(record); streamOperator.processElement(record); } return true;交给 flatMap 去处理。处理完了之后就又把数据发往 RecordWriter 的 emit 然后就这样反复执行,直到最后一个 operator ,这个消息也就消费完毕了。当然了,这仅仅是跨 taskManager 的消息流程,同一个 taskMananger 的消息流程就很简单了,就是简单的消息传递,不需要序列化成 byte 数组 总结 整体流程
|
今日新闻 |
推荐新闻 |
CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3 |