becoming a member of a brand new firm as an information engineer. You inherit fairly a couple of ETL pipelines and you might be answerable for sustaining them. What do you assume are the challenges of your work?
Usually, you could faces the next issues:
- Upstream schema modifications: Developer groups might add or drop fields, change information sorts or rename columns. When a supply schema modifications unexpectedly, ETL jobs can fail abruptly. To make issues worse, the pipeline silently load corrupted or null values into downstream tables.
- Knowledge High quality points: Generally ETL jobs don’t fail instantly, on the contrary, they run and end with a hit standing. Nevertheless, the information loaded are utterly incorrect, containing duplicated or lacking data.
- Lack of documentation: Legacy pipelines might have little paperwork, or the prevailing paperwork could also be outdated. So you aren’t certain if they’re in keeping with the present enterprise logic.
- Quantity development and efficiency spikes: The info quantity will increase because the enterprise grows. An ETL pipeline optimized for a smaller historic dataset can simply turn out to be sluggish, stall, or fail when processing huge volumes.
An automatic testing workflow can assist you handle the issues above. Why? As a result of the structured workflow can assist you perceive all key features of an ETL pipeline shortly: the enterprise logic, the algorithms for information transformation, the information sorts, all the information points that the ETL pipelines are required to resolve. The testing patterns are reusable—you don’t need to design a brand new workflow each time you inherit a unique ETL pipeline.
In in the present day’s article, I’ll deal with automated testing in information engineering, together with the surroundings configuration and a sensible workflow. On the finish, I’ll additionally talk about how AI-assisted code can speed up the workflow and enhance productiveness.
Make the Setting Work
For those who construct the automated testing workflow for the primary time, the setup of the surroundings might take a while. There are completely different instruments and flows for information engineers to arrange the testing surroundings. However when you comply with my under steps, the method shall be straightforward and easy.
Firstly, you solely want to put in 3 issues: Docker Desktop, VS Code and Dev Containers Extension.
In your testing workflow, Docker will create light-weight, remoted, and repeatable take a look at environments. It permits you to spin up mock information infrastructure (for instance, databases, information pipelines, and orchestration engines) immediately on an area machine or inside a Steady Integration (CI) pipeline. With Docker, you may run your integration exams and information validation identically throughout all platforms with out polluting native working methods.
Visible Studio Code (VS Code) is a centralised improvement surroundings for scripting, debugging, implementing, and automating information pipeline exams. As an information engineer, you will have used it on your different tasks. You may be extra acquainted with PyCharm or IntelliJ IDEA. From my consumer expertise perspective, I select VS Code as a result of its light-weight construct, extension ecosystem, and hybrid pocket book/script workflow. AI-native editors comparable to Cursor and Windsurf are quickly gaining recognition amongst builders, which I’ll talk about extra within the later a part of this text.
I assume you have already got python, poetry, and Java put in. You may open your VS Code terminal, kind the next scripts to examine their variations and ensure they’re up to date. You can too set up them underneath your terminal when you haven’t but completed it.
python --version
java -version
poetry --version
The Dev Container extension allows you to use a Docker container as a totally practical, reproducible improvement surroundings. It standardises environments throughout crew and permits to check information ingestion logic domestically with out the consumption of cloud sources. To put in Dev Container is sort of simple. You simply have to open Extensions in VS Code – you may press Ctrl+Shift+X (Home windows/Linux) or Cmd+Shift+X (Mac), then search ‘Dev Containers’ within the search bar, and click on on “Set up”.

However the Dev Containers extension doesn’t know the way to construct your particular surroundings. It wants a ‘information’. The information is the .devcontainer folder and the devcontainer.json file underneath the folder tells the Dev Container extension:
- Which Docker picture to obtain.
- Which ports to ahead.
- Which VS Code extensions to put in contained in the container.
There are two strategies so that you can get .devcontainer folder. In case you are new to those instruments, you should use use VS Code’s automated device. When you choose a Python or Knowledge Engineering template, VS Code can generate the folder routinely. Once you’re extra skilled in such form of tasks, you too can write it by hand from scratch to fulfill your crew’s testing necessities. The .devcontainer folder could be dedicated and pushed to Git, collectively along with your supply code and supply information, which you put together to check.
To make your life simpler, you may clone the Git repository and open that folder with VS Code.
git clone https://github.com/firm/data-ingestion-transformation.git
The final step of configuration is to reopen in container. Why is it essential? As a result of whenever you click on “Reopen in Container”, VS Code restarts its backend engine. It launches the Docker container and attaches your native challenge folder immediately inside that container. Your supply code and supply information on this ETL pipeline are accessible to the Docker surroundings. You may run your exams securely in an remoted sandbox. Sounds cool? Sure, now you may have already had your surroundings arrange and are prepared to begin testing your ETL pipelines.
Let the Checks Inform You What the System Does
After I inherit an unfamiliar ETL pipeline, my first query isn’t: “How does the code work?” As an alternative, I ask: “What conduct is the system anticipated to provide?” Checks often reply that query sooner than supply code.
Think about the corporate that you simply be part of makes use of LLMs comparable to GPT-5.5, Claude 4.6 and Gemini 3 Professional and the Finance crew wish to observe AI spending throughout groups.

