二、TableAPI读取文件使用TableAPI读取文件时,我们首先需要知道去哪里读取也就是文件路径、读取文件的格式、读取出来的数据的结构也就是结果表的表结构及表名
package net.cyan.FlinkSql; ? import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; ? import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.descriptors.Csv; import org.apache.flink.table.descriptors.FileSystem; import org.apache.flink.table.descriptors.Schema; import org.apache.flink.table.types.DataType; ? import static org.apache.flink.table.api.Expressions.$; ? public class Demo2_readWriteText { public static void main(String[] args) { //创建执行环境 // Configuration configuration = new Configuration(); // configuration.setInteger("rest.port", 3333); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); StreamTableEnvironment talEnv = StreamTableEnvironment.create(env); //创建查询的数据结果封装类型 Schema schema = new Schema() .field("id", DataTypes.STRING()) .field("ts", DataTypes.BIGINT()) .field("vc", DataTypes.INT()); talEnv .connect(new FileSystem().path("input/sensor.txt")) //读取文件路径 .withFormat(new Csv()) //读取文件的数据格式 .withSchema(schema) //读取出来的数据格式 .createTemporaryTable("sensor");//定义结果表名 ? //进行查询 Table select = talEnv.from("sensor") .where($("id").isEqual("sensor_1")) .select($("id"), $("ts"), $("vc")); ? ? //将查询结果写入到新文件中 //同样建立一个动态表连接 talEnv .connect(new FileSystem().path("input/b.txt")) //写入路径 .withFormat(new Csv()) //写入文件的数据格式 .withSchema(schema) //写入的数据格式 .createTemporaryTable("abc");//定义写入表名 //进行写入操作 ? select.executeInsert("abc"); ? // try { // //启动执行环境 // env.execute(); // } catch (Exception e) { // e.printStackTrace(); // } ? } }三、TableAPI 读取、写入Kakfa基本流程
1>需要创建表的运行环境
//创建表的运行环境 StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env);2>创建查询出的数据写出结构
//创建表结构 Schema schema=new Schema() .field("id",DataTypes.STRING()) .field("ts",DataTypes.BIGINT()) .field("vc",DataTypes.INT());3> 创建kafka连接
//创建kafka连接 tabEnv.connect( new Kafka() .version("universal")// 版本号 .property("bootstrap.servers","hadoop102:9092")//地址 .property("group.id","cy")//消费者组 .topic("first")//消费主题 ? ) .withFormat(new Json())//写入的格式 .withSchema(schema) .createTemporaryTable("a");//临时表4> 进行查询
//创建表 Table select = tabEnv.from("a").select("*");5> 创建写入kafka连接
//创建写入主题 tabEnv.connect( new Kafka() .version("universal")// 版本号 .property("bootstrap.servers","hadoop102:9092")//地址 .topic("first1")//消费主题 .sinkPartitionerRoundRobin()//随机分区 ? ) .withFormat(new Json())//写入的格式 .withSchema(schema) .createTemporaryTable("c");
推荐阅读
- 原神机械之心任务的完成方法是什么
- 四 【单片机入门】应用层软件开发的单片机学习之路-----ESP32开发板PWM控制电机以及中断的使用
- 原神坎蒂丝命之座效果是什么
- flutter 系列之:flutter 中的幽灵offstage
- vulnhub靶场之ICA: 1
- 云原生之旅 - 6)不能错过的一款 Kubernetes 应用编排管理神器 Kustomize
- 原神智中之宝新计划任务是什么
- 上古四大凶兽之一的梼杌有什么来历它还有别的名字吗
- 之六 2流高手速成记:从SpringBoot到SpringCloudAlibaba
- 真正“搞”懂HTTP协议02之空间穿梭