crm开发定制Flink Stream案例——使用Stream API 和Window API 对自定义数据流(循环生成1000条)进行处理并将数据存储于MySQL数据库中

目录

案例介绍

示例:
自定义Source,模拟1000crm开发定制条订单数据,crm开发定制每条数据间隔1秒钟(订单编号ID、用户编号ID、商品编号ID、消费金额、消费时间 )

要求:

  • crm开发定制随机生成订单编号ID(UUID)
  • crm开发定制随机生成用户编号ID(user1-user10)
  • crm开发定制随机生成商品编号ID(goods1-goods20)
  • crm开发定制随机生成消费金额(100-1000)
  • crm开发定制消费时间为当前系统时间

统计:

  1. 每30秒钟,crm开发定制统计一次各用户的最大消费订单信息,将结果写入MySQL;
  2. 统计30秒内,各用户的消费总额和订单数量,该数据每10秒更新一次,将结果打印输出;
  3. 统计30秒内,各商品的销售数量,该数据每10秒更新一次, 将结果打印输出。

开发步骤

  1. 创建订单样例类;

  2. 获取流处理环境;

  3. 创建自定义数据源;

        (1)循环1000次;    (2)随机构建订单信息;    (3)上下文收集数据;    (4)每隔一秒执行一次循环;
    • 1
    • 2
    • 3
    • 4
  4. 处理数据;

  5. 打印数据;

  6. 写入MySQL;

  7. 执行任务。


具体代码

定义样例类

 //样例类  case class  Order(itemId: String, userId: String, goodsId: String, price: Int, createTime: Long)
  • 1
  • 2
  • 3

和Source流程

//TODO:1.environmentval env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)//TODO:2.source,生产随机订单数据val sourceDStream: DataStream[Order] = env.addSource(new OrderSourceFunction)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

自定义随机生成订单类OrderSourceFunction

  //自定义随机生成订单类  class OrderSourceFunction extends RichSourceFunction[Order] {    //定义变量    private var count: Long = 0L    private var isRunning: Boolean = true    override def run(sourceContext: SourceFunction.SourceContext[Order]): Unit = {      // 使用while循环生成1000个订单(订单编号ID、用户编号ID、商品编号ID、消费金额、消费时间 )      while (isRunning && count < 1000) {        // 随机生成订单ID(UUID)        val itemId: String = UUID.randomUUID().toString        // 随机生成用户编号ID(user1-user10)        val userID: String = "user" + (Random.nextInt(10) + 1)        // 随机生成商品编号ID(goods1-goods20)        val goodsID: String = "goods" + (Random.nextInt(20) + 1)        // 随机生成消费金额(100~1000)        val price = Random.nextInt(900) + 101        // 消费时间为当前系统时间        val createTime: Long = System.currentTimeMillis()        //        println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date(createTime)))        // 收集数据        sourceContext.collect(Order(itemId, userID, goodsID, price, createTime))        // 统计生成的订单数        count += 1        // 每隔1秒生成一个订单        TimeUnit.SECONDS.sleep(1)        //        TimeUnit.MILLISECONDS.sleep(1)      }    }    override def cancel(): Unit = {      isRunning = false    }  }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36

Transformation流程,实现案例的需求

//TODO:3.transformation,对数据进行窗口统计,并将有些数据写入MySQl中 (1)每30秒钟,统计一次各用户的最大消费订单信息,将结果写入MySQL;// Order(7485091d-341c-4498-b00b-4cfca479de79,user6,goods9,815,1634218271508)val resultDStream01: DataStream[Order] = sourceDStream  .assignAscendingTimestamps(_.createTime)  .keyBy(_.userId)  .window(TumblingEventTimeWindows.of(Time.seconds(30)))  .maxBy("price") (2)统计30秒内,各用户的消费总额和订单数量,该数据每10秒更新一次,将结果打印输出;// (user9,3440,4)val resultDStream02: DataStream[(String, Int, Int)] = sourceDStream  .assignAscendingTimestamps(_.createTime)  .map(Data => (Data.userId, Data.price, 1))  .keyBy(_._1)  .window(SlidingEventTimeWindows.of(Time.seconds(30), Time.seconds(19)))  .reduce((preData, curData) => (preData._1, preData._2 + curData._2, preData._3 + curData._3)) (3)统计30秒内,各商品的销售数量,该数据每10秒更新一次, 将结果打印输出。// (goods10,2)val resultDStream03: DataStream[(String, Int)] = sourceDStream  .assignAscendingTimestamps(_.createTime)  .map(Data => (Data.goodsId, 1))  .keyBy(_._1)  .window(SlidingEventTimeWindows.of(Time.seconds(30), Time.seconds(10)))  .reduce((preData, curData) => (preData._1, preData._2 + curData._2))
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26

