Flink数据传输源码及原理

您所在的位置:网站首页 flink数据交换 Flink数据传输源码及原理

Flink数据传输源码及原理

2023-03-15 04:02| 来源: 网络整理| 查看: 265

Flink数据交换源码及其原理 前言

对于里面内容我们需要一定的netty只是,在涉及网络交互的时候是基于netty的,所以不了解netty可能让我们难以理解其中逻辑

对于一些相关组件我们提前介绍一下,后面会常提到

ResultPartition: 算子的数据会写入ResultPartition,包含多个ResultSubPartition

ResultSubPartition : 每个task消费一个ResultSubPartition,task不会只消费一个ResultSubPartition

InputGate : 对数据输入的封装,Gate(翻译 : 门)就很容易理解,其实InputGate主要还是inputChannel的封装,InputGate中包含多个InputChannel

InputChannel : 实际工作的内容,分为本地和远程,每一个InputChannel接收一个ResultSubPartition输出

Task构建简单流程

我们从构建出来Task之后开始读取数据发送数据的流程,今天聊一聊Task是如何传递数据,这里是讨论数据交互前所需要的组件构建的过程以及交互过程

在chain中直接调用下游算子的processElement方法即可,如果是taskManger和跨网络中,会对数据进行序列化以及反序列写入到buffer(buffer包含一个MemorySegment)中,会通过bufferBuilder来讲数据写入到MemorySegment中,与BufferBuilder想对应的时候BufferConsumer位于下游task,负责读取MemorySegment的数据,一个bufferBuilder对应一个BufferConsumer

