This content originally appeared on DEV Community and was authored by Isaac Ojeda
Introducción
En el desarrollo moderno de aplicaciones, es común necesitar ejecutar procesos en segundo plano que se comuniquen con nuestra API de forma eficiente y segura. Tradicionalmente, esto se resolvía con implementaciones complejas usando locks, colas manuales o infraestructura externa como RabbitMQ. Sin embargo, .NET ofrece una solución simple pero elegante: System.Threading.Channels.
En este artículo, exploraremos cómo construir un sistema de control de jobs en tiempo real utilizando:
Channels para comunicación thread-safe entre componentes
Background Services para tareas recurrentes
Minimal APIs para endpoints modernos y limpios
TaskCompletionSource para comunicación bidireccional
Al finalizar, tendrás un proyecto funcional que puedes adaptar para casos de uso reales como procesamiento de emails, análisis de imágenes, generación de reportes, y más.
Nota: El código fuente siempre lo encontrarás en mi github -> DevToPosts/ApiBackgroundChannels at main · isaacOjeda/DevToPosts
¿Qué son los Channels?
Los Channels en .NET son estructuras de datos thread-safe diseñadas para escenarios productor-consumidor. Piensa en ellos como una “tubería” donde un lado escribe datos y el otro los lee, sin preocuparte por locks o sincronización manual.
¿Por qué usarlos?
Thread-safe por diseño
Alta performance con bajo overhead
Backpressure integrado (control de flujo)
Ideal para comunicación entre hilos/tareas
Alternativa simple a colas externas (RabbitMQ, Redis) para escenarios internos
Optimizado para async/await (usa ValueTaskinternamente)
Arquitectura del Proyecto
Este proyecto demuestra cómo controlar un Background Job desde una API usando Channels para comunicación bidireccional:
┌──────────────┐ ┌─────────┐ ┌──────────────────┐
│ API Request │ ──────> │ Channel │ ──────> │ Background Job │
│ (Productor) │ │ (Cola) │ │ (Consumidor) │
└──────────────┘ └─────────┘ └──────────────────┘
↑ │
└────── TaskCompletionSource ─────────────────┘
(Respuesta)
Paso 1: Definir el Modelo de Comunicación
Primero, necesitamos estructuras para enviar comandos y recibir respuestas:
public enum CommandType { Start, Stop, GetStatus }
public class JobCommand
{
public CommandType Type { get; set; }
public TaskCompletionSource<JobStatus>? ResponseTask { get; set; }
}
public class JobStatus
{
public bool IsRunning { get; set; }
public int ExecutionCount { get; set; }
public DateTime? LastExecutionTime { get; set; }
public string Message { get; set; } = string.Empty;
}
Clave: TaskCompletionSource nos permite crear una Task que completaremos manualmente cuando tengamos la respuesta, haciendo posible la comunicación bidireccional.
Paso 2: Crear el Background Service (Consumidor)
public class JobProcessor : BackgroundService
{
private readonly Channel<JobCommand> _channel;
private bool _isJobRunning = false;
private int _executionCount = 0;
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("JobProcessor iniciado. Esperando comandos...");
// ✅ Patrón recomendado por Microsoft: WaitToReadAsync + TryRead
// Más eficiente que ReadAllAsync para alta concurrencia
while (await _channel.Reader.WaitToReadAsync(stoppingToken))
{
while (_channel.Reader.TryRead(out var command))
{
try
{
await ProcessCommandAsync(command, stoppingToken);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error procesando comando");
// ✅ Notificar errores al productor
command.ResponseTask?.TrySetException(ex);
}
}
}
}
private async Task ProcessCommandAsync(JobCommand command, CancellationToken token)
{
switch (command.Type)
{
case CommandType.Start:
_isJobRunning = true;
_ = Task.Run(async () => await RunRecurringJobAsync(token));
// Enviar respuesta al productor
command.ResponseTask?.SetResult(new JobStatus
{
IsRunning = true,
Message = "Job iniciado"
});
break;
case CommandType.Stop:
_isJobRunning = false;
command.ResponseTask?.SetResult(new JobStatus
{
IsRunning = false,
Message = "Job detenido"
});
break;
case CommandType.GetStatus:
command.ResponseTask?.SetResult(new JobStatus
{
IsRunning = _isJobRunning,
ExecutionCount = _executionCount
});
break;
}
}
}
Explicación de las mejoras:
-
WaitToReadAsync()+TryRead(): Patrón recomendado por Microsoft para mejor performance -
TrySetException(): Propaga errores al productor de forma segura - Bucle anidado: Procesa múltiples comandos en batch cuando están disponibles
Paso 3: Configurar el Channel y el Servicio
En Program.cs:
// ✅ Bounded Channel con opciones optimizadas (recomendado por Microsoft)
builder.Services.AddSingleton(Channel.CreateBounded<JobCommand>(
new BoundedChannelOptions(100)
{
FullMode = BoundedChannelFullMode.Wait, // Backpressure automático
SingleWriter = false, // Múltiples endpoints pueden escribir
SingleReader = true // Solo un BackgroundService consume
}));
// Registrar el Background Service
builder.Services.AddHostedService<JobProcessor>();
¿Bounded vs Unbounded?
| Característica | Unbounded | Bounded |
|---|---|---|
| Capacidad | Ilimitada | Limitada (configurable) |
| Memoria | Puede crecer sin control | Controlada |
| Backpressure | No | Sí (automático) |
| Uso recomendado | Productores lentos | Productores rápidos |
| Performance | Writes síncronos | Writes pueden ser async |
Modos de Bounded Channel (FullMode):
-
Wait(recomendado): Espera hasta que haya espacio (backpressure) -
DropWrite: Descarta el nuevo elemento -
DropOldest: Descarta el elemento más antiguo -
DropNewest: Descarta el elemento más reciente
Opciones de optimización:
-
SingleWriter:true= mejor performance si solo un productor escribe -
SingleReader:true= mejor performance si solo un consumidor lee -
AllowSynchronousContinuations:false(default) para evitar bloqueos
Paso 4: Crear los Endpoints (Productores)
public static IEndpointRouteBuilder MapJobEndpoints(this IEndpointRouteBuilder app)
{
var jobGroup = app.MapGroup("api/job");
jobGroup.MapPost("start", async (Channel<JobCommand> channel) =>
{
// Crear TaskCompletionSource para esperar la respuesta
var tcs = new TaskCompletionSource<JobStatus>();
var command = new JobCommand
{
Type = CommandType.Start,
ResponseTask = tcs
};
// ✅ WriteAsync maneja backpressure automáticamente
await channel.Writer.WriteAsync(command);
// Esperar respuesta del consumidor
var status = await tcs.Task;
return Results.Ok(status);
});
// Endpoints similares para stop y status...
return app;
}
Flujo:
- API recibe request → Crea
TaskCompletionSource - Escribe comando en el Channel con
WriteAsync()(maneja backpressure) - Espera que el consumidor complete la Task
- Retorna la respuesta al cliente
¿Por qué este patrón?
Sin Channels:
// ❌ Locks manuales, propenso a errores
private static readonly object _lock = new();
private static Queue<Command> _queue = new();
public void AddCommand(Command cmd)
{
lock(_lock) { _queue.Enqueue(cmd); }
}
Con Channels:
// ✅ Thread-safe automático, limpio, con backpressure
await channel.Writer.WriteAsync(command);
Casos de Uso Reales con Channels
1. Cola de Emails/Notificaciones
// Microsoft recomienda Bounded para prevenir OutOfMemory
var emailChannel = Channel.CreateBounded<EmailMessage>(
new BoundedChannelOptions(1000)
{
FullMode = BoundedChannelFullMode.Wait
});
// API recibe requests → Encola en Channel → Background envía emails en lotes
2. Procesamiento de Imágenes
Channel<ImageProcessingJob> imageChannel;
// Upload de imágenes → Channel → Worker redimensiona/optimiza en background
3. Logs Centralizados
// DropOldest para logs: si está lleno, descarta los más antiguos
var logChannel = Channel.CreateBounded<LogEntry>(
new BoundedChannelOptions(5000)
{
FullMode = BoundedChannelFullMode.DropOldest
});
4. Rate Limiting / Throttling
var boundedChannel = Channel.CreateBounded<Request>(100);
// Limita a 100 requests concurrentes, el resto espera (backpressure)
5. Event Sourcing Interno
Channel<DomainEvent> eventChannel;
// Eventos de dominio → Channel → Múltiples handlers procesan en paralelo
6. Pipeline de Datos (ejemplo oficial de Microsoft)
// Patrón de processing pipeline con múltiples stages
Channel<RawData> inputChannel;
Channel<ProcessedData> outputChannel;
// Stage 1: Raw → Validated → Stage 2: Validated → Enriched
Ventajas vs Alternativas
| Escenario | Channel | Queue Externo |
|---|---|---|
| Performance | Muy alta |
Red overhead |
| Configuración | Cero |
Infraestructura |
| Backpressure | Integrado |
Manual |
| Async/Await | Nativo (ValueTask) |
Depende |
| Escalabilidad | Single-app |
Multi-app |
| Memoria | Bounded options |
Depende |
Usa Channels cuando:
Comunicación dentro de la misma aplicación
Necesitas alta performance y bajo latency
Quieres simplicidad sin infraestructura externa
Trabajas con async/await
Necesitas backpressure automático
Usa Queue externo (RabbitMQ/Azure Service Bus) cuando:
Necesitas comunicación entre múltiples aplicaciones/servicios
Requieres persistencia de mensajes
Necesitas escalabilidad horizontal
Requieres garantías de entrega (at-least-once, exactly-once)
¿Por qué usar Channels?
A lo largo de este tutorial, hemos visto cómo Channels ofrece ventajas significativas:
- Simplicidad: No necesitas infraestructura externa para empezar
-
Performance: Diseñados desde cero para async/await con
ValueTask - Seguridad: Thread-safe por diseño, sin preocupaciones por race conditions
- Control: Backpressure automático previene sobrecarga del sistema
- Flexibilidad: Configuración granular según tus necesidades específicas
Cuándo SÍ usar Channels
Comunicación intra-proceso: Coordinación entre componentes de la misma aplicación
Alta frecuencia: Miles de mensajes por segundo con mínimo overhead
Backpressure crítico: Necesitas controlar la velocidad de producción/consumo
Simplicidad operacional: Quieres evitar dependencias de infraestructura externa
Desarrollo rápido: Prototipado y desarrollo local sin complicaciones
Cuándo NO usar Channels
Comunicación inter-proceso: Si necesitas comunicar múltiples aplicaciones/servicios
Persistencia requerida: Si los mensajes deben sobrevivir reinicios
Distribución geográfica: Múltiples datacenters o regiones
Garantías de entrega avanzadas: Exactly-once, dead letter queues, retries configurables
Monitoreo centralizado: Necesitas observabilidad empresarial de mensajería
Impacto en tu arquitectura
Este patrón es especialmente valioso cuando:
- Estás construyendo aplicaciones monolíticas modernas que necesitan procesamiento asíncrono
- Quieres reducir costos de infraestructura eliminando dependencias de message brokers
- Necesitas optimizar performance con procesamiento en memoria
- Buscas simplicidad operacional sin sacrificar escalabilidad vertical
Los Channels de .NET demuestran que no siempre necesitas herramientas complejas para resolver problemas complejos. A veces, la solución más elegante es la que viene incorporada en tu framework.
Próximos Pasos
¿Listo para llevar este conocimiento al siguiente nivel? Aquí tienes algunas ideas para expandir este proyecto:
1. Implementar Múltiples Consumidores
// Escalar procesamiento con múltiples workers
builder.Services.AddHostedService<JobProcessor>(); // Worker 1
builder.Services.AddHostedService<JobProcessor>(); // Worker 2
builder.Services.AddHostedService<JobProcessor>(); // Worker 3
Aprenderás: Paralelización, distribución de carga, sincronización entre workers
2. Agregar Sistema de Prioridades
public enum JobPriority { Low, Normal, High, Critical }
// Crear channels separados por prioridad
var highPriorityChannel = Channel.CreateBounded<JobCommand>(50);
var normalPriorityChannel = Channel.CreateBounded<JobCommand>(100);
var lowPriorityChannel = Channel.CreateBounded<JobCommand>(200);
Aprenderás: Gestión de prioridades, routing inteligente, SLA por prioridad
3. Integrar Observabilidad
// Métricas con System.Diagnostics.Metrics
var meter = new Meter("BackgroundJobs");
var jobsProcessed = meter.CreateCounter<long>("jobs_processed");
var queueDepth = meter.CreateObservableGauge("queue_depth",
() => channel.Reader.Count);
Aprenderás: OpenTelemetry, métricas personalizadas, dashboards con Grafana/Prometheus
4. Implementar Persistencia
// Guardar estado en caso de restart
public class PersistentJobProcessor : BackgroundService
{
private readonly IJobStateRepository _repository;
protected override async Task ExecuteAsync(CancellationToken token)
{
// Recuperar jobs pendientes al iniciar
await _repository.RestorePendingJobsAsync(token);
// ... continuar procesamiento normal
}
}
Aprenderás: State management, recovery strategies, durabilidad
5. Agregar Pipeline de Procesamiento
// Pipeline multi-etapa
var rawChannel = Channel.CreateBounded<RawData>(100);
var validatedChannel = Channel.CreateBounded<ValidatedData>(100);
var enrichedChannel = Channel.CreateBounded<EnrichedData>(100);
// Stage 1: Validación
builder.Services.AddHostedService<ValidationProcessor>();
// Stage 2: Enriquecimiento
builder.Services.AddHostedService<EnrichmentProcessor>();
// Stage 3: Persistencia
builder.Services.AddHostedService<PersistenceProcessor>();
Aprenderás: Pipeline pattern, ETL processes, data transformation
6. Implementar Rate Limiting Avanzado
// Rate limiter con ventanas deslizantes
public class RateLimitedJobProcessor : BackgroundService
{
private readonly RateLimiter _rateLimiter;
protected override async Task ExecuteAsync(CancellationToken token)
{
while (await _channel.Reader.WaitToReadAsync(token))
{
using var lease = await _rateLimiter.AcquireAsync(1, token);
if (lease.IsAcquired)
{
// Procesar job
}
}
}
}
Aprenderás: Rate limiting patterns, token bucket, leaky bucket
7. Crear Dashboard de Monitoreo
// SignalR para updates en tiempo real
builder.Services.AddSignalR();
// Notificar estado a clientes conectados
await _hubContext.Clients.All.SendAsync("JobStatusUpdate", new {
QueueDepth = channel.Reader.Count,
ProcessingRate = jobsPerSecond,
ActiveJobs = activeJobCount
});
Aprenderás: Real-time updates, SignalR, live dashboards
8. Añadir Resiliencia
// Polly para retry policies
var retryPolicy = Policy
.Handle<Exception>()
.WaitAndRetryAsync(3, retryAttempt =>
TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)));
await retryPolicy.ExecuteAsync(async () =>
{
await ProcessJobAsync(command);
});
Aprenderás: Retry patterns, circuit breakers, fallback strategies
9. Implementar Health Checks
// Health check para el channel
builder.Services.AddHealthChecks()
.AddCheck<ChannelHealthCheck>("channel_health")
.AddCheck<JobProcessorHealthCheck>("job_processor_health");
public class ChannelHealthCheck : IHealthCheck
{
public Task<HealthCheckResult> CheckHealthAsync()
{
var queueDepth = _channel.Reader.Count;
return queueDepth < 1000
? HealthCheckResult.Healthy()
: HealthCheckResult.Degraded("Queue depth high");
}
}
Aprenderás: Health monitoring, readiness/liveness probes, Kubernetes integration
10. Migrar a Arquitectura Distribuida
// Cuando crezcas más allá de un solo servidor
// Considera migrar a:
// - Azure Service Bus para messaging distribuido
// - Azure Queue Storage para simplicidad y bajo costo
// - RabbitMQ para control total
// - Redis Streams para alta performance
// Mantén la misma interfaz, cambia la implementación
public interface IJobQueue
{
Task EnqueueAsync(JobCommand command);
Task<JobCommand> DequeueAsync(CancellationToken token);
}
// Implementación con Channels (actual)
public class InMemoryJobQueue : IJobQueue { }
// Implementación con Azure Service Bus (futuro)
public class ServiceBusJobQueue : IJobQueue { }
Aprenderás: Estrategias de migración, abstracciones, arquitectura evolutiva
Recursos para Continuar Aprendiendo
Documentación Oficial:
- System.Threading.Channels API Reference
- Background tasks with hosted services in ASP.NET Core | Microsoft Learn
- Channels Library Guide
- Background Services in ASP.NET Core
Artículos Avanzados:
This content originally appeared on DEV Community and was authored by Isaac Ojeda
Red overhead
Manual
Single-app
Multi-app