Migrando para Brighter V10 com AWS SNS/SQS



This content originally appeared on DEV Community and was authored by Rafael Andrade

Em artigos anteriores, abordei a integração do Brighter com AWS SNS/SQS e o Brighter V10 RC1. Este guia foca na migração para o Brighter V10, destacando mudanças de configuração em AWS SNS/SQS e atualizações que causam breaking changes.

Novidades no Brighter V10 para AWS SNS/SQS

O Brighter V10 introduz melhorias significativas para AWS SNS/SQS:

  • Suporte direto ao SQS: Publicar/consumir mensagens do SQS sem exigir SNS
  • Suporte a FIFO: Compatibilidade completa com filas SNS/SQS FIFO
  • Integração com LocalStack: Suporte aprimorado para emulação local da AWS

Requisitos

Recapitulando o Brighter

Antes de falar sobre a configuração do RabbitMQ, vamos revisar conceitos básicos do Brighter.

Solicitação (Comando/Evento)

Defina mensagens usando IRequest:

public class Greeting() : Event(Guid.NewGuid())
{
    public string Name { get; set; } = string.Empty;
}
  • Comandos: Operações para um único destinatário (ex: SendEmail)
  • Eventos: Notificações broadcast (ex: OrderShipped)

Mapeador de Mensagens (Opcional)

Traduz entre mensagens do Brighter e objetos do app:

public class SqsFifoMapper : IAmAMessageMapperAsync<SqsFifoEvent>
{
    public Task<Message> MapToMessageAsync(SqsFifoEvent request, Publication publication,
        CancellationToken cancellationToken = new CancellationToken())
    {
        return Task.FromResult(new Message(new MessageHeader
            {
                MessageId = request.Id,
                Topic = publication.Topic!,
                PartitionKey = request.PartitionValue, // Requisito FIFO
                MessageType = MessageType.MT_EVENT,
                TimeStamp = DateTimeOffset.UtcNow
            }, 
            new MessageBody(JsonSerializer.SerializeToUtf8Bytes(request, JsonSerialisationOptions.Options))));
    }
    public Task<SqsFifoEvent> MapToRequestAsync(Message message, CancellationToken cancellationToken = new CancellationToken())
    {
        return Task.FromResult(JsonSerializer.Deserialize<SqsFifoEvent>(message.Body.Bytes, JsonSerialisationOptions.Options)!);
    }
    public IRequestContext? Context { get; set; }
}

Mudança no V10: Para pipelines assíncronos, agora é obrigatório usar IAmAMessageMapperAsync. Para SNS/SQS FIFO, é necessário implementar um mapeador personalizado que defina a chave de partição.

Manipulador de Solicitações

Processa mensagens recebidas:

public class GreetingHandler(ILogger<GreetingHandler> logger) : RequestHandler<Greeting>
{
    public override Greeting Handle(Greeting command)
    {
        logger.LogInformation("Olá {Name}", command.Name);
        await processor.PostAsync(new Farewell { Name = command.Name }, cancellationToken: cancellationToken);
        return base.Handle(command);
    }
}

Configurando o Brighter com AWS SNS/SQS

1. Configuração da Conexão

Defina detalhes da conexão com AWS SNS:

 var connection = new AWSMessagingGatewayConnection(new BasicAWSCredentials("test", "test"), 
            RegionEndpoint.USEast1,
            cfg => cfg.ServiceURL = "http://localhost:4566" // LocalStack
); 

2. Assinatura SQS

Assine uma SQS (exemplos: SNS→SQS, SQS→SQS e SNS FIFO→SQS FIFO):

.AddServiceActivator(opt =>
{
    opt.Subscriptions = [
         // SNS → SQS
         new SqsSubscription<Greeting>(
             "greeting-subscription", // Opcional
             "greeting-queue", // Nome da fila SQS
             ChannelType.PubSub,  // Obrigatório para SNS
             "greeting.topic".ToValidSNSTopicName(), // Nome do tópico SNS
             bufferSize: 2, 
             messagePumpType: MessagePumpType.Proactor),
          // SQS → SQS (Ponto-a-Ponto)
         new SqsSubscription<Farewell>(
             new SubscriptionName("farawell-subscription"), // Opcional
             new ChannelName("farewell.queue"), // Nome da fila SQS
             ChannelType.PointToPoint, // SQS direto
             new RoutingKey("farewell.queue".ToValidSQSQueueName()), // Nome do tópico SNS
             bufferSize: 2,
             messagePumpType: MessagePumpType.Proactor),
         // FIFO SNS → SQS
         new SqsSubscription<SnsFifoEvent>(
             new SubscriptionName("sns-sample-fifo-subscription"), // Opcional
             new ChannelName("sns-sample-fifo".ToValidSQSQueueName(true)), // Nome da fila SQS
             ChannelType.PubSub, 
             new RoutingKey("sns-sample-fifo".ToValidSNSTopicName(true)), // Nome do tópico SNS
             bufferSize: 2,
             messagePumpType: MessagePumpType.Proactor,
             topicAttributes: new SnsAttributes { Type = SqsType.Fifo }, // FIFO
             queueAttributes: new SqsAttributes(type: SqsType.Fifo)), // FIFO
    ];
    opt.DefaultChannelFactory = new ChannelFactory(connection);
})