// 这是flink调度任务的一个具体执行类,继承runnable,表示一个task由一个线程执行 public class Task implements Runnable,TaskSlotPayload,TaskActions, PartitionProducerStateProvider, CheckpointListener,BackPressureSampleableTask{ } // Task.run,可以看到直接调用doRun方法,doRun方法也是主要逻辑存在的地方 public void run() { try { doRun(); } finally { terminationFuture.complete(executionState); } } // 这里主要保留了 最主要的几行代码,其他的都删掉了,这里通过invokable调用invoke方法来真正的启动任务 // AbstractInvokable 一些实现类 StreamTask DataSrouceTask等. // 我们通过api的方式source生成SourceStreamTask,其他算子根据不同的输入生成不同的StreamTask private void doRun() { AbstractInvokable invokable = null; // 构建AbstractInvokable,实际上就是StreamTask,当然我说的是我这里的栗子,不同的模式可能会创建不同的AbstractInvokable,里面通过具体nameOfInvokableClass全限定类名来反射构建具体的AbstractInvokable对象,比如这里构建的是OneInputStreamTask invokable =loadAndInstantiateInvokable(userCodeClassLoader.asClassLoader(), nameOfInvokableClass, env); this.invokable = invokable; // 调用invoke开始执行 invokable.invoke(); } // 在进入invoke的方法前,我们看看AbstractInvokable是如何构建出来的,以OneInputStreamTask为栗子 // 这里通过反射的方式构建了传入的Environment public OneInputStreamTask(Environment env) throws Exception { // 调用父类构造 super(env); } // 父类构造 protected StreamTask(Environment env) throws Exception { this(env, null); } // 层层调用在这里注意一点 protected StreamTask( Environment environment, @Nullable TimerService timerService, Thread.UncaughtExceptionHandler uncaughtExceptionHandler, StreamTaskActionExecutor actionExecutor) throws Exception { this( environment, timerService, uncaughtExceptionHandler, actionExecutor, // 创建了一个Mailbox对象,后面task的数据接收和发送都依赖于该对象,并将构建当前线程作为参数传入 // 表示当前线程才可以读取mail --- 毕竟这是属于你的邮件 new TaskMailboxImpl(Thread.currentThread())); } // 最终执行到该构造方法中 // 只标注重点内容 protected StreamTask( Environment environment, @Nullable TimerService timerService, Thread.UncaughtExceptionHandler uncaughtExceptionHandler, StreamTaskActionExecutor actionExecutor, TaskMailbox mailbox) throws Exception { // 调用父类构造即AbstractInvokable super(environment); this.configuration = new StreamConfig(getTaskConfiguration()); // TODO 重点 后面需要 // private final RecordWriterDelegate recordWriter; // 构建recordWriter,实际构建的是RecordWriterDelegate,通过getRecordWriter方法获取具体的recordWriter // 下面介绍怎么构建出来recordWriter的 this.recordWriter = createRecordWriterDelegate(configuration, environment); // 处理基于mailbox之外的一些动作,比如发送事件等动作 this.actionExecutor = Preconditions.checkNotNull(actionExecutor); // TODO 重点 // 构建mailbox处理器,用于处理mailbox收到的mail信息 // 参数1:MailboxDefaultAction实现类,由lambda表达式方式编写,即调用当前类的processInput方法(即OneInputStreamTask) // 参数2:mailbox --- 处理邮箱,你首先得有一个邮箱对吧 // 参数3:处理邮箱的线程 this.mailboxProcessor = new MailboxProcessor(this::processInput, mailbox, actionExecutor); this.mailboxProcessor.initMetric(environment.getMetricGroup()); // 用于执行mail的executor,类似于java的线程池,但不医院 this.mainMailboxExecutor = mailboxProcessor.getMainMailboxExecutor(); this.asyncExceptionHandler = new StreamTaskAsyncExceptionHandler(environment); // 异步处理snapshot的线程,嘿嘿嘿 this.asyncOperationsThreadPool = Executors.newCachedThreadPool( new ExecutorThreadFactory("AsyncOperations", uncaughtExceptionHandler)); this.stateBackend = createStateBackend(); // task处理checkpoint的conordinator this.subtaskCheckpointCoordinator = new SubtaskCheckpointCoordinatorImpl( stateBackend.createCheckpointStorage(getEnvironment().getJobID()), getName(), actionExecutor, getCancelables(), getAsyncOperationsThreadPool(), getEnvironment(), this, configuration.isUnalignedCheckpointsEnabled(), this::prepareInputSnapshot); if (timerService == null) { ThreadFactory timerThreadFactory = new DispatcherThreadFactory( TRIGGER_THREAD_GROUP, "Time Trigger for " + getName()); this.timerService = new SystemProcessingTimeService(this::handleTimerException, timerThreadFactory); } else { this.timerService = timerService; } // 用于处理io的线程,比如恢复state this.channelIOExecutor = Executors.newSingleThreadExecutor( new ExecutorThreadFactory("channel-state-unspilling")); injectChannelStateWriterIntoChannels(); } // 在上面我们已经了解了streamTask的构造方法,现在我们把recordWirter构建的过程介绍一下 // StreamTask.createRecordWriterDelegate public static RecordWriterDelegate createRecordWriterDelegate( StreamConfig configuration, Environment environment) { List recordWrites = // 构建task对应的writers,通常是1个 createRecordWriters(configuration, environment); // 判断writer的输出数量,一般情况下就是1 也就是构建的都是SingleRecordWriter if (recordWrites.size() == 1) { // 直接获取对其包装一下即可 return new SingleRecordWriter(recordWrites.get(0)); } else if (recordWrites.size() == 0) { return new NonRecordWriter(); } else { // 比如 // 在下面中,map生成的时候就会生成MultipleRecordWriters // DataStreamSource source =....' // SingleOutputStreamOperator map =source.map; // source.connect(map); return new MultipleRecordWriters(recordWrites); } } // 构建task对应的recordWriter // StreamTask.createRecordWriters private static List createRecordWriters( StreamConfig configuration, Environment environment) { List recordWriters = new ArrayList(); // 找到operator的所有出边 List outEdgesInOrder = configuration.getOutEdgesInOrder( environment.getUserCodeClassLoader().asClassLoader()); // 遍历出边集合,为每一个出边构建一个recordWriter for (int i = 0; i < outEdgesInOrder.size(); i++) { StreamEdge edge = outEdgesInOrder.get(i); recordWriters.add( // 构建recordWriter // 具体不看了,简单介绍一下 // 获取task对应的partitioner,获取对应的ResultPartitionWriter,然后通过构造者模式构建RecordWriter // 在内部会判断是不是广播的,如果是广播的则构建BroadcastRecordWriter,否则是ChannelSelectorRecordWriter(基本都是他) // ChannelSelectorRecordWriter的构造过程中最终要的两个参数ResultPartitionWriter, ChannelSelector createRecordWriter( edge, i, environment, environment.getTaskInfo().getTaskName(), edge.getBufferTimeout())); } return recordWriters; }

