安装部署RocketMQ
下载介质:rocketmq-all-4.9.2-bin-release.zip,解压到目录。
系统环境变量配置(windows)
ROCKETMQ_HOME
启动NameServer
cd %ROCKETMQ_HOME%
start mqnamesrv.cmd
启动Broker
cd %ROCKETMQ_HOME%
start mqbroker.cmd -n 127.0.0.1 :9876 autoCreateTopicEnable=true
Springboot集成RocketMQ
pom.xml
<properties>
< java .version>1.8</java.version>
< maven .compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<rocketmq-spring-boot-starter-version>2.1.1</rocketmq-spring-boot-starter-version>
</properties>
<dependencies>
<dependency>
<groupId>org. apache .rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>${rocketmq- spring -boot-starter-version}</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.6</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
配置文件application.yml
rocketmq:
name-server: 127.0.0.1:9876
producer:
group: rocketmq-group
启动类
package com.what21.rocketmq01.demo01;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class RocketMQ01Demo01Application {
public static void main(String[] args) {
SpringApplication.run(RocketMQ01Demo01Application.class, args);
}
}
消息包装类
package com.what21.rocketmq01.demo01.message;
import lombok.Data;
import java.io. Serializable ;
@Data
public class Message<T> implements Serializable {
private String id;
private T content;
}
消费者端(@RocketMQMessageListener):
package com.what21.rocketmq01.demo01.consumer;
import com.what21.rocketmq01.demo01.message.Message;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring. annotation .RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;
@Slf4j
@Service("rocketMQConsumer")
public class RocketMQ consumer {
@Service("consumerOne")
@RocketMQMessageListener(topic = "topic-queue-one", consumerGroup = "consumer_topic-queue-one")
public class ConsumerOne implements RocketMQListener<Message> {
@ Override
public void onMessage(Message message) {
log.info("consumer-one received message: {}", message);
}
}
@Service("consumerTwo")
@RocketMQMessageListener(topic = "topic-queue-two", consumerGroup = "consumer_topic-queue-two")
public class ConsumerTwo implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
log.info("consumer-two received message: {}", message);
}
}
}
生产者端:
package com.what21.rocketmq01.demo01.producer;
import com.what21.rocketmq01.demo01.message.Message;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
import java.util.UUID;
/**
* 生产者
*/@Component("rocketMQProducer")
public class RocketMQProducer implements CommandLineRunner {
private RocketMQTemplate rocketMQTemplate;
@Autowired
public RocketMQProducer(RocketMQTemplate rocketMQTemplate) {
this.rocketMQTemplate = rocketMQTemplate;
}
@Override
public void run(String... args) throws Exception {
Message<String> message = new Message<>();
message.setId(UUID.randomUUID().toString());
message.setContent("Hello, SpringBoot RocketMQ !");
rocketMQTemplate.convertAndSend("topic-queue-one", message);
rocketMQTemplate.convertAndSend("topic-queue-two", "Hello, SpringBoot RocketMQ !");
}
}
运行输出:
consumer-two received message: Hello, SpringBoot RocketMQ !
consumer-one received message: Message(id=7a8f9eee-d242-407f-8bd3-f622950c078d, content=Hello, SpringBoot RocketMQ !)