您的位置 首页 java

Java大数据——JMS技术

JMS(Java Messaging Service) Java 平台上有关面向消息中间件(MOM)的技术规范,它便于消息系统中的Java应用程序进行消息交换,并且通过提供标准的产生、发送、接收消息的接口简化企业应用的开发,翻译为 Java消息服务

JMS应用程序接口是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。Java消息服务是一个与具体平台无关的API,绝大多数MOM提供商都对JMS提供支持。

JMS是一种与厂商无关的 API,用来访问消息收发系统消息。它类似于JDBC(Java Database Connectivity):这里,JDBC 是可以用来访问许多不同关系数据库的 API,而 JMS 则提供同样与厂商无关的访问方法,以访问消息收发服务。 JMS 使您能够通过消息收发服务(有时称为消息中介程序或路由器)从一个 JMS 客户机向另一个 JMS客户机发送消息。

消息是 JMS 中的一种类型对象,由两部分组成:报头和消息主体。报头由路由信息以及有关该消息的元数据组成。消息主体则携带着应用程序的数据或有效负载。根据有效负载的类型来划分,可以将消息分为几种类型,它们分别携带:简单文本(Text message )、可序列化的对象 (ObjectMessage)、属性集合 (Map Message )、字节流 (BytesMessage)、原始值流 (StreamMessage),还有无有效负载的消息 (Message)。

JVM体系架构

MS由以下元素组成。

JMS提供者provider: 如activeMQ,连接面向消息中间件的,JMS接口的一个实现 。提供者可以是Java平台的JMS实现,也可以是非Java平台的面向消息中间件的适配器。

JMS客户:生产或消费基于消息的Java的应用程序或对象。

JMS生产者:创建并发送消息的JMS客户。

JMS消费者:接收消息的JMS客户。

JMS消息:包括可以在JMS客户之间传递的数据的对象

JMS队列:一个容纳那些被发送的等待阅读的消息的区域。与队列名字所暗示的意思不同,消息的接受顺序并不一定要与消息的发送顺序相同。一旦一个消息被阅读,该消息将被从队列中移走。

JMS主题:一种支持发送消息给多个订阅者的机制。

JMS结构支持两种模型

1、点对点或队列模型

在点对点或队列模型下,一个生产者向一个 特定的队列发布消息 ,一个消费者从该队列中读取消息。这里,生产者知道消费者的队列,并直接将消息发送到消费者的队列。

Java大数据——JMS技术

这种模式被概括为:

只有一个消费者将获得消息;

生产者不需要在接收者消费该消息期间处于运行状态,接收者也同样不需要在消息发送时处于运行状态;

每一个成功处理的消息都由接收者签收。

2、发布者/订阅者模型

发布者/订阅者模型支持向一个 特定的消息主题发布消息 。0或多个订阅者可能对接收来自特定消息主题的消息感兴趣。在这种模型下,发布者和订阅者彼此不知道对方。这种模式好比是 匿名公告板

Java大数据——JMS技术

这种模式被概括为:

多个消费者可以获得消息;

在发布者和订阅者之间存在时间依赖性。发布者需要建立一个订阅(subscription),以便客户能够订阅。订阅者必须保持持续的活动状态以接收消息,除非订阅者建立了持久的订阅。在那种情况下,在订阅者未连接时发布的消息将在订阅者重新连接时重新发布。

代码实现

注意:相关连接参数放在配置文件中,就不列出来了。

