Flink的异步算子的原理及使用

1、简介Flink的特点是高吞吐低延迟 。但是Flink中的某环节的数据处理逻辑需要和外部系统交互,调用耗时不可控会显著降低集群性能 。这时候就可能需要使用异步算子让耗时操作不需要等待结果返回就可以继续下面的耗时操作 。
2、本章可以了解到啥

  • 异步算子源码分析
  • 异步算子为啥能够保证有序性
  • flinksql中怎么自定义使用异步lookup join
3、异步算子的测试代码import java.io.Serializable;import java.util.concurrent.CompletableFuture;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;/** * 网上copy的模拟一个耗时的异步操作 */public class AsyncIODemo implements Serializable {private final ExecutorService executorService = Executors.newFixedThreadPool(4);public CompletableFuture<String> pullData(final String source) {CompletableFuture<String> completableFuture = new CompletableFuture<>();executorService.submit(() -> {try {Thread.sleep(5000);} catch (InterruptedException e) {e.printStackTrace();}/*** 前面睡眠几秒后,调用一下完成方法 , 拼接一个结果字符串*/completableFuture.complete("Output value: " + source);});return completableFuture;}}import org.apache.flink.streaming.api.datastream.AsyncDataStream;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.async.AsyncFunction;import org.apache.flink.streaming.api.functions.async.ResultFuture;import java.util.Arrays;import java.util.concurrent.CompletableFuture;import java.util.concurrent.TimeUnit;/** * 网上copy的代码 */public class AsyncTest {public static void main(String[] args) throws Exception {/*** 获取Flink执行环境并设置并行度为1,方便后面观测*/StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);/*** 构造一个DataStreamSource的序列*/DataStreamSource stream = env.fromElements("11", "22", "33", "44");/*** 使用AsyncDataStream构造一个异步顺序流,这里异步顺序流从名字就可以看出来虽然是异步的,但是却可以保持顺序 , * 这个后面源码分析可以知道原因*/SingleOutputStreamOperator asyncStream = AsyncDataStream.orderedWait(stream, new AsyncFunction<String, String>() {@Overridepublic void asyncInvoke(String input, ResultFuture<String> resultFuture) throws Exception {/*** 这里调用模拟的获取异步请求结果,并返回一个CompletableFuture*/CompletableFuture<String> future = new AsyncIODemo().pullData(input);/*** 注册一个future处理完成的回调,当future处理完成拿到结果后 , 调用resultFuture的* complete方法真正吐出数据*/future.whenCompleteAsync((d,t) ->{resultFuture.complete(Arrays.asList(d));});}// 设置最长异步调用超时时间为10秒}, 10, TimeUnit.SECONDS);asyncStream.print();env.execute();}}4、异步算子源码分析4.1、AsyncDataStreampackage org.apache.flink.streaming.api.datastream;import org.apache.flink.annotation.PublicEvolving;import org.apache.flink.api.common.typeinfo.TypeInformation;import org.apache.flink.api.java.Utils;import org.apache.flink.api.java.typeutils.TypeExtractor;import org.apache.flink.streaming.api.functions.async.AsyncFunction;import org.apache.flink.streaming.api.operators.async.AsyncWaitOperator;import org.apache.flink.streaming.api.operators.async.AsyncWaitOperatorFactory;import java.util.concurrent.TimeUnit;/** * 用于将AsyncFunction应用到数据流的一个helper类 * * <pre>{@code * DataStream<String> input = ... * AsyncFunction<String, Tuple<String, String>> asyncFunc = ... * * AsyncDataStream.orderedWait(input, asyncFunc, timeout, TimeUnit.MILLISECONDS, 100); * }</pre> */@PublicEvolvingpublic class AsyncDataStream {/** 异步操作的输出模式,有序或者无序. */public enum OutputMode {ORDERED,UNORDERED}private static final int DEFAULT_QUEUE_CAPACITY = 100;/*** flag_2 , 添加一个AsyncWaitOperator.** @param in The {@link DataStream} where the {@link AsyncWaitOperator} will be added.* @param func {@link AsyncFunction} wrapped inside {@link AsyncWaitOperator}.* @param timeout for the asynchronous operation to complete* @param bufSize The max number of inputs the {@link AsyncWaitOperator} can hold inside.* @param mode Processing mode for {@link AsyncWaitOperator}.* @param <IN> Input type.* @param <OUT> Output type.* @return A new {@link SingleOutputStreamOperator}*/private static <IN, OUT> SingleOutputStreamOperator<OUT> addOperator(DataStream<IN> in,AsyncFunction<IN, OUT> func,long timeout,int bufSize,OutputMode mode) {TypeInformation<OUT> outTypeInfo =TypeExtractor.getUnaryOperatorReturnType(func,AsyncFunction.class,0,1,new int[] {1, 0},in.getType(),Utils.getCallLocationName(),true);/**这里生成了一个AsyncWaitOperatorFactory*/AsyncWaitOperatorFactory<IN, OUT> operatorFactory =new AsyncWaitOperatorFactory<>(in.getExecutionEnvironment().clean(func), timeout, bufSize, mode);return in.transform("async wait operator", outTypeInfo, operatorFactory);}/*** 添加一个AsyncWaitOperator 。输出流无顺序 。** @param in Input {@link DataStream}* @param func {@link AsyncFunction}* @param timeout for the asynchronous operation to complete* @param timeUnit of the given timeout* @param capacity The max number of async i/o operation that can be triggered* @param <IN> Type of input record* @param <OUT> Type of output record* @return A new {@link SingleOutputStreamOperator}.*/public static <IN, OUT> SingleOutputStreamOperator<OUT> unorderedWait(DataStream<IN> in,AsyncFunction<IN, OUT> func,long timeout,TimeUnit timeUnit,int capacity) {return addOperator(in, func, timeUnit.toMillis(timeout), capacity, OutputMode.UNORDERED);}/*** 添加一个AsyncWaitOperator 。输出流无顺序 。* @param in Input {@link DataStream}* @param func {@link AsyncFunction}* @param timeout for the asynchronous operation to complete* @param timeUnit of the given timeout* @param <IN> Type of input record* @param <OUT> Type of output record* @return A new {@link SingleOutputStreamOperator}.*/public static <IN, OUT> SingleOutputStreamOperator<OUT> unorderedWait(DataStream<IN> in, AsyncFunction<IN, OUT> func, long timeout, TimeUnit timeUnit) {return addOperator(in, func, timeUnit.toMillis(timeout), DEFAULT_QUEUE_CAPACITY, OutputMode.UNORDERED);}/*** flag_1 , 添加一个AsyncWaitOperator 。处理输入记录的顺序保证与输入记录的顺序相同** @param in Input {@link DataStream}* @param func {@link AsyncFunction}* @param timeout for the asynchronous operation to complete* @param timeUnit of the given timeout* @param capacity The max number of async i/o operation that can be triggered* @param <IN> Type of input record* @param <OUT> Type of output record* @return A new {@link SingleOutputStreamOperator}.*/public static <IN, OUT> SingleOutputStreamOperator<OUT> orderedWait(DataStream<IN> in,AsyncFunction<IN, OUT> func,long timeout,TimeUnit timeUnit,int capacity) {return addOperator(in, func, timeUnit.toMillis(timeout), capacity, OutputMode.ORDERED);}/*** 添加一个AsyncWaitOperator 。处理输入记录的顺序保证与输入记录的顺序相同* @param in Input {@link DataStream}* @param func {@link AsyncFunction}* @param timeout for the asynchronous operation to complete* @param timeUnit of the given timeout* @param <IN> Type of input record* @param <OUT> Type of output record* @return A new {@link SingleOutputStreamOperator}.*/public static <IN, OUT> SingleOutputStreamOperator<OUT> orderedWait(DataStream<IN> in, AsyncFunction<IN, OUT> func, long timeout, TimeUnit timeUnit) {return addOperator(in, func, timeUnit.toMillis(timeout), DEFAULT_QUEUE_CAPACITY, OutputMode.ORDERED);}}

推荐阅读