This content originally appeared on DEV Community and was authored by Aki
Original Japanese article: AWS Lambda×DuckDB×PyIcebergで実現する動的ルーティング軽量ETLの実装
Introduction
I’m Aki, an AWS Community Builder (@jitepengin).
In recent years, data pipelines increasingly aggregate data from multiple sources into a data lake.
However, when each source requires different processing or writes to different target tables, traditional ETL lacks flexibility and increases maintenance costs. Even lightweight ETL solutions using AWS Lambda, as I introduced previously, require creating separate functions per source or target table, which makes runtime or library version upgrades tedious.
In this article, I’ll introduce a dynamic routing ETL solution that solves this problem.
Routing information is stored in DynamoDB, and Lambda automatically determines the target table and SQL to execute.
Dynamic Routing Concept
Traditional ETL often hardcodes target tables and SQL per input source. As new sources are added, code changes are required.
By introducing dynamic routing, Lambda can automatically:
- Switch the target Iceberg table
- Execute different SQL per source
This enables handling new sources without changing the code.
Architecture used in this article
Bucket-based Routing
Routing is based on the input bucket name, useful when data comes from multiple sources.
Folder-based Routing
Routing is based on the folder path. Useful when multiple files from the same system have different source tables.
Processing Flow
- A file is PUT into S3, triggering Lambda.
- Lambda extracts a routing key from the S3 path (bucket or folder).
- Lambda fetches target table and SQL from DynamoDB.
- Data is loaded into DuckDB from Parquet, and SQL is executed.
- Result is appended to the Iceberg table via PyIceberg.
Key Libraries
- DuckDB — A fast in-memory SQL database engine.
- PyArrow — Efficient data transformation and transfer via the Arrow format.
- PyIceberg — Enables working with Iceberg tables using AWS Glue Catalog.
About the Libraries
DuckDB
DuckDB is an embedded OLAP database engine designed for analytical queries.
It’s lightweight, runs in memory, and works efficiently even in restricted environments like Lambda.
It’s particularly well-suited for batch analytics and ETL workloads.
https://duckdb.org/
PyArrow
PyArrow is the Python binding for Apache Arrow, a high-performance columnar memory format optimized for analytics and distributed computing.
It allows fast, zero-copy data transfer between systems and languages.
https://arrow.apache.org/
PyIceberg
PyIceberg is the Python implementation of Apache Iceberg, a table format designed for large, cloud-native data lakes.
It enables Python applications to read, write, and manage Iceberg tables seamlessly.
https://py.iceberg.apache.org/
Sample Code
Lambda Function
Switching between bucket-based and folder-based routing can be done by modifying the routing_key
extraction.
The append_to_iceberg
function includes retry logic for handling conflicts.
import os
import boto3
import duckdb
import time
from pyiceberg.catalog.glue import GlueCatalog
# DynamoDB table storing routing information
ROUTING_TABLE = os.environ.get("ROUTING_TABLE", "routing_table")
# Iceberg catalog configuration
ICEBERG_CATALOG_NAME = os.environ.get('ICEBERG_CATALOG_NAME', 'my_catalog')
APP_REGION = os.environ.get('APP_REGION', 'ap-northeast-1')
# AWS clients
dynamodb_resource = boto3.resource('dynamodb', region_name=APP_REGION)
table = dynamodb_resource.Table(ROUTING_TABLE)
def lambda_handler(event, context):
try:
# 1. Extract S3 event information
s3_bucket = event['Records'][0]['s3']['bucket']['name']
s3_object_key = event['Records'][0]['s3']['object']['key']
s3_input_path = f"s3://{s3_bucket}/{s3_object_key}"
print(f"s3_input_path: {s3_input_path}")
# Set routing key based on S3 structure
# - If using bucket-level routing, use the bucket name:
routing_key = s3_bucket
# - If using path-level routing, extract from the object key:
# routing_key = s3_object_key.split('/')[0]
# ※When using path-level routing, adjust the index according to your S3 path structure.
# 2. Fetch routing config from DynamoDB
print(f"APP_REGION: {APP_REGION}")
print(f"table: {table}")
print(f"PartitionKey: {routing_key}")
response = table.get_item(Key={'PartitionKey': routing_key})
if 'Item' not in response:
raise Exception(f"No routing information found in DynamoDB for key: {routing_key}")
config = response['Item']
target_table = config['target_table']
query_sql = config['query_sql'].format(s3_input_path=s3_input_path)
print(f"Routing decision: {routing_key} -> {target_table}")
print(f"Executing SQL: {query_sql}")
# 3. Load and process data with DuckDB
con = duckdb.connect(database=':memory:')
con.execute("SET home_directory='/tmp'")
con.execute("INSTALL httpfs;")
con.execute("LOAD httpfs;")
arrow_table = con.execute(query_sql).fetch_arrow_table()
print(f"Fetched rows: {arrow_table.num_rows}")
print(f"Schema: {arrow_table.schema}")
# 4. Append to Iceberg with retry logic
append_to_iceberg(arrow_table, target_table)
return {"status": "success", "rows": arrow_table.num_rows}
except Exception as e:
print(f"Error occurred: {e}")
raise
def append_to_iceberg(arrow_table, target_table, retries=5):
catalog = GlueCatalog(region_name=APP_REGION, name=ICEBERG_CATALOG_NAME)
delay = 10
for attempt in range(retries):
try:
table = catalog.load_table(target_table)
table.refresh()
current_snapshot = table.current_snapshot()
snapshot_id = current_snapshot.snapshot_id if current_snapshot else "None"
print(f"Attempt {attempt + 1}: Using snapshot ID {snapshot_id}")
table.append(arrow_table)
print(f"Data successfully appended to Iceberg table {target_table}.")
return
except Exception as e:
print(f"Attempt {attempt + 1} failed: {e}")
if "Cannot commit" in str(e) or "branch main has changed" in str(e):
if attempt < retries - 1:
delay *= 2
print(f"Retrying in {delay} seconds...")
time.sleep(delay)
else:
print("Reached max retries. Aborting.")
raise
else:
raise
The retry delay and limit may need adjustment depending on your environment. Using environment variables for these settings is recommended.
DynamoDB Routing Table
Execution Results
Upload to First Bucket
Upload to Second Bucket
Pros and Cons
Pros
- Adding new sources or targets only requires updating S3 triggers and DynamoDB entries.
- Lambda code remains simple and maintainable; no need for multiple functions per route.
Cons
- DynamoDB can be a single point of failure.
- Limited by Lambda memory size.
- Maximum execution time is 15 minutes (Lambda limitation).
- Complex logic beyond SQL queries is difficult; suitable for simple processing like moving data from landing to bronze zones.
Alternative Approaches
Metadata-based Routing
Routing based on custom metadata or columns inside Parquet files.
Flexible and independent of file names or paths, but requires the metadata to exist.
data_type = pq_file.metadata.metadata[b'type'].decode()
Conclusion
We implemented a dynamic routing lightweight ETL using AWS Lambda.
This approach consolidates dispersed processing, reducing maintenance overhead, and allows flexible route additions.
This example is conceptual, so consider your requirements and pros/cons before using it.
I hope this article serves as a helpful reference for those exploring lightweight ETL solutions.
If you want, I can also adapt this Markdown with dev.to-specific enhancements, like code highlighting, images rendering correctly, and headings optimized for readability.
Do you want me to do that?
This content originally appeared on DEV Community and was authored by Aki