您的位置 首页 java

SpringBoot项目:RedisTemplate实现轻量级消息队列

背景

公司项目有个需求, 前端上传excel文件, 后端读取数据、处理数据、返回错误数据, 最简单的方式同步处理, 客户端上传文件后一直阻塞等待响应, 但用户体验无疑很差, 处理数据可能十分耗时, 没人愿意傻等, 由于项目暂未使用ActiveMQ等消息队列中间件, 而 redis 的lpush和rpop很适合作为一种轻量级的消息队列实现, 所以用它完成此次功能开发

一、本文涉及知识点

  • excel文件读写–阿里easyexcel sdk
  • 文件上传、下载–腾讯云对象存储
  • 远程服务调用–restTemplate
  • 生产者、消费者–redisTemplate leftPush和rightPop操作
  • 异步处理数据–Executors线程池
  • 读取网络文件流–HttpClient
  • 自定义注解实现用户身份认证–JWT token认证, 拦截器拦截标注有@LoginRequired注解的请求入口

当然, Java 实现咯

涉及的知识点比较多, 每一个知识点都可以作为专题进行学习分析, 本文将完整实现呈现出来, 后期拆分与小伙伴分享学习

二、项目目录结构

说明: 数据库DAO层放到另一个模块了, 不是本文重点

三、主要maven依赖

1.easyexcel

 <easyexcel-latestVersion>1.1.2-beta4</easyexcel-latestVersion>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>easyexcel</artifactId>
            <version>${easyexcel-latestVersion}</version>
        </dependency>2.JWT  

2.JWT

         <dependency>
            <groupId>io.jsonwebtoken</groupId>
            <artifactId>jjwt</artifactId>
            <version>0.7.0</version>
        </dependency>  

3.redis

         <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-redis</artifactId>
            <version>1.3.5.RELEASE</version>
        </dependency>  

4.腾讯cos

       <dependency>
            <groupId>com.qcloud</groupId>
            <artifactId>cos_api</artifactId>
            <version>5.4.5</version>
        </dependency>  

四、流程

  • 用户上传文件
  • 将文件存储到腾讯cos
  • 将上传后的文件id及上传记录保存到数据库
  • redis生产一条导入消息, 即保存文件id到redis
  • 请求结束, 返回”处理中”状态
  • redis消费消息
  • 读取cos文件, 异步处理数据
  • 将错误数据以excel形式上传至cos, 以供用户下载, 并更新处理状态为”处理完成”
  • 客户端轮询查询处理状态, 并可以下载错误文件
  • 结束

五、实现效果

1.上传文件

2.数据库导入记录

3.导入的数据

4.下载错误文件

5.错误数据提示

6.查询导入记录

六、代码实现

1、导入excel控制层

     @LoginRequired
    @RequestMapping(value = "doImport", method = RequestMethod.POST)
    public JsonResponse doImport(@RequestParam(" file ") MultipartFile file, HttpServletRequest request) {
        PLUser user = getUser(request);
        return orderImportService.doImport(file, user.getId());
    }  

2、service层

     @ Override 
    public JsonResponse doImport(MultipartFile file, Integer userId) {
        if (null == file || file.isEmpty()) {
            throw new ServiceException("文件不能为空");
        }

        String filename = file.getOriginalFilename();
        if (!checkFileSuffix(filename)) {
            throw new ServiceException("当前仅支持xlsx格式的excel");
        }

        // 存储文件
        String fileId = saveToOss(file);
        if (StringUtils.isBlank(fileId)) {
            throw new ServiceException("文件上传失败, 请稍后重试");
        }

        // 保存记录到数据库
        saveRecordToDB(userId, fileId, filename);

        // 生产一条订单导入消息
        redisProducer.produce(RedisKey.orderImportKey, fileId);

        return JsonResponse.ok("导入成功, 处理中...");
    }

    /**
     * 校验文件格式
     * @param fileName
     * @return
     */    private static boolean checkFileSuffix(String fileName) {
        if (StringUtils.isBlank(fileName) || fileName.lastIndexOf(".") <= 0) {
            return false;
        }

        int pointIndex = fileName.lastIndexOf(".");
        String suffix = fileName.substring(pointIndex, fileName.length()).toLowerCase();
        if (".xlsx".equals(suffix)) {
            return true;
        }

        return false;
    }

   /**
     * 将文件存储到腾讯OSS
     * @param file
     * @return
     */    private String saveToOss(MultipartFile file) {
        InputStream ins = null;
        try {
            ins = file.getInputStream();
        } catch (IOException e) {
            e.printStackTrace();
        }

        String fileId;
        try {
            String originalFilename = file.getOriginalFilename();
            File f = new File(originalFilename);
            inputStreamToFile(ins, f);
            FileSystemResource resource = new FileSystemResource(f);

            MultiValueMap<String,  object > param = new LinkedMultiValueMap<>();
            param.add("file", resource);

            ResponseResult responseResult = restTemplate.postForObject(txOssUploadUrl, param, ResponseResult.class);
            fileId = (String) responseResult.getData();
        } catch (Exception e) {
            fileId = null;
        }

        return fileId;
    }  

3、redis生产者

 @Service
public class RedisProducerImpl implements RedisProducer {

    @Autowired
    private RedisTemplate redisTemplate;

    @Override
    public JsonResponse produce(String key, String msg) {
        Map<String, String> map = Maps.newHashMap();
        map.put("fileId", msg);
        redisTemplate.opsForList().leftPush(key, map);
        return JsonResponse.ok();
    }

}  

4、redis消费者

 @Service
public class RedisConsumer {

    @Autowired
    public RedisTemplate redisTemplate;

    @Value("${txOssFileUrl}")
    private String txOssFileUrl;

    @Value("${txOssUploadUrl}")
    private String txOssUploadUrl;

    @PostConstruct
    public void init() {
        processOrderImport();
    }

    /**
     * 处理订单导入
     */    private void processOrderImport() {
        ExecutorService executorService = Executors.newCachedThreadPool();
        executorService.execute(() -> {
            while (true) {
                Object object = redisTemplate.opsForList().rightPop(RedisKey.orderImportKey, 1, TimeUnit.SECONDS);
                if (null == object) {
                    continue;
                }
                String msg = JSON.toJSONString(object);
                executorService.execute(new OrderImportTask(msg, txOssFileUrl, txOssUploadUrl));
            }
        });
    }

}  

5、处理任务线程类

SpringBoot项目:RedisTemplate实现轻量级消息队列

说明: 处理数据的业务逻辑代码就不用贴了

6、上传文件到cos

SpringBoot项目:RedisTemplate实现轻量级消息队列

7、下载文件

8、读取网络文件流

9、ExcelUtil

说明: 至此, 整个流程算是完整了, 下面将其他知识点代码也贴出来参考

七、其他

1、@LoginRequired注解

 /**
 * 在需要登录验证的Controller的方法上使用此注解
 */@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface LoginRequired {
}  

2、MyControllerAdvice

3、AuthenticationInterceptor

4、JwtUtil

文章来源:智云一二三科技

文章标题:SpringBoot项目:RedisTemplate实现轻量级消息队列

文章地址:https://www.zhihuclub.com/193155.shtml

关于作者: 智云科技

热门文章

网站地图