本文共 8682 字,大约阅读时间需要 28 分钟。
消息队列在现代应用中扮演着重要角色,尤其是在处理高并发场景时,能够有效地解耦系统,提升整体性能。ActiveMQ 作为一个成熟的开源消息队列系统,常与 Spring Boot 应用进行集成。本文将从安装部署、开发实践到实际应用场景,详细介绍 ActiveMQ 与 Spring Boot 的整合方法。
ln -s /opt/activemq/bin/activemq /etc/init.d/activemqchkconfig --add activemq
service activemq start
service activemq stop
netstat -tuln | grep 61616
ActiveMQ 提供两个重要端口:
org.springframework.boot spring-boot-starter-activemq org.slf4j slf4j-log4j12 org.apache.activemq activemq-pool 5.15.2 org.slf4j slf4j-log4j12
@Configurationpublic class ActiveMQConfig { @Value("${spring.activemq.broker-url:disabled}") private String brokerURL; @Value("${activemq.listener.enable:disabled}") private String listenerEnable; @Bean public ActiveMQUtil getActiveMQUtil() throws JMSException { if (brokerURL.equals("disabled")) { return null; } ActiveMQUtil activeMQUtil = new ActiveMQUtil(); activeMQUtil.init(brokerURL); return activeMQUtil; } @Bean(name = "jmsQueueListener") public DefaultJmsListenerContainerFactory jmsQueueListenerContainerFactory( ActiveMQConnectionFactory activeMQConnectionFactory) { DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); if (!listenerEnable.equals("true")) { return null; } factory.setConnectionFactory(activeMQConnectionFactory); factory.setConcurrency("5"); factory.setRecoveryInterval(5000L); factory.setSessionTransacted(false); factory.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE); return factory; } @Bean public ActiveMQConnectionFactory activeMQConnectionFactory() { return new ActiveMQConnectionFactory(brokerURL); }}
public class ActiveMQUtil { private PooledConnectionFactory pooledConnectionFactory; public ConnectionFactory init(String brokerUrl) { ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerUrl); pooledConnectionFactory = new PooledConnectionFactory(factory); pooledConnectionFactory.setReconnectOnException(true); pooledConnectionFactory.setMaxConnections(5); pooledConnectionFactory.setExpiryTimeout(10000); return pooledConnectionFactory; } public ConnectionFactory getConnectionFactory() { return pooledConnectionFactory; }}
public static void main(String[] args) { ConnectionFactory connect = new ActiveMQConnectionFactory("tcp://192.168.67.163:61616"); try { Connection connection = connect.createConnection(); connection.start(); Session session = connection.createSession(true, Session.SESSION_TRANSACTED); Queue testqueue = session.createQueue("TEST1"); MessageProducer producer = session.createProducer(testqueue); TextMessage textMessage = new ActiveMQTextMessage(); textMessage.setText("今天天气真好!"); producer.setDeliveryMode(DeliveryMode.PERSISTENT); producer.send(textMessage); session.commit(); connection.close(); } catch (JMSException e) { e.printStackTrace(); }}
public static void main(String[] args) { ConnectionFactory connect = new ActiveMQConnectionFactory( ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, "tcp://192.168.67.163:61616"); try { Connection connection = connect.createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination testqueue = session.createQueue("TEST1"); MessageConsumer consumer = session.createConsumer(testqueue); consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { if (message instanceof TextMessage) { try { String text = ((TextMessage) message).getText(); System.out.println(text); // session.rollback(); } catch (JMSException e) { e.printStackTrace(); } } } }); } catch (Exception e) { e.printStackTrace(); }}
通过 Session
的事务管理实现消息生产和消费的原子性。
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
持久化消息在 ActiveMQ 宕机时不会丢失,但会增加系统资源消耗。
支付模块通过消息队列通知订单系统,实现异步状态更新。
在 application.properties
中配置:
spring.activemq.broker-url=tcp://mq.server.com:61616activemq.listener.enable=true
public void sendPaymentResult(String orderId, String result) { ConnectionFactory connectionFactory = activeMQUtil.getConnectionFactory(); Connection connection = null; try { connection = connectionFactory.createConnection(); connection.start(); Session session = connection.createSession(true, Session.SESSION_TRANSACTED); Queue paymentResultQueue = session.createQueue("PAYMENT_RESULT_QUEUE"); MapMessage mapMessage = new ActiveMQMapMessage(); mapMessage.setString("orderId", orderId); mapMessage.setString("result", result); MessageProducer producer = session.createProducer(paymentResultQueue); producer.send(mapMessage); session.commit(); producer.close(); connection.close(); } catch (JMSException e) { e.printStackTrace(); }}
spring.activemq.broker-url=tcp://mq.server.com:61616activemq.listener.enable=true
@JmsListener(destination = "PAYMENT_RESULT_QUEUE", containerFactory = "jmsQueueListener")public void consumePaymentResult(MapMessage mapMessage) throws JMSException { String orderId = mapMessage.getString("orderId"); String result = mapMessage.getString("result"); if (!"success".equals(result)) { orderService.updateProcessStatus(orderId, ProcessStatus.PAY_FAIL); } else { orderService.updateProcessStatus(orderId, ProcessStatus.PAID); } orderService.sendOrderResult(orderId);}
@Transactionalpublic void sendOrderResult(String orderId) { OrderInfo orderInfo = getOrderInfo(orderId); MapmessageMap = initWareOrderMessage(orderInfo); String wareOrderJson = JSON.toJSONString(messageMap); Session session = null; try { Connection conn = activeMQUtil.getConnection(); session = conn.createSession(true, Session.SESSION_TRANSACTED); Queue queue = session.createQueue("ORDER_RESULT_QUEUE"); MessageProducer producer = session.createProducer(queue); TextMessage message = new ActiveMQTextMessage(); message.setText(wareOrderJson); producer.send(message); updateProcessStatus(orderInfo.getId(), ProcessStatus.NOTIFIED_WARE); session.commit(); producer.close(); conn.close(); } catch (JMSException e) { e.printStackTrace(); }}
public MapinitWareOrderMessage(OrderInfo orderInfo) { HashMap hashMap = new HashMap<>(); hashMap.put("orderId", orderInfo.getId()); hashMap.put("consignee", orderInfo.getConsignee()); hashMap.put("consigneeTel", orderInfo.getConsigneeTel()); hashMap.put("orderComment", orderInfo.getOrderComment()); hashMap.put("orderBody", orderInfo.getOrderSubject()); hashMap.put("deliveryAddress", orderInfo.getDeliveryAddress()); hashMap.put("paymentWay", "2"); // 1 货到付款 2 在线支付 hashMap.put("wareId", orderInfo.getWareId()); List > details = new ArrayList<>(); List orderDetailList = orderInfo.getOrderDetailList(); for (OrderDetail orderDetail : orderDetailList) { HashMap detailMap = new HashMap<>(); detailMap.put("skuId", orderDetail.getSkuId()); detailMap.put("skuNum", "" + orderDetail.getSkuNum()); detailMap.put("skuName", orderDetail.getSkuName()); details.add(detailMap); } hashMap.put("details", details); return hashMap; }
通过以上实践,可以看到消息队列在业务逻辑中的重要作用,能够有效地解耦系统组件,提升系统的可扩展性和可维护性。
转载地址:http://aagfk.baihongyu.com/