此篇文章仅作为个人笔记,由于初学难免有理解错误的地方,请大佬指正~
由于我这个部分没有使用SpringBoot,使用的是RabbitMq java Client API 所以在Springboot上的有些功能用不了,只能手动实现,故在思维上走了一个坑。
测试代码: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 public static void main(String[] args) { //连接工厂 ConnectionFactory factory = new ConnectionFactory(); // factory.useNio(); // factory.setNioParams(new NioParams().setNbIoThreads(4)); factory.setHost("xxx.xxx.xxx.xxx"); factory.setPort(port); factory.setUsername("username"); factory.setPassword("password"); factory.setVirtualHost("msgpush"); // f.setConnectionTimeout(5000); // f.setHandshakeTimeout(3000); ExecutorService service = Executors.newFixedThreadPool(30); factory.setSharedExecutor(service); // 设置自动恢复 factory.setAutomaticRecoveryEnabled(true); factory.setNetworkRecoveryInterval(2); factory.setTopologyRecoveryEnabled(true);// 设置不重新声明交换器,队列等信息。 System.out.println("检索请求的最大频道数:" + factory.getRequestedChannelMax()); // f.setRequestedChannelMax(10); try { // Channel channel = f.newConnection().createChannel(); //建立连接 Connection c = factory.newConnection("push Client"); c.addShutdownListener(new ShutdownListener() { @Override public void shutdownCompleted(ShutdownSignalException e) { System.out.println( "断线了......"); } }); //建立信道 Channel ch = c.createChannel(); //声明队列,如果该队列已经创建过,则不会重复创建 //ch.queueDeclare("QueueWX", false, false, false, null); System.out.println("等待接收数据"); // 单条消息的大小限制,一般设为0或不设置,不限制大小 int prefecthSize = 0; // 告诉RabbitMQ不要同时给消费端推送n条消息,一旦有n个消息还没ack,则该consumer将block掉,直到有ack;注意在自动应答下不生效 int prefecthCount = 5; // 表示是否应用于channel上,即是channel级别还是consumer级别 boolean global = true; ch.basicQos(3); //消费者取消时的回调对象1 CancelCallback cancelHandel = new CancelCallback() { @Override public void handle(String consumerTag) throws IOException { System.out.println(consumerTag + " 链接已断开"); } }; /*微信*/ DeliverImpl deliverWx = new DeliverImpl(ch, MQEnum.QUEUE_WX); ch.basicConsume(MQEnum.QUEUE_WX.getData(), false, deliverWx, cancelHandel); /*企业微信*/ DeliverImpl deliver = new DeliverImpl(ch, MQEnum.QUEUE_WORKWX); //第二个参数为消息回执,消息确认处理完成,为true为自动确认,只要消息发送到消费者即消息处理成功;为false为,手动发送确认回执,服务器才认为这个消息处理成功 ch.basicConsume(MQEnum.QUEUE_WORKWX.getData(), false, deliver, cancelHandel); /*钉钉*/ DeliverImpl deliverDing = new DeliverImpl(ch, MQEnum.QUEUE_DINGDING); ch.basicConsume(MQEnum.QUEUE_DINGDING.getData(), false, deliverDing, cancelHandel); /*tg*/ DeliverImpl deliverTg = new DeliverImpl(ch, MQEnum.QUEUE_TG); ch.basicConsume(MQEnum.QUEUE_TG.getData(), false, deliverTg, cancelHandel); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } ...