User-defined exchange functions

Understand how UDXFs enable distributed processing with Apache Arrow Flight

Calling OpenAI’s API to analyze one million customer reviews using regular UDFs means blocking the query engine for seconds per API call. A single API timeout or network failure crashes your entire pipeline because the UDF and query engine share the same process. UDXFs solve this challenge by running your Python logic in separate processes that communicate via Apache Arrow Flight. This provides process isolation that protects the query engine from failures and enables safe external service calls.

What you’ll understand

  • What UDXFs are and how they differ from regular UDFs through distributed execution architecture
  • When to use UDXFs for external API calls and heavy computation versus regular UDFs for simple transformations
  • How Arrow Flight enables efficient distributed processing through zero-copy data transfer between processes
  • The trade-offs between process isolation benefits and communication overhead costs in production systems

What are user-defined exchange functions?

User-defined exchange functions are distributed UDFs that execute in separate processes or remote services rather than running in the same process as your query engine. Unlike regular UDFs that run through direct function calls, UDXFs communicate over Apache Arrow Flight protocol to exchange data between processes.

UDXFs provide three critical capabilities that regular UDFs cannot provide. First, process isolation lets you run untrusted code safely without risking the query engine. Second, resource management lets you dedicate specific CPU and memory allocations to particular operations. Third, remote execution enables calling external services or deploying processing logic as microservices.

import xorq.api as xo
from xorq.expr.relations import flight_udxf
import xorq.expr.datatypes as dt

# Define processing function
def sentiment_analysis(df):
    # Calls external API or runs heavy model
    df['sentiment'] = analyze_sentiment(df['text'])
    return df

# Define schemas
input_schema = xo.schema({"text": dt.string})
output_schema = xo.schema({"text": dt.string, "sentiment": dt.string})

# Create UDXF (curried, can be used with pipe)
sentiment_udxf = flight_udxf(
    process_df=sentiment_analysis,
    maybe_schema_in=input_schema,
    maybe_schema_out=output_schema,
    name="sentiment_analysis"
)

# Apply to data using pipe pattern
data = xo.memtable({"text": ["Great product", "Poor quality"]})
result = data.pipe(sentiment_udxf).execute()

Why process isolation matters

Regular UDFs run in the same process as your query engine. This creates significant problems for workloads that involve heavy computation, external service calls, or unreliable operations.

Memory-intensive UDFs compete directly with the query engine for memory resources when they run in the same process. Loading a 5GB machine learning model into memory for inference means the query engine loses 5GB of working memory for query processing. When the UDF crashes due to memory exhaustion or an unhandled exception, it takes down the entire query engine process. This happens because they share the same process space, so failure in one component affects all components.

External service calls block execution when running in the query engine process. API calls that take seconds to complete stall the entire pipeline. While waiting for an HTTP response from OpenAI or another external service, the query engine sits idle instead of processing other available data. Processing one million rows with one-second API calls per row means 277 hours of sequential execution time.

Parallelism remains limited with regular UDFs because they run sequentially in a single process. Distributing processing across multiple machines or GPUs requires complex manual orchestration since the UDF is tightly coupled to the query engine process.

UDXFs solve these challenges by running in separate processes that communicate via Arrow Flight protocol. The query engine sends data to the UDXF process through efficient Arrow-based communication. The UDXF process performs the computation independently and sends results back to the query engine. When the UDXF crashes due to external API failures or memory issues, the query engine survives and can retry or handle the error gracefully.

How UDXFs work

UDXFs operate through five stages that manage the distributed processing lifecycle.

First, you write a Python function that takes a pandas DataFrame and returns a transformed DataFrame containing your custom logic. Second, you wrap the function with flight_udxf and specify input and output schemas, which creates a Flight server configuration. Third, when you execute the expression, Xorq starts a Flight server in a separate process that hosts your processing function. Fourth, Xorq sends data to the server via Arrow Flight protocol. The server processes it using your function and sends results back using Arrow’s zero-copy transfer mechanism. Fifth, after execution completes, Xorq automatically shuts down the Flight server and cleans up the separate process.

