渐变维度 使用 Apache Hudi 实现 SCD-2( 四 )

  1. 现在我们有一个DataFrame,它在一条记录中包含新旧数据,让我们在各自单独的DataFrame中拉取更新记录的活动和非活动实例 。


在进行上述练习时 , 我们将通过更改活动(新)记录的 eff_end_tsto eff_start_ts -1 并更新 actv_ind = 0 来废弃非活动记录
//Prepare Active updatesval updActiveDf = updDf.select(col("stg_seller_id").as("seller_id"),col("stg_prod_category").as("prod_category"),col("stg_product_name").as("product_name"),col("stg_product_package").as("product_package"),col("stg_discount_percentage").as("discount_percentage"),col("stg_eff_start_ts").as("eff_start_ts"),to_timestamp(lit("9999-12-31 23:59:59")) as ("eff_end_ts"),lit(1) as ("actv_ind"))updActiveDf.show(false)+---------+-------------+-----------------+---------------+-------------------+-------------------+-------------------+--------+|seller_id|prod_category|product_name     |product_package|discount_percentage|eff_start_ts       |eff_end_ts         |actv_ind|+---------+-------------+-----------------+---------------+-------------------+-------------------+-------------------+--------+|1234     |Detergent    |Tide 5L          |6              |25                 |2022-01-31 10:00:30|9999-12-31 23:59:59|1       ||4565     |Gourmet      |Dairy Milk Almond|12             |45                 |2022-06-12 20:30:40|9999-12-31 23:59:59|1       |+---------+-------------+-----------------+---------------+-------------------+-------------------+-------------------+--------+//Prepare inactive updates, which will become obsolete recordsval updInactiveDf = updDf.select(col("tgt_seller_id").as("seller_id"),col("tgt_prod_category").as("prod_category"),col("tgt_product_name").as("product_name"),col("tgt_product_package").as("product_package"),col("tgt_discount_percentage").as("discount_percentage"),col("tgt_eff_start_ts").as("eff_start_ts"),(col("stg_eff_start_ts") - expr("interval 1 seconds")).as("eff_end_ts"),lit(0) as ("actv_ind"))scala> updInactiveDf.show+---------+-------------+---------------+---------------+-------------------+-------------------+-------------------+--------+|seller_id|prod_category|   product_name|product_package|discount_percentage|       eff_start_ts|         eff_end_ts|actv_ind|+---------+-------------+---------------+---------------+-------------------+-------------------+-------------------+--------+|     1234|    Detergent|        Tide 2L|              6|                 15|2021-12-15 15:20:30|2022-01-31 10:00:29|       0||     4565|      Gourmet|Dairy Milk Silk|              6|                 30|2021-06-12 20:30:40|2022-06-12 20:30:39|       0|+---------+-------------+---------------+---------------+-------------------+-------------------+-------------------+--------+
  1. 现在我们将使用union运算符将插入、活动更新和非活动更新拉入单个DataFrame 。将此DataFrame作为最终 Hudi 写入逻辑的增量源 。
