软件定制开发供应商通过Spark读写Hudi

这个更全:

一、使用Hudi环境准备

1.软件定制开发供应商安装分布式文件系统:存储Hudi数据

    Hadoop 2.8.0
     软件定制开发供应商首次格式化:hdfs namenode -format
     ./hadoop-daemon.sh start namenode
   ./hadoop-daemon.sh start datanode
    测试:hdfs dfs -put README.md /datas/

2.软件定制开发供应商安装分布式计算引擎:操作Hudi表的数据,主要CRUD

    Spark 3.0.3
    配置conf/spark-env.sh
    启动bin/spark-shell --master local[2]
    测试:
        上传一文件:hdfs dfs -put README.md /datas/
  1. scala> val datasRDD = sc.textFile("/datas/README.md")
  2. datasRDD: org.apache.spark.rdd.RDD[String] = /datas/README.md MapPartitionsRDD[1] at textFile at <console>:24scala> datasRDD.count
  3. res0: Long = 103
  4. scala> val dataFrame = spark.read.textFile("/datas/README.md")
  5. dataFrame: org.apache.spark.sql.Dataset[String] = [value: string]
  6. scala> dataFrame.printSchema
  7. root
  8. |-- value: string (nullable = true)
  9. scala> dataFrame.show(10,false)
  10. +------------------------------------------------------------------------------+
  11. |value |
  12. +------------------------------------------------------------------------------+
  13. |# Apache Spark |
  14. | |
  15. |Spark is a fast and general cluster computing system for Big Data. It provides|
  16. |high-level APIs in Scala, Java, Python, and R, and an optimized engine that |
  17. |supports general computation graphs for data analysis. It also supports a |
  18. |rich set of higher-level tools including Spark SQL for SQL and DataFrames, |
  19. |MLlib for machine learning, GraphX for graph processing, |
  20. |and Spark Streaming for stream processing. |
  21. | |
  22. |<http://spark.apache.org/> |
  23. +------------------------------------------------------------------------------+
  24. only showing top 10 rows

3.Scala 2.11.11

二、数据通过spark写入hudi

