Dynamic Routing Lightweight ETL with AWS Lambda, DuckDB, and PyIceberg



This content originally appeared on DEV Community and was authored by Aki

Original Japanese article: AWS Lambda×DuckDB×PyIcebergで実現する動的ルーティング軽量ETLの実装

Introduction

I’m Aki, an AWS Community Builder (@jitepengin).
In recent years, data pipelines increasingly aggregate data from multiple sources into a data lake.

However, when each source requires different processing or writes to different target tables, traditional ETL lacks flexibility and increases maintenance costs. Even lightweight ETL solutions using AWS Lambda, as I introduced previously, require creating separate functions per source or target table, which makes runtime or library version upgrades tedious.

In this article, I’ll introduce a dynamic routing ETL solution that solves this problem.
Routing information is stored in DynamoDB, and Lambda automatically determines the target table and SQL to execute.

Dynamic Routing Concept

Traditional ETL often hardcodes target tables and SQL per input source. As new sources are added, code changes are required.

By introducing dynamic routing, Lambda can automatically:

  • Switch the target Iceberg table
  • Execute different SQL per source

This enables handling new sources without changing the code.

Architecture used in this article

Bucket-based Routing

Routing is based on the input bucket name, useful when data comes from multiple sources.

Folder-based Routing

Routing is based on the folder path. Useful when multiple files from the same system have different source tables.

Processing Flow

  1. A file is PUT into S3, triggering Lambda.
  2. Lambda extracts a routing key from the S3 path (bucket or folder).
  3. Lambda fetches target table and SQL from DynamoDB.
  4. Data is loaded into DuckDB from Parquet, and SQL is executed.
  5. Result is appended to the Iceberg table via PyIceberg.

Key Libraries

  • DuckDB — A fast in-memory SQL database engine.
  • PyArrow — Efficient data transformation and transfer via the Arrow format.
  • PyIceberg — Enables working with Iceberg tables using AWS Glue Catalog.

About the Libraries

DuckDB

DuckDB is an embedded OLAP database engine designed for analytical queries.

It’s lightweight, runs in memory, and works efficiently even in restricted environments like Lambda.

It’s particularly well-suited for batch analytics and ETL workloads.

https://duckdb.org/

PyArrow

PyArrow is the Python binding for Apache Arrow, a high-performance columnar memory format optimized for analytics and distributed computing.

It allows fast, zero-copy data transfer between systems and languages.

https://arrow.apache.org/

PyIceberg

PyIceberg is the Python implementation of Apache Iceberg, a table format designed for large, cloud-native data lakes.

It enables Python applications to read, write, and manage Iceberg tables seamlessly.

https://py.iceberg.apache.org/

Sample Code

Lambda Function

Switching between bucket-based and folder-based routing can be done by modifying the routing_key extraction.
The append_to_iceberg function includes retry logic for handling conflicts.

import os
import boto3
import duckdb
import time
from pyiceberg.catalog.glue import GlueCatalog

# DynamoDB table storing routing information
ROUTING_TABLE = os.environ.get("ROUTING_TABLE", "routing_table")

# Iceberg catalog configuration
ICEBERG_CATALOG_NAME = os.environ.get('ICEBERG_CATALOG_NAME', 'my_catalog')
APP_REGION = os.environ.get('APP_REGION', 'ap-northeast-1')

# AWS clients
dynamodb_resource = boto3.resource('dynamodb', region_name=APP_REGION)
table = dynamodb_resource.Table(ROUTING_TABLE)


