您的位置 首页 java

使用Java客户端发送消息和消费的应用

实验简介

本教程将Demo演示使用java客户端发送消息和消费的应用场景

实验实操

第1节 如何发送和消费并发消息

并发消息,也叫普通消息,是相对顺序消息而言的,普通消息的效率最高。本教程将简单演示如何使用纯java client发送和消费消息。

1. 下载java代码demo(已下载则忽略操作)

 cd /data/demos

git clone   

2. 打包,执行代码demo

再执行命令, 可以看到正常生产和消费输出

 // 进入demo代码目录
cd /data/demos/06-all-java-demos/

// 打包
mvn clean package

// 运行代码
mvn exec:java -Dexec.args="127.0.0.1:39876" -Dexec.mainClass="org.apache.rocketmqdemos.ConcurrentMessageDemo" -Dexec.classpathScope=runtime  

3. Demo代码说明

Demo代码可以查看 github 。并发消息,意思是生产者可以并发地向topic中发送消息, 消费端不区分顺序的消息,这种模式效率最好。生产者demo代码如下:

最后留一个思考题给大家: 生产者实例和消费者实例, 都是线程安全的吗?

第2节 如何发送和消费顺序消息

顺序消息分为分区有序和全局有序。生产消费代码都是一样的, 区别在于分区有序的topic中queue个数可以是任意有效值,全局有序的topic要求queue的个数为1。顺序消息的实现非常简单易懂,但牺牲了可用性,单节点故障会直接影响顺序消息。

什么是分区有序消息,什么场景应该使用呢,又该如何发送分区有序消息?分区有序表示在一个queue中的消息是有序的,发送消息时设置设置了相同key的消息会被发送到同一个queue中。

本教程将简单演示如何使用纯java client发送和消费顺序消息。

1. 下载java代码demo(已下载则忽略操作)

 cd /data/demos

git clone   

2. 打包,执行代码demo

再执行命令, 可以看到正常生产和消费输出。 消费输出注意看相同queue id的消息输出内容中的数字,按照从小到大就是正确的。

 // 进入demo代码目录
cd /data/demos/06-all-java-demos/

// 打包
mvn clean package

// 运行代码
mvn exec:java -Dexec.args="127.0.0.1:39876" -Dexec.mainClass="org.apache.rocketmqdemos.OrderMessageDemo1" -Dexec.classpathScope=runtime  

3. Demo代码说明

Demo代码可以查看 github 。

  • 生产者说明

生产者会根据设置的keys做hash,相同hash值的消息会发送到相同的queue中。所以相同hash值的消息需要保证在同一个线程中顺序地发送。

  • 消费者说明

消费者使用起来相对比较简单, 消息监听类实现org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly接口即可。相同queue的消息需要串行处理,这样就保证消费的顺序性

第3节 如何发送和消费延迟消息

延迟消息,对于一些特殊场景比如订票后30分钟不支付自动取消等类似场景比较有用。本教程将简单演示如何使用纯java client发送和消费延迟消息。

1. 下载java代码demo(已下载则忽略操作)

 cd /data/demos

git clone   

2. 打包,执行代码demo

执行命令, 可以看到正常生产和消费输出。 目前 RocketMQ支持多种延迟级别 , 不过每种延迟级别都是基于RocketMQ自身,实际延迟时间会加上Broker-Client端的网络情况不同而略有差异。

 // 进入demo代码目录
cd /data/demos/06-all-java-demos/

// 打包
mvn clean package

// 运行代码
mvn exec:java -Dexec.args="127.0.0.1:39876" -Dexec.mainClass="org.apache.rocketmqdemos.DelayMessageDemo" -Dexec.classpathScope=runtime  

3. Demo代码说明

Demo代码可以查看 github 。

  • 生产者说明

生产者在发送消息的时候需要设置延迟级别, RocketMQ支持多种延迟级别 。如果把延迟时间算作一个以空格分割的数组,延迟级别就是延迟时间数组的下标index+1。 RocketMQ如何解析延迟级别和延迟时间映射关系。

  • 消费者说明: 消费者按照并发消息消费即可

第4节 如何发送和消费事务消息

事务消息,是RocketMQ解决分布式事务的一种实现,极其简单好用。一个事物消息大致的生命周期如下图

概括为如下几个重要点:

  1. 生产者发送half消息(事物消息)
  2. Broker存储half消息
  3. 生产者处理本地事物,处理成功后commit事物
  4. 消费者消费到事物消息

本教程将简单演示如何使用纯java client发送和消费事物消息。

1. 下载java代码demo(已下载则忽略操作)

 cd /data/demos

git clone   

2. 打包,执行代码demo

执行命令, 可以看到事物消息的全部过程。

 // 进入demo代码目录
cd /data/demos/06-all-java-demos/

// 打包
mvn clean package

// 运行代码
mvn exec:java -Dexec.args="127.0.0.1:39876" -Dexec.mainClass="org.apache.rocketmqdemos.TransactionMessageDemo" -Dexec.classpathScope=runtime  

3. Demo代码说明

Demo代码可以查看 github 。在实物消息中,消费代码和普通消息的消费一样,主要代码在生产者端。

生产者端的主要代码包含3个步骤:

  1. 初始化生产者,设置回调线程池、设置本地事物处理监听类。

这里注意事物消息的生产种类是: org.apache.rocketmq.client.producer.TransactionMQProducer, 而不是普通生产者类。

事物监听类需要实现2个方法,这里的逻辑都是mock的,实际使用的时候需要根据实际修改。

  1. 发送事物消息。调用sendMessageInTransaction()方法发送事物消息, 而不是以前的send()方法。

