第一步,先写配置
@Configurationpublic class RabbitmqConfig { @Bean("batchQueueRabbitListenerContainerFactory") public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){ SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setMessageConverter(new Jackson2JsonMessageConverter()); //确认方式,manual为手动ack. factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); //开发公司每次处理数据数量,开发公司提高并发量 //factory.setPrefetchCount(250); //开发公司设置线程数 //factory.setConcurrentConsumers(30); //开发公司最大线程数 //factory.setMaxConcurrentConsumers(50); /* setConnectionFactory:设置spring-amqp的ConnectionFactory。 */ factory.setConnectionFactory(connectionFactory); factory.setConcurrentConsumers(1); factory.setPrefetchCount(1); //factory.setDefaultRequeueRejected(true); //使用自定义线程池来启动消费者。 factory.setTaskExecutor(taskExecutor()); return factory; } @Bean("correctTaskExecutor") @Primary public TaskExecutor taskExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); // 设置核心线程数 executor.setCorePoolSize(100); // 设置最大线程数 executor.setMaxPoolSize(100); // 设置队列容量 executor.setQueueCapacity(0); // 设置线程活跃时间(秒) executor.setKeepAliveSeconds(300); // 设置默认线程名称 executor.setThreadNamePrefix("thread-file-queue"); // 设置拒绝策略rejection-policy:当pool已经达到max size的时候,丢弃 // executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy()); // 等待所有任务结束后再关闭线程池 executor.setWaitForTasksToCompleteOnShutdown(true); return executor; }}
主要看这两个配置
factory.setConcurrentConsumers(1);
factory.setPrefetchCount(1);
如果设置为1,在消费的时候就会开启多个线程来消费进行,意思就是一个线程只消费一条消息,这种适于消费时间处理长,处理的流程比较复杂,这种例如文件转换,需要时间
如果是大于1的,看具体设置的值,比如50,那每个线程就会消费50个消息,等到消息满了,才会开启其他线程来消费,这种适用于的情况,消费时间短,消费量很大,比如发短信
消费者配置
@RabbitListener(queues = "${xx.queue}", containerFactory = "batchQueueRabbitListenerContainerFactory") //指定上面配置的连接bean对象@RabbitHandlerpublic void handlexxxQueue(@Payload Media media, Message message, Channel channel) throws IOException, InterruptedException { //提交单条消息 channel.basicAck(deliveryTag, false);}