在分布式系统设计中,消息中间件是实现系统解耦、异步通信、流量削峰和最终一致性的重要技术手段。Spring Boot 作为当前主流的 Java 微服务开发框架,提供了对多种消息中间件的良好支持。下面我们将从 JMS 标准(以 ActiveMQ 为例) 和 AMQP 标准(以 RabbitMQ 和 Kafka 为例) 两个方向,详细讲解它们与 Spring Boot 的整合方式。
一、JMS 标准与 ActiveMQ 整合(基于 ActiveMQ)
1. 什么是 JMS?
JMS(Java Message Service)是 Java 平台中关于面向消息中间件的 API,它提供标准的接口来与消息中间件进行交互。JMS 支持两种消息模型:
点对点(Queue):消息被一个消费者消费。
发布/订阅(Topic):消息被多个订阅者接收。
2. 使用 ActiveMQ 实现 JMS
ActiveMQ 是 Apache 提供的开源消息中间件,完全实现 JMS(Java Message Service)1.1 规范。它支持点对点(Queue)和发布/订阅(Topic)两种模式,适合 Java 生态系统内部的异步通信。
(1)添加依赖(pom.xml)
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<!-- 如果使用 ActiveMQ 5.x 嵌入式 Broker,可选 -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-broker</artifactId>
</dependency>
(2)配置文件(application.yml)
spring:
activemq:
broker-url: tcp://localhost:61616
user: admin
password: admin
jms:
pub-sub-domain: false # false=Queue模式,true=Topic模式
(3)配置队列(src/main/java/com/example/activemq/config/ActiveMQConfig.java)
package com.example.activemq.config;
import org.apache.activemq.command.ActiveMQQueue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.jms.Queue;
/**
* ActiveMQ 队列配置
*/
@Configuration
public class ActiveMQConfig {
@Bean
public Queue myQueue() {
return new ActiveMQQueue("activemq.queue");
}
}
(4)创建生产者(src/main/java/com/example/activemq/producer/ActiveMQProducer.java)
package com.example.activemq.producer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Component;
/**
* ActiveMQ 消息生产者
*/
@Component
public class ActiveMQProducer {
@Autowired
private JmsTemplate jmsTemplate;
public void sendMessage(String message) {
jmsTemplate.convertAndSend("activemq.queue", message);
System.out.println("[ActiveMQ] 发送消息: " + message);
}
}
(5)创建消费者(src/main/java/com/example/activemq/consumer/ActiveMQConsumer.java)
package com.example.activemq.consumer;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
/**
* ActiveMQ 消息消费者
*/
@Component
public class ActiveMQConsumer {
@JmsListener(destination = "activemq.queue")
public void receiveMessage(String message) {
System.out.println("[ActiveMQ] 接收到: " + message);
}
}
(6)创建测试接口(src/main/java/com/example/activemq/web/ActiveMQController.java)
package com.example.activemq.web;
import com.example.activemq.producer.ActiveMQProducer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
/**
* 测试 ActiveMQ 发送消息
*/
@RestController
public class ActiveMQController {
@Autowired
private ActiveMQProducer producer;
@GetMapping("/activemq/send")
public String send(@RequestParam String msg) {
producer.sendMessage(msg);
return "✅ ActiveMQ 消息已发送: " + msg;
}
}
(7)启动类(src/main/java/com/example/activemq/Application.java
)
package com.example.activemq;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
二、RabbitMQ(基于 AMQP 标准)
RabbitMQ 是基于 AMQP(Advanced Message Queuing Protocol) 的开源消息中间件,以灵活的路由机制著称,支持多种交换机类型(Direct、Fanout、Topic、Headers)。
(1)添加依赖(pom.xml)
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
(2)配置文件(application.yml)
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
virtual-host: /
(3)配置交换机、队列、绑定(src/main/java/com/example/rabbitmq/config/RabbitConfig.java)
package com.example.rabbitmq.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* RabbitMQ 配置:定义交换机、队列、绑定
*/
@Configuration
public class RabbitConfig {
public static final String QUEUE = "rabbit.queue";
public static final String EXCHANGE = "rabbit.exchange";
public static final String ROUTING_KEY = "rabbit.key";
@Bean
public Queue queue() {
return QueueBuilder.durable(QUEUE).build();
}
@Bean
public DirectExchange exchange() {
return ExchangeBuilder.directExchange(EXCHANGE).durable(true).build();
}
@Bean
public Binding binding(Queue queue, DirectExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY);
}
}
(4)创建生产者(src/main/java/com/example/rabbitmq/producer/RabbitProducer.java)
package com.example.rabbitmq.producer;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import static com.example.rabbitmq.config.RabbitConfig.*;
/**
* RabbitMQ 消息生产者
*/
@Component
public class RabbitProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMessage(String message) {
rabbitTemplate.convertAndSend(EXCHANGE, ROUTING_KEY, message);
System.out.println("[RabbitMQ] 发送消息: " + message);
}
}
(5)创建消费者(src/main/java/com/example/rabbitmq/consumer/RabbitConsumer.java)
package com.example.rabbitmq.consumer;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import static com.example.rabbitmq.config.RabbitConfig.QUEUE;
/**
* RabbitMQ 消息消费者
*/
@Component
public class RabbitConsumer {
@RabbitListener(queues = QUEUE)
public void receiveMessage(String message) {
System.out.println("[RabbitMQ] 接收到: " + message);
}
}
(6)创建测试接口(src/main/java/com/example/rabbitmq/web/RabbitController.java)
package com.example.rabbitmq.web;
import com.example.rabbitmq.producer.RabbitProducer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class RabbitController {
@Autowired
private RabbitProducer producer;
@GetMapping("/rabbit/send")
public String send(@RequestParam String msg) {
producer.sendMessage(msg);
return "✅ RabbitMQ 消息已发送: " + msg;
}
}
(7)启动类(src/main/java/com/example/rabbitmq/Application.java)
package com.example.rabbitmq;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
三、Kafka(分布式流平台)
Apache Kafka 是一个高吞吐、分布式、基于日志的消息系统,常用于大数据、日志收集、事件流处理。
(1)添加依赖(pom.xml)
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
(2)配置文件(application.yml)
spring:
kafka:
bootstrap-servers: localhost:9092
consumer:
group-id: kafka-group
auto-offset-reset: earliest
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
(3)创建生产者(src/main/java/com/example/kafka/producer/KafkaProducer.java)
package com.example.kafka.producer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
/**
* Kafka 消息生产者
*/
@Component
public class KafkaProducer {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void sendMessage(String message) {
kafkaTemplate.send("kafka.topic", message);
System.out.println("[Kafka] 发送消息: " + message);
}
}
(4)创建消费者(src/main/java/com/example/kafka/consumer/KafkaConsumer.java
)
package com.example.kafka.consumer;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
/**
* Kafka 消息消费者
*/
@Component
public class KafkaConsumer {
@KafkaListener(topics = "kafka.topic", groupId = "kafka-group")
public void consume(String message) {
System.out.println("[Kafka] 接收到: " + message);
}
}
(5)创建测试接口(src/main/java/com/example/kafka/web/KafkaController.java)
package com.example.kafka.web;
import com.example.kafka.producer.KafkaProducer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class KafkaController {
@Autowired
private KafkaProducer producer;
@GetMapping("/kafka/send")
public String send(@RequestParam String msg) {
producer.sendMessage(msg);
return "✅ Kafka 消息已发送: " + msg;
}
}
(6)启动类(src/main/java/com/example/kafka/Application.java
)
package com.example.kafka;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
评论