第5节 生产者消费者如何同步发送、消费消息(Request-Reply)

request-reply模式,可以满足目前类似RPC同步调用的场景,本教程将简单演示如何使用该模式。

1. 下载java代码demo(已下载则忽略操作)

 cd /data/demos

git clone   

2. 打包,执行代码demo

执行命令, 可以看到正常生产和消费输出。

 // 进入demo代码目录
cd /data/demos/06-all-java-demos/

// 打包
mvn clean package

// 运行代码
mvn exec:java -Dexec.args="127.0.0.1:39876" -Dexec.mainClass="org.apache.rocketmqdemos.RequestReplyMessageDemo" -Dexec.classpathScope=runtime  

通过代码结果和代码比较, 我们得知request-reply类似RPC同步调用的效果。

个人觉得:需要同步调用就用RPC, 不要走MQ,毕竟两者是完全不同的目标的产品,专业的事情交给专业的产品。

3. Demo代码说明

Demo代码可以查看 github 。

request-reply模式,在生产者和消费者两端都和一般的生产消费有区别,下面分别介绍下demo代码。

生产者demo主要代码, 主要区别在于调用request(),而不是send()方法。

消费者demo主要代码: 消费代码主要增加了“回复”逻辑。回复是利用消息发送直接向生产者发送一条消息。 有点类似事物消息中broker回查生产者。

一个小问题:事物消息和request-reply消息时,生产者的生产者组名有什么要求嘛?

第6节 如何有选择性的消费消息

有时候我们只想消费部分消息, 当然全部消费,在代码中过滤。 假如消息海量时, 会有很多资源浪费,比如浪费不必要的带宽。我们可以通过tag,sql92表达式来选择性的消费。

  • 进入broker目录
 cd /usr/local/services/5-rocketmq/broker-01  
  • 编辑配置文件,修改broker配置项2个
 vim conf/broker.conf  

配置项值:

 // 是否支持重试消息也过滤
filterSupportRetry=true

// 支持属性过滤
enablePropertyFilter=true  

修改后:

  • 重启broker
 ./restart.sh  

1. 下载java代码demo(已下载则忽略操作)

 cd /data/demos

git clone   

2. 打包,执行tag过滤代码demo

执行命令, 可以看到正常生产和消费输出。

 // 进入demo代码目录
cd /data/demos/06-all-java-demos/

// 打包
mvn clean package

// 运行代码
mvn exec:java -Dexec.args="127.0.0.1:39876 tag" -Dexec.mainClass="org.apache.rocketmqdemos.FliterMessageDemo" -Dexec.classpathScope=runtime  

3. 执行sql过滤代码demo

执行命令, 可以看到正常生产和消费输出。

 // 进入demo代码目录
cd /data/demos/06-all-java-demos/

// 打包
mvn clean package

// 运行代码
mvn exec:java -Dexec.args="127.0.0.1:39876 sql" -Dexec.mainClass="org.apache.rocketmqdemos.FliterMessageDemo" -Dexec.classpathScope=runtime  

4. Demo代码说明

Demo代码可以查看 github 。以下分别介绍生产者和消费者主要demo代码。

  • 生产者

在生产tag消息的时候, 消息中需要加上发送tag;sql92过滤的时候,加上自定义k-v。

  • 消费者

tag过滤消费时,在订阅topic时, 也添加上tag订阅

SQL92过滤时,添加上SQL过滤订阅。至于SQL92除了等号,还是支持什么,大家可以自行自行查看或者到群里问。

第7节 如何使用ACL客户端生产消费消息

ACL,全称是Access Control List,是RocketMQ设计来做访问和权限控制的。更多文档参见github wiki:

0. 启动一个集群

  • 进入broker目录
 cd /usr/local/services/5-rocketmq/broker-01  
  • 编辑配置文件,修改broker配置项1个
 vim conf/broker.conf  

配置项值:

 aclEnable=true  

修改后:

  • 重启broker
 ./restart.sh  

1. 下载java代码demo(已下载则忽略操作)

 cd /data/demos

git clone   

2. 打包,执行代码demo

执行命令, 可以看到正常生产和消费输出。 demo代码使用的admin权限发送和消费,实际使用需要对于每个topic,消费者组授权,才能正常生产消费。

 // 进入demo代码目录
cd /data/demos/06-all-java-demos/

// 打包
mvn clean package

// 运行代码
mvn exec:java -Dexec.args="127.0.0.1:39876" -Dexec.mainClass="org.apache.rocketmqdemos.ACLDemo" -Dexec.classpathScope=runtime  

3. Demo代码说明

Demo代码可以查看 github 。带ACL的生产者和消费者在初始化的时候,都必须给一个hook实例,构建方法如下:

 static RPCHook getAclRPCHook(String accessKey, String secretKey) {
      return new AclClientRPCHook(new SessionCredentials(accessKey, secretKey));
}  

在broker端secret key用来校验信息的完整性, access key用来校验用户权限。二者缺一不可。

文章来源:智云一二三科技

文章标题:使用Java客户端发送消息和消费的应用

文章地址:https://www.zhihuclub.com/174637.shtml

关于作者: 智云科技

热门文章

评论已关闭

6条评论

  1. The AP histochemistry is compatible with electron microscopic visualization Gustincich et al Received February 24, 2015

  2. The main objectives of this research study are to see whether using these drugs will shrink down tumours before surgery and to see if any shrinkage in the tumour affects the extent of surgery that is required

  3. Use of drama sessions in school communities helped to raise awareness and reduce prejudice about epilepsy in the population

  4. Jamil, Effect of black seed on dextromethorphan o and n demethylation in human liver microsomes and healthy human subjects, Drug Metabolism Letters, vol

网站地图