using Microsoft.Extensions.Logging; using Polly; using Polly.Retry; using RabbitMQ.Client; using RabbitMQ.Client.Events; using RabbitMQ.Client.Exceptions; using System.Net.Sockets; using System.Text; namespace Tiobon.Core.EventBus; /// /// RabbitMQ持久连接 /// public class RabbitMQPersistentConnection : IRabbitMQPersistentConnection { private readonly IConnectionFactory _connectionFactory; private readonly ILogger _logger; private readonly int _retryCount; IConnection _connection; bool _disposed; object sync_root = new object(); public RabbitMQPersistentConnection(IConnectionFactory connectionFactory, ILogger logger, int retryCount = 5) { _connectionFactory = connectionFactory ?? throw new ArgumentNullException(nameof(connectionFactory)); _logger = logger ?? throw new ArgumentNullException(nameof(logger)); _retryCount = retryCount; } /// /// 是否已连接 /// public bool IsConnected { get { return _connection != null && _connection.IsOpen && !_disposed; } } /// /// 创建Model /// /// public IModel CreateModel() { if (!IsConnected) { throw new InvalidOperationException("No RabbitMQ connections are available to perform this action"); } return _connection.CreateModel(); } /// /// 释放 /// public void Dispose() { if (_disposed) return; _disposed = true; try { _connection.Dispose(); } catch (IOException ex) { _logger.LogCritical(ex.ToString()); } } /// /// 连接 /// /// public bool TryConnect() { _logger.LogInformation("RabbitMQ Client is trying to connect"); lock (sync_root) { var policy = RetryPolicy.Handle() .Or() .WaitAndRetry(_retryCount, retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)), (ex, time) => { _logger.LogWarning(ex, "RabbitMQ Client could not connect after {TimeOut}s ({ExceptionMessage})", $"{time.TotalSeconds:n1}", ex.Message); } ); policy.Execute(() => { _connection = _connectionFactory .CreateConnection(); }); if (IsConnected) { _connection.ConnectionShutdown += OnConnectionShutdown; _connection.CallbackException += OnCallbackException; _connection.ConnectionBlocked += OnConnectionBlocked; _logger.LogInformation("RabbitMQ Client acquired a persistent connection to '{HostName}' and is subscribed to failure events", _connection.Endpoint.HostName); return true; } else { _logger.LogCritical("FATAL ERROR: RabbitMQ connections could not be created and opened"); return false; } } } /// /// 连接被阻断 /// /// /// private void OnConnectionBlocked(object sender, ConnectionBlockedEventArgs e) { if (_disposed) return; _logger.LogWarning("A RabbitMQ connection is shutdown. Trying to re-connect..."); TryConnect(); } /// /// 连接出现异常 /// /// /// void OnCallbackException(object sender, CallbackExceptionEventArgs e) { if (_disposed) return; _logger.LogWarning("A RabbitMQ connection throw exception. Trying to re-connect..."); TryConnect(); } /// /// 连接被关闭 /// /// /// void OnConnectionShutdown(object sender, ShutdownEventArgs reason) { if (_disposed) return; _logger.LogWarning("A RabbitMQ connection is on shutdown. Trying to re-connect..."); TryConnect(); } /// /// 发布消息 /// /// /// /// public void PublishMessage(string message, string exchangeName, string routingKey) { using var channel = CreateModel(); channel.ExchangeDeclare(exchange: exchangeName, type: ExchangeType.Direct, true); var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: exchangeName, routingKey: routingKey, basicProperties: null, body: body); } /// /// 订阅消息 /// /// public void StartConsuming(string queueName) { using var channel = CreateModel(); channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: null); var consumer = new AsyncEventingBasicConsumer(channel); consumer.Received += new AsyncEventHandler( async (a, b) => { var Headers = b.BasicProperties.Headers; var msgBody = b.Body.ToArray(); var message = Encoding.UTF8.GetString(msgBody); await Task.CompletedTask; Console.WriteLine("Received message: {0}", message); //bool Dealresult = await Dealer(b.Exchange, b.RoutingKey, msgBody, Headers); //if (Dealresult) channel.BasicAck(b.DeliveryTag, false); //else channel.BasicNack(b.DeliveryTag, false, true); } ); channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer); Console.WriteLine("Consuming messages..."); } }