生产者:

  1. import javax.jms.Connection;
  2. import javax.jms.DeliveryMode;
  3. import javax.jms.Destination;
  4. import javax.jms.JMSException;
  5. import javax.jms.MessageProducer;
  6. import javax.jms. session ;
  7. import javax.jms.TextMessage;
  8. import org.apache.activemq.ActiveMQConnection;
  9. import org.apache.activemq.ActiveMQConnectionFactory;
  10. public class ProducerTool {
  11. private String user = ActiveMQConnection.DEFAULT_USER;
  12. private String password = ActiveMQConnection.DEFAULT_PASSWORD;
  13. private String url = ActiveMQConnection.DEFAULT_BROKER_URL;
  14. private String subject = “myqueue”;
  15. private Destination destination = null;
  16. private Connection connection = null;
  17. private Session session = null;
  18. private MessageProducer producer = null;
  19. // 初始化
  20. private void initialize() throws JMS Exception , Exception {
  21. ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
  22. user, password, url);
  23. connection = connectionFactory.createConnection();
  24. session = connection.createSession( false , Session.AUTO_ACKNOWLEDGE);
  25. destination = session.createQueue(subject);
  26. producer = session.createProducer(destination);
  27. producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
  28. }
  29. // 发送消息
  30. public void produceMessage(String message) throws JMSException, Exception {
  31. initialize();
  32. TextMessage msg = session.createTextMessage(message);
  33. connection.start();
  34. System.out.println(“Producer:->Sending message: ” + message);
  35. producer.send(msg);
  36. System.out.println(“Producer:->Message sent complete!”);
  37. }
  38. // 关闭连接
  39. public void close () throws JMSException {
  40. System.out.println(“Producer:->Closing connection”);
  41. if (producer != null)
  42. producer.close();
  43. if (session != null)
  44. session.close();
  45. if (connection != null)
  46. connection.close();
  47. }
  48. }
  49. //生产者测试
  50. public class ProducerTest {
  51. /**
  52. * @param args
  53. * @throws Exception
  54. * @throws JMSException
  55. */
  56. public static void main(String[] args) throws JMSException, Exception{
  57. ProducerTool producer = new ProducerTool();
  58. producer.produceMessage(“Hello, world!”);
  59. producer.close();
  60. }
  61. }

消费者:

  1. public class ConsumerTool implements MessageListener,ExceptionListener {
  2. private String user = ActiveMQConnection.DEFAULT_USER;
  3. private String password = ActiveMQConnection.DEFAULT_PASSWORD;
  4. private String url = ActiveMQConnection.DEFAULT_BROKER_URL;
  5. private String subject = “myqueue”;
  6. private Destination destination = null;
  7. private Connection connection = null;
  8. private Session session = null;
  9. private MessageConsumer consumer = null;
  10. private ActiveMQConnectionFactory connectionFactory=null;
  11. public static Boolean isconnection=false;
  12. // 初始化
  13. private void initialize() throws JMSException {
  14. connectionFactory= new ActiveMQConnectionFactory(
  15. user, password, url);
  16. connection = connectionFactory.createConnection();
  17. session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
  18. destination = session.createQueue(subject);
  19. consumer = session.createConsumer(destination);
  20. }
  21. // 消费消息
  22. public void consumeMessage() throws JMSException {
  23. initialize();
  24. connection.start();
  25. consumer.setMessageListener(this);
  26. connection.setExceptionListener(this);
  27. System.out.println(“Consumer:->Begin listening…”);
  28. isconnection=true;
  29. // 开始监听
  30. Message message = consumer.receive();
  31. System.out.println(message.getJMSMessageID());
  32. }
  33. // 关闭连接
  34. public void close() throws JMSException {
  35. System.out.println(“Consumer:->Closing connection”);
  36. if (consumer != null)
  37. consumer.close();
  38. if (session != null)
  39. session.close();
  40. if (connection != null)
  41. connection.close();
  42. }
  43. // 消息处理函数
  44. public void onMessage(Message message) {
  45. try {
  46. if (message instanceof TextMessage) {
  47. TextMessage txtMsg = (TextMessage) message;
  48. String msg = txtMsg.getText();
  49. System.out.println(“Consumer:->Received: ” + msg);
  50. } else {
  51. System.out.println(“Consumer:->Received: ” + message);
  52. }
  53. } catch (JMSException e) {
  54. e.printStackTrace();
  55. }
  56. }
  57. public void onException(JMSException arg0){
  58. isconnection=false;
  59. }
  60. }
  61. //消费者测试:
  62. public class ConsumerTest implements Runnable {
  63. static Thread t1 = null;
  64. public static void main(String[] args) throws InterruptedException {
  65. t1 = new Thread(new ConsumerTest());
  66. t1.start();
  67. while (true) {
  68. System.out.println(t1.isAlive());
  69. if (!t1.isAlive()) {
  70. t1 = new Thread(new ConsumerTest());
  71. t1.start();
  72. System.out.println(“重新启动”);
  73. }
  74. Thread.sleep(5000);
  75. }
  76. // 延时500毫秒之后停止接受消息
  77. // Thread.sleep(500);
  78. // consumer.close();
  79. }
  80. public void run() {
  81. try {
  82. ConsumerTool consumer = new ConsumerTool();
  83. consumer.consumeMessage();
  84. while (ConsumerTool.isconnection) {
  85. //System.out.println(123);
  86. }
  87. } catch (Exception e) {
  88. }
  89. }
  90. }

文章来源:智云一二三科技

文章标题:Java大数据——JMS技术

文章地址:https://www.zhihuclub.com/176149.shtml

关于作者: 智云科技

热门文章

网站地图