Validate Records in OpenSearch from a CSV (with Logging and Health Checks)



This content originally appeared on DEV Community and was authored by Vaisakh

📌 Overview

When working with data pipelines or audit tasks, we often need to verify that certain records exist in our OpenSearch cluster.

This post walks through a Python script that:

  • Reads (recordId, recordDate) pairs from a CSV file
  • Checks if each record exists in OpenSearch
  • Logs results with a timestamped log file
  • Outputs a results CSV for reporting
  • Performs a fast _cluster/health check before running queries

🗂 Folder Structure

opensearch-checks/
├─ .env
├─ inputs/
│ └─ records.csv
├─ logs/ # auto-created
├─ outputs/ # auto-created
└─ check_records.py

Key features:

Strict config from .env — no defaults in code
Pre-check: / _cluster/health endpoint
Exact match on recordId.keyword
Date range match for the full day
Logs + CSV output for easy tracking

🚀 How to Run
pip install python-dotenv
python check_records.py

inputs/records.csv (example)

recordId,recordDate
00000000-0000-0000-0000-000000000001,2025-07-31
00000000-0000-0000-0000-000000000002,2025-07-31

.env (required)

OPENSEARCH_URL=https://your-opensearch-host:9200
OPENSEARCH_USER=your-username
OPENSEARCH_PASS=your-password
INDEX_PATTERN=my_index_pattern_2025*/_search
INPUT_CSV=./inputs/records.csv
LOG_DIR=./logs
OUTPUT_DIR=./outputs

The script: check_records.py

"""
OpenSearch existence checks for (recordId, recordDate) pairs with a fast _cluster/health pre-check.

What it does
------------
1) Loads configuration strictly from a .env file (no hard-coded defaults).
2) Pre-checks OpenSearch availability via GET /_cluster/health (fails fast if unreachable or red).
3) Reads an input CSV with headers: recordId, recordDate (YYYY-MM-DD).
4) For each row, runs an existence query:
      - exact term match on recordId.keyword
      - range match on recordDate for the full day (00:00:00.000 -> 23:59:59.999)
5) Produces:
      - timestamped log file (PASS/FAIL per row)
      - timestamped results CSV (recordId, recordDate, status)

Security note
-------------
SSL verification is disabled here for convenience in internal/test setups.
For production, enable certificate verification and use a trusted CA/cert chain.
"""

from __future__ import annotations

import csv
import json
import logging
import os
import ssl
from base64 import b64encode
from dataclasses import dataclass
from datetime import datetime
from typing import Dict, Any
from urllib.request import Request, urlopen, URLError, HTTPError

from dotenv import load_dotenv


# ---------------- Configuration ----------------

@dataclass(frozen=True)
class Config:
    """Strongly-typed configuration loaded from environment."""
    url: str
    user: str
    password: str
    index_pattern: str           # e.g., "my_index_pattern_2025*/_search"
    input_csv: str               # e.g., "./inputs/records.csv"
    log_dir: str                 # e.g., "./logs"
    output_dir: str              # e.g., "./outputs"

    @staticmethod
    def from_env() -> "Config":
        load_dotenv()  # only reads .env / process env; no defaults
        url        = os.getenv("OPENSEARCH_URL")
        user       = os.getenv("OPENSEARCH_USER")
        password   = os.getenv("OPENSEARCH_PASS")
        index      = os.getenv("INDEX_PATTERN")
        input_csv  = os.getenv("INPUT_CSV")
        log_dir    = os.getenv("LOG_DIR")
        output_dir = os.getenv("OUTPUT_DIR")

        missing = [k for k, v in {
            "OPENSEARCH_URL": url,
            "OPENSEARCH_USER": user,
            "OPENSEARCH_PASS": password,
            "INDEX_PATTERN": index,
            "INPUT_CSV": input_csv,
            "LOG_DIR": log_dir,
            "OUTPUT_DIR": output_dir,
        }.items() if not v]
        if missing:
            raise ValueError(f"Missing required variables in .env: {', '.join(missing)}")

        return Config(url, user, password, index, input_csv, log_dir, output_dir)


# ---------------- Minimal OpenSearch client ----------------