Arrow Flight uses zero-copy data transfer that moves data between processes without serialization overhead. This makes distributed processing performance nearly comparable to in-process execution for large datasets.

Tip

UDXFs support microservice architectures for data processing. You deploy a UDXF as a standalone service that multiple queries can call concurrently, supporting model serving and API integration patterns.

UDXFs versus UDFs

Aspect UDF UDXF
Execution In-process Separate process
Communication Direct function call Arrow Flight protocol
Isolation None Full process isolation
Crash impact Crashes query engine Query engine survives
Remote execution Not supported Supported
Parallelism Limited Scalable across machines
Overhead Minimal Flight communication overhead
Use case Simple transformations Heavy computation, API calls

Example: LLM sentiment analysis

With UDF

A UDF blocks the query engine while waiting for API responses, creating performance bottlenecks.

def sentiment_udf(df):
    # Blocks query engine for seconds per API call
    df['sentiment'] = df['text'].apply(call_openai_api)
    return df

With UDXF

A UDXF runs in a separate process and doesn’t block the query engine during API calls.

def sentiment_udxf_fn(df):
    # Runs in separate process without blocking
    df['sentiment'] = batch_call_openai_api(df['text'])
    return df

# Create UDXF (curried, use with pipe)
sentiment_udxf = flight_udxf(
    process_df=sentiment_udxf_fn,
    maybe_schema_in=input_schema,
    maybe_schema_out=output_schema
)

# Apply to data
result = reviews.pipe(sentiment_udxf).execute()

Creating UDXFs

UDXFs require four components that work together to define the distributed processing behavior.

Processing function

Define a function that transforms pandas DataFrames with your custom logic.

def process_data(df):
    # Your custom logic
    df['new_column'] = df['old_column'] * 2
    return df

Input schema

Define the required columns and their data types for input validation.

input_schema = xo.schema({
    "old_column": dt.float64,
    "id": dt.int64
})

Output schema

Define the result columns and their data types that your function will return.

output_schema = xo.schema({
    "old_column": dt.float64,
    "id": dt.int64,
    "new_column": dt.float64
})

UDXF creation

Combine the function and schemas into a UDXF configuration that Xorq will execute.

# Create UDXF (curried, use with pipe)
my_udxf = flight_udxf(
    process_df=process_data,
    maybe_schema_in=input_schema,
    maybe_schema_out=output_schema,
    name="my_transformation"
)

# Apply to data
result = input_data.pipe(my_udxf).execute()

UDXF use cases

UDXFs excel in four scenarios with process isolation and distributed execution advantages.

External API calls

Call external services without blocking the query engine and protect against failures.

def geocoding_udxf(df):
    # Calls geocoding API for each address
    df['latitude'], df['longitude'] = geocode_addresses(df['address'])
    return df

# Create UDXF
geocode_udxf = flight_udxf(
    process_df=geocoding_udxf,
    maybe_schema_in=input_schema,
    maybe_schema_out=output_schema
)

# Apply to addresses
result = addresses.pipe(geocode_udxf).execute()

The query engine remains responsive while API calls happen in the background. Failed API calls in the UDXF process don’t crash the query engine. Multiple UDXF processes can run API calls in parallel across different machines or cores.

Large ML models

Load and run memory-intensive models safely with dedicated process resources.

def model_inference_udxf(df):
    # Load 5GB model isolated from query engine
    model = load_large_model()
    df['prediction'] = model.predict(df[feature_columns])
    return df

# Create UDXF
inference_udxf = flight_udxf(
    process_df=model_inference_udxf,
    maybe_schema_in=input_schema,
    maybe_schema_out=output_schema
)

# Apply to features
result = features.pipe(inference_udxf).execute()

Process isolation means the large model’s memory usage doesn’t interfere with query processing. Model crashes don’t affect the query engine. You can dedicate GPU resources to the UDXF process.

Batch LLM processing

Process text with LLMs at scale using batched API calls for efficiency.

