定制软件Kafka安装与配置

1.下载Kafka2.13-3.1.0

最新版为 

下载Zookeper

最新版为 

2.单机安装zookeper

依赖于zookeeper,定制软件官方承诺将来会移除.

解压文件:

  1. tar zxvf apache-zookeeper-3.8.0-bin.tar.gz -C /opt/
  2. mv /opt/apache-zookeeper-3.8.0-bin/ /opt/zookeeper

在/opt// 定制软件目录下创建数据文件目定制软件录和日志文件目录

  1. mkdir /opt/zookeeper/zkData
  2. mkdir /opt/zookeeper/zkLog

# 定制软件复制一份配置文件并修改

  1. cd /opt/zookeeper/conf/
  2. cp zoo_sample.cfg zoo.cfg
  3. vi zoo.cfg
  1. # 定制软件修改如下内容
  2. dataDir=/opt/zookeeper/zkData
  3. dataLogDir=/opt/zookeeper/zkLog

启动

  1. cd /opt/zookeeper/bin/
  2. # 启动zookeeper
  3. ./zkServer.sh start
  4. # 定制软件查看进程是否启动
  5. jps
  6. # 查看状态
  7. ./zkServer.sh status
  8. # 停止zookeeper
  9. ./zkServer.sh stop

3.安装Kafka

定制软件解压到指定目录

  1. cd /home
  2. $ tar -xzf kafka_2.13-3.1.0.tgz
  3. $ cd kafka_2.13-3.1.0

修改config目录下vi server.propertie文件

  1. listeners = PLAINTEXT://192.168.2.40:9092
  2. #定制软件多个可用逗号分隔
  3. #zookeeper.connect=server1:2181,server2:2181,server3:2181
  4. zookeeper.connect=192.168.2.40:2181

启动命令:

bin/kafka-server-start.sh config/server.properties

定制软件此方式可以实时查看日志.

定制软件后台启动方式:

./kafka-server-start.sh -daemon ../config/server.properties

定制软件查询进程和关闭命令

  1. jps
  2. ./kafka-server-stop.sh

登录zookeeper客户端,查看/brokers/ids

  1. cd /opt/zookeeper/bin/
  2. zkCli.sh
  3. # 查询结果如下:
  4. [zk: localhost:2181(CONNECTED) 0] ls /brokers/ids
  5. [0]
  6. [zk: localhost:2181(CONNECTED) 1] quit

kafka常见命令

  1. #创建主题 主题名是 quickstart-events
  2. $ bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092
  3. #查询主题
  4. $ bin/kafka-topics.sh --describe --topic quickstart-events --bootstrap-server localhost:9092
  5. #主题中写入消息
  6. bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092
  7. This is my first event
  8. This is my second event
  9. #主题中读取消息
  10. bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092
  11. This is my first event
  12. This is my second event

kafka集群

假如现在有两台服务器192.168.2.40,192.168.2.41

kafka的安装与配置如上,两台服务器唯一不同的地方就是配置文件中的broker.id和listeners的配置

 修改config目录下vi server.propertie文件

192.168.2.40

  1. listeners = PLAINTEXT://192.168.2.40:9092
  2. broker.id=0

 

192.168.2.41

  1. listeners = PLAINTEXT://192.168.2.41:9092
  2. broker.id=1

bin目录启动命令都添加

  1. vi kafka-server-start.sh
  2. #添加 export JMX_PORT="9999"
  3. if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
  4. export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
  5. export JMX_PORT="9999"
  6. fi

 登录zookeeper客户端,查看/brokers/ids

4.可视化工具kafka-eagle

下载:

解压

  1. cd /home
  2. tar -zxvf efak-web-2.1.0-bin.tar.gz

在/etc/profile文件中添加环境变量KE_HOME 

  1. vi /etc/profile
  2. # 在profile文件中添加
  3. #解压后的efak目录
  4. export KE_HOME=/home/efak-web-2.1.0
  5. export PATH=$PATH:$KE_HOME/bin
  6. # 使修改后的profile文件生效
  7. . /etc/profile

