6> 写入
//写入 select.executeInsert("c");完整代码如下
package net.cyan.FlinkSql; ? import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.descriptors.Json; import org.apache.flink.table.descriptors.Kafka; import org.apache.flink.table.descriptors.Schema; ? public class Demo5_readWriteKafka { public static void main(String[] args) { //创建执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //创建表的运行环境 StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env); //创建表结构 Schema schema=new Schema() .field("id",DataTypes.STRING()) .field("ts",DataTypes.BIGINT()) .field("vc",DataTypes.INT()); //创建kafka连接 tabEnv.connect( new Kafka() .version("universal")// 版本号 .property("bootstrap.servers","hadoop102:9092")//地址 .property("group.id","cy")//消费者组 .topic("first")//消费主题 ? ) .withFormat(new Json())//写入的格式 .withSchema(schema) .createTemporaryTable("a"); //创建表 Table select = tabEnv.from("a").select("*"); //创建写入主题 tabEnv.connect( new Kafka() .version("universal")// 版本号 .property("bootstrap.servers","hadoop102:9092")//地址 .topic("first1")//消费主题 .sinkPartitionerRoundRobin()//随即分区 ? ) .withFormat(new Json())//写入的格式 .withSchema(schema) .createTemporaryTable("c"); ? //写入 select.executeInsert("c"); ? ? } } 【FlinkSql之TableAPI详解】
推荐阅读
- 原神机械之心任务的完成方法是什么
- 四 【单片机入门】应用层软件开发的单片机学习之路-----ESP32开发板PWM控制电机以及中断的使用
- 原神坎蒂丝命之座效果是什么
- flutter 系列之:flutter 中的幽灵offstage
- vulnhub靶场之ICA: 1
- 云原生之旅 - 6)不能错过的一款 Kubernetes 应用编排管理神器 Kustomize
- 原神智中之宝新计划任务是什么
- 上古四大凶兽之一的梼杌有什么来历它还有别的名字吗
- 之六 2流高手速成记:从SpringBoot到SpringCloudAlibaba
- 真正“搞”懂HTTP协议02之空间穿梭