3. Configuração do Produtor SNS/SQS

Publique eventos em um tópico usando CombinedProducerRegistryFactory:

.UseExternalBus(opt =>
{
    opt.ProducerRegistry = new CombinedProducerRegistryFactory(
        // Produtores SNS
        new SnsMessageProducerFactory(connection, [
            new SnsPublication<Greeting>
            {
                Topic = "greeting.topic".ToValidSNSTopicName(), 
                MakeChannels = OnMissingChannel.Create
            },
            new SnsPublication<SnsFifoEvent>
            {
                Topic = "sns-sample-fifo".ToValidSNSTopicName(true),
                MakeChannels = OnMissingChannel.Create,
                TopicAttributes = new SnsAttributes
                {
                    Type = SqsType.Fifo
                }
            }
       ]),
       // Produtores SQS
       new SqsMessageProducerFactory(connection, [
           new SqsPublication<Farewell>
           {
               ChannelName = "farewell.queue".ToValidSQSQueueName(), 
               Topic = "farewell.queue".ToValidSQSQueueName(), 
               MakeChannels = OnMissingChannel.Create
            }
        ])
      ).Create();
});

Alterações que quebram no Brighter V10

Reestruturação do Mapeador de Mensagens

Serialização JSON Padrão:

No V9, mapeadores eram obrigatórios. No V10, a serialização JSON é embutida, exceto para lógica personalizada.

Divisão de Interfaces

  • IAmAMessageMapper (sincronização/reactor): Para fluxos síncronos
  • IAmAMessageMapperAsync (assíncrono/proactor): Para fluxos assíncronos

V10: Para definir uma chave de partição, implemente um mapeador personalizado (espera-se ajuste via RequestContext na versão final)

Mudanças no IAmAMessageMapper

// V10
IRequestContext? Context { get; set; }
Message MapToMessage(Greeting request, Publication publication);
// V9
Message MapToMessage(Greeting request);

Assinaturas

Tipos Explícitos de Message Pump

Substituído runAsync/isAsync por messagePumpType (Reactor, Proactor, Unknown)

Renomeação de Propriedades

ChannelFactory foi renomeado para DefaultChannelFactory em AddServiceActivator

Tipo de Canal no SQS

Agora é obrigatório definir ChannelType em SqsSubscription:

  • PubSub (SNS→SQS)
  • PointToPoint (SQS→SQS)

Atributos de Fila e Tópico

Propriedades movidas para SqsAttributes e SnsAttributes. Para FIFO:

new SqsSubscription<SnsFifoEvent>(
   ...
   topicAttributes: new SnsAttributes { Type = SqsType.Fifo },
   queueAttributes: new SqsAttributes(type: SqsType.Fifo))

Publicações

Reestruturação da Configuração

Removido IAmAProducerRegistry. Use ExternalBusConfiguration:

// V10
.UseExternalBus(opt => { ... })
// V9
.UseExternalBus(new RmqProducerRegistryFactory(...))

Especificação do Tipo de Solicitação

Defina RequestType ou altere o IAmAPublicationFinder:

new SqsSubscription<Greeting>
{
    MakeChannels = OnMissingChannel.Create,
    Topic = new RoutingKey("greeting.topic"),
}
// ou
new SqsSubscription
{
    MakeChannels = OnMissingChannel.Create,
    Topic = new RoutingKey("greeting.topic"),
    RequestType = typeof(Greeting)
}

SqsPublication

Agora existe SqsPublication específico para SQS

Conclusão

O Brighter V10 simplifica integrações com AWS adicionando recursos críticos como FIFO. Migre atualizando:

  1. Interfaces de mapeadores de mensagens
  2. Tipo de canal nas assinaturas
  3. Modelo de publicação
  4. Use serialização embutida quando possível

Código completo: Repositório GitHub


This content originally appeared on DEV Community and was authored by Rafael Andrade