安装MySQL并添加数据库ke,kafka-eagle之后会用到它;
修改配置文件$KE_HOME/conf/system-config.properties,主要是修改Zookeeper的配置和数据库配置,注释掉sqlite配置,改为使用MySQL.

  1. ######################################
  2. kafka.eagle.zk.cluster.alias=cluster1
  3. cluster1.zk.list=localhost:2181
  4. ######################################
  5. # kafka eagle webui port
  6. ######################################
  7. kafka.eagle.webui.port=8048
  8. ######################################
  9. # kafka mysql jdbc driver address
  10. ######################################
  11. kafka.eagle.driver=com.mysql.cj.jdbc.Driver
  12. kafka.eagle.url=jdbc:mysql://localhost:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
  13. kafka.eagle.username=root
  14. kafka.eagle.password=123456

 启动后会自动使用上面的数据库连接,创建并初始化数据库.名称ke.有时候ke数据库会初始化失败,这时可以手动去执行创建数据库的脚本。脚本如下:

  1. /*!40101 SET NAMES utf8 */;
  2. /*!40101 SET SQL_MODE=''*/;
  3. /*!40014 SET @OLD_UNIQUE_CHECKS=@@UNIQUE_CHECKS, UNIQUE_CHECKS=0 */;
  4. /*!40014 SET @OLD_FOREIGN_KEY_CHECKS=@@FOREIGN_KEY_CHECKS, FOREIGN_KEY_CHECKS=0 */;
  5. /*!40101 SET @OLD_SQL_MODE=@@SQL_MODE, SQL_MODE='NO_AUTO_VALUE_ON_ZERO' */;
  6. /*!40111 SET @OLD_SQL_NOTES=@@SQL_NOTES, SQL_NOTES=0 */;
  7. CREATE DATABASE /*!32312 IF NOT EXISTS*/`ke` /*!40100 DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci */ /*!80016 DEFAULT ENCRYPTION='N' */;
  8. USE `ke`;
  9. /*Table structure for table `ke_alarm_clusters` */
  10. DROP TABLE IF EXISTS `ke_alarm_clusters`;
  11. CREATE TABLE `ke_alarm_clusters` (
  12. `id` bigint NOT NULL AUTO_INCREMENT,
  13. `type` varchar(32) DEFAULT NULL,
  14. `cluster` varchar(64) DEFAULT NULL,
  15. `server` text,
  16. `alarm_group` varchar(128) DEFAULT NULL,
  17. `alarm_times` int DEFAULT NULL,
  18. `alarm_max_times` int DEFAULT NULL,
  19. `alarm_level` varchar(4) DEFAULT NULL,
  20. `is_normal` varchar(2) DEFAULT 'Y',
  21. `is_enable` varchar(2) DEFAULT 'Y',
  22. `created` varchar(32) DEFAULT NULL,
  23. `modify` varchar(32) DEFAULT NULL,
  24. PRIMARY KEY (`id`)
  25. ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
  26. /*Data for the table `ke_alarm_clusters` */
  27. /*Table structure for table `ke_alarm_config` */
  28. DROP TABLE IF EXISTS `ke_alarm_config`;
  29. CREATE TABLE `ke_alarm_config` (
  30. `cluster` varchar(64) NOT NULL,
  31. `alarm_group` varchar(128) NOT NULL,
  32. `alarm_type` varchar(16) DEFAULT NULL,
  33. `alarm_url` text,
  34. `http_method` varchar(16) DEFAULT NULL,
  35. `alarm_address` text,
  36. `created` varchar(32) DEFAULT NULL,
  37. `modify` varchar(32) DEFAULT NULL,
  38. PRIMARY KEY (`cluster`,`alarm_group`)
  39. ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
  40. /*Data for the table `ke_alarm_config` */
  41. /*Table structure for table `ke_alarm_consumer` */
  42. DROP TABLE IF EXISTS `ke_alarm_consumer`;
  43. CREATE TABLE `ke_alarm_consumer` (
  44. `id` bigint NOT NULL AUTO_INCREMENT,
  45. `cluster` varchar(64) DEFAULT NULL,
  46. `group` varchar(128) DEFAULT NULL,
  47. `topic` varchar(128) DEFAULT NULL,
  48. `lag` bigint DEFAULT NULL,
  49. `alarm_group` varchar(128) DEFAULT NULL,
  50. `alarm_times` int DEFAULT NULL,
  51. `alarm_max_times` int DEFAULT NULL,
  52. `alarm_level` varchar(4) DEFAULT NULL,
  53. `is_normal` varchar(2) DEFAULT 'Y',
  54. `is_enable` varchar(2) DEFAULT 'Y',
  55. `created` varchar(32) DEFAULT NULL,
  56. `modify` varchar(32) DEFAULT NULL,
  57. PRIMARY KEY (`id`)
  58. ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
  59. /*Data for the table `ke_alarm_consumer` */
  60. /*Table structure for table `ke_alarm_crontab` */
  61. DROP TABLE IF EXISTS `ke_alarm_crontab`;
  62. CREATE TABLE `ke_alarm_crontab` (
  63. `id` bigint NOT NULL,
  64. `type` varchar(64) NOT NULL,
  65. `crontab` varchar(32) DEFAULT NULL,
  66. `is_enable` varchar(2) DEFAULT 'Y',
  67. `created` varchar(32) DEFAULT NULL,
  68. `modify` varchar(32) DEFAULT NULL,
  69. PRIMARY KEY (`id`,`type`)
  70. ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
  71. /*Data for the table `ke_alarm_crontab` */
  72. /*Table structure for table `ke_connect_config` */
  73. DROP TABLE IF EXISTS `ke_connect_config`;
  74. CREATE TABLE `ke_connect_config` (
  75. `id` bigint NOT NULL AUTO_INCREMENT,
  76. `cluster` varchar(64) DEFAULT NULL,
  77. `connect_uri` varchar(128) DEFAULT NULL,
  78. `version` varchar(32) DEFAULT NULL,
  79. `alive` varchar(16) DEFAULT NULL,
  80. `created` varchar(32) DEFAULT NULL,
  81. `modify` varchar(32) DEFAULT NULL,
  82. PRIMARY KEY (`id`)
  83. ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
  84. /*Data for the table `ke_connect_config` */
  85. /*Table structure for table `ke_consumer_bscreen` */
  86. DROP TABLE IF EXISTS `ke_consumer_bscreen`;
  87. CREATE TABLE `ke_consumer_bscreen` (
  88. `cluster` varchar(64) DEFAULT NULL,
  89. `group` varchar(128) DEFAULT NULL,
  90. `topic` varchar(64) DEFAULT NULL,
  91. `logsize` bigint DEFAULT NULL,
  92. `difflogsize` bigint DEFAULT NULL,
  93. `offsets` bigint DEFAULT NULL,
  94. `diffoffsets` bigint DEFAULT NULL,
  95. `lag` bigint DEFAULT NULL,
  96. `timespan` bigint DEFAULT NULL,
  97. `tm` varchar(16) DEFAULT NULL,
  98. KEY `idx_timespan` (`timespan`),
  99. KEY `idx_tm_cluster_diffoffsets` (`tm`,`cluster`,`diffoffsets`)
  100. ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
  101. /*Data for the table `ke_consumer_bscreen` */
  102. /*Table structure for table `ke_consumer_group` */
  103. DROP TABLE IF EXISTS `ke_consumer_group`;
  104. CREATE TABLE `ke_consumer_group` (
  105. `cluster` varchar(64) NOT NULL,
  106. `group` varchar(128) NOT NULL,
  107. `topic` varchar(128) NOT NULL,
  108. `status` int DEFAULT NULL,
  109. PRIMARY KEY (`cluster`,`group`,`topic`)
  110. ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
  111. /*Data for the table `ke_consumer_group` */
  112. /*Table structure for table `ke_consumer_group_summary` */
  113. DROP TABLE IF EXISTS `ke_consumer_group_summary`;
  114. CREATE TABLE `ke_consumer_group_summary` (
  115. `cluster` varchar(64) NOT NULL,
  116. `group` varchar(128) NOT NULL,
  117. `topic_number` varchar(128) NOT NULL,
  118. `coordinator` varchar(128) DEFAULT NULL,
  119. `active_topic` int DEFAULT NULL,
  120. `active_thread_total` int DEFAULT NULL,
  121. PRIMARY KEY (`cluster`,`group`)
  122. ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
  123. /*Data for the table `ke_consumer_group_summary` */
  124. /*Table structure for table `ke_logsize` */
  125. DROP TABLE IF EXISTS `ke_logsize`;
  126. CREATE TABLE `ke_logsize` (
  127. `cluster` varchar(64) DEFAULT NULL,
  128. `topic` varchar(64) DEFAULT NULL,
  129. `logsize` bigint DEFAULT NULL,
  130. `diffval` bigint DEFAULT NULL,
  131. `timespan` bigint DEFAULT NULL,
  132. `tm` varchar(16) DEFAULT NULL,
  133. KEY `idx_timespan` (`timespan`),
  134. KEY `idx_tm_topic` (`tm`,`topic`),
  135. KEY `idx_tm_cluster_diffval` (`tm`,`cluster`,`diffval`)
  136. ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
  137. /*Data for the table `ke_logsize` */
  138. /*Table structure for table `ke_metrics` */
  139. DROP TABLE IF EXISTS `ke_metrics`;
  140. CREATE TABLE `ke_metrics` (
  141. `cluster` varchar(64) DEFAULT NULL,
  142. `broker` text,
  143. `type` varchar(32) DEFAULT NULL,
  144. `key` varchar(64) DEFAULT NULL,
  145. `value` varchar(128) DEFAULT NULL,
  146. `timespan` bigint DEFAULT NULL,
  147. `tm` varchar(16) DEFAULT NULL
  148. ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
  149. /*Data for the table `ke_metrics` */
  150. /*Table structure for table `ke_metrics_offline` */
  151. DROP TABLE IF EXISTS `ke_metrics_offline`;
  152. CREATE TABLE `ke_metrics_offline` (
  153. `cluster` varchar(64) NOT NULL,
  154. `key` varchar(128) NOT NULL,
  155. `one` varchar(128) DEFAULT NULL,
  156. `mean` varchar(128) DEFAULT NULL,
  157. `five` varchar(128) DEFAULT NULL,
  158. `fifteen` varchar(128) DEFAULT NULL,
  159. PRIMARY KEY (`cluster`,`key`)
  160. ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
  161. /*Data for the table `ke_metrics_offline` */
  162. /*Table structure for table `ke_p_role` */
  163. DROP TABLE IF EXISTS `ke_p_role`;
  164. CREATE TABLE `ke_p_role` (
  165. `id` bigint NOT NULL AUTO_INCREMENT,
  166. `name` varchar(64) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT 'role name',
  167. `seq` tinyint NOT NULL COMMENT 'rank',
  168. `description` varchar(128) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT 'role describe',
  169. PRIMARY KEY (`id`)
  170. ) ENGINE=InnoDB AUTO_INCREMENT=4 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
  171. /*Data for the table `ke_p_role` */
  172. insert into `ke_p_role`(`id`,`name`,`seq`,`description`) values
  173. (1,'Administrator',1,'Have all permissions'),
  174. (2,'Devs',2,'Own add or delete'),
  175. (3,'Tourist',3,'Only viewer');
  176. /*Table structure for table `ke_resources` */
  177. DROP TABLE IF EXISTS `ke_resources`;
  178. CREATE TABLE `ke_resources` (
  179. `resource_id` bigint NOT NULL AUTO_INCREMENT,
  180. `name` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT 'resource name',
  181. `url` varchar(255) NOT NULL,
  182. `parent_id` int NOT NULL,
  183. PRIMARY KEY (`resource_id`)
  184. ) ENGINE=InnoDB AUTO_INCREMENT=25 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
  185. /*Data for the table `ke_resources` */
  186. insert into `ke_resources`(`resource_id`,`name`,`url`,`parent_id`) values
  187. (1,'System','/system',-1),
  188. (2,'User','/system/user',1),
  189. (3,'Role','/system/role',1),
  190. (4,'Resource','/system/resource',1),
  191. (5,'Notice','/system/notice',1),
  192. (6,'Topic','/topic',-1),
  193. (7,'Message','/topic/message',6),
  194. (8,'Create','/topic/create',6),
  195. (9,'Alarm','/alarm',-1),
  196. (10,'Add','/alarm/add',9),
  197. (11,'Modify','/alarm/modify',9),
  198. (12,'Cluster','/cluster',-1),
  199. (13,'ZkCli','/cluster/zkcli',12),
  200. (14,'UserDelete','/system/user/delete',1),
  201. (15,'UserModify','/system/user/modify',1),
  202. (16,'Mock','/topic/mock',6),
  203. (18,'Create','/alarm/create',9),
  204. (19,'History','/alarm/history',9),
  205. (20,'Manager','/topic/manager',6),
  206. (21,'PasswdReset','/system/user/reset',1),
  207. (22,'Config','/alarm/config',9),
  208. (23,'List','/alarm/list',9),
  209. (24,'Hub','/topic/hub',6);
  210. /*Table structure for table `ke_role_resource` */
  211. DROP TABLE IF EXISTS `ke_role_resource`;
  212. CREATE TABLE `ke_role_resource` (
  213. `id` bigint NOT NULL AUTO_INCREMENT,
  214. `role_id` int NOT NULL,
  215. `resource_id` int NOT NULL,
  216. PRIMARY KEY (`id`)
  217. ) ENGINE=InnoDB AUTO_INCREMENT=26 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
  218. /*Data for the table `ke_role_resource` */
  219. insert into `ke_role_resource`(`id`,`role_id`,`resource_id`) values
  220. (1,1,1),
  221. (2,1,2),
  222. (3,1,3),
  223. (4,1,4),
  224. (5,1,5),
  225. (6,1,7),
  226. (7,1,8),
  227. (8,1,10),
  228. (9,1,11),
  229. (10,1,13),
  230. (11,2,7),
  231. (12,2,8),
  232. (13,2,13),
  233. (14,2,10),
  234. (15,2,11),
  235. (16,1,14),
  236. (17,1,15),
  237. (18,1,16),
  238. (19,1,18),
  239. (20,1,19),
  240. (21,1,20),
  241. (22,1,21),
  242. (23,1,22),
  243. (24,1,23),
  244. (25,1,24);
  245. /*Table structure for table `ke_sql_history` */
  246. DROP TABLE IF EXISTS `ke_sql_history`;
  247. CREATE TABLE `ke_sql_history` (
  248. `id` bigint NOT NULL AUTO_INCREMENT,
  249. `cluster` varchar(64) DEFAULT NULL,
  250. `username` varchar(64) DEFAULT NULL,
  251. `host` varchar(128) DEFAULT NULL,
  252. `ksql` text,
  253. `status` varchar(16) DEFAULT NULL,
  254. `spend_time` bigint DEFAULT NULL,
  255. `created` varchar(32) DEFAULT NULL,
  256. `tm` varchar(16) DEFAULT NULL,
  257. PRIMARY KEY (`id`)
  258. ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
  259. /*Data for the table `ke_sql_history` */
  260. /*Table structure for table `ke_topic_rank` */
  261. DROP TABLE IF EXISTS `ke_topic_rank`;
  262. CREATE TABLE `ke_topic_rank` (
  263. `cluster` varchar(64) NOT NULL,
  264. `topic` varchar(64) NOT NULL,
  265. `tkey` varchar(64) NOT NULL,
  266. `tvalue` bigint DEFAULT NULL,
  267. PRIMARY KEY (`cluster`,`topic`,`tkey`)
  268. ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
  269. /*Data for the table `ke_topic_rank` */
  270. /*Table structure for table `ke_user_role` */
  271. DROP TABLE IF EXISTS `ke_user_role`;
  272. CREATE TABLE `ke_user_role` (
  273. `id` bigint NOT NULL AUTO_INCREMENT,
  274. `user_id` int NOT NULL,
  275. `role_id` tinyint NOT NULL,
  276. PRIMARY KEY (`id`)
  277. ) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
  278. /*Data for the table `ke_user_role` */
  279. insert into `ke_user_role`(`id`,`user_id`,`role_id`) values
  280. (1,1,1);
  281. /*Table structure for table `ke_users` */
  282. DROP TABLE IF EXISTS `ke_users`;
  283. CREATE TABLE `ke_users` (
  284. `id` bigint NOT NULL AUTO_INCREMENT,
  285. `rtxno` int NOT NULL,
  286. `username` varchar(64) NOT NULL,
  287. `password` varchar(128) NOT NULL,
  288. `email` varchar(64) NOT NULL,
  289. `realname` varchar(128) NOT NULL,
  290. PRIMARY KEY (`id`)
  291. ) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
  292. /*Data for the table `ke_users` */
  293. insert into `ke_users`(`id`,`rtxno`,`username`,`password`,`email`,`realname`) values
  294. (1,1000,'admin','123456','admin@email.com','Administrator');
  295. /*!40101 SET SQL_MODE=@OLD_SQL_MODE */;
  296. /*!40014 SET FOREIGN_KEY_CHECKS=@OLD_FOREIGN_KEY_CHECKS */;
  297. /*!40014 SET UNIQUE_CHECKS=@OLD_UNIQUE_CHECKS */;
  298. /*!40111 SET SQL_NOTES=@OLD_SQL_NOTES */;
  1. # 停止服务
  2. bin/ke.sh stop
  3. # 重启服务
  4. bin/ke.sh restart
  5. # 查看服务运行状态
  6. bin/ke.sh status
  7. # 查看服务状态
  8. bin/ke.sh stats
  9. # 动态查看服务输出日志
  10. tail -f /logs/ke_console.out
  11. #查看进程号及端口号
  12. netstat -ntlp
  13. #通过PID查询出进程位置
  14. ps aux|grep 进程号
  15. #确定进程所在的目录
  16. ll /proc/进程号;

 ​脚本创建成功,再重新启动服务即可.
启动成功可以直接访问,输入账号密码admin:123456,访问地址:http://192.168.2.40:8048/

 

注意观察 brokers,topics的数量。brokers为0的话没有连接成功.

可视化工具自然少不了监控,如果你想开启kafka-eagle对Kafka的监控功能的话,需要修改Kafka的启动脚本,暴露JMX的端口.

  1. vi kafka-server-start.sh
  2. #添加 export JMX_PORT="9999"
  3. if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
  4. export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
  5. export JMX_PORT="9999"
  6. fi

kafka集群图示:

 

5.SpringBoot整合Kafka.

在pom.xml中添加

<dependency>    <groupId>org.springframework.kafka</groupId>    <artifactId>spring-kafka</artifactId></dependency>

 在application.yml中spring节点下添加

  1. spring
  2. kafka:
  3. bootstrap-servers: 192.168.2.40:9092
  4. producer: # 生产者配置
  5. retries: 3 # 设置大于0的值,则客户端会将发送失败的记录重新发送
  6. batch-size: 16384 #16K
  7. buffer-memory: 33554432 #32M
  8. acks: 1
  9. # 指定消息key和消息体的编解码方式
  10. key-serializer: org.apache.kafka.common.serialization.StringSerializer
  11. value-serializer: org.apache.kafka.common.serialization.StringSerializer
  12. consumer:
  13. group-id: zhTestGroup # 消费者组
  14. enable-auto-commit: false # 关闭自动提交
  15. auto-offset-reset: earliest # 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
  16. key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  17. value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  18. listener:
  19. # 当每一条记录被消费者监听器(ListenerConsumer)处理之后提交
  20. # RECORD
  21. # 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后提交
  22. # BATCH
  23. # 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,距离上次提交时间大于TIME时提交
  24. # TIME
  25. # 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,被处理record数量大于等于COUNT时提交
  26. # COUNT
  27. # TIME | COUNT 有一个条件满足时提交
  28. # COUNT_TIME
  29. # 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后, 手动调用Acknowledgment.acknowledge()后提交
  30. # MANUAL
  31. # 手动调用Acknowledgment.acknowledge()后立即提交,一般使用这种
  32. # MANUAL_IMMEDIATE
  33. ack-mode: manual_immediate

生产者:

  1. @RestController
  2. public class KafkaProducer {
  3. @Autowired
  4. private KafkaTemplate kafkaTemplate;
  5. // 发送消息
  6. @GetMapping("/kafka/normal/{message}")
  7. public void sendMessage1(@PathVariable("message") String normalMessage) {
  8. kafkaTemplate.send("quickstart-events", normalMessage);
  9. }
  10. }

 消费者:

  1. @Component
  2. public class KafkaConsumer {
  3. // 消费监听
  4. @KafkaListener(topics = {"quickstart-events"})
  5. public void onMessage1(ConsumerRecord<?, ?> record,Consumer consumer,Acknowledgment ack){
  6. // 消费的哪个topic、partition的消息,打印出消息内容
  7. System.out.println("简单消费:"+record.topic()+"-"+record.partition()+"-"+record.value());
  8. //同步提交
  9. //consumer.commitSync();
  10. ack.acknowledge();
  11. }
  12. }

 可以用postman进行测试,观察结果.

网站建设定制开发 软件系统开发定制 定制软件开发 软件开发定制 定制app开发 app开发定制 app开发定制公司 电商商城定制开发 定制小程序开发 定制开发小程序 客户管理系统开发定制 定制网站 定制开发 crm开发定制 开发公司 小程序开发定制 定制软件 收款定制开发 企业网站定制开发 定制化开发 android系统定制开发 定制小程序开发费用 定制设计 专注app软件定制开发 软件开发定制定制 知名网站建设定制 软件定制开发供应商 应用系统定制开发 软件系统定制开发 企业管理系统定制开发 系统定制开发