The above desk reveals a part of the information within the csv format to be saved. The column names have to be standardised by changing areas with underscores so downstream methods can reference fields persistently. For instance. ‘Mannequin Identify’ ought to turn out to be ‘Model_Name’. You discovered ingest.py to outline the features for column standardisation and information ingestion and ai_cost_ingest.py to name these features within the folder.
import logging
from typing import Listing
from pyspark.sql import SparkSession
def sanitize_columns(columns: Listing[str]) -> Listing[str]:
return [column.replace(" ", "_") for column in columns]
def run(spark: SparkSession, ingest_path: str, transformation_path: str) -> None:
logging.data("Studying textual content file from: %s", ingest_path)
input_df = (
spark.learn.format("org.apache.spark.csv")
.possibility("header", True)
.csv(ingest_path)
)
renamed_columns = sanitize_columns(input_df.columns)
ref_df = input_df.toDF(*renamed_columns)
ref_df.write.parquet(transformation_path)
import logging
import sys
from pyspark.sql import SparkSession
from data_ingestions.ai_cost import ingest
LOG_FILENAME = "challenge.log"
APP_NAME = "AI_Cost Pipeline: Ingest"
if __name__ == "__main__":
logging.basicConfig(filename=LOG_FILENAME, stage=logging.INFO)
logging.data(sys.argv)
if len(sys.argv) != 3:
logging.warning("Enter supply and output path are required")
sys.exit(1)
spark = SparkSession.builder.appName(APP_NAME).getOrCreate()
sc = spark.sparkContext
app_name = sc.appName
logging.data("Utility Initialized: " + app_name)
input_path = sys.argv[1]
output_path = sys.argv[2]
ingest.run(spark, input_path, output_path)
logging.data("Utility Accomplished: " + spark.sparkContext.appName)
spark.cease()
You must perceive the outlined features first. You could ask: “What precisely ought to sanitize_columns() do? Does it deal with main areas, trailing areas and inside areas?” With these questions in your thoughts, you write such code:
from data_ingestions.ai_cost import ingest
def test_should_sanitize_nothing() -> None:
no_whitespace_columns = ["Model"]
precise = ingest.sanitize_columns(no_whitespace_columns)
anticipated = no_whitespace_columns
assert anticipated == precise
def test_should_sanitize_whitespace_outside() -> None:
no_whitespace_columns = [" Prompt Tokens "]
precise = ingest.sanitize_columns(no_whitespace_columns)
anticipated = ["_Prompt_Tokens_"]
assert anticipated == precise
def test_should_sanitize_whitespace_in_between() -> None:
no_whitespace_columns = ["Prompt Tokens"]
precise = ingest.sanitize_columns(no_whitespace_columns)
anticipated = ["Prompt_Tokens"]
assert anticipated == precise
The code permits you to take a look at the perform of sanitize_columns() immediately with out launching Spark and processing information. It’s an instance of a unit take a look at.
Unit Checks
Unit exams are designed to validate a small piece of logic in isolation. They’re often quick, deterministic and impartial of exterior methods.
Integration Checks
Unit exams inform whether or not a small piece of logic behaves appropriately. However they’re unable to handle the query: “Does the whole pipeline work when all parts are linked collectively?”
For an information engineer, this typically means:
- Studying information
- Beginning Spark
- Working transformations
- Writing outputs
- Validating outcomes
To check the whole pipeline, we want integration exams, which reveal system conduct. Integration exams are very helpful throughout onboarding as a result of they describe what the system should do, no matter how the implementation evolves over time.
For the AI_cost information ingestion challenge, you should use a integration take a look at to assist validate whether or not:
- Enter arrives as CSV information.
- Spark is used to course of the information.
- Column names are sanitized.
- Knowledge values stay unchanged.
- Output is written in Parquet format.
- The entire ingestion workflow should succeed.
import csv
import os
import tempfile
from pathlib import Path
from typing import Listing, Tuple
from pyspark.sql import SparkSession
from data_ingestions.ai_cost import ingest
def test_should_sanitize_column_names(
spark_session: SparkSession,
) -> None:
given_ingest_folder, given_transform_folder = (
__create_ingest_and_transform_folders()
)
input_csv_path = given_ingest_folder + "enter.csv"
csv_content = [
[
"Model Name",
"Prompt Tokens",
" Completion Tokens "
],
[
"GPT-5.5",
"1200",
"300"
],
[
"Gemini 3 Pro",
"900",
"250"
],
]
__write_csv_file(input_csv_path, csv_content)
ingest.run(
spark_session,
input_csv_path,
given_transform_folder
)
precise = spark_session.learn.parquet(
given_transform_folder
)
anticipated = spark_session.createDataFrame(
[
["GPT-5.5", "1200", "300"],
["Gemini 3 Pro", "900", "250"]
],
[
"Model_Name",
"Prompt_Tokens",
"_Completion_Tokens_"
]
)
assert anticipated.gather() == precise.gather()
Let AI Learn the ETL Pipeline Earlier than You Do
Think about that you’re reviewing an unfamiliar ETL pipeline which incorporates a whole bunch and even 1000’s of traces of PySpark code. To know the code and write exams might take hours and even days. At this time, instruments comparable to Cursor, Windsurf, and GitHub Copilot can assist speed up this course of.
Take Cursor for instance. As an AI Assistant, it might analyze a whole repository and generate explanations of particular person modules, features, and information flows. It will possibly additionally generate preliminary variations of unit exams and integration exams. To maximise its productiveness, it’s worthwhile to ask proper questions as an information engineer. Right here some pattern questions that you could be ask:
- What’s the function of this ETL job?
- What enter and output codecs does this pipeline count on?
- Which features are answerable for information validation?
- Which edge circumstances are at present untested?
AI can recommend take a look at circumstances, nevertheless it can’t decide whether or not these exams meet the enterprise necessities and firm’s technique. Understanding the pipeline, validating assumptions, and reviewing code are nonetheless your duty. AI is a productiveness accelerator slightly than a alternative for engineering judgment. It saves your time understanding and testing ETL pipeline so you may deal with higher-value information engineering work comparable to designing information architectures, constructing scalable information platforms, and empowering data-driven choice making.