在spark-shell命令行,对Hudi表数据进行操作,需要运行spark-shell命令时,添加相关依赖包
下载到 /Users/FengZhen/.ivy2/cache,在线下载,慢
./spark-shell --master local[2] --packages org.apache.hudi:hudi-spark3-bundle_2.12:0.10.0,org.apache.spark:spark-avro_2.12:3.0.3 --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer"
直接使用本地jar
./spark-shell --master local[2] --jars /Users/FengZhen/Desktop/Hadoop/spark/spark-3.0.3-bin-hadoop2.7/jars/spark-avro_2.12-3.0.3.jar,/Users/FengZhen/Desktop/Hadoop/hudi/hudi-0.10.1/packaging/hudi-spark-bundle/target/hudi-spark3.1.2-bundle_2.12-0.10.1.jar --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer"
保存数据至Hudi表及从Hudi表加载数据
  1. import org.apache.hudi.QuickstartUtils._
  2. import scala.collection.JavaConversions._
  3. import org.apache.spark.sql.SaveMode._
  4. import org.apache.hudi.DataSourceReadOptions._
  5. import org.apache.hudi.DataSourceWriteOptions._
  6. import org.apache.hudi.config.HoodieWriteConfig._
  7. val tableName = "hudi_trips_cow"
  8. val basePath = "hdfs://localhost:9000/datas/hudi-warehouse/hudi_trips_cow"
  9. val dataGen = new DataGenerator
  10. //准备数据
  11. val inserts = convertToStringList(dataGen.generateInserts(10))
  12. inserts: java.util.List[String] = [{"ts": 1643936410360, "uuid": "54782a44-5262-4cb5-9018-4b176bfcebfe", "rider": "rider-213", "driver": "driver-213", "begin_lat": 0.4726905879569653, "begin_lon": 0.46157858450465483, "end_lat": 0.754803407008858, "end_lon": 0.9671159942018241, "fare": 34.158284716382845, "partitionpath": "americas/brazil/sao_paulo"}, {"ts": 1643751609190, "uuid": "80d2c5af-cc88-4775-881c-ce00bc150710", "rider": "rider-213", "driver": "driver-213", "begin_lat": 0.6100070562136587, "begin_lon": 0.8779402295427752, "end_lat": 0.3407870505929602, "end_lon": 0.5030798142293655, "fare": 43.4923811219014, "partitionpath": "americas/brazil/sao_paulo"}, {"ts": 1643811519875, "uuid": "7cc20d76-37ef-4a8f-94bb-2318b38d261d", "rider": "rider-213", "driver"...
  13. //指定两个分区,读取为dataframe格式
  14. val df = spark.read.json(spark.sparkContext.parallelize(inserts,2))
  15. scala> df.printSchema
  16. root
  17. |-- begin_lat: double (nullable = true)
  18. |-- begin_lon: double (nullable = true)
  19. |-- driver: string (nullable = true)
  20. |-- end_lat: double (nullable = true)
  21. |-- end_lon: double (nullable = true)
  22. |-- fare: double (nullable = true)
  23. |-- partitionpath: string (nullable = true)
  24. |-- rider: string (nullable = true)
  25. |-- ts: long (nullable = true)
  26. |-- uuid: string (nullable = true)
查看数据
  1. scala> df.select("rider", "begin_lat", "begin_lon", "driver", "fare", "uuid", "ts").show(10,truncate=false)
  2. +---------+-------------------+-------------------+----------+------------------+------------------------------------+-------------+
  3. |rider |begin_lat |begin_lon |driver |fare |uuid |ts |
  4. +---------+-------------------+-------------------+----------+------------------+------------------------------------+-------------+
  5. |rider-213|0.4726905879569653 |0.46157858450465483|driver-213|34.158284716382845|54782a44-5262-4cb5-9018-4b176bfcebfe|1643936410360|
  6. |rider-213|0.6100070562136587 |0.8779402295427752 |driver-213|43.4923811219014 |80d2c5af-cc88-4775-881c-ce00bc150710|1643751609190|
  7. |rider-213|0.5731835407930634 |0.4923479652912024 |driver-213|64.27696295884016 |7cc20d76-37ef-4a8f-94bb-2318b38d261d|1643811519875|
  8. |rider-213|0.21624150367601136|0.14285051259466197|driver-213|93.56018115236618 |95c601e1-0d8a-4b4f-a441-9f7798bec7a2|1644303895565|
  9. |rider-213|0.40613510977307 |0.5644092139040959 |driver-213|17.851135255091155|8f2b5da1-4f26-447f-9bc5-bd4c9f795024|1643952973289|
  10. |rider-213|0.8742041526408587 |0.7528268153249502 |driver-213|19.179139106643607|427d34d4-0840-4e9f-b442-68bdbb1e62d3|1643744615179|
  11. |rider-213|0.1856488085068272 |0.9694586417848392 |driver-213|33.92216483948643 |bec76118-3f03-4e11-b7da-c1b9cda47832|1644200519234|
  12. |rider-213|0.0750588760043035 |0.03844104444445928|driver-213|66.62084366450246 |815de54a-f3bf-495c-aa22-2ec301f74610|1643946541239|
  13. |rider-213|0.651058505660742 |0.8192868687714224 |driver-213|41.06290929046368 |b05819f0-36c5-4d67-9771-af9c5a528104|1644149438138|
  14. |rider-213|0.11488393157088261|0.6273212202489661 |driver-213|27.79478688582596 |0995237e-9a58-4a9f-a58d-414f718087b1|1644250954765|
  15. +---------+-------------------+-------------------+----------+------------------+------------------------------------+-------------+
插入数据
df.write.mode(Overwrite).format("hudi").options(getQuickstartWriteConfigs).option(PRECOMBINE_FIELD_OPT_KEY, "ts").option(RECORDKEY_FIELD_OPT_KEY, "uuid").option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").option(TABLE_NAME, tableName).save(basePath)
写入解析
  1. df.write
  2. .mode(Overwrite) -- 覆盖
  3. .format("hudi”) --hudi格式
  4. .options(getQuickstartWriteConfigs)
  5. .option(PRECOMBINE_FIELD_OPT_KEY, "ts”)
  6. .option(RECORDKEY_FIELD_OPT_KEY, "uuid”)
  7. .option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath”)
  8. .option(TABLE_NAME, tableName)
  9. .save(basePath) --保存路径
getQuickstartWriteConfigs:设置写入/更新数据至hudi时,shuffle时分区数目
PRECOMBINE_FIELD_OPT_KEY:数据合并时,依据主键字段
RECORDKEY_FIELD_OPT_KEY:每条记录的唯一ID,支持多字段
PARTITIONPATH_FIELD_OPT_KEY:用于存放数据的分区字段
  1. scala> df.write.mode(Overwrite).format("hudi").options(getQuickstartWriteConfigs).option(PRECOMBINE_FIELD_OPT_KEY, "ts").option(RECORDKEY_FIELD_OPT_KEY, "uuid").option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").option(TABLE_NAME, tableName).save(basePath)
  2. warning: there was one deprecation warning; for details, enable `:setting -deprecation' or `:replay -deprecation'
  3. 22/02/08 17:13:13 WARN config.DFSPropertiesConfiguration: Cannot find HUDI_CONF_DIR, please set it as the dir of hudi-defaults.conf
  4. 22/02/08 17:13:13 WARN config.DFSPropertiesConfiguration: Properties file file:/etc/hudi/conf/hudi-defaults.conf not found. Ignoring to load props file
  5. 13:00 WARN: Timeline-server-based markers are not supported for HDFS: base path hdfs://localhost:9000/datas/hudi-warehouse/hudi_trips_cow. Falling back to direct markers.
  6. 13:02 WARN: Timeline-server-based markers are not supported for HDFS: base path hdfs://localhost:9000/datas/hudi-warehouse/hudi_trips_cow. Falling back to direct markers.
  7. 13:02 WARN: Timeline-server-based markers are not supported for HDFS: base path hdfs://localhost:9000/datas/hudi-warehouse/hudi_trips_cow. Falling back to direct markers.
  8. 13:03 WARN: Timeline-server-based markers are not supported for HDFS: base path hdfs://localhost:9000/datas/hudi-warehouse/hudi_trips_cow. Falling back to direct markers.
  9. 13:03 WARN: Timeline-server-based markers are not supported for HDFS: base path hdfs://localhost:9000/datas/hudi-warehouse/hudi_trips_cow. Falling back to direct markers.
HDFS上数据存储结构如下,可以发现hudi表数据存储在HDFS上,以PARQUET列式方式存储的,有三级分区
三、通过spark读取hudi数据
val tripsSnapsshotDF = spark.read.format("hudi").load(basePath + "/*/*/*/*")
其中指定Hudi表数据存储路径即可,采用正则Regex匹配方式,由于保存Hudi表属于分区表,并且为三级分区(相当于hive中表指定三个分区字段),使用表达式:/*/*/*/*加载所有数据
  1. scala> tripsSnapsshotDF.printSchema
  2. root
  3. |-- _hoodie_commit_time: string (nullable = true)
  4. |-- _hoodie_commit_seqno: string (nullable = true)
  5. |-- _hoodie_record_key: string (nullable = true)
  6. |-- _hoodie_partition_path: string (nullable = true)
  7. |-- _hoodie_file_name: string (nullable = true)
  8. |-- begin_lat: double (nullable = true)
  9. |-- begin_lon: double (nullable = true)
  10. |-- driver: string (nullable = true)
  11. |-- end_lat: double (nullable = true)
  12. |-- end_lon: double (nullable = true)
  13. |-- fare: double (nullable = true)
  14. |-- partitionpath: string (nullable = true)
  15. |-- rider: string (nullable = true)
  16. |-- ts: long (nullable = true)
  17. |-- uuid: string (nullable = true)
 字段解释:
_hoodie_commit_time:数据提交时间
_hoodie_commit_seqno:数据提交序列号
_hoodie_record_key:数据row key,对应上述数据的UUID
_hoodie_partition_path:数据分区存储路径
_hoodie_file_name:数据所在文件名称
将hudi表数据注册为临时视图,采用SQL方式查询分析数据
tripsSnapsshotDF.createOrReplaceTempView("hudi_trips_snapshot”)
查找乘车费用大于20的
  1. scala> spark.sql("SELECT fare, begin_lon, begin_lat, ts FROM hudi_trips_snapshot WHERE fare > 20.0").show()
  2. +------------------+-------------------+-------------------+-------------+
  3. | fare| begin_lon| begin_lat| ts|
  4. +------------------+-------------------+-------------------+-------------+
  5. | 33.92216483948643| 0.9694586417848392| 0.1856488085068272|1644200519234|
  6. | 93.56018115236618|0.14285051259466197|0.21624150367601136|1644303895565|
  7. | 64.27696295884016| 0.4923479652912024| 0.5731835407930634|1643811519875|
  8. | 27.79478688582596| 0.6273212202489661|0.11488393157088261|1644250954765|
  9. | 43.4923811219014| 0.8779402295427752| 0.6100070562136587|1643751609190|
  10. | 66.62084366450246|0.03844104444445928| 0.0750588760043035|1643946541239|
  11. |34.158284716382845|0.46157858450465483| 0.4726905879569653|1643936410360|
  12. | 41.06290929046368| 0.8192868687714224| 0.651058505660742|1644149438138|
  13. +------------------+-------------------+-------------------+-------------+
  14. scala> spark.sql("SELECT _hoodie_commit_time, _hoodie_commit_seqno, _hoodie_record_key, _hoodie_partition_path,_hoodie_file_name FROM hudi_trips_snapshot").show(false)
  15. +-------------------+----------------------+------------------------------------+------------------------------------+------------------------------------------------------------------------+
  16. |_hoodie_commit_time|_hoodie_commit_seqno |_hoodie_record_key |_hoodie_partition_path |_hoodie_file_name |
  17. +-------------------+----------------------+------------------------------------+------------------------------------+------------------------------------------------------------------------+
  18. |20220208171313818 |20220208171313818_1_4 |bec76118-3f03-4e11-b7da-c1b9cda47832|americas/united_states/san_francisco|f42b8b67-c841-49e0-8514-682a26df08de-0_1-35-34_20220208171313818.parquet|
  19. |20220208171313818 |20220208171313818_1_7 |95c601e1-0d8a-4b4f-a441-9f7798bec7a2|americas/united_states/san_francisco|f42b8b67-c841-49e0-8514-682a26df08de-0_1-35-34_20220208171313818.parquet|
  20. |20220208171313818 |20220208171313818_1_8 |7cc20d76-37ef-4a8f-94bb-2318b38d261d|americas/united_states/san_francisco|f42b8b67-c841-49e0-8514-682a26df08de-0_1-35-34_20220208171313818.parquet|
  21. |20220208171313818 |20220208171313818_1_9 |427d34d4-0840-4e9f-b442-68bdbb1e62d3|americas/united_states/san_francisco|f42b8b67-c841-49e0-8514-682a26df08de-0_1-35-34_20220208171313818.parquet|
  22. |20220208171313818 |20220208171313818_1_10|0995237e-9a58-4a9f-a58d-414f718087b1|americas/united_states/san_francisco|f42b8b67-c841-49e0-8514-682a26df08de-0_1-35-34_20220208171313818.parquet|
  23. |20220208171313818 |20220208171313818_0_1 |80d2c5af-cc88-4775-881c-ce00bc150710|americas/brazil/sao_paulo |31556e7e-4086-4be9-b3e4-951d32ec19de-0_0-29-33_20220208171313818.parquet|
  24. |20220208171313818 |20220208171313818_0_2 |815de54a-f3bf-495c-aa22-2ec301f74610|americas/brazil/sao_paulo |31556e7e-4086-4be9-b3e4-951d32ec19de-0_0-29-33_20220208171313818.parquet|
  25. |20220208171313818 |20220208171313818_0_3 |54782a44-5262-4cb5-9018-4b176bfcebfe|americas/brazil/sao_paulo |31556e7e-4086-4be9-b3e4-951d32ec19de-0_0-29-33_20220208171313818.parquet|
  26. |20220208171313818 |20220208171313818_2_5 |b05819f0-36c5-4d67-9771-af9c5a528104|asia/india/chennai |66ff0450-85af-4653-9da4-e26c4fbbc438-0_2-35-35_20220208171313818.parquet|
  27. |20220208171313818 |20220208171313818_2_6 |8f2b5da1-4f26-447f-9bc5-bd4c9f795024|asia/india/chennai |66ff0450-85af-4653-9da4-e26c4fbbc438-0_2-35-35_20220208171313818.parquet|
  28. +-------------------+----------------------+------------------------------------+------------------------------------+------------------------------------------------------------------------+
四、结论
Hudi如何管理数据?
使用table形式组织数据,并且每张表中数据类似hive分区表,按照分区字段划分数据到不同目录中,每条数据有主键PrimaryKey, 标识数据唯一性 
网站建设定制开发 软件系统开发定制 定制软件开发 软件开发定制 定制app开发 app开发定制 app开发定制公司 电商商城定制开发 定制小程序开发 定制开发小程序 客户管理系统开发定制 定制网站 定制开发 crm开发定制 开发公司 小程序开发定制 定制软件 收款定制开发 企业网站定制开发 定制化开发 android系统定制开发 定制小程序开发费用 定制设计 专注app软件定制开发 软件开发定制定制 知名网站建设定制 软件定制开发供应商 应用系统定制开发 软件系统定制开发 企业管理系统定制开发 系统定制开发