scala> val upsertDf = insDf.union(updActiveDf).union(updInactiveDf)scala> upsertDf.show+---------+-------------+-----------------+---------------+-------------------+-------------------+-------------------+--------+|seller_id|prod_category|     product_name|product_package|discount_percentage|       eff_start_ts|         eff_end_ts|actv_ind|+---------+-------------+-----------------+---------------+-------------------+-------------------+-------------------+--------+|     3345|   Stationary|     Sticky Notes|              4|                 12|2022-07-09 21:30:45|9999-12-31 23:59:59|       1||     4565|      Gourmet|Dairy Milk Almond|             12|                 45|2022-06-12 20:30:40|9999-12-31 23:59:59|       1||     1234|    Detergent|          Tide 5L|              6|                 25|2022-01-31 10:00:30|9999-12-31 23:59:59|       1||     4565|      Gourmet|  Dairy Milk Silk|              6|                 30|2021-06-12 20:30:40|2022-06-12 20:30:39|       0||     1234|    Detergent|          Tide 2L|              6|                 15|2021-12-15 15:20:30|2022-01-31 10:00:29|       0|+---------+-------------+-----------------+---------------+-------------------+-------------------+-------------------+--------+val path = "gs://target_bucket/hudi_product_catalog"upsertDf.write.format("org.apache.hudi").option(TABLE_TYPE_OPT_KEY, "COPY_ON_WRITE").option("hoodie.datasource.write.keygenerator.class","org.apache.hudi.keygen.ComplexKeyGenerator").option(RECORDKEY_FIELD_OPT_KEY, "seller_id,prod_category,eff_end_ts").option(PRECOMBINE_FIELD_OPT_KEY, "eff_start_ts").option("hoodie.table.name","hudi_product_catalog").option(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY, "target_schema").option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY, "hudi_product_catalog").option(OPERATION_OPT_KEY, UPSERT_OPERATION_OPT_VAL).option(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING_OPT_KEY, "true").option(PARTITIONPATH_FIELD_OPT_KEY, "actv_ind").mode(Append).save(s"$path")scala> spark.sql("refresh table stg_wmt_ww_fin_rtn_mb_dl_secure.hudi_product_catalog")scala> spark.sql("select * from stg_wmt_ww_fin_rtn_mb_dl_secure.hudi_product_catalog").show(false)+-------------------+---------------------+-------------------------------------------------------------------------+----------------------+--------------------------------------------------------------------------+---------+--------------+-----------------+---------------+-------------------+-------------------+-------------------+--------+|_hoodie_commit_time|_hoodie_commit_seqno |_hoodie_record_key                                                       |_hoodie_partition_path|_hoodie_file_name                                                         |seller_id|prod_category |product_name     |product_package|discount_percentage|eff_start_ts       |eff_end_ts         |actv_ind|+-------------------+---------------------+-------------------------------------------------------------------------+----------------------+--------------------------------------------------------------------------+---------+--------------+-----------------+---------------+-------------------+-------------------+-------------------+--------+|20220722113258101  |20220722113258101_0_0|seller_id:3412,prod_category:Healthcare,eff_end_ts:253402300799000000    |actv_ind=1            |a94c9c58-ac6b-4841-a734-8ef1580e2547-0_0-72-2451_20220722114049500.parquet|3412     |Healthcare    |Dolo 650         |10             |10                 |2022-04-01 16:30:45|9999-12-31 23:59:59|1       ||20220722113258101  |20220722113258101_0_1|seller_id:1234,prod_category:Home Essential,eff_end_ts:253402300799000000|actv_ind=1            |a94c9c58-ac6b-4841-a734-8ef1580e2547-0_0-72-2451_20220722114049500.parquet|1234     |Home Essential|Hand Towel       |12             |20                 |2021-10-20 06:55:22|9999-12-31 23:59:59|1       ||20220722114049500  |20220722114049500_0_2|seller_id:4565,prod_category:Gourmet,eff_end_ts:253402300799000000       |actv_ind=1            |a94c9c58-ac6b-4841-a734-8ef1580e2547-0_0-72-2451_20220722114049500.parquet|4565     |Gourmet       |Dairy Milk Almond|12             |45                 |2022-06-12 20:30:40|9999-12-31 23:59:59|1       ||20220722114049500  |20220722114049500_0_3|seller_id:1234,prod_category:Detergent,eff_end_ts:253402300799000000     |actv_ind=1            |a94c9c58-ac6b-4841-a734-8ef1580e2547-0_0-72-2451_20220722114049500.parquet|1234     |Detergent     |Tide 5L          |6              |25                 |2022-01-31 10:00:30|9999-12-31 23:59:59|1       ||20220722114049500  |20220722114049500_0_4|seller_id:3345,prod_category:Stationary,eff_end_ts:253402300799000000    |actv_ind=1            |a94c9c58-ac6b-4841-a734-8ef1580e2547-0_0-72-2451_20220722114049500.parquet|3345     |Stationary    |Sticky Notes     |4              |12                 |2022-07-09 21:30:45|9999-12-31 23:59:59|1       ||20220722114049500  |20220722114049500_1_0|seller_id:4565,prod_category:Gourmet,eff_end_ts:1655065839000000         |actv_ind=0            |789e0317-d499-4d74-a5d9-ad6e6517d6b8-0_1-72-2452_20220722114049500.parquet|4565     |Gourmet       |Dairy Milk Silk  |6              |30                 |2021-06-12 20:30:40|2022-06-12 20:30:39|0       ||20220722114049500  |20220722114049500_1_1|seller_id:1234,prod_category:Detergent,eff_end_ts:1643623229000000       |actv_ind=0            |789e0317-d499-4d74-a5d9-ad6e6517d6b8-0_1-72-2452_20220722114049500.parquet|1234     |Detergent     |Tide 2L          |6              |15                 |2021-12-15 15:20:30|2022-01-31 10:00:29|0       |+-------------------+---------------------+-------------------------------------------------------------------------+----------------------+--------------------------------------------------------------------------+---------+--------------+-----------------+---------------+-------------------+-------------------+-------------------+--------+

推荐阅读