文章插图
重启后160重新漂移回153上, 高可用测试ok
队列功能代码测试直接使用最简单的hello world程序测试, IP使用虚拟的VIP 160
消费者
package com.dance.redis.mq.rabbit.helloworld;import com.rabbitmq.client.*;import java.io.IOException;import java.util.concurrent.TimeUnit;import java.util.concurrent.TimeoutException;public class Receiver {private static final String QUEUE_NAME = "queue-test";private static final String IP_ADDRESS = "192.168.247.160";private static final int PORT = 5672;public static void main(String[] args) throws IOException, TimeoutException,InterruptedException {Address[] address = new Address[]{new Address(IP_ADDRESS, PORT)};ConnectionFactory factory = new ConnectionFactory();factory.setUsername("toor");factory.setPassword("123456");// 这里的连接方式与生产者的demo略有不同,注意区别 。Connection connection = factory.newConnection(address); //创建连接final Channel channel = connection.createChannel();//创建信道channel.basicQos(64);//设置客户端最多接收未被ack的消息个数Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throws IOException {System.out.println("recvive message:" + new String(body));try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}channel.basicAck(envelope.getDeliveryTag(), false);}};channel.basicConsume(QUEUE_NAME, consumer);//等待回调函数执行完毕之后 , 关闭资源 。TimeUnit.SECONDS.sleep(50);channel.close();connection.close();}}生产者
package com.dance.redis.mq.rabbit.helloworld;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import com.rabbitmq.client.MessageProperties;import java.io.IOException;import java.util.concurrent.TimeoutException;public class RabbitProducer {private static final String EXCHANGE_NAME = "exchange-test";private static final String ROUTING_KEY = "text.*";private static final String QUEUE_NAME = "queue-test";private static final String IP_ADDRESS = "192.168.247.160";private static final int PORT = 5672; //RabbitMQ服务默认端口为5672public static void main(String[] args) throws IOException,TimeoutException, InterruptedException {ConnectionFactory factory = new ConnectionFactory();factory.setHost(IP_ADDRESS);factory.setPort(PORT);factory.setVirtualHost("/");factory.setUsername("toor");factory.setPassword("123456");Connection connection = factory.newConnection();//创建连接Channel channel = connection.createChannel();//创建信道//创建一个type="topic"、持久化的、非自动删除的交换器 。channel.exchangeDeclare(EXCHANGE_NAME, "topic", true, false, null);//创建一个持久化、非排他的、非自动删除的队列channel.queueDeclare(QUEUE_NAME, true, false, false, null);//将交换机与队列通过路由键绑定channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);//发送一条持久化消息:Hello World!String message = "Hello World!";channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY,MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());//关闭资源channel.close();connection.close();}}我的账户和密码是toor和123456, 请改为自己的
启动消费者
文章插图
启动生产者
文章插图
查看消费者
文章插图
消费成功, ip也是160
查看控制台 exchange
文章插图
多了exchange-test 功能是ha-all
文章插图
队列也是, 到此镜像队列+高可用已经实现
文章插图
也可以在HA的控制台上查看统计
集群架构回顾我们实现的是RabbitMQ Cluster + Mirror Queue + Haproxy + Keepalived
RabbitMQ Cluster 3台
Mirror Queue RabbitMQ集群方式
Haproxy 反向代理
Keepalived Haproxy集群检测, 虚拟VIP, 实现统一IP对外提供
架构图手绘
文章插图
emm, 就是这样一个架构, 我应该李姐的挺到位的
推荐阅读
- centos7中配置java + mysql +jdk+使用jar部署项目
- 云原生下基于K8S声明式GitOps持续部署工具ArgoCD实战-上
- opencvcv.line
- 基于 Apache Hudi 极致查询优化的探索实践
- 14 基于SqlSugar的开发框架循序渐进介绍-- 基于Vue3+TypeScript的全局对象的注入和使用
- 【Python+C#】手把手搭建基于Hugging Face模型的离线翻译系统,并通过C#代码进行访问
- 基于Qt Designer和PyQt5的桌面软件开发--环境搭建和入门例子
- 最高温度25度穿什么衣服合适 18-25度穿什么衣服合适
- pr基于当前字幕新建字幕怎么用 PR如何新建视频字幕
- 图片如何水印化 图片如何使用水印