博客
关于我
2024年最新java 编程技术异步通信_java异步消息通知机制,面试官问的那些大数据开发原理你都懂吗
阅读量:798 次
发布时间:2023-04-17

本文共 8682 字,大约阅读时间需要 28 分钟。

ActiveMQ 与 Spring Boot 整合实践指南

消息队列在现代应用中扮演着重要角色,尤其是在处理高并发场景时,能够有效地解耦系统,提升整体性能。ActiveMQ 作为一个成熟的开源消息队列系统,常与 Spring Boot 应用进行集成。本文将从安装部署、开发实践到实际应用场景,详细介绍 ActiveMQ 与 Spring Boot 的整合方法。


1. ActiveMQ 安装与启动

1.1 安装 ActiveMQ

ln -s /opt/activemq/bin/activemq /etc/init.d/activemqchkconfig --add activemq

1.2 启动服务

service activemq start

1.3 关闭服务

service activemq stop

1.4 通过 netstat 查看端口

netstat -tuln | grep 61616

ActiveMQ 提供两个重要端口:

  • 消息队列服务端口:61616
  • 控制台管理端口:8161

2. 在 Spring Boot 项目中整合 ActiveMQ

2.1 导入依赖

org.springframework.boot
spring-boot-starter-activemq
org.slf4j
slf4j-log4j12
org.apache.activemq
activemq-pool
5.15.2
org.slf4j
slf4j-log4j12

2.2 依赖配置

@Configuration
public class ActiveMQConfig {
@Value("${spring.activemq.broker-url:disabled}")
private String brokerURL;
@Value("${activemq.listener.enable:disabled}")
private String listenerEnable;
@Bean
public ActiveMQUtil getActiveMQUtil() throws JMSException {
if (brokerURL.equals("disabled")) {
return null;
}
ActiveMQUtil activeMQUtil = new ActiveMQUtil();
activeMQUtil.init(brokerURL);
return activeMQUtil;
}
@Bean(name = "jmsQueueListener")
public DefaultJmsListenerContainerFactory jmsQueueListenerContainerFactory(
ActiveMQConnectionFactory activeMQConnectionFactory) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
if (!listenerEnable.equals("true")) {
return null;
}
factory.setConnectionFactory(activeMQConnectionFactory);
factory.setConcurrency("5");
factory.setRecoveryInterval(5000L);
factory.setSessionTransacted(false);
factory.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
return factory;
}
@Bean
public ActiveMQConnectionFactory activeMQConnectionFactory() {
return new ActiveMQConnectionFactory(brokerURL);
}
}

2.3 工具类实现

public class ActiveMQUtil {
private PooledConnectionFactory pooledConnectionFactory;
public ConnectionFactory init(String brokerUrl) {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerUrl);
pooledConnectionFactory = new PooledConnectionFactory(factory);
pooledConnectionFactory.setReconnectOnException(true);
pooledConnectionFactory.setMaxConnections(5);
pooledConnectionFactory.setExpiryTimeout(10000);
return pooledConnectionFactory;
}
public ConnectionFactory getConnectionFactory() {
return pooledConnectionFactory;
}
}

3. 消息生产与消费示例

3.1 产生消息

public static void main(String[] args) {
ConnectionFactory connect = new ActiveMQConnectionFactory("tcp://192.168.67.163:61616");
try {
Connection connection = connect.createConnection();
connection.start();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Queue testqueue = session.createQueue("TEST1");
MessageProducer producer = session.createProducer(testqueue);
TextMessage textMessage = new ActiveMQTextMessage();
textMessage.setText("今天天气真好!");
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
producer.send(textMessage);
session.commit();
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}

3.2 消费消息

public static void main(String[] args) {
ConnectionFactory connect = new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD,
"tcp://192.168.67.163:61616");
try {
Connection connection = connect.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination testqueue = session.createQueue("TEST1");
MessageConsumer consumer = session.createConsumer(testqueue);
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
if (message instanceof TextMessage) {
try {
String text = ((TextMessage) message).getText();
System.out.println(text);
// session.rollback();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
});
} catch (Exception e) {
e.printStackTrace();
}
}

4. 事务控制与持久化

4.1 事务控制

通过 Session 的事务管理实现消息生产和消费的原子性。

4.2 持久化设置

producer.setDeliveryMode(DeliveryMode.PERSISTENT);

持久化消息在 ActiveMQ 宕机时不会丢失,但会增加系统资源消耗。


5. 在支付业务模块中的应用

5.1 系统设计

支付模块通过消息队列通知订单系统,实现异步状态更新。

5.2 配置

application.properties 中配置:

spring.activemq.broker-url=tcp://mq.server.com:61616
activemq.listener.enable=true

5.3 代码实现

