Flink的异步算子的原理及使用( 二 )

如上从测试代码开始调用链为:AsyncDataStream.orderedWait -> addOperator,然后addOperator中new了一个AsyncWaitOperatorFactory 。然后到这里其实可以告一段落了,因为没有必要往下看了,这个时候就需要猜了,一般我们类名叫XXFactory基本都是工厂模式 , 然后工厂生产的就是XX了,这里就是生成AsyncWaitOperator对象的工厂了,然后我们就可以直接在AsyncWaitOperator类的构造方法第一行打个断点,看看啥时候会进去了 。为啥要这样做,因为我们看到的Flink源码其实并不是一个线性的执行过程,架构图如下

Flink的异步算子的原理及使用

文章插图
他的代码实际上并不是都在一个节点执行,虽然我们在本地调试,但是也是在模拟的一个本地集群中执行,怎么模拟出不同的节点呢,很明显要通过线程,也就是说不同的节点用不同的线程来代表并执行 。所以我们无脑断点是没法看到全貌的 。看代码的一个技巧,根据各方面的经验猜测,比如这里就是根据类名的特点进行猜测 。
4.2、AsyncWaitOperator我们在AsyncWaitOperator类的所有公共方法和构造方法里打个断点,debug运行程序进入调试
Flink的异步算子的原理及使用

文章插图
Flink的异步算子的原理及使用

文章插图
很明显这个构造方法,在一个独立的sink线程中运行,如果还按照上面的方式断点,估计找一辈子都找不到了
public AsyncWaitOperator(@Nonnull AsyncFunction<IN, OUT> asyncFunction,long timeout,int capacity,@Nonnull AsyncDataStream.OutputMode outputMode,@Nonnull ProcessingTimeService processingTimeService,@Nonnull MailboxExecutor mailboxExecutor) {super(asyncFunction);setChainingStrategy(ChainingStrategy.ALWAYS);Preconditions.checkArgument(capacity > 0, "The number of concurrent async operation should be greater than 0.");this.capacity = capacity;this.outputMode = Preconditions.checkNotNull(outputMode, "outputMode");this.timeout = timeout;this.processingTimeService = Preconditions.checkNotNull(processingTimeService);this.mailboxExecutor = mailboxExecutor;}我们看一下构造方法的内容,发现都是一些初始化操作 , 看着没啥营养,看代码的另外一个技巧:抓大放小,路边的野花不要理睬,忽略一些不重要的初始化和参数校验等代码 , 重点关注大的流程的东西 。
我们继续直接放开往下运行,直到下一个断点
Flink的异步算子的原理及使用

文章插图
@Overridepublic void setup(StreamTask<?, ?> containingTask,StreamConfig config,Output<StreamRecord<OUT>> output) {super.setup(containingTask, config, output);this.inStreamElementSerializer =new StreamElementSerializer<>(getOperatorConfig().<IN>getTypeSerializerIn1(getUserCodeClassloader()));switch (outputMode) {case ORDERED:queue = new OrderedStreamElementQueue<>(capacity);break;case UNORDERED:queue = new UnorderedStreamElementQueue<>(capacity);break;default:throw new IllegalStateException("Unknown async mode: " + outputMode + '.');}this.timestampedCollector = new TimestampedCollector<>(super.output);}一眼望去就发现下面switch case那里比较有用 , 根据名字可以知道,这里根据outputMode判断分别实例化有序的队列和无需的队列,联想到AsyncDataStream类里的几个orderedWait和unorderedWait方法,很快就能想到是否有序这个队列就是关键了 。好了没什么可以留恋了,继续执行到下一个断点吧!
Flink的异步算子的原理及使用

文章插图
初始化状态,没啥可留恋的 , 先跳过继续到下一个断点
Flink的异步算子的原理及使用

文章插图
@Overridepublic void open() throws Exception {super.open();this.isObjectReuseEnabled = getExecutionConfig().isObjectReuseEnabled();if (recoveredStreamElements != null) {for (StreamElement element : recoveredStreamElements.get()) {if (element.isRecord()) {processElement(element.<IN>asRecord());} else if (element.isWatermark()) {processWatermark(element.asWatermark());} else if (element.isLatencyMarker()) {processLatencyMarker(element.asLatencyMarker());} else {throw new IllegalStateException("Unknown record type "+ element.getClass()+ " encountered while opening the operator.");}}recoveredStreamElements = null;}}如上从7行开始貌似是开始处理数据了,但是根据recoveredStreamElements这个名称一看,很显然recovered是恢复的意思,这里判断是否为空 , 不为空再做,很明显是做修复数据相关的逻辑,我们处理数据的正主都没找到这里很明显没啥意义 , 属于路边的野花,直接忽略到下一个断点去 。

推荐阅读