网站建设定制开发记录-bigdata-使用scala语言,使用flink消费kafka中的数据,统计数据存入到redis中

网站建设定制开发我这里是编码完成后,网站建设定制开发打包发送到集群上运行的!!!

​​​​​​​网站建设定制开发有问题可以私聊我交流

1.使用IDEA创建项目

pom配置如下 里面掺杂了支持scala和spark的配置

  1. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
  2. <modelVersion>4.0.0</modelVersion>
  3. <groupId>com.tledu</groupId>
  4. <artifactId>llll</artifactId>
  5. <version>1.0-SNAPSHOT</version>
  6. <name>${project.artifactId}</name>
  7. <description>My wonderfull scala app</description>
  8. <inceptionYear>2018</inceptionYear>
  9. <licenses>
  10. <license>
  11. <name>My License</name>
  12. <url>http://....</url>
  13. <distribution>repo</distribution>
  14. </license>
  15. </licenses>
  16. <properties>
  17. <maven.compiler.source>1.8</maven.compiler.source>
  18. <maven.compiler.target>1.8</maven.compiler.target>
  19. <encoding>UTF-8</encoding>
  20. <scala.version>2.11.11</scala.version>
  21. <scala.compat.version>2.11</scala.compat.version>
  22. <spec2.version>4.2.0</spec2.version>
  23. <flink.version>1.10.2</flink.version>
  24. </properties>
  25. <dependencies>
  26. <dependency>
  27. <groupId>org.scala-lang</groupId>
  28. <artifactId>scala-library</artifactId>
  29. <version>${scala.version}</version>
  30. </dependency>
  31. <dependency>
  32. <groupId>org.apache.flink</groupId>
  33. <artifactId>flink-connector-kafka_2.11</artifactId>
  34. <version>1.10.2</version>
  35. </dependency>
  36. <dependency>
  37. <groupId>org.apache.flink</groupId>
  38. <artifactId>flink-java</artifactId>
  39. <version>${flink.version}</version>
  40. <scope>provided</scope>
  41. </dependency>
  42. <dependency>
  43. <groupId>org.apache.flink</groupId>
  44. <artifactId>flink-scala_${scala.compat.version}</artifactId>
  45. <version>${flink.version}</version>
  46. <scope>provided</scope>
  47. </dependency>
  48. <dependency>
  49. <groupId>org.apache.flink</groupId>
  50. <artifactId>flink-clients_${scala.compat.version}</artifactId>
  51. <version>${flink.version}</version>
  52. <scope>provided</scope>
  53. </dependency>
  54. <dependency>
  55. <groupId>org.apache.flink</groupId>
  56. <artifactId>flink-streaming-scala_${scala.compat.version}</artifactId>
  57. <version>${flink.version}</version>
  58. <scope>provided</scope>
  59. </dependency>
  60. <!-- flink包依赖配置-end -->
  61. <!-- 日志类引入 -->
  62. <dependency>
  63. <groupId>org.slf4j</groupId>
  64. <artifactId>slf4j-log4j12</artifactId>
  65. <version>1.6.6</version>
  66. <scope>compile</scope>
  67. </dependency>
  68. <dependency>
  69. <groupId>log4j</groupId>
  70. <artifactId>log4j</artifactId>
  71. <version>1.2.17</version>
  72. <scope>compile</scope>
  73. </dependency>
  74. <dependency>
  75. <groupId>org.scala-lang</groupId>
  76. <artifactId>scala-library</artifactId>
  77. <version>${scala.version}</version>
  78. </dependency>
  79. <!-- 引入整合redis的依赖 -->
  80. <dependency>
  81. <groupId>org.apache.flink</groupId>
  82. <artifactId>flink-connector-redis_2.11</artifactId>
  83. <version>1.1.5</version>
  84. </dependency>
  85. <dependency>
  86. <groupId>org.apache.spark</groupId>
  87. <artifactId>spark-core_${scala.compat.version}</artifactId>
  88. <version>2.3.2</version>
  89. <scope>provided</scope>
  90. </dependency>
  91. <dependency>
  92. <groupId>org.apache.spark</groupId>
  93. <artifactId>spark-sql_${scala.compat.version}</artifactId>
  94. <version>2.3.2</version>
  95. <scope>provided</scope>
  96. </dependency>
  97. <dependency>
  98. <groupId>org.apache.spark</groupId>
  99. <artifactId>spark-hive_2.11</artifactId>
  100. <version>2.0.2</version>
  101. <scope>provided</scope>
  102. </dependency>
  103. <dependency>
  104. <groupId>mysql</groupId>
  105. <artifactId>mysql-connector-java</artifactId>
  106. <version>8.0.23</version>
  107. </dependency>
  108. <!-- Test -->
  109. <dependency>
  110. <groupId>junit</groupId>
  111. <artifactId>junit</artifactId>
  112. <version>4.12</version>
  113. <scope>test</scope>
  114. </dependency>
  115. <dependency>
  116. <groupId>org.scalatest</groupId>
  117. <artifactId>scalatest_${scala.compat.version}</artifactId>
  118. <version>3.0.5</version>
  119. <scope>test</scope>
  120. </dependency>
  121. <dependency>
  122. <groupId>org.specs2</groupId>
  123. <artifactId>specs2-core_${scala.compat.version}</artifactId>
  124. <version>${spec2.version}</version>
  125. <scope>test</scope>
  126. </dependency>
  127. <dependency>
  128. <groupId>org.specs2</groupId>
  129. <artifactId>specs2-junit_${scala.compat.version}</artifactId>
  130. <version>${spec2.version}</version>
  131. <scope>test</scope>
  132. </dependency>
  133. </dependencies>
  134. <build>
  135. <sourceDirectory>src/main/scala</sourceDirectory>
  136. <testSourceDirectory>src/test/scala</testSourceDirectory>
  137. <plugins>
  138. <plugin>
  139. <groupId>org.apache.maven.plugins</groupId>
  140. <artifactId>maven-surefire-plugin</artifactId>
  141. <version>2.21.0</version>
  142. <configuration>
  143. <!-- Tests will be run with scalatest-maven-plugin instead -->
  144. <skipTests>true</skipTests>
  145. </configuration>
  146. </plugin>
  147. <plugin>
  148. <groupId>org.scalatest</groupId>
  149. <artifactId>scalatest-maven-plugin</artifactId>
  150. <version>2.0.0</version>
  151. <configuration>
  152. <reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
  153. <junitxml>.</junitxml>
  154. <filereports>TestSuiteReport.txt</filereports>
  155. <!-- Comma separated list of JUnit test class names to execute -->
  156. <jUnitClasses>samples.AppTest</jUnitClasses>
  157. </configuration>
  158. <executions>
  159. <execution>
  160. <id>test</id>
  161. <goals>
  162. <goal>test</goal>
  163. </goals>
  164. </execution>
  165. </executions>
  166. </plugin>
  167. <plugin>
  168. <!-- see http://davidb.github.com/scala-maven-plugin -->
  169. <groupId>net.alchim31.maven</groupId>
  170. <artifactId>scala-maven-plugin</artifactId>
  171. <version>3.3.2</version>
  172. <executions>
  173. <execution>
  174. <goals>
  175. <goal>compile</goal>
  176. <goal>testCompile</goal>
  177. </goals>
  178. <configuration>
  179. <args>
  180. <arg>-dependencyfile</arg>
  181. <arg>${project.build.directory}/.scala_dependencies</arg>
  182. </args>
  183. </configuration>
  184. </execution>
  185. </executions>
  186. </plugin>
  187. <plugin>
  188. <artifactId>maven-assembly-plugin</artifactId>
  189. <configuration>
  190. <descriptorRefs>
  191. <descriptorRef>jar-with-dependencies</descriptorRef>
  192. </descriptorRefs>
  193. </configuration>
  194. <executions>
  195. <execution>
  196. <id>make-assembly</id>
  197. <phase>package</phase>
  198. <goals>
  199. <goal>assembly</goal>
  200. </goals>
  201. </execution>
  202. </executions>
  203. </plugin>
  204. </plugins>
  205. </build>
  206. </project>

