下面通过一个单词统计的案例,快速上手应用 Flink,进行流处理(Streaming)和批处理(Batch)
单词统计(批处理)
- 引入依赖
<!--flink核心包--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.7.2</version></dependency><!--flink流处理包--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>1.7.2</version></dependency>
- 代码实现
public class WordCountBatch {public static void main(String[] args) throws Exception {String inputFile= "E:\\data\\word.txt";String outPutFile= "E:\\data\\wordResult.txt";ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();//1. 读取数据DataSource<String> dataSource = executionEnvironment.readTextFile(inputFile);//2. 对数据进行处理 , 转成word,1的格式FlatMapOperator<String, Tuple2<String, Integer>> flatMapOperator = dataSource.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {@Overridepublic void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {String[] words = s.split(" ");for (String word : words) {collector.collect(new Tuple2<>(word, 1));}}});//3. 对数据分组 , 相同word的一个组UnsortedGrouping<Tuple2<String, Integer>> tuple2UnsortedGrouping = flatMapOperator.groupBy(0);//4. 对分组后的数据求和AggregateOperator<Tuple2<String, Integer>> sum = tuple2UnsortedGrouping.sum(1);//5. 写出数据sum.writeAsCsv(outPutFile).setParallelism(1);//执行executionEnvironment.execute("wordcount batch process");}}
执行 main 方法 , 得出结果 。我测试的 word.txt 内容如下:ni hao hiwang mei meiliu meini haowo hen haothis is a good ideaApache Flink
【Flink WordCount入门】输出的文件结果:a,1mei,3Apache,1Flink,1good,1hen,1hi,1idea,1ni,2is,1liu,1this,1wo,1hao,3wang,1
单词统计(流数据)需求:Socket 模拟实时发送单词,使用 Flink 实时接收数据,对指定时间窗口内(如 5s)的数据进行聚合统计 , 每隔 1s 汇总计算一次,并且把时间窗口内计算结果打印出来public class WordCountStream {public static void main(String[] args) throws Exception {int port = 7000;StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> textStream = executionEnvironment.socketTextStream("192.168.56.103", port, "\n");SingleOutputStreamOperator<Tuple2<String, Integer>> tuple2SingleOutputStreamOperator = textStream.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {@Overridepublic void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {String[] split = s.split("\\s");for (String word : split) {collector.collect(Tuple2.of(word, 1));}}});SingleOutputStreamOperator<Tuple2<String, Integer>> word = tuple2SingleOutputStreamOperator.keyBy(0).timeWindow(Time.seconds(5),Time.seconds(1)).sum(1);word.print();executionEnvironment.execute("wordcount stream process");}}
运行起来之后,我们就可以开始发送 socket 请求过去 。我们测试可以使用 netcat 工具 。在 linux 上安装好后,使用下面的命令:nc -lk 7000
然后发送数据即可 。文章插图
文章插图
推荐阅读
- 一篇文章带你了解网页框架——Vue简单入门
- 绝地求生新手如何快速入门(绝地求生新手入门教学怎么跳过)
- 【C++】spdlog光速入门,C++logger最简单最快的库
- 小白转行入门STM32----手机蓝牙控制STM32单片机点亮LED
- Taurus.MVC 微服务框架 入门开发教程:项目部署:7、微服务节点的监控与告警。
- Flink的异步算子的原理及使用
- 麻将怎么玩(麻将怎么玩新手入门)
- 1 Python全栈工程师之从网页搭建入门到Flask全栈项目实战 - ES6标准入门和Flex布局
- 《基于Apache Flink的流处理》读书笔记
- flinksql读写redis