RabbitMQ基础及其集群搭建

1. 对比

image



2. 安装

2.1. 环境


2.1.1. 虚拟机

3台Ubuntu机器做集群, ip地址: 192.168.211.111~113


2.1.2. Erlang

1
2
3
4
root@clustertest01:/home/gang/soft# apt-get update
root@clustertest01:/home/gang/soft# apt-get install erlang
root@clustertest01:/home/gang/soft# erl -version
Erlang (SMP,ASYNC_THREADS) (BEAM) emulator version 12.2.1

2.1.3. RabbitMQ

1
2
3
4
5
6
7
root@clustertest01:/home/gang/soft# apt-get install rabbitmq-server
Not creating home directory `/var/lib/rabbitmq'.
Created symlink /etc/systemd/system/multi-user.target.wants/rabbitmq-server.service → /lib/systemd/system/rabbitmq-server.service.



root@clustertest01:/home/gang/soft# service rabbitmq-server status

开启后台界面

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
root@clustertest01:/home/gang/soft# rabbitmq-plugins enable rabbitmq_management 
Enabling plugins on node rabbit@clustertest01:
rabbitmq_management
The following plugins have been configured:
  rabbitmq_management
  rabbitmq_management_agent
  rabbitmq_web_dispatch
Applying plugin configuration to rabbit@clustertest01...
The following plugins have been enabled:
  rabbitmq_management
  rabbitmq_management_agent
  rabbitmq_web_dispatch

新增用户并赋予admin权限

1
2
3
4
5
root@clustertest01:/home/gang/soft# rabbitmqctl add_user gang 123456
Adding user "gang" ...
Done. Don't forget to grant the user permissions to some virtual hosts! See 'rabbitmqctl help set_permissions' to learn more.
root@clustertest01:/home/gang/soft# rabbitmqctl set_user_tags gang administrator
Setting tags for user "gang" to [administrator] ...

进入地址: http://192.168.211.111:15672/#/, 即可登录管理界面

image-20240517123214785

2.2. 集群模式


2.2.1. 普通集群特点

  • 默认模式

  • 集群中个节点之间有相同的元数据, 即队列结构; 而消息不会冗余, 只会存在一个节点中

  • 宕机重启后可能出现消息重复消费

  • 不支持高可用, 挂了需要手动重启?


2.2.2. 镜像集群特点

  • 是普通集群的增强
  • 主动同步镜像节点消息, 每个节点上存放着全量消息
  • 有master和slave
  • 镜像同步会占用大量的带宽, 所以队列数量最好不要太多

2.3. 普通集群搭建

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
# 同步.erlang.cookie, 注意同步过来的文件权限
cd /var/lib/rabbitmq
chown rabbitmq:rabbitmq .erlang.cookie


# clustertest02 以内存的方式加入集群
root@clustertest02:/home/gang# rabbitmqctl join_cluster --ram rabbit@clustertest01

# 加入之前需要先停止当前节点
root@clustertest02:/home/gang# rabbitmqctl join_cluster --ram rabbit@clustertest01
Error: this command requires the 'rabbit' app to be stopped on the target node. Stop it with 'rabbitmqctl stop_app'.
Arguments given:
	join_cluster --ram rabbit@clustertest01
	
	
# clustertest03 以磁盘的方式加入集群
root@clustertest03:/home/gang# rabbitmqctl join_cluster rabbit@clustertest01


# 打开两个节点的 rabbitmq_management, 要不然在cluster01上无法看到
rabbitmq-plugins enable rabbitmq_management

2.4. 镜像集群搭建

1
2
3
4
# 创建一个虚拟机, 然后添加镜像策略
rabbitmqctl add_vhost /mirror

rabbitmqctl set_policy ha-all --vhost "/mirror" "^" '{"ha-mode":"all"}'

2.4.1. HA Mode

  • all

    • 队列镜像同步到集群中的所有节点, 新加入的集群也会被同步到
  • exactly

    • 需要搭配一个数字类型的参数
    • 队列镜像到集群中指定数量的节点
      • 如果集群内节点数少于这个数字,则队列镜像到集群内的所有节点
      • 如果集群内节点少于这个数,当一个包含镜像的节点停止服务后,新的镜像就不会去另外找节点进行镜像备份
  • nodes

    • 需要搭配一个字符串类型的参数
    • 将队列镜像到指定节点上
image-20240517161232266

3. 一些概念

image


3.1. 虚拟主机(Virtual Host)

  • 一个RabbitMQ集群中划分出多个虚拟主机,每一个虚拟主机都有AMQP的全套基础组件,并且可以针对每个虚拟主机进行权限以及数据分配
  • 不同虚拟主机之间是完全隔离的

3.2. 连接(Connection)

  • 客户端与RabbitM建立一个TCP连接,就是Connection

3.3. 信道(Channel)

  • 客户端与RabbitMQ建立连接之后, 就会创建一个AMQP的Channel, 每个Channel都会有一个唯一的ID
  • 一个Connection中可以有多个Channel
  • Connection中的Channel需要根据情况分配

3.4. 交换机(Exchange)

  • 消息发到RabbitMQ之后, 首先会进入交换机, 再由交换机转发给不同的队列
  • 不同的交换机有不同的路由策略
  • 多与生产者打交道

3.5. 队列(Queue)

  • 保存数据的最小单位
  • 有如下类型: Classic, Quiorum,. Stream

3.5.1. Classic

image-20240517162550442

参数:

  • Durability: 是否持久化
    • Durable: 消息落盘, io增加
    • Transient
  • Auto delete: 是否自动删除
    • yes: 如果没有消费者连接自己了, 就会自动删除, pulsar也有这个选项
    • no

3.5.2. Quorum 仲裁队列

关于Quorum的详细介绍见 https://www.rabbitmq.com/quorum-queues.html

仲裁队列是RabbitMQ 3.8版中引入的一种队列类型。与传统的镜像队列(Mirrored Queues)相比,仲裁队列使用Raft一致性算法来确保数据的高可用性和一致性。它们在设计上解决了镜像队列的一些缺陷,如网络分区处理和一致性问题。

特点:

  1. 高可用性:使用Raft一致性算法,确保即使在节点故障的情况下,数据也能保持一致性和可用性。
  2. 强一致性:所有写操作(消息发布)必须经过多数节点(即多数派)的确认,这保证了数据的一致性。
  3. 容错性:支持节点失效和恢复,能在网络分区和节点宕机的情况下继续运行。
  4. 动态扩展:可以动态添加和删除节点,而不会中断队列的正常运行

原理:

仲裁队列使用Raft一致性算法来管理队列中的消息, 确保在多个节点间的一致性。以下是其关键组件和工作流程:

  1. 领导者(Leader):仲裁队列中的一个节点会被选举为领导者,负责处理所有写请求和协调其他节点的状态。
  2. 追随者(Follower):其余节点作为追随者,复制领导者的状态和日志条目。
  3. 日志复制:当客户端发布消息时,领导者将消息记录到其日志中,并将日志条目复制到大多数追随者。只有在大多数追随者确认日志条目后,消息才会被认为是已提交并对客户端可见。
  4. 节点故障处理:如果领导者节点发生故障,会自动选举新的领导者,继续处理未完成的操作。

3.5.3. Stream队列

  • 消息持久化到磁盘, 分布式备份, 官方推荐的, 适合消费者多, 读消息频繁的场景。

  • Stream队列的核心是以append-only的方式记录日志消息,

  • 通过调整消费者的消费进度offset来实现消息的多次分发

特点:

  • large fan-outs 大规模分发

    Stream队列允许任意数量的消费者使用同一个队列的消息,从而消除绑定多个队列的需求。

  • Replay/Time-travelling 消息回溯

    RabbitMQ的其他队列在消费者确认处理完消息之后, 就会将消息从队列中删除。而Stream队列允许消费者在日志的任何一个位置开始重新读取

  • Throughput Performance 高吞吐性能

  • Large logs 大日志



4. 基础操作

4.1. 原生API


4.1.1. 添加依赖

1
2
3
4
5
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.9.0</version>
</dependency>
  1. 创建队列

    1
    2
    3
    4
    5
    
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    
    connection = factory.newConnection();
    channel = connection.createChannel();
  2. 声明队列

    1
    
    channel.queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments);

    参数说明:

    • queue:队列的名字
    • durable true if we are declaring a durable queue (the queue will survive a server restart)
    • exclusive true if we are declaring an exclusive queue (restricted to this connection)
    • autoDelete true if we are declaring an autodelete queue (server will delete it when no longer in use)
    • arguments other properties (construction arguments) for the queue

    如果要声明一个Quorum队列,则只需要在后面的arguments中传入一个参数,x-queue-type,参数值设定为quorum

    1
    2
    3
    4
    
    Map<String,Object> params = new HashMap<>();
    params.put("x-queue-type","quorum");
    //声明Quorum队列的方式就是添加一个x-queue-type参数,指定为quorum。默认是classic
    channel.queueDeclare(QUEUE_NAME, true, false, false, params);

    注意:1、对于Quorum类型,durable参数就必须是true了,设置成false的话,会报错。同样,exclusive参数必须设置为false

    如果要声明一个Stream队列,则 x-queue-type参数要设置为 stream .

    1
    2
    3
    4
    5
    
      Map<String,Object> params = new HashMap<>();
      params.put("x-queue-type","stream");
      params.put("x-max-length-bytes", 20_000_000_000L); // maximum stream size: 20 GB
      params.put("x-stream-max-segment-size-bytes", 100_000_000); // size of segment files: 100 MB
      channel.queueDeclare(QUEUE_NAME, true, false, false, params);

    注意:

    1. 同样,durable参数必须是true,exclusive必须是false。

    2. x-max-length-bytes 表示日志文件的最大字节数。x-stream-max-segment-size-bytes 每一个日志文件的最大大小

    声明的队列,如果服务端没有,那么会自动创建。但是如果服务端有了这个队列,那么声明的队列属性必须和服务端的队列属性一致才行。

  3. Producer发送消息到queue

    1
    
    channel.basicPublish(String exchange, String routingKey, BasicProperties props,message.getBytes("UTF-8"));

    参数说明:

    • exchange the exchange to publish the message to
    • routingKey the routing key
    • props other properties for the message - routing headers etc
    • body the message body
  4. Consumer消费消息

    消费方式:

    1. 被动消费: 等待mq主动把消息推送过来

      1
      
      channel.basicConsume(String queue, boolean autoAck, Consumer callback);

      参数说明:

      • queue the name of the queue
      • autoAck true if the server should consider messages acknowledged once delivered; false if the server should expect explicit acknowledgements
      • callback an interface to the consumer object
    2. 主动消费:主动去mq上获取指定的msg消费

      1
      
      GetResponse response = channel.basicGet(QUEUE_NAME, boolean autoAck);

      参数说明:

      • queue the name of the queue
      • autoAck true if the server should consider messages acknowledged once delivered; false if the server should expect explicit acknowledgements
    3. Stream队列消费

      1. channel必须设置basicQos属性。
      2. 正确声明Stream队列。
      3. 消费时需要指定offset。
      1
      2
      3
      4
      5
      6
      7
      8
      9
      
      //3、消费时,必须指定offset。 可选的值:
      // first: 从日志队列中第一个可消费的消息开始消费
      // last: 消费消息日志中最后一个消息
      // next: 相当于不指定offset,消费不到消息。
      // Offset: 一个数字型的偏移量
      // Timestamp:一个代表时间的Data类型变量,表示从这个时间点开始消费。例如 一个小时前 Date timestamp = new Date(System.currentTimeMillis() - 60 * 60 * 1_000)
      Map<String,Object> consumeParam = new HashMap<>();
      consumeParam.put("x-stream-offset","next");
      channel.basicConsume(QUEUE_NAME, false,consumeParam, myconsumer);
  5. 关闭Channel

    1
    
     channel.close(); 

4.2. 发送消息

不分代码如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
public static void main(String[] args) throws Exception {
    Connection connection = RabbitMQUtil.getConnection();
    Channel channel = connection.createChannel();
    //声明队列会在服务端自动创建。
    channel.queueDeclare(QUEUE_NAME, false, false, false, null);

    String message = "Hello World!333";

    AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
    builder.deliveryMode(MessageProperties.PERSISTENT_TEXT_PLAIN.getDeliveryMode());
    builder.priority(MessageProperties.PERSISTENT_TEXT_PLAIN.getPriority());
    //携带消息ID
    builder.messageId(""+channel.getNextPublishSeqNo());
    Map<String, Object> headers = new HashMap<>();
    //携带订单号
    headers.put("order", "123");
    builder.headers(headers);

    channel.basicPublish("", QUEUE_NAME, builder.build(), message.getBytes("UTF-8"));
    System.out.println(" [x] Sent '" + message + "'");

    channel.close();
    connection.close();
}


public class RabbitMQUtil {

	private static Connection connection;
	private static final String HOST_NAME="192.168.211.111";
	private static final int HOST_PORT=5672;

	public static final String QUEUE_HELLO="hello";
	public static final String QUEUE_WORK="work";
	public static final String QUEUE_PUBLISH="publish";
	
	private RabbitMQUtil() {}
	
	public static Connection getConnection() throws Exception {
		if(null == connection) {
			ConnectionFactory factory = new ConnectionFactory();
			factory.setHost(HOST_NAME);
			factory.setPort(HOST_PORT);
			factory.setUsername("gang");
			factory.setPassword("123456");
			factory.setVirtualHost("/mirror");
			connection = factory.newConnection();
		}
		return connection;
	}
}

需要再后台设置好用户对应虚拟机(vhost)的权限, 要不然会报错

image-20240517171234869


4.3. 消息模型


4.3.1. 简单队列模型(Simple Queue)

这是最基本的消息模型,包含一个生产者、一个队列和一个消费者。生产者将消息发送到队列,消费者从队列中接收并处理消息。

示例

  • 生产者发送消息到队列 simple_queue
  • 消费者从队列 simple_queue 中接收消息。

4.3.2. 工作队列模型(Work Queue)

image

工作队列模型用来分发耗时任务。一个生产者将任务分发到队列,多个消费者从队列中获取任务,分担工作负载。这种模型帮助我们实现负载均衡。

示例

  • 生产者发送任务消息到队列 work_queue
  • 多个消费者从队列 work_queue 中接收任务并处理。
1
2
3
4
5
6
7
8
// Producer
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); //任务一般是不能因为消息中间件的服务而被耽误的,所以durable设置成了true,这样,即使rabbitMQ服务断了,这个消息也不会消失
channel.basicPublish("", TASK_QUEUE_NAME,MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes("UTF-8"));

// Consumer
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
channel.basicQos(1);
channel.basicConsume(TASK_QUEUE_NAME, false, consumer);

这个模式应该是最常用的模式,也是官网讨论比较详细的一种模式,所以官网上也对这种模式做了重点讲述。

注意:

  • Consumer端的autoAck为false, 这样即使Consumer在执行message过程中出问题了, 也不会导致消息丢失,因为没有ack的message会被服务器重新进行投递。
  • 消息的持久性;queue一旦被定义了,就无法再次修改了
  • 分发给Consumer消息的方式
    1. 默认是fair dispatch, 也就是round robin
    2. Consumer会向服务器上报一个prefetchCount, 表示自己预处理能力值, 若下发的时候超过了这个值,则不会向该Consumer再次下发

4.3.3. 发布/订阅模型(Publish/Subscribe)

image

在发布/订阅模型中,消息被发送到交换机(Exchange),然后交换机将消息分发到绑定到它的多个队列中。这种模型适用于广播消息的场景。

示例

  • 生产者将消息发送到交换机 logs_exchange
  • 交换机将消息分发到所有绑定到它的队列 queue1queue2 等。
1
2
3
4
5
6
7
8
// Producer 只负责向Exchange中发送消息,后面的事情不管
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));

// Receiver 将消费的目标队列绑定到Exchange上
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, "");

关键处就是type为”fanout” 的Exchange,这种类型的Exchange只负责往所有已绑定的队列上发送消息


4.3.4. 路由模型(Routing)

image

路由模型是发布/订阅模型的扩展,消息根据路由键(Routing Key)被发送到不同的队列。交换机类型为 direct

示例

  • 生产者将消息发送到交换机 direct_logs_exchange,并指定路由键 infowarningerror
  • 交换机会将消息分发到绑定了相应路由键的队列。
1
2
3
4
5
6
7
8
9
// Producer
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));

// Consumer
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
channel.queueBind(queueName, EXCHANGE_NAME, routingKey1);
channel.queueBind(queueName, EXCHANGE_NAME, routingKey2);
channel.basicConsume(queueName, true, consumer);

4.3.5. 主题模型(Topics)

image

主题模型也是发布/订阅模型的扩展,允许使用模式匹配的路由键,将消息分发到绑定了匹配模式的队列。交换机类型为 topic

示例

  • 生产者将消息发送到交换机 topic_logs_exchange,并指定路由键 kern.criticalapp.info 等。
  • 交换机会根据绑定的模式(如 kern.**.info)将消息分发到匹配的队列。
1
2
3
4
5
6
7
8
9
// Producer
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));

// Receiver
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
channel.queueBind(queueName, EXCHANGE_NAME, routingKey1);
channel.queueBind(queueName, EXCHANGE_NAME, routingKey2);
channel.basicConsume(queueName, true, consumer);

4.3.6. RPC(Remote Procedure Call)

RPC 模型允许在分布式系统中实现同步的远程过程调用。客户端发送请求消息到队列,服务器从队列中接收并处理请求,然后将响应消息发送回客户端。

示例

  • 客户端将请求消息发送到队列 rpc_queue,并指定一个回调队列 reply_queue
  • 服务器从 rpc_queue 接收请求消息,处理后将结果发送到 reply_queue

用的比较少。


4.3.7. Publisher Confirms

Publisher Confirms 是一种用于确认消息已经被 RabbitMQ 服务器成功接收和处理的机制。在生产者发送消息之后,RabbitMQ 会异步地返回一个确认(ack)或未确认(nack)信号,以通知生产者消息的处理状态。这种机制非常适合需要高可靠性和确保消息不丢失的场景。

  • 该模式默认不开启,需要手动声明

    1
    
    channel.confirmSelect();
    1. 发布单条消息

      1
      2
      3
      4
      5
      6
      
      for (int i = 0; i < MESSAGE_COUNT; i++) {
          String body = String.valueOf(i);
          channel.basicPublish("", queue, null, body.getBytes());
          // 该方法会阻塞channel
          channel.waitForConfirmsOrDie(5_000);
      }
    2. 发送批量消息

       1
       2
       3
       4
       5
       6
       7
       8
       9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      
      int batchSize = 100;
      int outstandingMessageCount = 0;
      
      long start = System.nanoTime();
      for (int i = 0; i < MESSAGE_COUNT; i++) {
          String body = String.valueOf(i);
          ch.basicPublish("", queue, null, body.getBytes());
          outstandingMessageCount++;
      
          if (outstandingMessageCount == batchSize) {
              ch.waitForConfirmsOrDie(5_000);
              outstandingMessageCount = 0;
          }
      }
      
      if (outstandingMessageCount > 0) {
          ch.waitForConfirmsOrDie(5_000);
      }
    3. 异步确认

      1
      
      channel.addConfirmListener(ConfirmCallback var1, ConfirmCallback var2);
      • 发送者在发送万消息后就会调用第一个callback
      • 等服务端反馈后,在执行第二个callback

相关内容

0%