Sink与Execution流程

//TODO:4.sinkresultDStream01.print("用户的最大消费订单信息:")resultDStream01.addSink(new MySQLSinkFunction)resultDStream02.print("用户的消费总额和订单数量:")resultDStream03.print("商品的销售数量:")//    sourceDStream.writeAsCsv("src\\main\\resources\w_OrderSource.csv", WriteMode.OVERWRITE)//TODO:5.executionenv.execute("OrderStream Job")
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

自定义MySQLSinkFunction类,将数据存储于MySQL中

  //自定义MySQLSinkFunction类,将数据存储于MySQL中  class MySQLSinkFunction extends RichSinkFunction[Order] {    var connection: Connection = null    var ps: PreparedStatement = null    override def open(parameters: Configuration): Unit = {      //加载驱动,打开连接      Class.forName("com.mysql.jdbc.Driver")      connection = DriverManager.getConnection(        "jdbc:mysql://localhost:3306/数据库名?useSSL=false&characterEncoding=utf8",        "用户名",        "密码")      ps = connection.prepareStatement("insert into 表名(itemId,userId,goodsId,price,createTime) values (?,?,?,?,?)")    }    override def invoke(value: Order, context: SinkFunction.Context): Unit = {      //元组的主键,取系统此时的时间戳      ps.setString(1, value.itemId)      ps.setString(2, value.userId)      ps.setString(3, value.goodsId)      ps.setInt(4, value.price)      ps.setLong(5, value.createTime)      //执行sql语句      ps.executeUpdate()    }    override def close(): Unit = {      if (connection != null)        connection.close()      if (ps != null)        ps.close()    }  }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36

实现结果

在控制台输出的结果:

存储于MySQL中的数据:
(数据表的结构)

表中数据

(查询产生的数据的数目)


本案例的完整代码

