πŸ”„ Go-like WaitGroup Pattern in Async OCR



This content originally appeared on DEV Community and was authored by Syahrul Al-Rasyid

πŸ”„ Go-like WaitGroup Pattern in Async OCR

πŸ“ Location of WaitGroup Implementation

The Go-like WaitGroup pattern is implemented in:

  • File: src/core/async_ocr_integration.py
  • Class: AsyncOCRWaitGroup (lines 28-84)
  • Usage: _process_concurrent_operations() method

πŸ”„ How the WaitGroup Works (Like Go’s sync.WaitGroup)

1. WaitGroup Initialization

# Line 189-190: Create WaitGroup for coordinating async operations
wait_group = AsyncOCRWaitGroup()

2. Add Operations to WaitGroup

# Line 330: Add N operations to WaitGroup (like Go's wg.Add(n))
await wait_group.add(len(all_operations))

Output: 🔢 [AsyncOCRWaitGroup] Counter: 3 (delta: 3)

3. Start Concurrent Operations

# Lines 332-340: Create async tasks for each detection
tasks = []
for i, operation in enumerate(all_operations):
    task = asyncio.create_task(
        self._process_single_operation_safe(operation, wait_group, updated_detections, i)
    )
    tasks.append(task)

4. Each Operation Signals Completion

# Line 472-473: Each operation calls done() when finished (like Go's wg.Done())
finally:
    await wait_group.done()  # Decrements counter by 1

Output: ✅ [AsyncOCRWaitGroup] Operation completed, counter: 2
Output: ✅ [AsyncOCRWaitGroup] Operation completed, counter: 1
Output: ✅ [AsyncOCRWaitGroup] Operation completed, counter: 0

5. Wait for All Operations

# Line 199: Wait for all operations to complete (like Go's wg.Wait())
await wait_group.wait(timeout=self.operation_timeout)

Output: 🎉 [AsyncOCRWaitGroup] All 3 operations completed

🎯 Detailed Flow Diagram

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                   ASYNC OCR WAITGROUP FLOW                  β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

1. INITIALIZATION
   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
   β”‚ Multiple         β”‚
   β”‚ food_label       │──┐
   β”‚ detections found β”‚  β”‚
   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚     β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
                         β”œβ”€β”€β”€β”€▶β”‚ Create WaitGroup    β”‚
   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”‚     β”‚ wait_group = new()  β”‚
   β”‚ QR/invoice       β”‚  β”‚     β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
   β”‚ detections found β”‚β”€β”€β”˜              β”‚
   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜                 β”‚
                                        β–Ό

2. ADD OPERATIONS TO WAITGROUP
   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
   β”‚ await wait_group.add(3)  # 3 detections to process  β”‚
   β”‚ 🔢 Counter: 3 (delta: +3)                          β”‚
   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                               β”‚
                               β–Ό

3. START CONCURRENT TASKS
   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
   β”‚                  CONCURRENT EXECUTION                   β”‚
   β”‚                                                         β”‚
   β”‚ Task 1: food_label   Task 2: food_label   Task 3: QR    β”‚
   β”‚ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”‚
   β”‚ β”‚ OCR Processing  β”‚  β”‚ OCR Processing  β”‚  β”‚ QR Proc β”‚  β”‚
   β”‚ β”‚ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚  β”‚ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚  β”‚ β”Œβ”€β”€β”€β”€β”€β” β”‚  β”‚
   β”‚ β”‚ β”‚   gRPC      β”‚ β”‚  β”‚ β”‚   gRPC      β”‚ β”‚  β”‚ β”‚gRPC β”‚ β”‚  β”‚
   β”‚ β”‚ β”‚ OCR Service β”‚ β”‚  β”‚ β”‚ OCR Service β”‚ β”‚  β”‚ β”‚ QR  β”‚ β”‚  β”‚
   β”‚ β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚  β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚  β”‚ β””β”€β”€β”€β”€β”€β”˜ β”‚  β”‚
   β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚
   β”‚         β”‚                    β”‚                 β”‚        β”‚
   β”‚         β–Ό                    β–Ό                 β–Ό        β”‚
   β”‚ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”‚
   β”‚ β”‚ await wg.done() β”‚  β”‚ await wg.done() β”‚  β”‚ await    β”‚  β”‚
   β”‚ β”‚ Counter: 2      β”‚  β”‚ Counter: 1      β”‚  β”‚ wg.done()β”‚  β”‚
   β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚Counter:0β”‚  β”‚
   β”‚                                           β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚
   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                               β”‚
                               β–Ό

