This content originally appeared on DEV Community and was authored by Dmitry Romanoff
If you’re working with Amazon DocumentDB and want to evaluate how your cluster performs under load, a stress test is one of the best ways to gain confidence in your setup.
In this post, I’ll walk you through a simple, effective way to stress test your DocumentDB cluster using Python and pymongo
.
Prerequisites
Before running the script, make sure you have:
- An active Amazon DocumentDB cluster
- The global CA certificate from Amazon:
curl -o global-bundle.pem https://truststore.pki.rds.amazonaws.com/global/global-bundle.pem
- Python 3.7+ installed
- The
pymongo
driver:
pip install pymongo
What This Script Does
The script:
- Connects to your DocumentDB cluster
- Spawns multiple threads
- Randomly performs
insert
,read
, orupdate
operations - Measures how long it takes to complete a set of operations
This lets you simulate a concurrent, mixed workload to see how your database performs.
The Code
import random
import string
import time
from pymongo import MongoClient
from concurrent.futures import ThreadPoolExecutor, as_completed
# Configuration
MONGO_URI = (
"mongodb://myadmin:********@"
"docdb-dima-1.cluster-xxxxxxxxxxxx.us-east-1.docdb.amazonaws.com:27017"
"/?ssl=true&replicaSet=rs0&readPreference=secondaryPreferred&retryWrites=false"
)
CA_CERT_PATH = "global-bundle.pem"
DB_NAME = "stressTestDB"
COLLECTION_NAME = "stressCollection"
# Connect
client = MongoClient(MONGO_URI, tlsCAFile=CA_CERT_PATH)
db = client[DB_NAME]
collection = db[COLLECTION_NAME]
def random_string(length=10):
return ''.join(random.choices(string.ascii_letters + string.digits, k=length))
def stress_worker(thread_id):
ops = ["insert", "read", "update"]
op = random.choice(ops)
try:
if op == "insert":
doc = {"thread": thread_id, "data": random_string(), "value": random.randint(1, 1000)}
collection.insert_one(doc)
return f"[Thread {thread_id}] Insert OK"
elif op == "read":
result = collection.find_one({}, {"_id": 0})
return f"[Thread {thread_id}] Read OK: {result}"
elif op == "update":
collection.update_one({}, {"$set": {"updated": time.time()}}, upsert=True)
return f"[Thread {thread_id}] Update OK"
except Exception as e:
return f"[Thread {thread_id}] ERROR: {e}"
def run_stress_test(concurrency=50, iterations=1000):
start = time.time()
print(f"Starting stress test: {concurrency} threads x {iterations} ops...")
with ThreadPoolExecutor(max_workers=concurrency) as executor:
futures = [executor.submit(stress_worker, i % concurrency) for i in range(iterations)]
for future in as_completed(futures):
print(future.result())
duration = time.time() - start
print(f"Test completed in {duration:.2f} seconds")
if __name__ == "__main__":
run_stress_test(concurrency=20, iterations=200)
Example Output
Starting stress test: 20 threads x 200 ops...
[Thread 3] Insert OK
[Thread 7] Read OK: {'thread': 9, 'data': 'xyz456', 'value': 88}
[Thread 12] Update OK
...
Test completed in 11.84 seconds
Customizing the Test
-
Change concurrency by modifying the
concurrency
parameter inrun_stress_test()
. -
Adjust workload size via the
iterations
parameter. -
Modify workload types (e.g., remove
update
or bias towardread
) by changing theops
list instress_worker()
.
Important Notes
- This test writes data — you might want to clean up your test database afterward.
- Make sure you’re not violating AWS quotas or triggering alarms.
- Always test in a non-production environment first.
Conclusion
Stress testing your DocumentDB cluster helps you prepare for production-scale workloads. This lightweight script is a great starting point for load testing, performance tuning, or monitoring how your system reacts under pressure.
This content originally appeared on DEV Community and was authored by Dmitry Romanoff