Flink的异步算子的原理及使用( 三 )


Flink的异步算子的原理及使用

文章插图
@Overridepublic void processElement(StreamRecord<IN> record) throws Exception {StreamRecord<IN> element;// copy the element avoid the element is reusedif (isObjectReuseEnabled) {//noinspection uncheckedelement = (StreamRecord<IN>) inStreamElementSerializer.copy(record);} else {element = record;}// add element first to the queuefinal ResultFuture<OUT> entry = addToWorkQueue(element);final ResultHandler resultHandler = new ResultHandler(element, entry);// register a timeout for the entry if timeout is configuredif (timeout > 0L) {resultHandler.registerTimeout(getProcessingTimeService(), timeout);}userFunction.asyncInvoke(element.getValue(), resultHandler);}很明显根据方法名称可以知道这里就是在处理真正的数据了 , 反复断点几次,可以发现,每一条数据都会进来这个方法一次
Flink的异步算子的原理及使用

文章插图
这个方法的参数就是source流里的一个元素 , 下面我们再看一下addToWorkQueue方法吧
/** 将给定的流元素添加到操作符的流元素队列中 。该操作会阻塞 , 直到元素被添加 。*/private ResultFuture<OUT> addToWorkQueue(StreamElement streamElement)throws InterruptedException {Optional<ResultFuture<OUT>> queueEntry;while (!(queueEntry = queue.tryPut(streamElement)).isPresent()) {mailboxExecutor.yield();}return queueEntry.get();}这个方法就是将前面source里的元素放入前面new出来的队列 , 本例这里是一个有序的队列OrderedStreamElementQueue,并返回了一个ResultFuture对象,我们需要看一下这个对象是个啥
4.3、ResultFuture【Flink的异步算子的原理及使用】@PublicEvolvingpublic interface ResultFuture<OUT> {/*** 将所有结果放在Collection中,然后输出 。*/void complete(Collection<OUT> result);/*** 将异常输出*/void completeExceptionally(Throwable error);}我们再来看下tryPut是如何包装出了一个ResultFuture对象的
4.4、OrderedStreamElementQueue@Internalpublic final class OrderedStreamElementQueue<OUT> implements StreamElementQueue<OUT> {private static final Logger LOG = LoggerFactory.getLogger(OrderedStreamElementQueue.class);/** Capacity of this queue. */private final int capacity;/** Queue for the inserted StreamElementQueueEntries. */private final Queue<StreamElementQueueEntry<OUT>> queue;public OrderedStreamElementQueue(int capacity) {Preconditions.checkArgument(capacity > 0, "The capacity must be larger than 0.");this.capacity = capacity;this.queue = new ArrayDeque<>(capacity);}@Overridepublic boolean hasCompletedElements() {return !queue.isEmpty() && queue.peek().isDone();}@Overridepublic void emitCompletedElement(TimestampedCollector<OUT> output) {if (hasCompletedElements()) {final StreamElementQueueEntry<OUT> head = queue.poll();head.emitResult(output);}}@Overridepublic List<StreamElement> values() {List<StreamElement> list = new ArrayList<>(this.queue.size());for (StreamElementQueueEntry e : queue) {list.add(e.getInputElement());}return list;}@Overridepublic boolean isEmpty() {return queue.isEmpty();}@Overridepublic int size() {return queue.size();}@Overridepublic Optional<ResultFuture<OUT>> tryPut(StreamElement streamElement) {if (queue.size() < capacity) {StreamElementQueueEntry<OUT> queueEntry = createEntry(streamElement);queue.add(queueEntry);LOG.debug("Put element into ordered stream element queue. New filling degree "+ "({}/{}).",queue.size(),capacity);return Optional.of(queueEntry);} else {LOG.debug("Failed to put element into ordered stream element queue because it "+ "was full ({}/{}).",queue.size(),capacity);return Optional.empty();}}private StreamElementQueueEntry<OUT> createEntry(StreamElement streamElement) {if (streamElement.isRecord()) {return new StreamRecordQueueEntry<>((StreamRecord<?>) streamElement);}if (streamElement.isWatermark()) {return new WatermarkQueueEntry<>((Watermark) streamElement);}throw new UnsupportedOperationException("Cannot enqueue " + streamElement);}}我们重点关注一下52行以下的部分,可以看到new了一个StreamElementQueueEntry对象放入了queue队列中 , 那就需要看一下StreamRecordQueueEntry类了
4.5、StreamRecordQueueEntry@Internalclass StreamRecordQueueEntry<OUT> implements StreamElementQueueEntry<OUT> {@Nonnull private final StreamRecord<?> inputRecord;private Collection<OUT> completedElements;StreamRecordQueueEntry(StreamRecord<?> inputRecord) {this.inputRecord = Preconditions.checkNotNull(inputRecord);}@Overridepublic boolean isDone() {return completedElements != null;}@Nonnull@Overridepublic StreamRecord<?> getInputElement() {return inputRecord;}@Overridepublic void emitResult(TimestampedCollector<OUT> output) {output.setTimestamp(inputRecord);for (OUT r : completedElements) {output.collect(r);}}@Overridepublic void complete(Collection<OUT> result) {this.completedElements = Preconditions.checkNotNull(result);}}

推荐阅读