Agentic ETL: Automating Data Pipeline Orchestration with Multi-Agent Systems
Traditional ETL pipelines break silently, require constant maintenance, and cannot adapt to schema changes without redeployment. Agentic ETL — LLM-orchestrated pipelines that reason about data quality and self-heal — changes the operational model.
Agentic ETL replaces brittle hand-coded pipeline logic with four cooperative agents: a Schema Discovery Agent that reverse-engineers source schemas, a Data Quality Agent that validates data and detects anomalies, a Transformation Agent that writes dbt models from schema and business rules, and a Pipeline Orchestration Agent that monitors runs and diagnoses failures. Together they reduce the mean time to recover from schema change failures from hours to minutes and eliminate the category of 'nobody knew the schema changed' data incidents.
Why Traditional ETL Pipelines Are an Operational Liability
The median enterprise data team maintains dozens to hundreds of ETL pipelines. Each one is a brittle artifact: a YAML file in Airflow, a dbt model with hard-coded column names, a Glue job that assumes a specific S3 schema. The pipeline works perfectly until the source system changes. A Salesforce admin adds a field, a backend team renames a column in the orders table, an upstream vendor changes their API response format. The pipeline silently fails, or worse, continues running and produces wrong data that flows downstream into dashboards and ML features before anyone notices.
Schema drift is not an edge case — it is the median condition of production data systems. In a survey of enterprise data teams, schema-related failures account for 40-60% of all pipeline incidents. The engineering response has been more monitoring: data contracts, Great Expectations checks, dbt tests. These are valuable, but they are reactive — they detect failures after the fact and still require an engineer to diagnose root cause and write a fix.
Agentic ETL takes a different approach: instead of writing pipelines that break and alerting engineers to fix them, build agents that observe the data environment, understand the intent of each transformation, and adapt when the environment changes. The agent does not replace the engineer — it handles the 80% of incidents that are routine schema drift and data quality failures, freeing engineers for the 20% that require genuine architectural judgment.
The Four-Agent Architecture for Agentic ETL
Schema Discovery Agent
The Schema Discovery Agent continuously monitors source systems — databases, APIs, file drops, event streams — and maintains a current schema registry. On schedule or triggered by change events, it connects to source systems, introspects schemas, compares against the registry, and flags changes: new columns, dropped columns, type changes, new tables. For undocumented source systems (a common enterprise reality), it generates semantic descriptions of discovered columns using LLM-powered column name and sample value analysis. The output is a versioned schema registry that downstream agents consume. Tools: database introspection queries (information_schema), API spec parsing (OpenAPI), column profiling with pandas/Polars.
Data Quality Agent
The Data Quality Agent runs after every pipeline execution to validate data against expected patterns. Beyond threshold checks (row counts, null rates), it performs LLM-powered anomaly reasoning: given the column description, business context, and sample values, does this distribution look correct? It generates Great Expectations suites automatically from column profiles and updates them as data distributions evolve. When it detects anomalies, it classifies them: source issue (bad data from upstream), transformation issue (pipeline logic error), or expected variation (seasonal pattern, business event). This classification determines the automated response — halt the pipeline, alert the team, or log and continue.
Transformation Agent
The Transformation Agent writes and updates dbt models in response to schema changes. Given a schema change notification (new column in the orders table, orders.customer_tier added as VARCHAR), it reasons about the transformation intent from existing model documentation and SQL, determines whether the change requires updating an existing model or creating a new one, writes the dbt SQL, and generates or updates the YAML documentation. It uses the existing dbt project structure as context — reading model files, schema.yml, and source definitions — to produce changes that are consistent with the project's conventions. In dbt 1.8+, the dbt Cloud API enables programmatic model deployment after generation.
Pipeline Orchestration Agent
The Pipeline Orchestration Agent monitors Airflow/Dagster run logs and diagnoses failures. When a pipeline fails, it retrieves the error, the failing task's code, upstream task outputs, and recent schema changes, then reasons about root cause. For routine failures (schema drift, upstream data unavailability, resource constraints), it proposes and optionally applies a fix automatically. For novel failures, it drafts a root cause analysis and proposed fix for engineer review. In Dagster 1.7+, the asset-based model makes failure scope clearer — the agent can identify exactly which asset failed and what its upstream dependencies look like, without reading through a complex DAG definition.
Traditional ETL vs Agentic ETL: Operational Comparison
| Scenario | Traditional ETL Response | Agentic ETL Response | MTTR |
|---|---|---|---|
| Source column renamed | Pipeline fails, alert fires, engineer investigates and updates model | Schema Discovery Agent detects change, Transformation Agent updates model, pipeline resumes | Minutes vs. hours |
| New column added to source | Ignored until engineer manually adds it to model | Schema Agent flags new column, Transformation Agent assesses relevance and adds to model if applicable | Automated vs. weeks |
| Data quality anomaly detected | Great Expectations test fails, alert fires, engineer reviews | Quality Agent classifies anomaly (source/transform/expected), routes to auto-fix or engineer review | Minutes vs. hours |
| Pipeline fails with resource error | Alert fires, engineer increases resources in config | Orchestration Agent identifies resource failure, updates task resource config, retries | Minutes vs. hours |
| Undocumented source table | Engineer manually profiles and documents | Schema Discovery Agent profiles columns, generates semantic descriptions via LLM | Automated vs. days |
Multi-Agent ETL Pipeline: Schema Discovery, Quality Validation, and Self-Healing
import json
import os
from dataclasses import dataclass
from typing import Any, Optional
import sqlalchemy as sa
from langchain_openai import ChatOpenAI
from langchain_core.tools import tool
from langchain_core.messages import SystemMessage, HumanMessage
from langgraph.graph import StateGraph, MessagesState, START, END
from langgraph.prebuilt import ToolNode, create_react_agent
# --- Database connection ---
def get_engine() -> sa.Engine:
return sa.create_engine(os.environ["DATABASE_URL"])
# --- Schema Discovery Tools ---
@tool
def inspect_table_schema(table_name: str) -> str:
"""Retrieve current column definitions for a database table."""
engine = get_engine()
with engine.connect() as conn:
result = conn.execute(sa.text(
"SELECT column_name, data_type, is_nullable "
"FROM information_schema.columns "
"WHERE table_name = :table ORDER BY ordinal_position"
), {"table": table_name})
columns = [{"name": r[0], "type": r[1], "nullable": r[2]} for r in result]
return json.dumps(columns)
@tool
def sample_column_values(table_name: str, column_name: str, limit: int = 20) -> str:
"""Sample values from a column to support semantic understanding."""
engine = get_engine()
with engine.connect() as conn:
result = conn.execute(sa.text(
f"SELECT DISTINCT {column_name} FROM {table_name} "
f"WHERE {column_name} IS NOT NULL LIMIT {limit}"
))
samples = [str(r[0]) for r in result]
return json.dumps(samples)
@tool
def compare_to_registered_schema(table_name: str, current_schema_json: str) -> str:
"""Compare current schema against the registered schema. Returns diff as JSON."""
# In production: load from your schema registry (database table or file store)
registry_path = f"schema_registry/{table_name}.json"
if not os.path.exists(registry_path):
return json.dumps({"status": "not_registered", "diff": []})
with open(registry_path) as f:
registered = json.load(f)
current = json.loads(current_schema_json)
registered_cols = {c["name"]: c for c in registered}
current_cols = {c["name"]: c for c in current}
added = [c for c in current_cols if c not in registered_cols]
removed = [c for c in registered_cols if c not in current_cols]
type_changed = [
{"column": c, "from": registered_cols[c]["type"], "to": current_cols[c]["type"]}
for c in current_cols if c in registered_cols and current_cols[c]["type"] != registered_cols[c]["type"]
]
return json.dumps({"added": added, "removed": removed, "type_changed": type_changed})
# --- Data Quality Tools ---
@tool
def run_row_count_check(table_name: str, expected_min_rows: int) -> str:
"""Validate that a table has at least the expected minimum number of rows."""
engine = get_engine()
with engine.connect() as conn:
count = conn.execute(sa.text(f"SELECT COUNT(*) FROM {table_name}")).scalar()
passed = count >= expected_min_rows
return json.dumps({"table": table_name, "row_count": count, "passed": passed})
@tool
def run_null_rate_check(table_name: str, column_name: str, max_null_rate: float = 0.05) -> str:
"""Check null rate for a column against the maximum acceptable threshold."""
engine = get_engine()
with engine.connect() as conn:
result = conn.execute(sa.text(
f"SELECT COUNT(*) as total, SUM(CASE WHEN {column_name} IS NULL THEN 1 ELSE 0 END) as nulls "
f"FROM {table_name}"
)).fetchone()
total, nulls = result[0], result[1]
null_rate = nulls / total if total > 0 else 0.0
passed = null_rate <= max_null_rate
return json.dumps({"column": column_name, "null_rate": round(null_rate, 4), "passed": passed})
@tool
def get_pipeline_failure_log(pipeline_id: str) -> str:
"""Retrieve the most recent failure log for a given pipeline/DAG run."""
# In production: query Airflow REST API or Dagster GraphQL API
# Stub: return a simulated failure log
return json.dumps({
"pipeline_id": pipeline_id,
"failed_task": "transform_orders",
"error": "KeyError: 'customer_tier' — column does not exist in source schema",
"task_code_path": "dbt/models/marts/orders_enriched.sql",
})
@tool
def propose_dbt_model_fix(model_path: str, error_description: str, schema_diff_json: str) -> str:
"""Generate a proposed fix for a dbt model based on the error and schema diff."""
# In production: read the actual model file, use LLM to generate updated SQL
return json.dumps({
"proposed_action": "add_null_coalesce",
"description": f"Wrap missing column reference with COALESCE to handle schema drift: {error_description}",
"diff": "+ COALESCE(orders.customer_tier, 'unknown') AS customer_tier",
})
# --- Agent assembly ---
llm = ChatOpenAI(model="gpt-4o", temperature=0)
schema_discovery_agent = create_react_agent(
llm,
tools=[inspect_table_schema, sample_column_values, compare_to_registered_schema],
state_modifier=SystemMessage(content=(
"You are a Schema Discovery Agent. Inspect source table schemas, compare against the registry, "
"and report any schema changes (added columns, removed columns, type changes). "
"Be specific about what changed and provide semantic descriptions of new columns based on their names and sample values."
)),
)
quality_agent = create_react_agent(
llm,
tools=[run_row_count_check, run_null_rate_check],
state_modifier=SystemMessage(content=(
"You are a Data Quality Agent. Run data quality checks on tables after pipeline execution. "
"Classify failures as: SOURCE_ISSUE (bad upstream data), TRANSFORM_ISSUE (pipeline logic error), "
"or EXPECTED_VARIATION (normal business pattern). Recommend whether to halt the pipeline or continue."
)),
)
orchestration_agent = create_react_agent(
llm,
tools=[get_pipeline_failure_log, propose_dbt_model_fix],
state_modifier=SystemMessage(content=(
"You are a Pipeline Orchestration Agent. When a pipeline fails, retrieve the failure log, "
"diagnose the root cause, and propose a specific fix. Reference any schema changes that may have "
"caused the failure. Output a structured diagnosis with: root_cause, affected_component, proposed_fix."
)),
)
# --- Workflow state and graph ---
class ETLOrchestratorState(MessagesState):
table_name: str
pipeline_id: str
schema_diff: Optional[str] = None
quality_passed: Optional[bool] = None
diagnosis: Optional[str] = None
async def run_schema_discovery(state: ETLOrchestratorState) -> dict:
result = await schema_discovery_agent.ainvoke({
"messages": [HumanMessage(content=f"Inspect schema for table '{state['table_name']}' and compare to registry.")]
})
final_msg = result["messages"][-1].content
return {"messages": result["messages"], "schema_diff": final_msg}
async def run_quality_checks(state: ETLOrchestratorState) -> dict:
result = await quality_agent.ainvoke({
"messages": [HumanMessage(content=f"Run quality checks on '{state['table_name']}'. Expected min rows: 100.")]
})
content = result["messages"][-1].content
passed = "halt" not in content.lower() and "SOURCE_ISSUE" not in content
return {"messages": result["messages"], "quality_passed": passed}
async def run_failure_diagnosis(state: ETLOrchestratorState) -> dict:
result = await orchestration_agent.ainvoke({
"messages": [HumanMessage(content=(
f"Pipeline '{state['pipeline_id']}' failed. "
f"Recent schema changes: {state.get('schema_diff', 'none')}. "
"Diagnose and propose a fix."
))]
})
return {"messages": result["messages"], "diagnosis": result["messages"][-1].content}
def route_after_quality(state: ETLOrchestratorState) -> str:
return "done" if state.get("quality_passed") else "diagnose"
etl_graph_builder = StateGraph(ETLOrchestratorState)
etl_graph_builder.add_node("schema_discovery", run_schema_discovery)
etl_graph_builder.add_node("quality_checks", run_quality_checks)
etl_graph_builder.add_node("failure_diagnosis", run_failure_diagnosis)
etl_graph_builder.add_edge(START, "schema_discovery")
etl_graph_builder.add_edge("schema_discovery", "quality_checks")
etl_graph_builder.add_conditional_edges(
"quality_checks",
route_after_quality,
{"done": END, "diagnose": "failure_diagnosis"},
)
etl_graph_builder.add_edge("failure_diagnosis", END)
etl_pipeline = etl_graph_builder.compile()
A three-agent ETL orchestration graph: Schema Discovery runs first to detect drift, Data Quality validates the loaded data, and Failure Diagnosis fires only when quality checks fail. Each agent uses LangGraph's create_react_agent with purpose-specific tools. In production, add PostgresSaver and human-in-the-loop interrupts before applying automated fixes.
Do not let the Transformation Agent apply dbt model changes directly to production without a human review gate — at least initially. The agent's proposed changes should go through your normal dbt PR process: auto-commit to a branch, run dbt compile and dbt test in CI, require engineer approval before merge. The value is in automating the diagnosis and drafting the fix, not in bypassing code review for data pipeline changes. Once you have calibrated the agent's change quality over 3-6 months, you can consider auto-merging low-risk changes (adding a COALESCE for a new nullable column) while keeping human review for structural changes.
Migrating from Traditional to Agentic ETL
A full agentic ETL migration does not happen overnight. Here is the incremental path.
Start with schema monitoring only
Deploy the Schema Discovery Agent against your highest-churn source systems. Connect it to your schema registry and configure it to alert on changes. This delivers immediate value — schema changes are detected proactively rather than discovered when pipelines fail — without any risk of automated changes to production pipelines.
Add automated quality checks
Deploy the Data Quality Agent to run after your highest-impact pipelines. Start with row count and null rate checks, then expand to LLM-powered distribution anomaly detection. Configure the agent to classify failures and route to the appropriate response — halt/alert vs. log/continue — with engineers reviewing the classification accuracy for the first month before trusting it fully.
Introduce diagnosis-only orchestration
Enable the Orchestration Agent to diagnose failures and draft fixes, but do not yet apply any changes automatically. Every failure gets an LLM-generated root cause analysis delivered to the engineering Slack channel alongside the standard alert. Measure the accuracy of the diagnosis against your engineers' own root cause assessments. This calibration phase is essential before adding automation.
Enable automated fixes for low-risk change classes
After validating diagnosis accuracy, enable automated fix application for the lowest-risk change class: adding COALESCE wrappers for new nullable columns, updating column references when a column is renamed in a documented way. These changes are easily reviewable in diffs and have low blast radius if wrong. Keep engineer approval gates for structural changes: model redesigns, new joins, changes to aggregation logic.
Agentic ETL in Practice: Inductivee's Deployments
The data engineering teams we work with face a consistent operational challenge: a small team maintaining a large catalog of pipelines against source systems they do not control. A single Salesforce admin changing a custom object schema, a backend team shipping a migration without coordinating with data engineering, a third-party vendor updating their API — any of these can break a production pipeline. The old model was alert-driven firefighting. The agentic model is continuous environment monitoring with automated first-response.
In our deployments, the Schema Discovery Agent typically catches 70-80% of schema drift events before they cause pipeline failures — because the pipeline can be paused or adapted before it runs against a changed schema, rather than after. The reduction in 3am pager alerts from data pipeline failures is typically the benefit that gets the most immediate appreciation from data engineering managers.
The Transformation Agent's ability to draft dbt model updates is the highest-leverage capability, but it requires the most calibration. dbt model quality depends on understanding business context, not just schema mechanics — a column named 'status' in an orders table could mean order status, fulfillment status, or payment status, and the correct transformation depends on business rules that the agent needs to learn from documentation and existing model patterns. Teams with well-documented dbt projects see significantly better Transformation Agent performance than teams with undocumented models, which is itself a governance improvement driver.
Frequently Asked Questions
What is agentic ETL and how does it differ from traditional data pipelines?
Which orchestration tools work best with agentic ETL?
Can AI agents automatically fix broken data pipelines in production?
How does a Schema Discovery Agent detect schema drift?
What data quality checks can an AI agent perform beyond rule-based threshold checks?
Written By
Inductivee Team
AuthorAgentic AI Engineering Team
The Inductivee engineering team — a remote-first group of multi-agent orchestration specialists, RAG pipeline architects, and data liquidity engineers who have shipped 40+ agentic deployments across 25+ enterprises since 2012. Our writing is grounded in what we actually build, break, and operate in production.
Inductivee is a remote-first agentic AI engineering firm with 40+ production deployments across 25+ enterprises since 2012. Our engineering content is written by active practitioners and technically reviewed before publication. Compliance: SOC2 Type II, HIPAA, GDPR, ISO 27001.
Engineer This With Inductivee
The engineering patterns in this article are what our team builds into production every day. Explore the related service to see how we deliver this capability at enterprise scale.
Related Articles
Agentic Workflow Automation: Moving Beyond Single-Task AI to End-to-End Orchestration
Five Multi-Agent Coordination Patterns That Actually Work in Enterprise
Generative BI: Connecting LLMs to Your Enterprise Data Warehouse
Ready to Build This Into Your Enterprise?
Inductivee engineers agentic systems, RAG pipelines, and enterprise data liquidity solutions. Let's scope your project.
Start a Project