def sentiment_analysis_udxf(df):
    # Batch calls to OpenAI API
    df['sentiment'] = batch_call_openai(df['review_text'])
    return df

# Create UDXF
sentiment_udxf = flight_udxf(
    process_df=sentiment_analysis_udxf,
    maybe_schema_in=input_schema,
    maybe_schema_out=output_schema
)

# Apply to reviews
result = reviews.pipe(sentiment_udxf).execute()

Batching API calls in the UDXF process improves throughput. Retries happen without affecting the query engine. Results can be cached within the UDXF process.

Microservice deployment

Deploy processing as standalone services that multiple applications can access independently.

# Build expression containing UDXF
xorq build pipeline.py -e pipeline

# Serve the built expression as a Flight server
xorq serve-unbound pipeline --port 8815

This pattern enables sharing processing logic across multiple applications. It provides independent scaling of processing and query workloads. You can deploy UDXFs on specialized hardware like GPU machines.

When to use UDXFs

Deciding when to use UDXFs depends on your isolation needs, computation patterns, and service architecture requirements.

Use UDXFs when:

  • You’re calling external APIs with REST APIs, LLM services, or other HTTP endpoints that have unpredictable latency.
  • You’re running heavy ML models that consume gigabytes of memory or require GPU acceleration for inference.
  • You’re executing unreliable operations that might crash or hang, like processing untrusted data or experimental code.
  • You’re building microservice architectures with processing logic that needs to run as an independent service for multiple clients.

Use regular UDFs when:

  • You’re doing simple transformations that complete in under 100ms without external dependencies or heavy computation.
  • You’re performing arithmetic operations, string transformations, or basic data cleaning that runs faster without communication overhead.
  • You’re implementing operations tightly coupled to the query engine, like custom aggregations that interact with the query planner.

Processing one million customer reviews with OpenAI sentiment analysis requires UDXFs. External API calls with seconds of latency would block the query engine if run as regular UDFs. The UDXF process handles API calls independently, batches requests for efficiency, and isolates failures so one bad review doesn’t crash the pipeline.

Calculating the ratio between two columns works better as a regular UDF. The operation completes in microseconds while Flight communication overhead takes milliseconds. This makes the overhead larger than the computation time itself.

Trade-offs

UDXFs provide process isolation and distributed execution, but they also introduce operational complexity and communication overhead costs.

Benefits:

  • Process isolation prevents crashes in custom code from affecting the query engine, supporting safe execution of untrusted code.
  • Remote execution lets you deploy processing logic on different machines or specialized hardware like GPU servers.
  • Resource management becomes explicit because you can dedicate specific CPU and memory allocations to UDXF processes.
  • Parallel execution scales naturally by running multiple UDXF processes across machines without manual orchestration.
  • Fault tolerance improves because UDXF failures don’t crash entire pipelines, allowing graceful degradation.
  • Microservices pattern enables deploying as standalone services that multiple clients can access independently.

Costs:

  • Communication overhead from Arrow Flight adds latency, typically 1-5ms per batch transfer depending on data size.
  • Complexity increases through server startup and shutdown coordination, error handling across process boundaries, and monitoring multiple services.
  • Debugging becomes more challenging since errors span multiple processes and require distributed tracing to understand failures.
  • Server management overhead includes process spawning costs, typically 10-50ms, and resource allocation coordination.
  • Development complexity increases because you need explicit schemas for both input and output data.

External API calls and heavy ML model inference justify the UDXF overhead. The alternative of in-process execution creates worse problems through blocked query engines and crash contagion. A 5ms Flight overhead is negligible compared to 500ms API call latency.

Simple transformations like string concatenation or arithmetic operations work better as regular UDFs. The computation takes microseconds while Flight communication takes milliseconds, making the overhead larger than the benefit.

Learning more

UDXFs extend basic UDF concepts with backend-specific data exchange capabilities as explained in User-defined functions. The Apache Arrow Flight protocol supports UDXF communication as covered in How Xorq works.

UDXF deployment patterns are covered in Serving expressions as endpoints.