文章插图
准备一个 data2 文件
cp data1 data2 && vi data2
data2 的数据更新为{'uid':1,'uname':'grey1','dt':'2022/11'}{'uid':2,'uname':'tony1','dt':'2022/12'}
然后执行hdfs dfs -put data2 /mydata/
更新数据的代码,我们可以做如下调整,完整代码如下package git.snippet.testimport git.snippet.entity.MyEntityimport git.snippet.util.JsonUtilimport org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions}import org.apache.spark.SparkConfimport org.apache.spark.sql.{SaveMode, SparkSession}object DataUpdate {def main(args: Array[String]): Unit = {System.setProperty("HADOOP_USER_NAME", "root")val sparkConf = new SparkConf().setAppName("MyFirstDataApp").set("spark.serializer", "org.apache.spark.serializer.KryoSerializer").setMaster("local[*]")val sparkSession = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()val ssc = sparkSession.sparkContextssc.hadoopConfiguration.set("dfs.client.use.datanode.hostname", "true")updateData(sparkSession)}def updateData(sparkSession: SparkSession) = {import org.apache.spark.sql.functions._import sparkSession.implicits._val commitTime = System.currentTimeMillis().toString //生成提交时间val df = sparkSession.read.text("/mydata/data2").mapPartitions(partitions => {partitions.map(item => {val jsonObject = JsonUtil.getJsonData(item.getString(0))MyEntity(jsonObject.getIntValue("uid"), jsonObject.getString("uname"), jsonObject.getString("dt"))})})val result = df.withColumn("ts", lit(commitTime)) //添加ts 时间戳列.withColumn("uuid", col("uid")) //添加uuid 列.withColumn("hudipart", col("dt")) //增加hudi分区列result.write.format("org.apache.hudi")//.option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL).option("hoodie.insert.shuffle.parallelism", 2).option("hoodie.upsert.shuffle.parallelism", 2).option("PRECOMBINE_FIELD_OPT_KEY", "ts") //指定提交时间列.option("RECORDKEY_FIELD_OPT_KEY", "uuid") //指定uuid唯一标示列.option("hoodie.table.name", "myDataTable").option("hoodie.datasource.write.partitionpath.field", "hudipart") //分区列.mode(SaveMode.Append).save("/snippet/data/hudi")}}
执行更新数据的代码 。验证一下,访问:http://192.168.100.130:50070/explorer.html#/snippet/data/hudi/2022
可以查看到更新的数据情况
文章插图
数据查询的代码也很简单,完整代码如下
package git.snippet.testimport org.apache.spark.SparkConfimport org.apache.spark.sql.SparkSessionobject DataQuery {def main(args: Array[String]): Unit = {System.setProperty("HADOOP_USER_NAME", "root")val sparkConf = new SparkConf().setAppName("MyFirstDataApp").set("spark.serializer", "org.apache.spark.serializer.KryoSerializer").setMaster("local[*]")val sparkSession = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()val ssc = sparkSession.sparkContextssc.hadoopConfiguration.set("dfs.client.use.datanode.hostname", "true")queryData(sparkSession)}def queryData(sparkSession: SparkSession) = {val df = sparkSession.read.format("org.apache.hudi").load("/snippet/data/hudi/*/*")df.show()println(df.count())}}
执行 , 输出以下信息,验证成功 。文章插图
数据查询也支持很多查询条件 , 比如增量查询,按时间段查询等 。
接下来是 flink 实时数据分析的服务,首先需要在 master 上启动 kafka , 并创建 一个名字为 mytopic 的 topic,详见Linux 下搭建 Kafka 环境
相关命令如下
创建topic
kafka-topics.sh --zookeeper 127.0.0.1:2181 --replication-factor 1 --partitions 1 --create --topicmytopic
生产者启动配置kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic mytopic
消费者启动配置kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic mytopic
然后运行如下代码package git.snippet.analyzer;import org.apache.flink.api.common.serialization.SimpleStringSchema;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;import java.util.Properties;public class DataAnalyzer {public static void main(String[] args) {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();Properties properties = new Properties();properties.setProperty("bootstrap.servers", "192.168.100.130:9092");properties.setProperty("group.id", "snippet");//构建FlinkKafkaConsumerFlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>("mytopic", new SimpleStringSchema(), properties);//指定偏移量myConsumer.setStartFromLatest();final DataStream<String> stream = env.addSource(myConsumer);env.enableCheckpointing(5000);stream.print();try {env.execute("DataAnalyzer");} catch (Exception e) {e.printStackTrace();}}}
推荐阅读
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- 重生细胞怎么获得分叉钥匙
- 我的世界经验9999999的指令是什么
- 国字繁体多少笔画-国字繁体字怎么写的
- ipad pro2021 11寸和12.9寸的区别_哪个好
- 真正的解决英雄联盟fps值低的问题(5600g玩英雄联盟fps低)
- 消除两个inline-block元素之间的间隔
- MISC 网刃杯2022
- 离谱的汉字消除成语简单2怎么过关
- 离谱的汉字消除成语困难3怎么过
- 原神赤沙石板特殊的权能任务怎么解密