业务需求:
- 1、商品上架时:search-service新增商品到elasticsearch
- 2、商品下架时:search-service删除elasticsearch中的商品
需求分析:
数据同步是希望,当我们商品修改了数据库中的商品信息, 索引 库中的信息也会跟着改。在 微服务 中数据库和索引库是在两个不同的服务中。如果,商品的服务,向es的服务中发个消息,通知ES服务就可以实现数据的同步。此时我们利用 MQ 接收商品服务的消息,实现ES服务对消息的监听就可以了。
业务模型:
业务实现:
商品微服务:消息发送
pom.xml
1.引入amqp依赖
<!--amqp-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2.application.yml文件中配置MQ的地址(5大参数)
rabbitmq :
host: 124.223.41.137
port: 5672
virtual -host: /
username: rabbitmq
password: 123321
3.ItemService(商品的service)
@Service
public class ItemService extends ServiceImpl<ItemMapper, Item> implements IItemService {
@Autowired
private RabbitTemplate rabbitTemplate;
@Transactional
@Override
public void updateStatus(Long id, Integer status) {
// update tb_item set status = ? where id = ?
this.update().set("status", status).eq("id", id).update();
// 根据上下架判断RoutingKey
String routingKey = status == 1 ? "item.up" : "item.down";
// 发送消息
rabbitTemplate.convertAndSend("item.topic", routingKey, id);
}
}
ES微服务:消息接收
pom.xml
1.引入amqp依赖
<!--amqp-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2.application.yml文件中配置MQ的地址(5大参数)
rabbitmq:
host: 124.223.41.137
port: 5672
virtual-host: /
username: rabbitmq
password: 123321
3.新建ItemLister类监听消息
@Component //注册成一个 Bean
public class ItemListener {
@Autowired
private ISearchService searchService;
//基于注解的方式现实 交换机 和队列的绑定
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "up.queue"),
exchange = @Exchange(name = "item.topic", type = ExchangeTypes.TOPIC),
key = "item.up"
))
//新增业务
public void listenItemUp(Long id){
searchService.saveItemById(id);
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "down.queue"),
exchange = @Exchange(name = "item.topic", type = ExchangeTypes.TOPIC),
key = "item.down"
))
//删除业务
public void listenItemDown(Long id){
searchService.deleteItemById(id);
}
}
4.实现ItemLister中新增和删除的Service层方法:
//消息同步:上架——新增索引库数据
void deleteItemById(Long id);
//消息同步:下架——删除索引库数据
void saveItemById(Long id);
5.实现ItemLister中新增和删除的Service实现类方法:
//删除方法
@Override
public void deleteItemById(Long id) {
try {
// 1.准备 request
DeleteRequest request = new DeleteRequest("item", id.toString());
// 2.发请求
rest HighLevelClient.delete(request, RequestOptions.DEFAULT);
} catch (IO Exception e) {
throw new RuntimeException(e);
}
}
//注入——实现远程调用(Item的服务中有商品的查询方法)
@Autowired
private ItemClient itemClient;
//新增方法
@Override
public void saveItemById(Long id) {
try {
// 1.查询商品数据
Item item = itemClient.queryItemById(id);
// 2.准备Request
IndexRequest request = new IndexRequest("item").id(id.toString());
// 3.准备DSL
request.source(MAPPER.writeValueAsString(new ItemDoc(item)), XContentType.JSON);
// 4.发送请求
restHighLevelClient.index(request, RequestOptions.DEFAULT);
} catch (IOException e) {
throw new RuntimeException(e);
}
}