Validate data in Xorq pipelines
This guide shows you how to validate data quality before it breaks your pipeline. You’ll use schema validation to catch errors at compile time and runtime validation to catch issues during execution.
Prerequisites
- Xorq installed (see Install Xorq)
- Completed Quickstart
If you encounter ModuleNotFoundError: No module named 'xorq_datafusion' when importing xorq.api, install the xorq-datafusion package separately: pip install xorq-datafusion. The embedded backend requires xorq-datafusion (a Rust extension), which is separate from the Python datafusion package. If the error persists, reinstall Xorq: pip install --upgrade --force-reinstall xorq xorq-datafusion.
Validate schemas at compile time
Schema validation catches mismatches before execution starts. The validation happens when you create the expression, so errors fail fast without processing any data.
Check required columns and types before processing. Create schema_validation.py:
# schema_validation.py
import xorq.api as xo
from datetime import datetime, timedelta
# Define expected schema
expected_schema = {
"customer_id": "int64",
"email": "string",
"created_at": "timestamp",
"status": "string"
}
def validate_schema(expr, expected_schema):
"""Validate expression schema matches expected schema."""
actual_schema = expr.schema()
# Check all expected columns exist
missing = [col for col in expected_schema.keys() if col not in actual_schema.names]
if missing:
raise ValueError(f"Missing required columns: {missing}")
# Check types match
for col, expected_type in expected_schema.items():
actual_type = str(actual_schema[col])
if actual_type != expected_type:
raise ValueError(
f"Type mismatch for column '{col}': "
f"expected {expected_type}, got {actual_type}"
)
return expr
# Create data
con = xo.connect()
data = xo.memtable({
"customer_id": [1, 2, 3],
"email": ["a@b.com", "c@d.com", "e@f.com"],
"created_at": [datetime.now() - timedelta(days=i) for i in range(3)],
"status": ["active", "inactive", "pending"]
}, name="customers")
# Schema validation happens here (before execution)
# Raises ValueError immediately if schema doesn't match
validated = validate_schema(data, expected_schema)
validated = validated.mutate(validated=True)
df = validated.execute()
print(f"Validated schema for {len(df)} rows")Run the script:
python schema_validation.pyYou should see a result like this:
Validated schema for 3 rows
To verify schema validation works, test these scenarios:
- Data matching the expected schema executes successfully
- Data missing
customer_idcolumn raisesValueError: Missing required columns: ['customer_id']at thevalidate_schema()call before execution - Data with wrong type (for example,
customer_idas string) raisesValueError: Type mismatch for column 'customer_id': expected int64, got string
Validate data at runtime
Runtime validation catches data quality issues during execution. These checks run when you call .execute() and fail before downstream processing continues.
Check for null values
Remove rows with missing values in critical columns. Create check_nulls.py:
# check_nulls.py
import xorq.api as xo
con = xo.connect()
data = xo.memtable({
"customer_id": [1, 2, None, 4],
"email": ["a@b.com", None, "c@d.com", "e@f.com"],
"status": ["active", "inactive", None, "pending"]
}, name="customers")
# Filter out rows where critical columns are null
valid_data = data.filter(
xo._.customer_id.notnull(),
xo._.email.notnull(),
xo._.status.notnull()
)
original = data.execute()
df = valid_data.execute()
print(f"Original rows: {len(original)}")
print(f"Valid rows (no nulls): {len(df)}")
print(f"Removed rows: {len(original) - len(df)}")Run the script:
python check_nulls.pyYou should see a result like this:
Original rows: 4
Valid rows (no nulls): 2
Removed rows: 2
Stop processing on validation failure
Raise exceptions when validation fails to prevent processing invalid data. Create fail_fast_validation.py:
# fail_fast_validation.py
import xorq.api as xo
def validate_and_process(data_expr, required_columns):
"""Validate data and fail fast if invalid."""
# Check that required columns exist in schema
schema = data_expr.schema()
missing = [col for col in required_columns if col not in schema.names]
if missing:
raise ValueError(f"Missing required columns: {missing}")
# Apply validation filters
validated = data_expr.filter(
*[data_expr[col].notnull() for col in required_columns]
)
# Execute and check if any rows remain
result = validated.execute()
if len(result) == 0:
raise ValueError("No valid rows after validation")
return result
con = xo.connect()
data = xo.memtable({
"customer_id": [1, 2, None, 4],
"email": ["a@b.com", None, "c@d.com", "e@f.com"],
"status": ["active", "inactive", None, "pending"]
}, name="customers")
df = validate_and_process(data, ["customer_id", "email", "status"])
print(f"Processed {len(df)} valid rows")Run the script:
python fail_fast_validation.pyYou should see a result like this:
Processed 2 valid rows
To verify fail-fast validation: - Data with all required columns processes successfully - Data missing required columns raises ValueError with column names at expression creation - Data where all rows have nulls raises ValueError: No valid rows after validation during execution
Remove null rows
Drop rows with null values using .drop_null(). Create drop_nulls.py:
# drop_nulls.py
import xorq.api as xo
con = xo.connect()
data = xo.memtable({
"customer_id": [1, 2, None, 4],
"email": ["a@b.com", None, "c@d.com", "e@f.com"]
}, name="customers")
# Remove rows where any column is null
clean_data = data.drop_null(how="any")
df = clean_data.execute()
print(f"Clean rows: {len(df)}")Run the script:
python drop_nulls.pyYou should see a result like this:
Clean rows: 2
The how parameter controls which rows to remove: - how="any" removes rows with any null in any column - how="all" removes only rows where every column is null - subset=["col1", "col2"] removes rows where only specified columns are null
Validate data ranges
Filter data to ensure values fall within expected ranges. Create validate_ranges.py:
# validate_ranges.py
import xorq.api as xo
con = xo.connect()
data = xo.memtable({
"age": [25, 150, 18, 10, 120],
"revenue": [100.0, -10.0, 50.0, 200.0, 0.0]
}, name="customers")
# Ensure values are within expected bounds
valid_data = data.filter(
(xo._.age >= 18) & (xo._.age <= 120),
xo._.revenue > 0
)
df = valid_data.execute()
print(f"Rows within valid range: {len(df)}")Run the script:
python validate_ranges.pyYou should see a result like this:
Rows within valid range: 2
Range validation removes rows that don’t meet the criteria. To verify, check that output contains only values within specified ranges and test edge cases like age=18 or age=120.
Validate string patterns
Check string formats using string methods. Create validate_strings.py:
# validate_strings.py
import xorq.api as xo
con = xo.connect()
data = xo.memtable({
"email": ["valid@test.com", "invalid", "missing.dot@com", "bad@", "good@test.com"]
}, name="customers")
# Check for valid email pattern
valid_emails = data.filter(
xo._.email.contains("@"),
xo._.email.contains(".")
)
df = valid_emails.execute()
print(f"Rows with valid email format: {len(df)}")Run the script:
python validate_strings.pyYou should see a result like this:
Rows with valid email format: 2
This filters out emails missing “@” or “.” characters. For production, use more comprehensive email validation rules.
Create custom validation functions
Combine schema validation with business rule validation in a reusable function. The schema check catches errors before execution, and the business rules filter data during execution.
Create custom_validation.py:
# custom_validation.py
import xorq.api as xo
def validate_customer_data(expr):
"""Validate customer data schema and business rules."""
# Define expected schema
expected_schema = {
"customer_id": "int64",
"email": "string",
"status": "string"
}
# Check schema
actual_schema = expr.schema()
missing = [col for col in expected_schema.keys() if col not in actual_schema.names]
if missing:
raise ValueError(f"Missing required columns: {missing}")
# Apply business rules
valid_statuses = ["active", "inactive", "pending"]
validated = expr.filter(expr.status.isin(valid_statuses))
return validated
# Apply validation
con = xo.connect()
data = xo.memtable({
"customer_id": [1, 2, 3, 4],
"email": ["a@b.com", "c@d.com", "e@f.com", "g@h.com"],
"status": ["active", "inactive", "invalid", "pending"]
}, name="customers")
# Schema validation happens at function call (before execution)
# Business rule validation happens at execute (during execution)
validated_data = validate_customer_data(data)
df = validated_data.execute()
print(f"Validated {len(df)} rows with custom rules")Run the script:
python custom_validation.pyYou should see a result like this:
Validated 3 rows with custom rules
To verify custom validation: - Valid data matching all business rules processes successfully - Invalid status values filters out rows with “invalid” status - Missing required columns raises ValueError: Missing required columns at function call - Output contains only rows with status in [“active”, “inactive”, “pending”]
Production considerations
Choose the right validation strategy based on when you need to catch errors and how much data you’re processing.
Choose validation timing
Compile-time validation (schema checks): Use for critical schema mismatches that would break your pipeline immediately. These catch errors before any data processing.
Runtime validation (filters and assertions): Use for null checks, range validation, and business rules that filter rows. These catch data quality issues during execution.
For large datasets: Use schema validation functions for compile-time checks and filter-based validation for runtime data quality checks.
Performance
Schema validation adds minimal overhead because it checks types once at expression creation. Runtime validation processes every row, so filter early in your pipeline to reduce data volume for downstream operations.
Monitoring
Track validation metrics in production:
| Metric | What to monitor |
|---|---|
| Validation failure rate | Percentage of rows filtered out |
| Schema mismatch frequency | How often schema validation fails |
| Null value percentage | Percentage of nulls in critical columns |
Log validation failures to identify data quality trends and upstream issues.
You now have compile-time schema validation to catch errors before execution and runtime validation to filter bad data during execution. For troubleshooting validation errors, see Troubleshoot data quality issues.
Next steps
- Transform data with custom UDFs - Create reusable transformations
- Build feature pipelines - Create production feature pipelines
- Optimize pipeline performance - Improve validation performance