- 现在我们有一个DataFrame,它在一条记录中包含新旧数据,让我们在各自单独的DataFrame中拉取更新记录的活动和非活动实例 。
在进行上述练习时 , 我们将通过更改活动(新)记录的 eff_end_tsto eff_start_ts -1 并更新 actv_ind = 0 来废弃非活动记录
//Prepare Active updatesval updActiveDf = updDf.select(col("stg_seller_id").as("seller_id"),col("stg_prod_category").as("prod_category"),col("stg_product_name").as("product_name"),col("stg_product_package").as("product_package"),col("stg_discount_percentage").as("discount_percentage"),col("stg_eff_start_ts").as("eff_start_ts"),to_timestamp(lit("9999-12-31 23:59:59")) as ("eff_end_ts"),lit(1) as ("actv_ind"))updActiveDf.show(false)+---------+-------------+-----------------+---------------+-------------------+-------------------+-------------------+--------+|seller_id|prod_category|product_name |product_package|discount_percentage|eff_start_ts |eff_end_ts |actv_ind|+---------+-------------+-----------------+---------------+-------------------+-------------------+-------------------+--------+|1234 |Detergent |Tide 5L |6 |25 |2022-01-31 10:00:30|9999-12-31 23:59:59|1 ||4565 |Gourmet |Dairy Milk Almond|12 |45 |2022-06-12 20:30:40|9999-12-31 23:59:59|1 |+---------+-------------+-----------------+---------------+-------------------+-------------------+-------------------+--------+//Prepare inactive updates, which will become obsolete recordsval updInactiveDf = updDf.select(col("tgt_seller_id").as("seller_id"),col("tgt_prod_category").as("prod_category"),col("tgt_product_name").as("product_name"),col("tgt_product_package").as("product_package"),col("tgt_discount_percentage").as("discount_percentage"),col("tgt_eff_start_ts").as("eff_start_ts"),(col("stg_eff_start_ts") - expr("interval 1 seconds")).as("eff_end_ts"),lit(0) as ("actv_ind"))scala> updInactiveDf.show+---------+-------------+---------------+---------------+-------------------+-------------------+-------------------+--------+|seller_id|prod_category| product_name|product_package|discount_percentage| eff_start_ts| eff_end_ts|actv_ind|+---------+-------------+---------------+---------------+-------------------+-------------------+-------------------+--------+| 1234| Detergent| Tide 2L| 6| 15|2021-12-15 15:20:30|2022-01-31 10:00:29| 0|| 4565| Gourmet|Dairy Milk Silk| 6| 30|2021-06-12 20:30:40|2022-06-12 20:30:39| 0|+---------+-------------+---------------+---------------+-------------------+-------------------+-------------------+--------+
- 现在我们将使用union运算符将插入、活动更新和非活动更新拉入单个DataFrame 。将此DataFrame作为最终 Hudi 写入逻辑的增量源 。
scala> val upsertDf = insDf.union(updActiveDf).union(updInactiveDf)scala> upsertDf.show+---------+-------------+-----------------+---------------+-------------------+-------------------+-------------------+--------+|seller_id|prod_category| product_name|product_package|discount_percentage| eff_start_ts| eff_end_ts|actv_ind|+---------+-------------+-----------------+---------------+-------------------+-------------------+-------------------+--------+| 3345| Stationary| Sticky Notes| 4| 12|2022-07-09 21:30:45|9999-12-31 23:59:59| 1|| 4565| Gourmet|Dairy Milk Almond| 12| 45|2022-06-12 20:30:40|9999-12-31 23:59:59| 1|| 1234| Detergent| Tide 5L| 6| 25|2022-01-31 10:00:30|9999-12-31 23:59:59| 1|| 4565| Gourmet| Dairy Milk Silk| 6| 30|2021-06-12 20:30:40|2022-06-12 20:30:39| 0|| 1234| Detergent| Tide 2L| 6| 15|2021-12-15 15:20:30|2022-01-31 10:00:29| 0|+---------+-------------+-----------------+---------------+-------------------+-------------------+-------------------+--------+val path = "gs://target_bucket/hudi_product_catalog"upsertDf.write.format("org.apache.hudi").option(TABLE_TYPE_OPT_KEY, "COPY_ON_WRITE").option("hoodie.datasource.write.keygenerator.class","org.apache.hudi.keygen.ComplexKeyGenerator").option(RECORDKEY_FIELD_OPT_KEY, "seller_id,prod_category,eff_end_ts").option(PRECOMBINE_FIELD_OPT_KEY, "eff_start_ts").option("hoodie.table.name","hudi_product_catalog").option(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY, "target_schema").option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY, "hudi_product_catalog").option(OPERATION_OPT_KEY, UPSERT_OPERATION_OPT_VAL).option(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING_OPT_KEY, "true").option(PARTITIONPATH_FIELD_OPT_KEY, "actv_ind").mode(Append).save(s"$path")scala> spark.sql("refresh table stg_wmt_ww_fin_rtn_mb_dl_secure.hudi_product_catalog")scala> spark.sql("select * from stg_wmt_ww_fin_rtn_mb_dl_secure.hudi_product_catalog").show(false)+-------------------+---------------------+-------------------------------------------------------------------------+----------------------+--------------------------------------------------------------------------+---------+--------------+-----------------+---------------+-------------------+-------------------+-------------------+--------+|_hoodie_commit_time|_hoodie_commit_seqno |_hoodie_record_key |_hoodie_partition_path|_hoodie_file_name |seller_id|prod_category |product_name |product_package|discount_percentage|eff_start_ts |eff_end_ts |actv_ind|+-------------------+---------------------+-------------------------------------------------------------------------+----------------------+--------------------------------------------------------------------------+---------+--------------+-----------------+---------------+-------------------+-------------------+-------------------+--------+|20220722113258101 |20220722113258101_0_0|seller_id:3412,prod_category:Healthcare,eff_end_ts:253402300799000000 |actv_ind=1 |a94c9c58-ac6b-4841-a734-8ef1580e2547-0_0-72-2451_20220722114049500.parquet|3412 |Healthcare |Dolo 650 |10 |10 |2022-04-01 16:30:45|9999-12-31 23:59:59|1 ||20220722113258101 |20220722113258101_0_1|seller_id:1234,prod_category:Home Essential,eff_end_ts:253402300799000000|actv_ind=1 |a94c9c58-ac6b-4841-a734-8ef1580e2547-0_0-72-2451_20220722114049500.parquet|1234 |Home Essential|Hand Towel |12 |20 |2021-10-20 06:55:22|9999-12-31 23:59:59|1 ||20220722114049500 |20220722114049500_0_2|seller_id:4565,prod_category:Gourmet,eff_end_ts:253402300799000000 |actv_ind=1 |a94c9c58-ac6b-4841-a734-8ef1580e2547-0_0-72-2451_20220722114049500.parquet|4565 |Gourmet |Dairy Milk Almond|12 |45 |2022-06-12 20:30:40|9999-12-31 23:59:59|1 ||20220722114049500 |20220722114049500_0_3|seller_id:1234,prod_category:Detergent,eff_end_ts:253402300799000000 |actv_ind=1 |a94c9c58-ac6b-4841-a734-8ef1580e2547-0_0-72-2451_20220722114049500.parquet|1234 |Detergent |Tide 5L |6 |25 |2022-01-31 10:00:30|9999-12-31 23:59:59|1 ||20220722114049500 |20220722114049500_0_4|seller_id:3345,prod_category:Stationary,eff_end_ts:253402300799000000 |actv_ind=1 |a94c9c58-ac6b-4841-a734-8ef1580e2547-0_0-72-2451_20220722114049500.parquet|3345 |Stationary |Sticky Notes |4 |12 |2022-07-09 21:30:45|9999-12-31 23:59:59|1 ||20220722114049500 |20220722114049500_1_0|seller_id:4565,prod_category:Gourmet,eff_end_ts:1655065839000000 |actv_ind=0 |789e0317-d499-4d74-a5d9-ad6e6517d6b8-0_1-72-2452_20220722114049500.parquet|4565 |Gourmet |Dairy Milk Silk |6 |30 |2021-06-12 20:30:40|2022-06-12 20:30:39|0 ||20220722114049500 |20220722114049500_1_1|seller_id:1234,prod_category:Detergent,eff_end_ts:1643623229000000 |actv_ind=0 |789e0317-d499-4d74-a5d9-ad6e6517d6b8-0_1-72-2452_20220722114049500.parquet|1234 |Detergent |Tide 2L |6 |15 |2021-12-15 15:20:30|2022-01-31 10:00:29|0 |+-------------------+---------------------+-------------------------------------------------------------------------+----------------------+--------------------------------------------------------------------------+---------+--------------+-----------------+---------------+-------------------+-------------------+-------------------+--------+
推荐阅读
-
-
8朵玫瑰花语每朵代表什么 玫瑰花语每朵代表什么意思
-
-
奥迪a5是cvt变速箱吗? 奥迪A5变速箱是什么牌子
-
微信朋友圈评论怎么统一回复比较好 微信朋友圈评论怎么统一回复
-
-
-
-
-
-
-
-
毛豆可以和胡萝卜一起煮吗 毛豆不能和什么东西一起吃
-
-
-
-
-
南昌附近的油菜花基地 2023南昌县油菜花观赏地点推荐
-
国医大师朱良春养生经 每天一碗长寿粥 朱良春长寿粥
-