P:生产者
C:消费者
红色的图标为队列,代码实现:
一、工厂工具类
import java.io.IOException;
import java.util.concurrent.TimeoutException;import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class ConnectionUtil {
public static Connection getConnection() throws IOException, TimeoutException{
//连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); //设置服务地址 connectionFactory.setHost("127.0.0.1"); //设置端口 AMQP协议 connectionFactory.setPort(5672); //设置虚拟主机vhost connectionFactory.setVirtualHost("/vhost_mmr"); //设置用户名,密码 connectionFactory.setUsername("user_mmr"); connectionFactory.setPassword("123"); return connectionFactory.newConnection(); }}二、 消息生产者
import java.io.IOException;
import java.util.concurrent.TimeoutException;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;import com.zy.rabbitmq.util.ConnectionUtil;/**
* 消息生产者 * * @author zy * */public class Send { private static final String QUEUE_NAME = "test_simple_queue";public static void main(String[] args) throws IOException, TimeoutException {
// 获取连接 Connection connection = ConnectionUtil.getConnection();// 从连接中获取一个通道
Channel channel = connection.createChannel(); // 创建队列声明 // String queue, boolean durable, boolean exclusive, boolean // autoDelete,Map<String, Object> arguments channel.queueDeclare(QUEUE_NAME, false, false, false, null);String msg = "hello simple !";
// 发送
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());System.out.println("--send msg:" + msg);
channel.close();
connection.close(); }}
三、消息消费者
/**
* 消费者消费消息 * * @author zy * */public class Receive {private static final String QUEUE_NAME = "test_simple_queue";
public static void main(String[] args) throws IOException, TimeoutException {
// 获取连接 Connection connection = ConnectionUtil.getConnection(); // 从连接中获取一个通道 Channel channel = connection.createChannel(); // 队列声明 channel.queueDeclare(QUEUE_NAME, false, false, false, null);//基于事件
Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" receive msg:" + message); } };// 监听队列
channel.basicConsume(QUEUE_NAME, true, consumer); }}
生产者将消息放入名为test_simple_queue的队列中,消费者从该队列中消费消息
参考:http://www.rabbitmq.com/getstarted.html