编码如下 我这里是计算实时总销售额

  1. package com.com.tledu
  2. import org.apache.flink.api.common.serialization.SimpleStringSchema
  3. import org.apache.flink.api.scala.ExecutionEnvironment
  4. import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTypeInformation}
  5. import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
  6. import org.apache.flink.streaming.connectors.redis.RedisSink
  7. import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig
  8. import java.util.Properties
  9. object FlinkKafka {
  10. def main(args: Array[String]): Unit = {
  11. // 创建flink的环境,StreamExecutionEnvironment ,消费流式数据的环境
  12. val env = StreamExecutionEnvironment.getExecutionEnvironment
  13. // 创建kafka配置文件
  14. val properties = new Properties()
  15. // 配置了bootstrap.servers: kafka的IP:端口
  16. properties.setProperty("bootstrap.servers", "127.0.0.1:26001")
  17. // 设置消费组
  18. properties.setProperty("group.id", "wzxgroup")
  19. val consumer = new FlinkKafkaConsumer[String](
  20. "wzx", new SimpleStringSchema(), properties
  21. )
  22. // 设置数据源
  23. val dataStream = env.addSource(consumer)
  24. // 拿到数据之后,就可以使用flink算子进行操作了
  25. dataStream.print()
  26. // 获取O开头的数据,之后拿到数据第四项,进行类和
  27. val result = dataStream
  28. .filter(_.startsWith("O")) // 只保留0开头的数据
  29. .map(_.substring(2) // 处理这里的数据,把数据的每一项的O:去掉了
  30. .split(",")) // 切割数据,形成一个个的字段
  31. // 求总金额
  32. val priceResult = result
  33. .map(item => ("price", item(3).toInt)) // 我们只关心销售额,所以将数据转成(price,销售额)
  34. .keyBy(0) // 根据第一项进行聚合统计
  35. .sum(1) // 根据第二项进行求和
  36. // 配置redis的连接 注意正确的redis的ip和密码
  37. val config: FlinkJedisPoolConfig = new FlinkJedisPoolConfig.Builder()
  38. .setHost("192.168.3.89")
  39. .setPort(6379)
  40. .setPassword("123456")
  41. .build()
  42. val redisSink = new RedisSink[(String, Int)](config, new MyRedisMapper)
  43. priceResult.addSink(redisSink)
  44. env.execute()
  45. }
  46. }

  1. package com.flink
  2. import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}
  3. class MyRedisMapper extends RedisMapper[(String,Int)]{
  4. override def getCommandDescription:RedisCommandDescription={
  5. new RedisCommandDescription(RedisCommand.SET)
  6. }
  7. override def getKeyFromData(t:(String,Int)):String = t._1
  8. override def getValueFromData(t:(String,Int)):String = t._2.toString
  9. }

打包,上传到集群

我这里是传到了flink文件夹的bin目录里

redis的ip可以使用    ip addr  查看    密码在redis.conf中

2.在集群中运行

在flink的bin目录下运行如下指令

flink run -m yarn-cluster -c com.com.tledu.FlinkKafka(这是你要运行的类) ./llll.jar(这是你的jar包的位置,我是放在了bin目录下)

3.查看redis中的数据

进入redis的文件夹

输入如下指令即可开启redis服务器

./src/redis-server redis.conf

src/redis-cli -a 123456

进入之后可以用get key 来得到你的数据

我的key名是price

所以使用get price  即可看到我的数据

网站建设定制开发 软件系统开发定制 定制软件开发 软件开发定制 定制app开发 app开发定制 app开发定制公司 电商商城定制开发 定制小程序开发 定制开发小程序 客户管理系统开发定制 定制网站 定制开发 crm开发定制 开发公司 小程序开发定制 定制软件 收款定制开发 企业网站定制开发 定制化开发 android系统定制开发 定制小程序开发费用 定制设计 专注app软件定制开发 软件开发定制定制 知名网站建设定制 软件定制开发供应商 应用系统定制开发 软件系统定制开发 企业管理系统定制开发 系统定制开发