def lambda_handler(event, context):
    try:
        # 1. Extract S3 event information
        s3_bucket = event['Records'][0]['s3']['bucket']['name']
        s3_object_key = event['Records'][0]['s3']['object']['key']
        s3_input_path = f"s3://{s3_bucket}/{s3_object_key}"

        print(f"s3_input_path: {s3_input_path}")

        # Set routing key based on S3 structure
        # - If using bucket-level routing, use the bucket name:
        routing_key = s3_bucket
        # - If using path-level routing, extract from the object key:
        # routing_key = s3_object_key.split('/')[0]
        #   ※When using path-level routing, adjust the index according to your S3 path structure.

        # 2. Fetch routing config from DynamoDB
        print(f"APP_REGION: {APP_REGION}")
        print(f"table: {table}")
        print(f"PartitionKey: {routing_key}")
        response = table.get_item(Key={'PartitionKey': routing_key})
        if 'Item' not in response:
            raise Exception(f"No routing information found in DynamoDB for key: {routing_key}")

        config = response['Item']
        target_table = config['target_table']
        query_sql = config['query_sql'].format(s3_input_path=s3_input_path)

        print(f"Routing decision: {routing_key} -> {target_table}")
        print(f"Executing SQL: {query_sql}")

        # 3. Load and process data with DuckDB
        con = duckdb.connect(database=':memory:')
        con.execute("SET home_directory='/tmp'")
        con.execute("INSTALL httpfs;")
        con.execute("LOAD httpfs;")

        arrow_table = con.execute(query_sql).fetch_arrow_table()

        print(f"Fetched rows: {arrow_table.num_rows}")
        print(f"Schema: {arrow_table.schema}")

        # 4. Append to Iceberg with retry logic
        append_to_iceberg(arrow_table, target_table)

        return {"status": "success", "rows": arrow_table.num_rows}

    except Exception as e:
        print(f"Error occurred: {e}")
        raise


def append_to_iceberg(arrow_table, target_table, retries=5):
    catalog = GlueCatalog(region_name=APP_REGION, name=ICEBERG_CATALOG_NAME)
    delay = 10

    for attempt in range(retries):
        try:
            table = catalog.load_table(target_table)
            table.refresh()

            current_snapshot = table.current_snapshot()
            snapshot_id = current_snapshot.snapshot_id if current_snapshot else "None"
            print(f"Attempt {attempt + 1}: Using snapshot ID {snapshot_id}")

            table.append(arrow_table)
            print(f"Data successfully appended to Iceberg table {target_table}.")
            return

        except Exception as e:
            print(f"Attempt {attempt + 1} failed: {e}")
            if "Cannot commit" in str(e) or "branch main has changed" in str(e):
                if attempt < retries - 1:
                    delay *= 2
                    print(f"Retrying in {delay} seconds...")
                    time.sleep(delay)
                else:
                    print("Reached max retries. Aborting.")
                    raise
            else:
                raise

The retry delay and limit may need adjustment depending on your environment. Using environment variables for these settings is recommended.

DynamoDB Routing Table


Execution Results

Upload to First Bucket


Successfully written!

Upload to Second Bucket


Successfully written!

Pros and Cons

Pros

  • Adding new sources or targets only requires updating S3 triggers and DynamoDB entries.
  • Lambda code remains simple and maintainable; no need for multiple functions per route.

Cons

  • DynamoDB can be a single point of failure.
  • Limited by Lambda memory size.
  • Maximum execution time is 15 minutes (Lambda limitation).
  • Complex logic beyond SQL queries is difficult; suitable for simple processing like moving data from landing to bronze zones.

Alternative Approaches

Metadata-based Routing

Routing based on custom metadata or columns inside Parquet files.
Flexible and independent of file names or paths, but requires the metadata to exist.

data_type = pq_file.metadata.metadata[b'type'].decode()

Conclusion

We implemented a dynamic routing lightweight ETL using AWS Lambda.
This approach consolidates dispersed processing, reducing maintenance overhead, and allows flexible route additions.

This example is conceptual, so consider your requirements and pros/cons before using it.
I hope this article serves as a helpful reference for those exploring lightweight ETL solutions.

If you want, I can also adapt this Markdown with dev.to-specific enhancements, like code highlighting, images rendering correctly, and headings optimized for readability.

Do you want me to do that?


This content originally appeared on DEV Community and was authored by Aki