FlinkSql之TableAPI详解( 二 )

二、TableAPI读取文件使用TableAPI读取文件时,我们首先需要知道去哪里读取也就是文件路径、读取文件的格式、读取出来的数据的结构也就是结果表的表结构及表名
 package net.cyan.FlinkSql; ? import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; ? import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.descriptors.Csv; import org.apache.flink.table.descriptors.FileSystem; import org.apache.flink.table.descriptors.Schema; import org.apache.flink.table.types.DataType; ? import static org.apache.flink.table.api.Expressions.$; ? public class Demo2_readWriteText {     public static void main(String[] args) {         //创建执行环境 //      Configuration configuration = new Configuration(); //      configuration.setInteger("rest.port", 3333);         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();         env.setParallelism(1);         StreamTableEnvironment talEnv = StreamTableEnvironment.create(env);         //创建查询的数据结果封装类型         Schema schema = new Schema()               .field("id", DataTypes.STRING())               .field("ts", DataTypes.BIGINT())               .field("vc", DataTypes.INT());         talEnv               .connect(new FileSystem().path("input/sensor.txt"))  //读取文件路径               .withFormat(new Csv()) //读取文件的数据格式               .withSchema(schema) //读取出来的数据格式               .createTemporaryTable("sensor");//定义结果表名 ?         //进行查询         Table select = talEnv.from("sensor")               .where($("id").isEqual("sensor_1"))               .select($("id"), $("ts"), $("vc")); ? ?         //将查询结果写入到新文件中         //同样建立一个动态表连接         talEnv               .connect(new FileSystem().path("input/b.txt"))  //写入路径               .withFormat(new Csv()) //写入文件的数据格式               .withSchema(schema) //写入的数据格式               .createTemporaryTable("abc");//定义写入表名         //进行写入操作 ?         select.executeInsert("abc"); ? //      try { //          //启动执行环境 //          env.execute(); //      } catch (Exception e) { //          e.printStackTrace(); //      } ?   } }三、TableAPI 读取、写入Kakfa基本流程
1>需要创建表的运行环境
 //创建表的运行环境 StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env);2>创建查询出的数据写出结构
 //创建表结构 Schema schema=new Schema()       .field("id",DataTypes.STRING())       .field("ts",DataTypes.BIGINT())       .field("vc",DataTypes.INT());3> 创建kafka连接
 //创建kafka连接 tabEnv.connect(         new Kafka()       .version("universal")// 版本号       .property("bootstrap.servers","hadoop102:9092")//地址       .property("group.id","cy")//消费者组       .topic("first")//消费主题 ?  )       .withFormat(new Json())//写入的格式       .withSchema(schema)       .createTemporaryTable("a");//临时表4> 进行查询
 //创建表 Table select = tabEnv.from("a").select("*");5> 创建写入kafka连接
 //创建写入主题 tabEnv.connect(         new Kafka()               .version("universal")// 版本号               .property("bootstrap.servers","hadoop102:9092")//地址               .topic("first1")//消费主题               .sinkPartitionerRoundRobin()//随机分区 ? )       .withFormat(new Json())//写入的格式       .withSchema(schema)       .createTemporaryTable("c");

推荐阅读