开发公司rabbitMq设置多线程并设置线程池消费处理

第一步,先写配置

@Configurationpublic class RabbitmqConfig {    @Bean("batchQueueRabbitListenerContainerFactory")    public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();        factory.setConnectionFactory(connectionFactory);        factory.setMessageConverter(new Jackson2JsonMessageConverter());        //确认方式,manual为手动ack.        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);        //开发公司每次处理数据数量,开发公司提高并发量        //factory.setPrefetchCount(250);        //开发公司设置线程数        //factory.setConcurrentConsumers(30);        //开发公司最大线程数        //factory.setMaxConcurrentConsumers(50);        /* setConnectionFactory:设置spring-amqp的ConnectionFactory。 */        factory.setConnectionFactory(connectionFactory);        factory.setConcurrentConsumers(1);        factory.setPrefetchCount(1);        //factory.setDefaultRequeueRejected(true);        //使用自定义线程池来启动消费者。        factory.setTaskExecutor(taskExecutor());        return factory;    }    @Bean("correctTaskExecutor")    @Primary    public TaskExecutor taskExecutor() {        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();        // 设置核心线程数        executor.setCorePoolSize(100);        // 设置最大线程数        executor.setMaxPoolSize(100);        // 设置队列容量        executor.setQueueCapacity(0);        // 设置线程活跃时间(秒)        executor.setKeepAliveSeconds(300);        // 设置默认线程名称        executor.setThreadNamePrefix("thread-file-queue");        // 设置拒绝策略rejection-policy:当pool已经达到max size的时候,丢弃        // executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());        // 等待所有任务结束后再关闭线程池        executor.setWaitForTasksToCompleteOnShutdown(true);        return executor;    }}

主要看这两个配置 

factory.setConcurrentConsumers(1);

factory.setPrefetchCount(1); 

如果设置为1,在消费的时候就会开启多个线程来消费进行,意思就是一个线程只消费一条消息,这种适于消费时间处理长,处理的流程比较复杂,这种例如文件转换,需要时间

如果是大于1的,看具体设置的值,比如50,那每个线程就会消费50个消息,等到消息满了,才会开启其他线程来消费,这种适用于的情况,消费时间短,消费量很大,比如发短信

消费者配置 

@RabbitListener(queues = "${xx.queue}", containerFactory = "batchQueueRabbitListenerContainerFactory") //指定上面配置的连接bean对象@RabbitHandlerpublic void handlexxxQueue(@Payload Media media, Message message, Channel channel) throws IOException, InterruptedException {    //提交单条消息    channel.basicAck(deliveryTag, false);}
网站建设定制开发 软件系统开发定制 定制软件开发 软件开发定制 定制app开发 app开发定制 app开发定制公司 电商商城定制开发 定制小程序开发 定制开发小程序 客户管理系统开发定制 定制网站 定制开发 crm开发定制 开发公司 小程序开发定制 定制软件 收款定制开发 企业网站定制开发 定制化开发 android系统定制开发 定制小程序开发费用 定制设计 专注app软件定制开发 软件开发定制定制 知名网站建设定制 软件定制开发供应商 应用系统定制开发 软件系统定制开发 企业管理系统定制开发 系统定制开发