博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
RabbitMQJava系列1-简单队列
阅读量:5284 次
发布时间:2019-06-14

本文共 2437 字,大约阅读时间需要 8 分钟。

P:生产者

C:消费者

红色的图标为队列,代码实现:

一、工厂工具类

import java.io.IOException;

import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.ConnectionFactory;

public class ConnectionUtil {

public static Connection getConnection() throws IOException, TimeoutException{

//连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置服务地址
connectionFactory.setHost("127.0.0.1");
//设置端口 AMQP协议
connectionFactory.setPort(5672);
//设置虚拟主机vhost
connectionFactory.setVirtualHost("/vhost_mmr");
//设置用户名,密码
connectionFactory.setUsername("user_mmr");
connectionFactory.setPassword("123");
return connectionFactory.newConnection();
}
}

二、 消息生产者

import java.io.IOException;

import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;
import com.zy.rabbitmq.util.ConnectionUtil;

/**

* 消息生产者
*
* @author zy
*
*/
public class Send {
private static final String QUEUE_NAME = "test_simple_queue";

public static void main(String[] args) throws IOException, TimeoutException {

// 获取连接
Connection connection = ConnectionUtil.getConnection();

// 从连接中获取一个通道

Channel channel = connection.createChannel();
// 创建队列声明
// String queue, boolean durable, boolean exclusive, boolean
// autoDelete,Map<String, Object> arguments
channel.queueDeclare(QUEUE_NAME, false, false, false, null);

String msg = "hello simple !";

// 发送

channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());

System.out.println("--send msg:" + msg);

channel.close();

connection.close();
}
}

 

三、消息消费者

/**

* 消费者消费消息
*
* @author zy
*
*/
public class Receive {

private static final String QUEUE_NAME = "test_simple_queue";

public static void main(String[] args) throws IOException, TimeoutException {

// 获取连接
Connection connection = ConnectionUtil.getConnection();
// 从连接中获取一个通道
Channel channel = connection.createChannel();
// 队列声明
channel.queueDeclare(QUEUE_NAME, false, false, false, null);

//基于事件

Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" receive msg:" + message);
}
};

// 监听队列

channel.basicConsume(QUEUE_NAME, true, consumer);
}

}

 

生产者将消息放入名为test_simple_queue的队列中,消费者从该队列中消费消息

参考:http://www.rabbitmq.com/getstarted.html

 

转载于:https://www.cnblogs.com/zytiger/p/9369766.html

你可能感兴趣的文章
smarty模板自定义变量
查看>>
研究称90%的癌症由非健康生活习惯导致
查看>>
命令行启动Win7系统操作部分功能
查看>>
排序sort (一)
查看>>
Teamcenter10 step-by-step installation in Linux env-Oracle Server Patch
查看>>
Struts2学习(三)
查看>>
Callable和Runnable和FutureTask
查看>>
GitHub 多人协作开发 三种方式:
查看>>
文本域添加编辑器
查看>>
Yum安装MySQL以及相关目录路径和修改目录
查看>>
java获取hostIp和hostName
查看>>
关于web服务器和数据库的各种说法(搜集到的)
查看>>
C# Stream 和 byte[] 之间的转换
查看>>
OMG: daily scrum nine
查看>>
redis与spring结合错误情况
查看>>
第六章 字节码执行方式--解释执行和JIT
查看>>
字符串方法title()、istitle()
查看>>
yield语句
查看>>
查看linux系统中占用cpu最高的语句
查看>>
[洛谷P1738]洛谷的文件夹
查看>>