using Tiobon.Core.Common.Extensions; using Confluent.Kafka; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using Newtonsoft.Json; using System; namespace Tiobon.Core.EventBus { /// /// 基于Kafka的事件总线 /// public class EventBusKafka : IEventBus { private readonly ILogger _logger; private readonly IEventBusSubscriptionsManager _subsManager; private readonly IKafkaConnectionPool _connectionPool; private readonly KafkaOptions _options; public EventBusKafka(ILogger logger, IEventBusSubscriptionsManager subsManager, IKafkaConnectionPool connectionPool, IOptions options) { _logger = logger; _subsManager = subsManager; _connectionPool = connectionPool; _options = options.Value; } /// /// 发布 /// public void Publish(IntegrationEvent @event) { var producer = _connectionPool.Producer(); try { var eventName = @event.GetType().Name; var body = Protobuf.Serialize(JsonConvert.SerializeObject(@event)); DeliveryResult result = producer.ProduceAsync(_options.Topic, new Message { Key = eventName, Value = body }).ConfigureAwait(false).GetAwaiter().GetResult(); } catch (Exception ex) { _logger.LogWarning($"Could not publish event: {@event.Id.ToString("N")} ({ex.Message}); Message:{ JsonConvert.SerializeObject(@event)}"); } finally { //放入连接池中 _connectionPool.Return(producer); } } /// /// 订阅 /// 动态 /// /// 事件处理器 /// 事件名 public void SubscribeDynamic(string eventName) where TH : IDynamicIntegrationEventHandler { _logger.LogInformation("Subscribing to dynamic event {EventName} with {EventHandler}", eventName, typeof(TH).GetGenericTypeName()); _subsManager.AddDynamicSubscription(eventName); } /// /// 订阅 /// /// 约束:事件模型 /// 约束:事件处理器<事件模型> public void Subscribe() where T : IntegrationEvent where TH : IIntegrationEventHandler { var eventName = _subsManager.GetEventKey(); _logger.LogInformation("Subscribing to event {EventName} with {EventHandler}", eventName, typeof(TH).GetGenericTypeName()); _subsManager.AddSubscription(); } /// /// 取消订阅 /// /// /// public void Unsubscribe() where T : IntegrationEvent where TH : IIntegrationEventHandler { var eventName = _subsManager.GetEventKey(); _logger.LogInformation("Unsubscribing from event {EventName}", eventName); _subsManager.RemoveSubscription(); } public void UnsubscribeDynamic(string eventName) where TH : IDynamicIntegrationEventHandler { _subsManager.RemoveDynamicSubscription(eventName); } public void Dispose() { if (_connectionPool != null) { _connectionPool.Dispose(); } _subsManager.Clear(); } } }