在分布式系统设计中,消息中间件是实现系统解耦、异步通信、流量削峰和最终一致性的重要技术手段。Spring Boot 作为当前主流的 Java 微服务开发框架,提供了对多种消息中间件的良好支持。下面我们将从 JMS 标准(以 ActiveMQ 为例) 和 AMQP 标准(以 RabbitMQ 和 Kafka 为例) 两个方向,详细讲解它们与 Spring Boot 的整合方式。

一、JMS 标准与 ActiveMQ 整合(基于 ActiveMQ)

1. 什么是 JMS?

JMS(Java Message Service)是 Java 平台中关于面向消息中间件的 API,它提供标准的接口来与消息中间件进行交互。JMS 支持两种消息模型:

  • 点对点(Queue):消息被一个消费者消费。

  • 发布/订阅(Topic):消息被多个订阅者接收。

2. 使用 ActiveMQ 实现 JMS

ActiveMQ 是 Apache 提供的开源消息中间件,完全实现 JMS(Java Message Service)1.1 规范。它支持点对点(Queue)和发布/订阅(Topic)两种模式,适合 Java 生态系统内部的异步通信。

(1)添加依赖(pom.xml)

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<!-- 如果使用 ActiveMQ 5.x 嵌入式 Broker,可选 -->
<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-broker</artifactId>
</dependency>

(2)配置文件(application.yml)

spring:
  activemq:
    broker-url: tcp://localhost:61616
    user: admin
    password: admin
  jms:
    pub-sub-domain: false  # false=Queue模式,true=Topic模式

(3)配置队列(src/main/java/com/example/activemq/config/ActiveMQConfig.java)

package com.example.activemq.config;

import org.apache.activemq.command.ActiveMQQueue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.jms.Queue;

/**
 * ActiveMQ 队列配置
 */
@Configuration
public class ActiveMQConfig {

    @Bean
    public Queue myQueue() {
        return new ActiveMQQueue("activemq.queue");
    }
}

(4)创建生产者(src/main/java/com/example/activemq/producer/ActiveMQProducer.java)

package com.example.activemq.producer;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Component;

/**
 * ActiveMQ 消息生产者
 */
@Component
public class ActiveMQProducer {

    @Autowired
    private JmsTemplate jmsTemplate;

    public void sendMessage(String message) {
        jmsTemplate.convertAndSend("activemq.queue", message);
        System.out.println("[ActiveMQ] 发送消息: " + message);
    }
}

(5)创建消费者(src/main/java/com/example/activemq/consumer/ActiveMQConsumer.java)

package com.example.activemq.consumer;

import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;

/**
 * ActiveMQ 消息消费者
 */
@Component
public class ActiveMQConsumer {

    @JmsListener(destination = "activemq.queue")
    public void receiveMessage(String message) {
        System.out.println("[ActiveMQ] 接收到: " + message);
    }
}

(6)创建测试接口(src/main/java/com/example/activemq/web/ActiveMQController.java)

package com.example.activemq.web;

import com.example.activemq.producer.ActiveMQProducer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

/**
 * 测试 ActiveMQ 发送消息
 */
@RestController
public class ActiveMQController {

    @Autowired
    private ActiveMQProducer producer;

    @GetMapping("/activemq/send")
    public String send(@RequestParam String msg) {
        producer.sendMessage(msg);
        return "✅ ActiveMQ 消息已发送: " + msg;
    }
}

