This content originally appeared on DEV Community and was authored by Danilo Maia Florenzano
“TPL Dataflow is an interesting mix of asynchronous and parallel technologies. It’s useful when you have a sequence of processes that need to be applied to your data.”
— Stephen Cleary, Concurrency in C# Cookbook
I recently needed to traverse a recursive category tree — starting with ~20 root nodes, each with an unknown number of subcategories (and sub-subcategories, and so on).
This kind of dynamically expanding workload doesn’t play well with static parallelization tools like Parallel.ForEach.
The Problem: Static Work Distribution
If we run Parallel.ForEach on the 20 root categories:
- Thread 1 might get a small branch and finish in 1 second.
- Thread 2 might get a massive branch and take 10 minutes.
Even with MaxDegreeOfParallelism = 50, most threads finish early and go idle — while a few get stuck processing deep, heavy trees. Load imbalance and wasted resources.
The Solution: ActionBlock<T> as a Dynamic Work Queue
ActionBlock<T> from TPL Dataflow provides a centralized, thread-safe queue with dynamic task generation and controlled concurrency.
Here’s how it works:
- Create one
ActionBlock<CategoryNode>with a fixed concurrency limit. - “Prime” it with the initial root categories.
- Each worker processes its node, finds subcategories, and posts them back into the same
ActionBlock. - The block keeps running until all items (current + pending) are done.
This pattern is effectively a recursive, self-balancing queue — all threads stay busy until the entire tree is processed.
Example Implementation
public class MultithreadTreeParser
{
private int _activeItems;
private ActionBlock<CategoryNode> _actionBlock = null!;
public async Task<int> StartAsync()
{
var rootNode = await GetDepartmentsRootNodeAsync()
?? throw new Exception("Failed to get root node");
var options = new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 5
};
_actionBlock = new ActionBlock<CategoryNode>(async node =>
{
try
{
await ProcessCategoryAsync(node);
var subs = node.SubCategories;
if (subs.Count > 0)
{
Interlocked.Add(ref _activeItems, subs.Count);
foreach (var sub in subs)
await _actionBlock.SendAsync(sub);
}
}
finally
{
var remaining = Interlocked.Decrement(ref _activeItems);
if (remaining == 0)
_actionBlock.Complete();
}
}, options);
var rootSubs = rootNode.SubCategories;
Interlocked.Add(ref _activeItems, rootSubs.Count);
foreach (var sub in rootSubs)
await _actionBlock.SendAsync(sub);
await _actionBlock.Completion;
return 0;
}
}
Why This Works
- Dynamic load balancing: Workers pull from a shared queue. As soon as they finish, they grab the next available node.
- Recursive workload expansion: Each task can add new tasks safely to the same queue.
- Controlled parallelism: MaxDegreeOfParallelism keeps resource usage in check.
- Clean completion tracking: _activeItems counts in-flight work — when it hits zero, the pipeline gracefully completes.
This content originally appeared on DEV Community and was authored by Danilo Maia Florenzano