4. WAITGROUP COORDINATION
   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
   β”‚                WAITGROUP COORDINATION                   β”‚
   β”‚                                                         β”‚
   β”‚ ✅ Operation 1 complete β†’ Counter: 2                    β”‚
   β”‚ ✅ Operation 2 complete β†’ Counter: 1                    β”‚
   β”‚ ✅ Operation 3 complete β†’ Counter: 0                    β”‚
   β”‚                                                         β”‚
   β”‚ When Counter = 0:                                       β”‚
   β”‚ 🎉 All 3 operations completed!                          β”‚
   β”‚ 📤 Release await wait_group.wait()                      β”‚
   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                               β”‚
                               β–Ό

5. COMPLETION
   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
   β”‚ ✨ Successfully processed 3 operations concurrently     β”‚
   β”‚ 📊 Return updated detections with OCR results           β”‚
   β”‚ ⏱ Total time: ~200ms (instead of 600ms sequential)     β”‚
   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

πŸ” Key Components Explained

AsyncOCRWaitGroup Class (lines 28-84)

class AsyncOCRWaitGroup:
    def __init__(self):
        self._counter = 0           # Number of pending operations
        self._event = asyncio.Event()  # Event to signal completion
        self._lock = asyncio.Lock()    # Thread-safe counter updates

    async def add(self, delta: int = 1):
        # Like Go's wg.Add(n) - adds n operations to wait for
        self._counter += delta
        print(f"🔢 Counter: {self._counter} (delta: {delta})")

    async def done(self):
        # Like Go's wg.Done() - signals one operation completed
        self._counter -= 1
        print(f"✅ Operation completed, counter: {self._counter}")
        if self._counter == 0:
            self._event.set()  # Wake up wait()

    async def wait(self, timeout=None):
        # Like Go's wg.Wait() - blocks until counter = 0
        await self._event.wait()
        print(f"🎉 All {self._initial_counter} operations completed")

Where Operations Start (line 330)

# Add all operations to WaitGroup
await wait_group.add(len(all_operations))  # e.g., add(3)

Where Each Operation Finishes (line 472-473)

# In _process_single_operation_async
finally:
    # Always mark operation as done in WaitGroup
    await wait_group.done()  # Decrements counter

Where We Wait for All (line 199)

# Wait for all operations to complete with timeout
await wait_group.wait(timeout=self.operation_timeout)

πŸ“Š Real Log Example

When you have 3 food_label detections:

🚀 [AsyncOCR] Processing 3 detections concurrently
🚀 [AsyncOCR] Starting 3 concurrent operations
🔢 [AsyncOCRWaitGroup] Counter: 3 (delta: 3)           ← WaitGroup.Add(3)

[Concurrent gRPC OCR calls happen]

✅ [AsyncOCR] Completed food_label operation for detection 0
✅ [AsyncOCRWaitGroup] Operation completed, counter: 2  ← WaitGroup.Done()

✅ [AsyncOCR] Completed food_label operation for detection 1  
✅ [AsyncOCRWaitGroup] Operation completed, counter: 1  ← WaitGroup.Done()

✅ [AsyncOCR] Completed food_label operation for detection 2
✅ [AsyncOCRWaitGroup] Operation completed, counter: 0  ← WaitGroup.Done()

🎉 [AsyncOCRWaitGroup] All 3 operations completed      ← WaitGroup.Wait() unblocks
✅ [AsyncOCR] Completed 3 operations in 221.9ms
✨ [AsyncOCR] Successfully processed 2 operations concurrently

🎯 Go vs Python Comparison

Go Version

var wg sync.WaitGroup
wg.Add(3)  // 3 operations

// Start goroutines
for _, detection := range detections {
    go func(d Detection) {
        defer wg.Done()  // Signal completion
        processOCR(d)
    }(detection)
}

wg.Wait()  // Wait for all to complete

Python Version (Our Implementation)

wait_group = AsyncOCRWaitGroup()
await wait_group.add(3)  # 3 operations

# Start async tasks
tasks = []
for detection in detections:
    task = asyncio.create_task(process_ocr_async(detection, wait_group))
    tasks.append(task)

await wait_group.wait()  # Wait for all to complete

# In each process_ocr_async:
try:
    # Do OCR processing
    result = await ocr_service.process(detection)
finally:
    await wait_group.done()  # Signal completion

The pattern is identical – we wait for a counter to reach zero as each concurrent operation signals completion! πŸŽ‰


This content originally appeared on DEV Community and was authored by Syahrul Al-Rasyid