(7)启动类(src/main/java/com/example/activemq/Application.java

package com.example.activemq;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class Application {
    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
}

二、RabbitMQ(基于 AMQP 标准)

RabbitMQ 是基于 AMQP(Advanced Message Queuing Protocol) 的开源消息中间件,以灵活的路由机制著称,支持多种交换机类型(Direct、Fanout、Topic、Headers)。

(1)添加依赖(pom.xml)

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

(2)配置文件(application.yml)

spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
    virtual-host: /

(3)配置交换机、队列、绑定(src/main/java/com/example/rabbitmq/config/RabbitConfig.java)

package com.example.rabbitmq.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * RabbitMQ 配置:定义交换机、队列、绑定
 */
@Configuration
public class RabbitConfig {

    public static final String QUEUE = "rabbit.queue";
    public static final String EXCHANGE = "rabbit.exchange";
    public static final String ROUTING_KEY = "rabbit.key";

    @Bean
    public Queue queue() {
        return QueueBuilder.durable(QUEUE).build();
    }

    @Bean
    public DirectExchange exchange() {
        return ExchangeBuilder.directExchange(EXCHANGE).durable(true).build();
    }

    @Bean
    public Binding binding(Queue queue, DirectExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY);
    }
}

(4)创建生产者(src/main/java/com/example/rabbitmq/producer/RabbitProducer.java)

package com.example.rabbitmq.producer;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import static com.example.rabbitmq.config.RabbitConfig.*;

/**
 * RabbitMQ 消息生产者
 */
@Component
public class RabbitProducer {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendMessage(String message) {
        rabbitTemplate.convertAndSend(EXCHANGE, ROUTING_KEY, message);
        System.out.println("[RabbitMQ] 发送消息: " + message);
    }
}

(5)创建消费者(src/main/java/com/example/rabbitmq/consumer/RabbitConsumer.java)

package com.example.rabbitmq.consumer;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import static com.example.rabbitmq.config.RabbitConfig.QUEUE;

/**
 * RabbitMQ 消息消费者
 */
@Component
public class RabbitConsumer {

    @RabbitListener(queues = QUEUE)
    public void receiveMessage(String message) {
        System.out.println("[RabbitMQ] 接收到: " + message);
    }
}

(6)创建测试接口(src/main/java/com/example/rabbitmq/web/RabbitController.java)

package com.example.rabbitmq.web;

import com.example.rabbitmq.producer.RabbitProducer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class RabbitController {

    @Autowired
    private RabbitProducer producer;

    @GetMapping("/rabbit/send")
    public String send(@RequestParam String msg) {
        producer.sendMessage(msg);
        return "✅ RabbitMQ 消息已发送: " + msg;
    }
}

(7)启动类(src/main/java/com/example/rabbitmq/Application.java)

package com.example.rabbitmq;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class Application {
    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
}

三、Kafka(分布式流平台)

Apache Kafka 是一个高吞吐、分布式、基于日志的消息系统,常用于大数据、日志收集、事件流处理。

(1)添加依赖(pom.xml)

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

(2)配置文件(application.yml)

spring:
  kafka:
    bootstrap-servers: localhost:9092
    consumer:
      group-id: kafka-group
      auto-offset-reset: earliest
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer

(3)创建生产者(src/main/java/com/example/kafka/producer/KafkaProducer.java)

package com.example.kafka.producer;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

/**
 * Kafka 消息生产者
 */
@Component
public class KafkaProducer {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public void sendMessage(String message) {
        kafkaTemplate.send("kafka.topic", message);
        System.out.println("[Kafka] 发送消息: " + message);
    }
}

(4)创建消费者(src/main/java/com/example/kafka/consumer/KafkaConsumer.java

package com.example.kafka.consumer;

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

/**
 * Kafka 消息消费者
 */
@Component
public class KafkaConsumer {

    @KafkaListener(topics = "kafka.topic", groupId = "kafka-group")
    public void consume(String message) {
        System.out.println("[Kafka] 接收到: " + message);
    }
}

(5)创建测试接口(src/main/java/com/example/kafka/web/KafkaController.java)

package com.example.kafka.web;

import com.example.kafka.producer.KafkaProducer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class KafkaController {

    @Autowired
    private KafkaProducer producer;

    @GetMapping("/kafka/send")
    public String send(@RequestParam String msg) {
        producer.sendMessage(msg);
        return "✅ Kafka 消息已发送: " + msg;
    }
}

(6)启动类(src/main/java/com/example/kafka/Application.java

package com.example.kafka;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class Application {
    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
}