1、简单介绍
RabbitMQ是一个消息协调器(Broker),它的主要思路非常简单:接收和传输消息。可以把它看成一个邮局:当你将邮件送到邮箱中后,肯定知道邮差将会把封信送到收件人手中。类似地,RabbitMQ就是邮局、邮箱、邮差,唯一的区别就是,邮局传递的是纸质邮件,而RabbitMQ接收、存储、传递的是二进制流数据。
介绍一下RabbitMQ使用的一些术语:
1.1、生产者(Producing )
生产者仅会发送消息。简称”P“。
1.2、队列(Queue)
队列就相当于邮箱,它存在于RabbitMQ Server中。在RabbitMQ的组件中,所有的消息仅能够存储在队列中。队列的大小理论上是无限制的,本质上它是一个无限大的缓存区。多个生产者可以往同一个队列中发送消息,多个消息者也可以从一个队列中接收消息。
1.3、消费者(Consuming )
消费者就是接收消息。它主要是等待接收消息,简称”C“。
2、HelloWorld
HelloWorld将非常简单,P发送消息,C接收消息并打印它。如下图所示,中间是一个队列:
2.1、JAVA实现
2.1.1、安装配置
下载java客户端包:client library package,解压后,将jar包加入到开发环境的classpath中。若使用maven管理方式,查找groupId为com.rabbitmq,artifactId为amqp-client的归档。
2.1.2、发送端
主要完成发送消息的功能,具体实现代码如下:
package com.zenfery.example.rabbitmq; import java.io.IOException; import java.util.Scanner; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; //生产者 public class Sender { private static final String QUEUE_NAME = "hello"; // 队列名称 public static void main(String[] args) throws IOException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setPort(5672); //创建连接 Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); //定义队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); //发送消息,接收控制台输入,并将其发送 Scanner scanner = new Scanner(System.in); while(scanner.hasNextLine()){ String message = scanner.nextLine(); //退出 if(message != null && "quit".equals(message)){ channel.close(); connection.close(); break; } //发送消息 else{ channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println(" >>>发送:"+message+""); } } } }
使用ConnectionFactory创建连接,实质上,此连接就是一个Socket连接,设置host为localhost来连接本地的RabbitMQ Server,设置Port为5672(默认队列连接端口),未设置用户名和密码,将默认使用guest/guest口令(当然,此口令仅能在localhost本地使用)。
使用channel.queueDeclare()来定义队列,在RabbitMQ中,队列仅能够创建一次,如果发现已经存在此队列,将会忽略此方法。
使用Scanner来从控制台循环输入内容,以行分隔。使用方法channel.basicPublish()循环将键盘输入发送到hello队列,如果输入”quit“时,将会退出当前发送客户端;如果发送至队列中的消息为”ok“时,消费者如果接收到此消息,将会退出消费者客户端。
2.1.3、消费者
主要用于接收指定队列(hello)中的消息。代码实现如下:
package com.zenfery.example.rabbitmq; import java.io.IOException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.ConsumerCancelledException; import com.rabbitmq.client.QueueingConsumer; import com.rabbitmq.client.ShutdownSignalException; //消费者 public class Receiver { private static final String QUEUE_NAME = "hello"; // 队列名称 public static void main(String[] args) throws IOException , ShutdownSignalException, ConsumerCancelledException , InterruptedException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setPort(5672); //创建连接 Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); //定义队列,因为消费者有可能比生产者先启动,为了保证消费者能连接 //上已知队列。 channel.queueDeclare(QUEUE_NAME, false, false, false, null); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(QUEUE_NAME, true, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" >>>接收消息:" + message); if(message!=null && "ok".equals(message)){ channel.close(); connection.close(); break; } } } }
同样,启动消费者客户端,首先要建立RabbitMQ的连接。在创建消费者对象之前,同样定义了hello的队列,因为消费者有可能会在生产者之前启动,为了不至于连接不到指定队列,所以也需要定义队列,不必担心,如果队列已经存在了,将不会再重复执行。
QueueingConsumer是队列消费对象,使用channel.basicConsume()方法将队列消费对象和队列进行绑定。使用consumer.nextDelivery()获取一个一个信件。使用delivery.getBody()获取二进制消息体流,然后将消息打印。如果接收到的消息为”ok“的话,将退出消费者客户端。
2.1.4、执行
启动第一个生产者,输入hello world。
hello world >>>发送:hello world
启动第二个生产者,输入hello world 2。
hello world 2 >>>发送:hello world 2
在第二个生产者中输入quit,退出。
此时,可以在Web UI管理界面上,看到hello队列中有两条消息。
此时启动消费者。控制台输出:
>>>接收消息:hello world >>>接收消息:hello world 2
在第一个生产者中输入ok,此时消费者接收到此消息并退出。再输入quit,第一个生产者也退出。
转载请注明:子暃之路 » RabbitMQ指南(5)-HelloWorld