public void sendPaymentResult(String orderId, String result) {
ConnectionFactory connectionFactory = activeMQUtil.getConnectionFactory();
Connection connection = null;
try {
connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Queue paymentResultQueue = session.createQueue("PAYMENT_RESULT_QUEUE");
MapMessage mapMessage = new ActiveMQMapMessage();
mapMessage.setString("orderId", orderId);
mapMessage.setString("result", result);
MessageProducer producer = session.createProducer(paymentResultQueue);
producer.send(mapMessage);
session.commit();
producer.close();
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}

6. 订单模块的消息消费

6.1 配置

spring.activemq.broker-url=tcp://mq.server.com:61616
activemq.listener.enable=true

6.2 消息消费逻辑

@JmsListener(destination = "PAYMENT_RESULT_QUEUE", containerFactory = "jmsQueueListener")
public void consumePaymentResult(MapMessage mapMessage) throws JMSException {
String orderId = mapMessage.getString("orderId");
String result = mapMessage.getString("result");
if (!"success".equals(result)) {
orderService.updateProcessStatus(orderId, ProcessStatus.PAY_FAIL);
} else {
orderService.updateProcessStatus(orderId, ProcessStatus.PAID);
}
orderService.sendOrderResult(orderId);
}

7. 减库存通知

7.1 消息发送逻辑

@Transactional
public void sendOrderResult(String orderId) {
OrderInfo orderInfo = getOrderInfo(orderId);
Map
messageMap = initWareOrderMessage(orderInfo);
String wareOrderJson = JSON.toJSONString(messageMap);
Session session = null;
try {
Connection conn = activeMQUtil.getConnection();
session = conn.createSession(true, Session.SESSION_TRANSACTED);
Queue queue = session.createQueue("ORDER_RESULT_QUEUE");
MessageProducer producer = session.createProducer(queue);
TextMessage message = new ActiveMQTextMessage();
message.setText(wareOrderJson);
producer.send(message);
updateProcessStatus(orderInfo.getId(), ProcessStatus.NOTIFIED_WARE);
session.commit();
producer.close();
conn.close();
} catch (JMSException e) {
e.printStackTrace();
}
}

7.2 消息数据组织

public Map
initWareOrderMessage(OrderInfo orderInfo) {
HashMap
hashMap = new HashMap<>();
hashMap.put("orderId", orderInfo.getId());
hashMap.put("consignee", orderInfo.getConsignee());
hashMap.put("consigneeTel", orderInfo.getConsigneeTel());
hashMap.put("orderComment", orderInfo.getOrderComment());
hashMap.put("orderBody", orderInfo.getOrderSubject());
hashMap.put("deliveryAddress", orderInfo.getDeliveryAddress());
hashMap.put("paymentWay", "2"); // 1 货到付款 2 在线支付
hashMap.put("wareId", orderInfo.getWareId());
List
> details = new ArrayList<>();
List
orderDetailList = orderInfo.getOrderDetailList();
for (OrderDetail orderDetail : orderDetailList) {
HashMap
detailMap = new HashMap<>(); detailMap.put("skuId", orderDetail.getSkuId()); detailMap.put("skuNum", "" + orderDetail.getSkuNum()); detailMap.put("skuName", orderDetail.getSkuName()); details.add(detailMap); } hashMap.put("details", details); return hashMap; }

通过以上实践,可以看到消息队列在业务逻辑中的重要作用,能够有效地解耦系统组件,提升系统的可扩展性和可维护性。

转载地址:http://aagfk.baihongyu.com/

你可能感兴趣的文章
Mac OS 12.0.1 如何安装柯美287打印机驱动,刷卡打印
查看>>
MangoDB4.0版本的安装与配置
查看>>
Manjaro 24.1 “Xahea” 发布!具有 KDE Plasma 6.1.5、GNOME 46 和最新的内核增强功能
查看>>
mapping文件目录生成修改
查看>>
MapReduce程序依赖的jar包
查看>>
mariadb multi-source replication(mariadb多主复制)
查看>>
MariaDB的简单使用
查看>>
MaterialForm对tab页进行隐藏
查看>>
Member var and Static var.
查看>>
memcached高速缓存学习笔记001---memcached介绍和安装以及基本使用
查看>>
memcached高速缓存学习笔记003---利用JAVA程序操作memcached crud操作
查看>>
Memcached:Node.js 高性能缓存解决方案
查看>>
memcache、redis原理对比
查看>>
memset初始化高维数组为-1/0
查看>>
Metasploit CGI网关接口渗透测试实战
查看>>
Metasploit Web服务器渗透测试实战
查看>>
MFC模态对话框和非模态对话框
查看>>
Moment.js常见用法总结
查看>>
MongoDB出现Error parsing command line: unrecognised option ‘--fork‘ 的解决方法
查看>>
mxGraph改变图形大小重置overlay位置
查看>>