网站建设定制开发我这里是编码完成后,网站建设定制开发打包发送到集群上运行的!!!
网站建设定制开发有问题可以私聊我交流
1.使用IDEA创建项目
pom配置如下 里面掺杂了支持scala和spark的配置
- <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <groupId>com.tledu</groupId>
- <artifactId>llll</artifactId>
- <version>1.0-SNAPSHOT</version>
- <name>${project.artifactId}</name>
- <description>My wonderfull scala app</description>
- <inceptionYear>2018</inceptionYear>
- <licenses>
- <license>
- <name>My License</name>
- <url>http://....</url>
- <distribution>repo</distribution>
- </license>
- </licenses>
-
- <properties>
- <maven.compiler.source>1.8</maven.compiler.source>
- <maven.compiler.target>1.8</maven.compiler.target>
- <encoding>UTF-8</encoding>
- <scala.version>2.11.11</scala.version>
- <scala.compat.version>2.11</scala.compat.version>
- <spec2.version>4.2.0</spec2.version>
- <flink.version>1.10.2</flink.version>
- </properties>
-
- <dependencies>
- <dependency>
- <groupId>org.scala-lang</groupId>
- <artifactId>scala-library</artifactId>
- <version>${scala.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-connector-kafka_2.11</artifactId>
- <version>1.10.2</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-java</artifactId>
- <version>${flink.version}</version>
- <scope>provided</scope>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-scala_${scala.compat.version}</artifactId>
- <version>${flink.version}</version>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-clients_${scala.compat.version}</artifactId>
- <version>${flink.version}</version>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-streaming-scala_${scala.compat.version}</artifactId>
- <version>${flink.version}</version>
- <scope>provided</scope>
- </dependency>
-
- <!-- flink包依赖配置-end -->
- <!-- 日志类引入 -->
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- <version>1.6.6</version>
- <scope>compile</scope>
- </dependency>
- <dependency>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- <version>1.2.17</version>
- <scope>compile</scope>
- </dependency>
- <dependency>
- <groupId>org.scala-lang</groupId>
- <artifactId>scala-library</artifactId>
- <version>${scala.version}</version>
- </dependency>
-
- <!-- 引入整合redis的依赖 -->
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-connector-redis_2.11</artifactId>
- <version>1.1.5</version>
- </dependency>
-
-
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-core_${scala.compat.version}</artifactId>
- <version>2.3.2</version>
- <scope>provided</scope>
- </dependency>
-
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-sql_${scala.compat.version}</artifactId>
- <version>2.3.2</version>
- <scope>provided</scope>
- </dependency>
-
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-hive_2.11</artifactId>
- <version>2.0.2</version>
- <scope>provided</scope>
- </dependency>
-
- <dependency>
- <groupId>mysql</groupId>
- <artifactId>mysql-connector-java</artifactId>
- <version>8.0.23</version>
- </dependency>
-
-
- <!-- Test -->
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <version>4.12</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.scalatest</groupId>
- <artifactId>scalatest_${scala.compat.version}</artifactId>
- <version>3.0.5</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.specs2</groupId>
- <artifactId>specs2-core_${scala.compat.version}</artifactId>
- <version>${spec2.version}</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.specs2</groupId>
- <artifactId>specs2-junit_${scala.compat.version}</artifactId>
- <version>${spec2.version}</version>
- <scope>test</scope>
- </dependency>
-
- </dependencies>
-
- <build>
- <sourceDirectory>src/main/scala</sourceDirectory>
- <testSourceDirectory>src/test/scala</testSourceDirectory>
- <plugins>
-
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-surefire-plugin</artifactId>
- <version>2.21.0</version>
- <configuration>
- <!-- Tests will be run with scalatest-maven-plugin instead -->
- <skipTests>true</skipTests>
- </configuration>
- </plugin>
- <plugin>
- <groupId>org.scalatest</groupId>
- <artifactId>scalatest-maven-plugin</artifactId>
- <version>2.0.0</version>
- <configuration>
- <reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
- <junitxml>.</junitxml>
- <filereports>TestSuiteReport.txt</filereports>
- <!-- Comma separated list of JUnit test class names to execute -->
- <jUnitClasses>samples.AppTest</jUnitClasses>
- </configuration>
- <executions>
- <execution>
- <id>test</id>
- <goals>
- <goal>test</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- <plugin>
- <!-- see http://davidb.github.com/scala-maven-plugin -->
- <groupId>net.alchim31.maven</groupId>
- <artifactId>scala-maven-plugin</artifactId>
- <version>3.3.2</version>
- <executions>
- <execution>
- <goals>
- <goal>compile</goal>
- <goal>testCompile</goal>
- </goals>
- <configuration>
- <args>
- <arg>-dependencyfile</arg>
- <arg>${project.build.directory}/.scala_dependencies</arg>
- </args>
- </configuration>
- </execution>
- </executions>
- </plugin>
- <plugin>
- <artifactId>maven-assembly-plugin</artifactId>
-
- <configuration>
- <descriptorRefs>
- <descriptorRef>jar-with-dependencies</descriptorRef>
- </descriptorRefs>
- </configuration>
- <executions>
- <execution>
- <id>make-assembly</id>
- <phase>package</phase>
- <goals>
- <goal>assembly</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
-
-
- </plugins>
- </build>
- </project>
编码如下 我这里是计算实时总销售额
- package com.com.tledu
-
- import org.apache.flink.api.common.serialization.SimpleStringSchema
- import org.apache.flink.api.scala.ExecutionEnvironment
- import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTypeInformation}
- import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
- import org.apache.flink.streaming.connectors.redis.RedisSink
- import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig
-
- import java.util.Properties
-
- object FlinkKafka {
- def main(args: Array[String]): Unit = {
- // 创建flink的环境,StreamExecutionEnvironment ,消费流式数据的环境
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- // 创建kafka配置文件
- val properties = new Properties()
- // 配置了bootstrap.servers: kafka的IP:端口
- properties.setProperty("bootstrap.servers", "127.0.0.1:26001")
- // 设置消费组
- properties.setProperty("group.id", "wzxgroup")
-
- val consumer = new FlinkKafkaConsumer[String](
- "wzx", new SimpleStringSchema(), properties
- )
- // 设置数据源
- val dataStream = env.addSource(consumer)
-
- // 拿到数据之后,就可以使用flink算子进行操作了
- dataStream.print()
- // 获取O开头的数据,之后拿到数据第四项,进行类和
- val result = dataStream
- .filter(_.startsWith("O")) // 只保留0开头的数据
- .map(_.substring(2) // 处理这里的数据,把数据的每一项的O:去掉了
- .split(",")) // 切割数据,形成一个个的字段
- // 求总金额
- val priceResult = result
- .map(item => ("price", item(3).toInt)) // 我们只关心销售额,所以将数据转成(price,销售额)
- .keyBy(0) // 根据第一项进行聚合统计
- .sum(1) // 根据第二项进行求和
-
-
- // 配置redis的连接 注意正确的redis的ip和密码
- val config: FlinkJedisPoolConfig = new FlinkJedisPoolConfig.Builder()
- .setHost("192.168.3.89")
- .setPort(6379)
- .setPassword("123456")
- .build()
-
- val redisSink = new RedisSink[(String, Int)](config, new MyRedisMapper)
- priceResult.addSink(redisSink)
-
- env.execute()
- }
- }
- package com.flink
-
- import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}
-
- class MyRedisMapper extends RedisMapper[(String,Int)]{
- override def getCommandDescription:RedisCommandDescription={
- new RedisCommandDescription(RedisCommand.SET)
- }
-
- override def getKeyFromData(t:(String,Int)):String = t._1
-
- override def getValueFromData(t:(String,Int)):String = t._2.toString
-
- }
打包,上传到集群
我这里是传到了flink文件夹的bin目录里
redis的ip可以使用 ip addr 查看 密码在redis.conf中
2.在集群中运行
在flink的bin目录下运行如下指令
flink run -m yarn-cluster -c com.com.tledu.FlinkKafka(这是你要运行的类) ./llll.jar(这是你的jar包的位置,我是放在了bin目录下)
3.查看redis中的数据
进入redis的文件夹
输入如下指令即可开启redis服务器
./src/redis-server redis.conf
src/redis-cli -a 123456
进入之后可以用get key 来得到你的数据
我的key名是price
所以使用get price 即可看到我的数据