您的位置 首页 java

项目实战——基于RabbitMQ实现数据库、elasticsearch的数据同步

业务需求:

  • 1、商品上架时:search-service新增商品到elasticsearch
  • 2、商品下架时:search-service删除elasticsearch中的商品

需求分析:

数据同步是希望,当我们商品修改了数据库中的商品信息, 索引 库中的信息也会跟着改。在 微服务 中数据库和索引库是在两个不同的服务中。如果,商品的服务,向es的服务中发个消息,通知ES服务就可以实现数据的同步。此时我们利用 MQ 接收商品服务的消息,实现ES服务对消息的监听就可以了。

业务模型:

​​​​​​

业务实现:

商品微服务:消息发送

pom.xml

1.引入amqp依赖

 <!--amqp-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>  

2.application.yml文件中配置MQ的地址(5大参数)

  rabbitmq :
    host: 124.223.41.137
    port: 5672
     virtual -host: /
    username: rabbitmq
    password: 123321  

3.ItemService(商品的service)

 @Service
public class ItemService  extends  ServiceImpl<ItemMapper, Item> implements IItemService {

    @Autowired
     private  RabbitTemplate rabbitTemplate;

    @Transactional
    @Override
    public  void  updateStatus(Long id,  Integer  status) {
        // update tb_item set status = ? where id = ?
        this.update().set("status", status).eq("id", id).update();

        // 根据上下架判断RoutingKey
         String  routingKey = status == 1 ? "item.up" : "item.down";
        // 发送消息
        rabbitTemplate.convertAndSend("item.topic", routingKey, id);
    }

   }  

ES微服务:消息接收

pom.xml

1.引入amqp依赖

 <!--amqp-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>  

2.application.yml文件中配置MQ的地址(5大参数)

 rabbitmq:
    host: 124.223.41.137
    port: 5672
    virtual-host: /
    username: rabbitmq
    password: 123321  

3.新建ItemLister类监听消息

 @Component  //注册成一个 Bean 
public class ItemListener {


    @Autowired
    private ISearchService searchService;
//基于注解的方式现实 交换机 和队列的绑定

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "up.queue"),
            exchange = @Exchange(name = "item.topic", type = ExchangeTypes.TOPIC),
            key = "item.up"
    ))

//新增业务
    public void listenItemUp(Long id){
        searchService.saveItemById(id);
    }

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "down.queue"),
            exchange = @Exchange(name = "item.topic", type = ExchangeTypes.TOPIC),
            key = "item.down"
    ))
//删除业务
    public void listenItemDown(Long id){
        searchService.deleteItemById(id);
    }

}  

4.实现ItemLister中新增和删除的Service层方法:

 //消息同步:上架——新增索引库数据
    void deleteItemById(Long id);


    //消息同步:下架——删除索引库数据
    void saveItemById(Long id);  

5.实现ItemLister中新增和删除的Service实现类方法:

 //删除方法
@Override
    public void deleteItemById(Long id) {
        try {
            // 1.准备 request 
            DeleteRequest request = new DeleteRequest("item", id.toString());
            // 2.发请求
             rest HighLevelClient.delete(request, RequestOptions.DEFAULT);
        } catch (IO Exception  e) {
            throw new RuntimeException(e);
        }
    }

//注入——实现远程调用(Item的服务中有商品的查询方法)
@Autowired
    private ItemClient itemClient;

//新增方法
    @Override
    public void saveItemById(Long id) {
        try {
            // 1.查询商品数据
            Item item = itemClient.queryItemById(id);
            // 2.准备Request
            IndexRequest request = new IndexRequest("item").id(id.toString());
            // 3.准备DSL
            request.source(MAPPER.writeValueAsString(new ItemDoc(item)), XContentType.JSON);
            // 4.发送请求
            restHighLevelClient.index(request, RequestOptions.DEFAULT);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }  

文章来源:智云一二三科技

文章标题:项目实战——基于RabbitMQ实现数据库、elasticsearch的数据同步

文章地址:https://www.zhihuclub.com/193427.shtml

关于作者: 智云科技

热门文章

网站地图