This content originally appeared on DEV Community and was authored by Syahrul Al-Rasyid
Go-like WaitGroup Pattern in Async OCR
Location of WaitGroup Implementation
The Go-like WaitGroup pattern is implemented in:
-
File:
src/core/async_ocr_integration.py
-
Class:
AsyncOCRWaitGroup
(lines 28-84) -
Usage:
_process_concurrent_operations()
method
How the WaitGroup Works (Like Go’s sync.WaitGroup)
1. WaitGroup Initialization
# Line 189-190: Create WaitGroup for coordinating async operations
wait_group = AsyncOCRWaitGroup()
2. Add Operations to WaitGroup
# Line 330: Add N operations to WaitGroup (like Go's wg.Add(n))
await wait_group.add(len(all_operations))
Output: 🔢 [AsyncOCRWaitGroup] Counter: 3 (delta: 3)
3. Start Concurrent Operations
# Lines 332-340: Create async tasks for each detection
tasks = []
for i, operation in enumerate(all_operations):
task = asyncio.create_task(
self._process_single_operation_safe(operation, wait_group, updated_detections, i)
)
tasks.append(task)
4. Each Operation Signals Completion
# Line 472-473: Each operation calls done() when finished (like Go's wg.Done())
finally:
await wait_group.done() # Decrements counter by 1
Output: ✅ [AsyncOCRWaitGroup] Operation completed, counter: 2
Output: ✅ [AsyncOCRWaitGroup] Operation completed, counter: 1
Output: ✅ [AsyncOCRWaitGroup] Operation completed, counter: 0
5. Wait for All Operations
# Line 199: Wait for all operations to complete (like Go's wg.Wait())
await wait_group.wait(timeout=self.operation_timeout)
Output: 🎉 [AsyncOCRWaitGroup] All 3 operations completed
Detailed Flow Diagram
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β ASYNC OCR WAITGROUP FLOW β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
1. INITIALIZATION
ββββββββββββββββββββ
β Multiple β
β food_label ββββ
β detections found β β
ββββββββββββββββββββ β βββββββββββββββββββββββ
βββββ▶β Create WaitGroup β
ββββββββββββββββββββ β β wait_group = new() β
β QR/invoice β β βββββββββββββββββββββββ
β detections found ββββ β
ββββββββββββββββββββ β
βΌ
2. ADD OPERATIONS TO WAITGROUP
βββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β await wait_group.add(3) # 3 detections to process β
β 🔢 Counter: 3 (delta: +3) β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β
βΌ
3. START CONCURRENT TASKS
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β CONCURRENT EXECUTION β
β β
β Task 1: food_label Task 2: food_label Task 3: QR β
β βββββββββββββββββββ βββββββββββββββββββ βββββββββββ β
β β OCR Processing β β OCR Processing β β QR Proc β β
β β βββββββββββββββ β β βββββββββββββββ β β βββββββ β β
β β β gRPC β β β β gRPC β β β βgRPC β β β
β β β OCR Service β β β β OCR Service β β β β QR β β β
β β βββββββββββββββ β β βββββββββββββββ β β βββββββ β β
β βββββββββββββββββββ βββββββββββββββββββ βββββββββββ β
β β β β β
β βΌ βΌ βΌ β
β βββββββββββββββββββ βββββββββββββββββββ βββββββββββ β
β β await wg.done() β β await wg.done() β β await β β
β β Counter: 2 β β Counter: 1 β β wg.done()β β
β βββββββββββββββββββ βββββββββββββββββββ βCounter:0β β
β βββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β
βΌ
4. WAITGROUP COORDINATION
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β WAITGROUP COORDINATION β
β β
β ✅ Operation 1 complete β Counter: 2 β
β ✅ Operation 2 complete β Counter: 1 β
β ✅ Operation 3 complete β Counter: 0 β
β β
β When Counter = 0: β
β 🎉 All 3 operations completed! β
β 📤 Release await wait_group.wait() β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β
βΌ
5. COMPLETION
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β ✨ Successfully processed 3 operations concurrently β
β 📊 Return updated detections with OCR results β
β ⏱ Total time: ~200ms (instead of 600ms sequential) β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Key Components Explained
AsyncOCRWaitGroup Class (lines 28-84
)
class AsyncOCRWaitGroup:
def __init__(self):
self._counter = 0 # Number of pending operations
self._event = asyncio.Event() # Event to signal completion
self._lock = asyncio.Lock() # Thread-safe counter updates
async def add(self, delta: int = 1):
# Like Go's wg.Add(n) - adds n operations to wait for
self._counter += delta
print(f"🔢 Counter: {self._counter} (delta: {delta})")
async def done(self):
# Like Go's wg.Done() - signals one operation completed
self._counter -= 1
print(f"✅ Operation completed, counter: {self._counter}")
if self._counter == 0:
self._event.set() # Wake up wait()
async def wait(self, timeout=None):
# Like Go's wg.Wait() - blocks until counter = 0
await self._event.wait()
print(f"🎉 All {self._initial_counter} operations completed")
Where Operations Start (line 330
)
# Add all operations to WaitGroup
await wait_group.add(len(all_operations)) # e.g., add(3)
Where Each Operation Finishes (line 472-473
)
# In _process_single_operation_async
finally:
# Always mark operation as done in WaitGroup
await wait_group.done() # Decrements counter
Where We Wait for All (line 199
)
# Wait for all operations to complete with timeout
await wait_group.wait(timeout=self.operation_timeout)
Real Log Example
When you have 3 food_label detections:
🚀 [AsyncOCR] Processing 3 detections concurrently
🚀 [AsyncOCR] Starting 3 concurrent operations
🔢 [AsyncOCRWaitGroup] Counter: 3 (delta: 3) β WaitGroup.Add(3)
[Concurrent gRPC OCR calls happen]
✅ [AsyncOCR] Completed food_label operation for detection 0
✅ [AsyncOCRWaitGroup] Operation completed, counter: 2 β WaitGroup.Done()
✅ [AsyncOCR] Completed food_label operation for detection 1
✅ [AsyncOCRWaitGroup] Operation completed, counter: 1 β WaitGroup.Done()
✅ [AsyncOCR] Completed food_label operation for detection 2
✅ [AsyncOCRWaitGroup] Operation completed, counter: 0 β WaitGroup.Done()
🎉 [AsyncOCRWaitGroup] All 3 operations completed β WaitGroup.Wait() unblocks
✅ [AsyncOCR] Completed 3 operations in 221.9ms
✨ [AsyncOCR] Successfully processed 2 operations concurrently
Go vs Python Comparison
Go Version
var wg sync.WaitGroup
wg.Add(3) // 3 operations
// Start goroutines
for _, detection := range detections {
go func(d Detection) {
defer wg.Done() // Signal completion
processOCR(d)
}(detection)
}
wg.Wait() // Wait for all to complete
Python Version (Our Implementation)
wait_group = AsyncOCRWaitGroup()
await wait_group.add(3) # 3 operations
# Start async tasks
tasks = []
for detection in detections:
task = asyncio.create_task(process_ocr_async(detection, wait_group))
tasks.append(task)
await wait_group.wait() # Wait for all to complete
# In each process_ocr_async:
try:
# Do OCR processing
result = await ocr_service.process(detection)
finally:
await wait_group.done() # Signal completion
The pattern is identical – we wait for a counter to reach zero as each concurrent operation signals completion!
This content originally appeared on DEV Community and was authored by Syahrul Al-Rasyid