RabbitMQ — это мощная система обмена сообщениями (message broker), которая обеспечивает надежную передачу данных между приложениями. Она поддерживает множество протоколов и языков программирования, что делает её популярным выбором для микросервисных архитектур и систем реального времени.
Основные компоненты RabbitMQ
Producer — отправитель сообщений
Queue — очередь, в которую попадают сообщения
Consumer — получатель сообщений
Exchange — маршрутизатор сообщений
Почему нужен кластер RabbitMQ?
Для обеспечения отказоустойчивости и масштабируемости, особенно в продакшен-средах, важно настраивать кластер RabbitMQ из нескольких нод. Кластер позволяет:
Распределить нагрузку между нодами
Обеспечить отказоустойчивость при сбоях
Увеличить общую производительность системы
Что такое Qorum и зачем он нужен?
Qorum — это механизм репликации очередей в кластере RabbitMQ, который обеспечивает высокую доступность. Он использует алгоритм Raft для консенсуса, что делает его надежным и подходящим для продакшен-сред.
Основные преимущества Qorum:
Автоматическая репликация данных между нодами
Автоматическое восстановление очередей при сбоях
Обеспечение консистентности данных
Поддержка отказоустойчивости на уровне очередей
Настройка кластера RabbitMQ из трёх нод
Установка RabbitMQ
Установите RabbitMQ на все три сервера:
# Ubuntu/Debian
sudo apt update
sudo apt install rabbitmq-server
# CentOS/RHEL
sudo yum install rabbitmq-server
Запустите сервис:
sudo systemctl start rabbitmq-server
sudo systemctl enable rabbitmq-server
Настройка кластера
На первой ноде:
sudo rabbitmqctl stop_app
sudo rabbitmqctl reset
sudo rabbitmqctl start_app
С первой ноды берем файл /var/lib/rabbitmq/.erlang.cookie и копируем его на остальные ноды
Предположим наши нода называются rabbitmq-node1 rabbitmq-node2 rabbitmq-node3,
На второй и третей ноде выполнить
sudo rabbitmqctl stop_app
sudo rabbitmqctl reset
sudo rabbitmqctl join_cluster rabbitmq-node1@rabbitmq-node1
sudo rabbitmqctl start_app
Проверьте статус кластера:
sudo rabbitmqctl cluster_status
Включение Qorum
Qorum включается автоматически при использовании новых версий RabbitMQ (3.8+), но можно явно включить:
sudo rabbitmqctl set_cluster_name my-cluster
sudo rabbitmqctl set_policy quorum ".*" '{"ha-mode":"all"}' --apply-to queues
Теперь все очереди будут автоматически реплицироваться на все ноды кластера.
Подключение к RabbitMQ: Примеры на разных языках
Node.js (с использованием amqplib)
const amqp = require('amqplib');
async function connect() {
const connection = await amqp.connect('amqp://user:password@node1');
const channel = await connection.createChannel();
const queue = 'hello';
await channel.assertQueue(queue, { durable: true });
channel.sendToQueue(queue, Buffer.from('Hello World!'));
console.log(" [x] Sent 'Hello World!'");
await channel.close();
await connection.close();
}
connect().catch(console.error);
Java (с использованием rabbitmq-client)
import com.rabbitmq.client.*;
public class RabbitMQExample {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("node1");
factory.setUsername("user");
factory.setPassword("password");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("hello", true, false, false, null);
String message = "Hello World!";
channel.basicPublish("", "hello", null, message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + message + "'");
channel.close();
connection.close();
}
}
PHP используем библиотеку php-amqplib
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('node1', 5672, 'user', 'password');
$channel = $connection->channel();
$channel->queue_declare('hello', false, true, false, false);
$msg = new AMQPMessage('Hello World!');
$channel->basic_publish($msg, '', 'hello');
echo " [x] Sent 'Hello World!'\n";
$callback = function ($msg) {
echo " [x] Received ", $msg->body, "\n";
};
$channel->basic_consume('hello', '', false, true, false, false, $callback);
while ($channel->is_consuming()) {
$channel->wait();
}
Python используем библиотеку pika
import pika
# Подключение к кластеру
connection = pika.BlockingConnection(pika.ConnectionParameters('node1', 5672, '/', pika.PlainCredentials('user', 'password')))
channel = connection.channel()
# Объявление очереди
channel.queue_declare(queue='hello')
# Отправка сообщения
channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
# Получение сообщения
def callback(ch, method, properties, body):
print(f" [x] Received {body}")
channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)
channel.start_consuming()