文章插图
@Overridepublic void processElement(StreamRecord<IN> record) throws Exception {StreamRecord<IN> element;// copy the element avoid the element is reusedif (isObjectReuseEnabled) {//noinspection uncheckedelement = (StreamRecord<IN>) inStreamElementSerializer.copy(record);} else {element = record;}// add element first to the queuefinal ResultFuture<OUT> entry = addToWorkQueue(element);final ResultHandler resultHandler = new ResultHandler(element, entry);// register a timeout for the entry if timeout is configuredif (timeout > 0L) {resultHandler.registerTimeout(getProcessingTimeService(), timeout);}userFunction.asyncInvoke(element.getValue(), resultHandler);}
很明显根据方法名称可以知道这里就是在处理真正的数据了 , 反复断点几次,可以发现,每一条数据都会进来这个方法一次文章插图
这个方法的参数就是source流里的一个元素 , 下面我们再看一下addToWorkQueue方法吧
/** 将给定的流元素添加到操作符的流元素队列中 。该操作会阻塞 , 直到元素被添加 。*/private ResultFuture<OUT> addToWorkQueue(StreamElement streamElement)throws InterruptedException {Optional<ResultFuture<OUT>> queueEntry;while (!(queueEntry = queue.tryPut(streamElement)).isPresent()) {mailboxExecutor.yield();}return queueEntry.get();}
这个方法就是将前面source里的元素放入前面new出来的队列 , 本例这里是一个有序的队列OrderedStreamElementQueue,并返回了一个ResultFuture对象,我们需要看一下这个对象是个啥4.3、ResultFuture【Flink的异步算子的原理及使用】
@PublicEvolvingpublic interface ResultFuture<OUT> {/*** 将所有结果放在Collection中,然后输出 。*/void complete(Collection<OUT> result);/*** 将异常输出*/void completeExceptionally(Throwable error);}
我们再来看下tryPut是如何包装出了一个ResultFuture对象的4.4、OrderedStreamElementQueue
@Internalpublic final class OrderedStreamElementQueue<OUT> implements StreamElementQueue<OUT> {private static final Logger LOG = LoggerFactory.getLogger(OrderedStreamElementQueue.class);/** Capacity of this queue. */private final int capacity;/** Queue for the inserted StreamElementQueueEntries. */private final Queue<StreamElementQueueEntry<OUT>> queue;public OrderedStreamElementQueue(int capacity) {Preconditions.checkArgument(capacity > 0, "The capacity must be larger than 0.");this.capacity = capacity;this.queue = new ArrayDeque<>(capacity);}@Overridepublic boolean hasCompletedElements() {return !queue.isEmpty() && queue.peek().isDone();}@Overridepublic void emitCompletedElement(TimestampedCollector<OUT> output) {if (hasCompletedElements()) {final StreamElementQueueEntry<OUT> head = queue.poll();head.emitResult(output);}}@Overridepublic List<StreamElement> values() {List<StreamElement> list = new ArrayList<>(this.queue.size());for (StreamElementQueueEntry e : queue) {list.add(e.getInputElement());}return list;}@Overridepublic boolean isEmpty() {return queue.isEmpty();}@Overridepublic int size() {return queue.size();}@Overridepublic Optional<ResultFuture<OUT>> tryPut(StreamElement streamElement) {if (queue.size() < capacity) {StreamElementQueueEntry<OUT> queueEntry = createEntry(streamElement);queue.add(queueEntry);LOG.debug("Put element into ordered stream element queue. New filling degree "+ "({}/{}).",queue.size(),capacity);return Optional.of(queueEntry);} else {LOG.debug("Failed to put element into ordered stream element queue because it "+ "was full ({}/{}).",queue.size(),capacity);return Optional.empty();}}private StreamElementQueueEntry<OUT> createEntry(StreamElement streamElement) {if (streamElement.isRecord()) {return new StreamRecordQueueEntry<>((StreamRecord<?>) streamElement);}if (streamElement.isWatermark()) {return new WatermarkQueueEntry<>((Watermark) streamElement);}throw new UnsupportedOperationException("Cannot enqueue " + streamElement);}}
我们重点关注一下52行以下的部分,可以看到new了一个StreamElementQueueEntry对象放入了queue队列中 , 那就需要看一下StreamRecordQueueEntry类了4.5、StreamRecordQueueEntry
@Internalclass StreamRecordQueueEntry<OUT> implements StreamElementQueueEntry<OUT> {@Nonnull private final StreamRecord<?> inputRecord;private Collection<OUT> completedElements;StreamRecordQueueEntry(StreamRecord<?> inputRecord) {this.inputRecord = Preconditions.checkNotNull(inputRecord);}@Overridepublic boolean isDone() {return completedElements != null;}@Nonnull@Overridepublic StreamRecord<?> getInputElement() {return inputRecord;}@Overridepublic void emitResult(TimestampedCollector<OUT> output) {output.setTimestamp(inputRecord);for (OUT r : completedElements) {output.collect(r);}}@Overridepublic void complete(Collection<OUT> result) {this.completedElements = Preconditions.checkNotNull(result);}}
推荐阅读
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- VLQ & Base64 VLQ 编码方式的原理及代码实现
- Spring Boot 配置 jar 包外面的 Properties 配置文件
- 【C++】从零开始的CS:GO逆向分析3——写出一个透视
- ddr5比ddr4强多少_ddr5和ddr4的差距
- ba拼音读法 拔草的拼音
- 去土耳其别再买洛神诗rosense玫瑰水了 很多国人都只知道土耳其粉色瓶的rosense玫瑰水好用
- 3ce口红畅销的颜色排名 复古魅力,你值得拥有
- 投诉淘宝商家最管用的方法(淘宝商家最怕什么)
- iphone x怎么截屏(苹果x截屏的三种方法)
- 苹果的nfc功能怎么用(苹果9有nfc功能吗)