在上面我们已经了解到了task的构建和recordWriter的构建了,现在我们要进入invoke方法,来看看怎么执行的了,当invoke调用的时候,Task会做一些执行前的准备工作,然后真正的开始调用userFunction读取数据发送数据的过程了,那么我们现在开始通过StreamTask来看内部如何实现的

// StreamTask是抽象类 // StreamTask.invoke方法 @Override public final void invoke() throws Exception { try { // 最主要的两个方法 // 在里面会初始化 input和output,来明确数据的输入和输出 // 通过不同的输入输出来确定数据的交互方式,比如线程内,taskManager的task之间或者基于网络传输 beforeInvoke(); // 运行mail,开始持续的读取数据发送下游,后面看 runMailboxLoop(); } // 不重要代码去掉了 protected void beforeInvoke() throws Exception { // 重点 // 构建operatorChain,我们需要深入去看, recordWriter在构造方法是构建的,前面已经讲解 operatorChain = new OperatorChain(this, recordWriter); // 即运行在task里的具体的operator mainOperator = operatorChain.getMainOperator(); // 做一些task的初始化工作 actionExecutor.runThrowing(() -> { // 读取state的reader SequentialChannelStateReader reader = getEnvironment().getTaskStateManager() .getSequentialChannelStateReader(); reader.readOutputData(getEnvironment().getAllWriters(), false); // operator初始化,会调用的算子的open方法 operatorChain.initializeStateAndOpenOperators(createStreamTaskStateInitializer()); // 读取state数据 channelIOExecutor.execute(() -> { try { reader.readInputData(getEnvironment().getAllInputGates()); } catch (Exception e) { asyncExceptionHandler.handleAsyncException( "Unable to read channel state", e); } }); }); isRunning = true; } // beforeInvoke的最终要一点就是构建的OperatorChain,我们一起深入看看operatorChain的构造方法 // 代码很多,由于我们只关注数据交换的内容,其他的地方不做讲解 public OperatorChain( StreamTask containingTask, RecordWriterDelegate recordWriterDelegate) { this.operatorEventDispatcher = new OperatorEventDispatcherImpl( containingTask.getEnvironment().getUserCodeClassLoader().asClassLoader(), containingTask.getEnvironment().getOperatorCoordinatorEventGateway()); final ClassLoader userCodeClassloader = containingTask.getUserCodeClassLoader(); final StreamConfig configuration = containingTask.getConfiguration(); StreamOperatorFactory operatorFactory = configuration.getStreamOperatorFactory(userCodeClassloader); // we read the chained configs, and the order of record writer registrations by output name Map chainedConfigs = configuration.getTransitiveChainedTaskConfigsWithSelf(userCodeClassloader); // create the final output stream writers // we iterate through all the out edges from this job vertex and create a stream output List outEdgesInOrder = configuration.getOutEdgesInOrder(userCodeClassloader); Map[outEdgesInOrder.size()]; // from here on, we need to make sure that the output writers are shut down again on failure boolean success = false; try { createChainOutputs( outEdgesInOrder, recordWriterDelegate, chainedConfigs, containingTask, streamOutputMap); // we create the chain of operators and grab the collector that leads into the chain List


【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3