class OpenSearchClient:
    """Tiny OpenSearch client using urllib (no external HTTP deps)."""
    def __init__(self, base_url: str, user: str, password: str, verify_ssl: bool = False) -> None:
        self.base_url = base_url.rstrip("/")
        self.auth_header = {
            "Authorization": "Basic " + b64encode(f"{user}:{password}".encode()).decode(),
            "Content-Type": "application/json",
        }
        self.ctx = ssl.create_default_context()
        if not verify_ssl:
            self.ctx.check_hostname = False
            self.ctx.verify_mode = ssl.CERT_NONE

    def get_json(self, endpoint: str) -> Dict[str, Any]:
        url = f"{self.base_url}/{endpoint.lstrip('/')}"
        req = Request(url, method="GET")
        # Only Authorization header on GET (no need for Content-Type)
        req.add_header("Authorization", self.auth_header["Authorization"])
        with urlopen(req, context=self.ctx) as resp:
            payload = json.loads(resp.read().decode("utf-8"))
            logging.info("GET %s -> %s", endpoint, resp.status)
            return payload

    def post_json(self, endpoint: str, body: Dict[str, Any]) -> Dict[str, Any]:
        url = f"{self.base_url}/{endpoint.lstrip('/')}"
        req = Request(url, data=json.dumps(body).encode("utf-8"), method="POST")
        for k, v in self.auth_header.items():
            req.add_header(k, v)
        with urlopen(req, context=self.ctx) as resp:
            payload = json.loads(resp.read().decode("utf-8"))
            logging.info("POST %s -> %s", endpoint, resp.status)
            return payload


# ---------------- Health pre-check ----------------

def assert_cluster_healthy(client: OpenSearchClient) -> None:
    """
    Fail fast if OpenSearch is not reachable or reports 'red' status.
    Accepts 'green' and 'yellow' as pass for read-only checks.
    """
    try:
        health = client.get_json("_cluster/health")
        status = (health.get("status") or "").lower()
        if status not in {"green", "yellow"}:
            raise RuntimeError(f"Cluster health is '{status}', expected green/yellow.")
        logging.info("Cluster health OK: %s", status)
    except (HTTPError, URLError) as e:
        raise RuntimeError(f"Cluster health check failed: {e}") from e


# ---------------- Existence query ----------------

def record_exists(client: OpenSearchClient, index_pattern: str, record_id: str, record_date: str) -> bool:
    """
    Return True if any document exists for:
      - recordId.keyword == record_id
      - recordDate between 'record_date 00:00:00.000' and 'record_date 23:59:59.999'
    """
    start = f"{record_date} 00:00:00.000"
    end   = f"{record_date} 23:59:59.999"
    query = {
        "query": {
            "bool": {
                "must": [
                    {"term": {"recordId.keyword": record_id}},
                    {"range": {"recordDate": {"gte": start, "lte": end}}},
                ]
            }
        },
        "size": 0
    }
    try:
        res = client.post_json(index_pattern, query)
        total = res.get("hits", {}).get("total", {})
        value = total.get("value", total if isinstance(total, int) else 0)
        return value > 0
    except Exception as e:
        logging.error("Query failed for recordId=%s date=%s: %s", record_id, record_date, e)
        return False


# ---------------- Main ----------------

def main() -> None:
    cfg = Config.from_env()

    # Prepare paths + logging
    os.makedirs(cfg.log_dir, exist_ok=True)
    os.makedirs(cfg.output_dir, exist_ok=True)
    ts = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
    log_file = os.path.join(cfg.log_dir,    f"record_check_{ts}.log")
    out_file = os.path.join(cfg.output_dir, f"record_results_{ts}.csv")

    logging.basicConfig(filename=log_file, level=logging.INFO,
                        format="%(asctime)s - %(levelname)s - %(message)s")

    # Initialize client
    client = OpenSearchClient(cfg.url, cfg.user, cfg.password, verify_ssl=False)

    # Pre-flight: cluster health
    assert_cluster_healthy(client)

    # Process CSV -> Results CSV
    with open(cfg.input_csv, newline="", encoding="utf-8") as fin, \
         open(out_file, "w", newline="", encoding="utf-8") as fout:

        reader = csv.DictReader(fin)
        required_headers = {"recordId", "recordDate"}
        if not reader.fieldnames or required_headers - set(reader.fieldnames):
            raise ValueError(f"CSV must have headers: recordId,recordDate (got {reader.fieldnames})")

        writer = csv.writer(fout)
        writer.writerow(["recordId", "recordDate", "status"])

        for row in reader:
            rec_id = (row.get("recordId") or "").strip()
            date   = (row.get("recordDate") or "").strip()
            if not rec_id or not date:
                logging.warning("Skipping row with missing values: %s", row)
                continue

            ok = record_exists(client, cfg.index_pattern, rec_id, date)
            status = "PASS" if ok else "FAIL"
            writer.writerow([rec_id, date, status])
            (logging.info if ok else logging.warning)("%s: recordId=%s date=%s", status, rec_id, date)

    print(f"Done.\nLog: {log_file}\nResults: {out_file}")


if __name__ == "__main__":
    main()


This content originally appeared on DEV Community and was authored by Vaisakh