This content originally appeared on DEV Community and was authored by Rafael Andrade
Em artigos anteriores, abordei a integração do Brighter com AWS SNS/SQS e o Brighter V10 RC1. Este guia foca na migração para o Brighter V10, destacando mudanças de configuração em AWS SNS/SQS e atualizações que causam breaking changes.
Novidades no Brighter V10 para AWS SNS/SQS
O Brighter V10 introduz melhorias significativas para AWS SNS/SQS:
- Suporte direto ao SQS: Publicar/consumir mensagens do SQS sem exigir SNS
- Suporte a FIFO: Compatibilidade completa com filas SNS/SQS FIFO
- Integração com LocalStack: Suporte aprimorado para emulação local da AWS
Requisitos
- .NET 8 ou superior
- Projeto .NET com os seguintes pacotes NuGet:
- Paramore.Brighter.MessagingGateway.AWSSQS: Ativa integração com AWS SNS/SQS.
- Paramore.Brighter.ServiceActivator.Extensions.DependencyInjection: Registra o Brighter com DI do Microsoft.
- Paramore.Brighter.ServiceActivator.Extensions.Hosting: Hospeda o Brighter como serviço em segundo plano.
- Serilog.AspNetCore: Para logs estruturados (opcional, mas recomendado).
Recapitulando o Brighter
Antes de falar sobre a configuração do RabbitMQ, vamos revisar conceitos básicos do Brighter.
Solicitação (Comando/Evento)
Defina mensagens usando IRequest
:
public class Greeting() : Event(Guid.NewGuid())
{
public string Name { get; set; } = string.Empty;
}
-
Comandos: Operações para um único destinatário (ex:
SendEmail
) -
Eventos: Notificações broadcast (ex:
OrderShipped
)
Mapeador de Mensagens (Opcional)
Traduz entre mensagens do Brighter e objetos do app:
public class SqsFifoMapper : IAmAMessageMapperAsync<SqsFifoEvent>
{
public Task<Message> MapToMessageAsync(SqsFifoEvent request, Publication publication,
CancellationToken cancellationToken = new CancellationToken())
{
return Task.FromResult(new Message(new MessageHeader
{
MessageId = request.Id,
Topic = publication.Topic!,
PartitionKey = request.PartitionValue, // Requisito FIFO
MessageType = MessageType.MT_EVENT,
TimeStamp = DateTimeOffset.UtcNow
},
new MessageBody(JsonSerializer.SerializeToUtf8Bytes(request, JsonSerialisationOptions.Options))));
}
public Task<SqsFifoEvent> MapToRequestAsync(Message message, CancellationToken cancellationToken = new CancellationToken())
{
return Task.FromResult(JsonSerializer.Deserialize<SqsFifoEvent>(message.Body.Bytes, JsonSerialisationOptions.Options)!);
}
public IRequestContext? Context { get; set; }
}
Mudança no V10: Para pipelines assíncronos, agora é obrigatório usar IAmAMessageMapperAsync
. Para SNS/SQS FIFO, é necessário implementar um mapeador personalizado que defina a chave de partição.
Manipulador de Solicitações
Processa mensagens recebidas:
public class GreetingHandler(ILogger<GreetingHandler> logger) : RequestHandler<Greeting>
{
public override Greeting Handle(Greeting command)
{
logger.LogInformation("Olá {Name}", command.Name);
await processor.PostAsync(new Farewell { Name = command.Name }, cancellationToken: cancellationToken);
return base.Handle(command);
}
}
Configurando o Brighter com AWS SNS/SQS
1. Configuração da Conexão
Defina detalhes da conexão com AWS SNS:
var connection = new AWSMessagingGatewayConnection(new BasicAWSCredentials("test", "test"),
RegionEndpoint.USEast1,
cfg => cfg.ServiceURL = "http://localhost:4566" // LocalStack
);
2. Assinatura SQS
Assine uma SQS (exemplos: SNS→SQS, SQS→SQS e SNS FIFO→SQS FIFO):
.AddServiceActivator(opt =>
{
opt.Subscriptions = [
// SNS → SQS
new SqsSubscription<Greeting>(
"greeting-subscription", // Opcional
"greeting-queue", // Nome da fila SQS
ChannelType.PubSub, // Obrigatório para SNS
"greeting.topic".ToValidSNSTopicName(), // Nome do tópico SNS
bufferSize: 2,
messagePumpType: MessagePumpType.Proactor),
// SQS → SQS (Ponto-a-Ponto)
new SqsSubscription<Farewell>(
new SubscriptionName("farawell-subscription"), // Opcional
new ChannelName("farewell.queue"), // Nome da fila SQS
ChannelType.PointToPoint, // SQS direto
new RoutingKey("farewell.queue".ToValidSQSQueueName()), // Nome do tópico SNS
bufferSize: 2,
messagePumpType: MessagePumpType.Proactor),
// FIFO SNS → SQS
new SqsSubscription<SnsFifoEvent>(
new SubscriptionName("sns-sample-fifo-subscription"), // Opcional
new ChannelName("sns-sample-fifo".ToValidSQSQueueName(true)), // Nome da fila SQS
ChannelType.PubSub,
new RoutingKey("sns-sample-fifo".ToValidSNSTopicName(true)), // Nome do tópico SNS
bufferSize: 2,
messagePumpType: MessagePumpType.Proactor,
topicAttributes: new SnsAttributes { Type = SqsType.Fifo }, // FIFO
queueAttributes: new SqsAttributes(type: SqsType.Fifo)), // FIFO
];
opt.DefaultChannelFactory = new ChannelFactory(connection);
})
3. Configuração do Produtor SNS/SQS
Publique eventos em um tópico usando CombinedProducerRegistryFactory
:
.UseExternalBus(opt =>
{
opt.ProducerRegistry = new CombinedProducerRegistryFactory(
// Produtores SNS
new SnsMessageProducerFactory(connection, [
new SnsPublication<Greeting>
{
Topic = "greeting.topic".ToValidSNSTopicName(),
MakeChannels = OnMissingChannel.Create
},
new SnsPublication<SnsFifoEvent>
{
Topic = "sns-sample-fifo".ToValidSNSTopicName(true),
MakeChannels = OnMissingChannel.Create,
TopicAttributes = new SnsAttributes
{
Type = SqsType.Fifo
}
}
]),
// Produtores SQS
new SqsMessageProducerFactory(connection, [
new SqsPublication<Farewell>
{
ChannelName = "farewell.queue".ToValidSQSQueueName(),
Topic = "farewell.queue".ToValidSQSQueueName(),
MakeChannels = OnMissingChannel.Create
}
])
).Create();
});
Alterações que quebram no Brighter V10
Reestruturação do Mapeador de Mensagens
Serialização JSON Padrão:
No V9, mapeadores eram obrigatórios. No V10, a serialização JSON é embutida, exceto para lógica personalizada.
Divisão de Interfaces
-
IAmAMessageMapper
(sincronização/reactor): Para fluxos síncronos -
IAmAMessageMapperAsync
(assíncrono/proactor): Para fluxos assíncronos
V10: Para definir uma chave de partição, implemente um mapeador personalizado (espera-se ajuste via RequestContext
na versão final)
Mudanças no IAmAMessageMapper
// V10
IRequestContext? Context { get; set; }
Message MapToMessage(Greeting request, Publication publication);
// V9
Message MapToMessage(Greeting request);
Assinaturas
Tipos Explícitos de Message Pump
Substituído runAsync
/isAsync
por messagePumpType
(Reactor
, Proactor
, Unknown
)
Renomeação de Propriedades
ChannelFactory
foi renomeado para DefaultChannelFactory
em AddServiceActivator
Tipo de Canal no SQS
Agora é obrigatório definir ChannelType
em SqsSubscription
:
-
PubSub
(SNS→SQS) -
PointToPoint
(SQS→SQS)
Atributos de Fila e Tópico
Propriedades movidas para SqsAttributes
e SnsAttributes
. Para FIFO:
new SqsSubscription<SnsFifoEvent>(
...
topicAttributes: new SnsAttributes { Type = SqsType.Fifo },
queueAttributes: new SqsAttributes(type: SqsType.Fifo))
Publicações
Reestruturação da Configuração
Removido IAmAProducerRegistry
. Use ExternalBusConfiguration
:
// V10
.UseExternalBus(opt => { ... })
// V9
.UseExternalBus(new RmqProducerRegistryFactory(...))
Especificação do Tipo de Solicitação
Defina RequestType
ou altere o IAmAPublicationFinder
:
new SqsSubscription<Greeting>
{
MakeChannels = OnMissingChannel.Create,
Topic = new RoutingKey("greeting.topic"),
}
// ou
new SqsSubscription
{
MakeChannels = OnMissingChannel.Create,
Topic = new RoutingKey("greeting.topic"),
RequestType = typeof(Greeting)
}
SqsPublication
Agora existe SqsPublication
específico para SQS
Conclusão
O Brighter V10 simplifica integrações com AWS adicionando recursos críticos como FIFO. Migre atualizando:
- Interfaces de mapeadores de mensagens
- Tipo de canal nas assinaturas
- Modelo de publicação
- Use serialização embutida quando possível
Código completo: Repositório GitHub
This content originally appeared on DEV Community and was authored by Rafael Andrade