import org.apache.flink.configuration.Configurationimport org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction}import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment, _}import org.apache.flink.streaming.api.windowing.assigners.{SlidingEventTimeWindows, TumblingEventTimeWindows}import org.apache.flink.streaming.api.windowing.time.Timeimport java.util.UUIDimport java.util.concurrent.TimeUnitimport scala.util.Randomimport java.sql.{Connection, DriverManager, PreparedStatement}object OrderStream {  //样例类  case class  Order(itemId: String, userId: String, goodsId: String, price: Int, createTime: Long)  def main(args: Array[String]): Unit = {    //TODO:1.environment    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment    env.setParallelism(1)    //TODO:2.source,生产随机订单数据    val sourceDStream: DataStream[Order] = env.addSource(new OrderSourceFunction)    //TODO:3.transformation,对数据进行窗口统计,并将有些数据写入MySQl中     (1)每30秒钟,统计一次各用户的最大消费订单信息,将结果写入MySQL;    // Order(7485091d-341c-4498-b00b-4cfca479de79,user6,goods9,815,1634218271508)    val resultDStream01: DataStream[Order] = sourceDStream      .assignAscendingTimestamps(_.createTime)      .keyBy(_.userId)      .window(TumblingEventTimeWindows.of(Time.seconds(30)))      .maxBy("price")     (2)统计30秒内,各用户的消费总额和订单数量,该数据每10秒更新一次,将结果打印输出;    // (user9,3440,4)    val resultDStream02: DataStream[(String, Int, Int)] = sourceDStream      .assignAscendingTimestamps(_.createTime)      .map(Data => (Data.userId, Data.price, 1))      .keyBy(_._1)      .window(SlidingEventTimeWindows.of(Time.seconds(30), Time.seconds(19)))      .reduce((preData, curData) => (preData._1, preData._2 + curData._2, preData._3 + curData._3))     (3)统计30秒内,各商品的销售数量,该数据每10秒更新一次, 将结果打印输出。    // (goods10,2)    val resultDStream03: DataStream[(String, Int)] = sourceDStream      .assignAscendingTimestamps(_.createTime)      .map(Data => (Data.goodsId, 1))      .keyBy(_._1)      .window(SlidingEventTimeWindows.of(Time.seconds(30), Time.seconds(10)))      .reduce((preData, curData) => (preData._1, preData._2 + curData._2))    //TODO:4.sink    resultDStream01.print("用户的最大消费订单信息:")    resultDStream01.addSink(new MySQLSinkFunction)    resultDStream02.print("用户的消费总额和订单数量:")    resultDStream03.print("商品的销售数量:")    //    sourceDStream.writeAsCsv("src\\main\\resources\w_OrderSource.csv", WriteMode.OVERWRITE)    //TODO:5.execution    env.execute("OrderStream Job")  }  //自定义随机生成订单类  class OrderSourceFunction extends RichSourceFunction[Order] {    //定义变量    private var count: Long = 0L    private var isRunning: Boolean = true    override def run(sourceContext: SourceFunction.SourceContext[Order]): Unit = {      // 使用while循环生成1000个订单(订单编号ID、用户编号ID、商品编号ID、消费金额、消费时间 )      while (isRunning && count < 1000) {        // 随机生成订单ID(UUID)        val itemId: String = UUID.randomUUID().toString        // 随机生成用户编号ID(user1-user10)        val userID: String = "user" + (Random.nextInt(10) + 1)        // 随机生成商品编号ID(goods1-goods20)        val goodsID: String = "goods" + (Random.nextInt(20) + 1)        // 随机生成消费金额(100~1000)        val price = Random.nextInt(900) + 101        // 消费时间为当前系统时间        val createTime: Long = System.currentTimeMillis()        //        println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date(createTime)))        // 收集数据        sourceContext.collect(Order(itemId, userID, goodsID, price, createTime))        // 统计生成的订单数        count += 1        // 每隔1秒生成一个订单        TimeUnit.SECONDS.sleep(1)        //        TimeUnit.MILLISECONDS.sleep(1)      }    }    override def cancel(): Unit = {      isRunning = false    }  }  //自定义MySQLSinkFunction类,将数据存储于MySQL中  class MySQLSinkFunction extends RichSinkFunction[Order] {    var connection: Connection = null    var ps: PreparedStatement = null    override def open(parameters: Configuration): Unit = {      //加载驱动,打开连接      Class.forName("com.mysql.jdbc.Driver")      connection = DriverManager.getConnection(        "jdbc:mysql://localhost:3306/数据库名?useSSL=false&characterEncoding=utf8",        "用户名",        "密码")      ps = connection.prepareStatement("insert into 表名(itemId,userId,goodsId,price,createTime) values (?,?,?,?,?)")    }    override def invoke(value: Order, context: SinkFunction.Context): Unit = {      //元组的主键,取系统此时的时间戳      ps.setString(1, value.itemId)      ps.setString(2, value.userId)      ps.setString(3, value.goodsId)      ps.setInt(4, value.price)      ps.setLong(5, value.createTime)      //执行sql语句      ps.executeUpdate()    }    override def close(): Unit = {      if (connection != null)        connection.close()      if (ps != null)        ps.close()    }  }}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145

总结:本案例使用 API 和·Window API 对自定义数据流(循环生成1000条)进行处理并将数据存储于MySQL数据中!

后续会继续更新有关Flink Stream及Flink SQL的内容!
(注:第12次发文,如有错误和疑问,欢迎在评论区指出!)
——2021.10.14

网站建设定制开发 软件系统开发定制 定制软件开发 软件开发定制 定制app开发 app开发定制 app开发定制公司 电商商城定制开发 定制小程序开发 定制开发小程序 客户管理系统开发定制 定制网站 定制开发 crm开发定制 开发公司 小程序开发定制 定制软件 收款定制开发 企业网站定制开发 定制化开发 android系统定制开发 定制小程序开发费用 定制设计 专注app软件定制开发 软件开发定制定制 知名网站建设定制 软件定制开发供应商 应用系统定制开发 软件系统定制开发 企业管理系统定制开发 系统定制开发