using Confluent.Kafka;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Newtonsoft.Json;
using Tiobon.Core.Extensions;
namespace Tiobon.Core.EventBus
{
///
/// 基于Kafka的事件总线
///
public class EventBusKafka : IEventBus
{
private readonly ILogger _logger;
private readonly IEventBusSubscriptionsManager _subsManager;
private readonly IKafkaConnectionPool _connectionPool;
private readonly KafkaOptions _options;
public EventBusKafka(ILogger logger,
IEventBusSubscriptionsManager subsManager,
IKafkaConnectionPool connectionPool,
IOptions options)
{
_logger = logger;
_subsManager = subsManager;
_connectionPool = connectionPool;
_options = options.Value;
}
///
/// 发布
///
public void Publish(IntegrationEvent @event)
{
var producer = _connectionPool.Producer();
try
{
var eventName = @event.GetType().Name;
var body = Protobuf.Serialize(JsonConvert.SerializeObject(@event));
DeliveryResult result = producer.ProduceAsync(_options.Topic, new Message
{
Key = eventName,
Value = body
}).ConfigureAwait(false).GetAwaiter().GetResult();
}
catch (Exception ex)
{
_logger.LogWarning($"Could not publish event: {@event.Id.ToString("N")} ({ex.Message}); Message:{ JsonConvert.SerializeObject(@event)}");
}
finally
{
//放入连接池中
_connectionPool.Return(producer);
}
}
///
/// 订阅
/// 动态
///
/// 事件处理器
/// 事件名
public void SubscribeDynamic(string eventName)
where TH : IDynamicIntegrationEventHandler
{
_logger.LogInformation("Subscribing to dynamic event {EventName} with {EventHandler}", eventName, typeof(TH).GetGenericTypeName());
_subsManager.AddDynamicSubscription | (eventName);
}
///
/// 订阅
///
/// 约束:事件模型
/// 约束:事件处理器<事件模型>
public void Subscribe()
where T : IntegrationEvent
where TH : IIntegrationEventHandler
{
var eventName = _subsManager.GetEventKey();
_logger.LogInformation("Subscribing to event {EventName} with {EventHandler}", eventName, typeof(TH).GetGenericTypeName());
_subsManager.AddSubscription();
}
///
/// 取消订阅
///
///
///
public void Unsubscribe()
where T : IntegrationEvent
where TH : IIntegrationEventHandler
{
var eventName = _subsManager.GetEventKey();
_logger.LogInformation("Unsubscribing from event {EventName}", eventName);
_subsManager.RemoveSubscription();
}
public void UnsubscribeDynamic(string eventName)
where TH : IDynamicIntegrationEventHandler
{
_subsManager.RemoveDynamicSubscription | (eventName);
}
public void Dispose()
{
if (_connectionPool != null)
{
_connectionPool.Dispose();
}
_subsManager.Clear();
}
}
}
| |