Data Processing

Stream large datasets efficiently. Perfect for ETL pipelines, data transformations, and batch processing.

Why Tauq for Data Processing?

🚀

Streaming-First

Process records as they arrive. No need to load entire files into memory.

📉

Smaller Files

54% smaller than JSON means faster I/O and lower storage costs.

🔧

Schema Validation

Built-in schemas ensure data consistency throughout your pipeline.

ETL Pipelines

Extract, Transform, Load

Tauq's streaming API makes it ideal for processing large datasets without loading everything into memory.

# Extract from database, transform to Tauq
import tauq
import psycopg2

# Stream 1M records without loading all into memory
with psycopg2.connect(dsn) as conn:
    with conn.cursor(name='large_cursor') as cursor:
        cursor.execute("SELECT id, name, email, created_at FROM users")

        # Write schema once
        with open('users.tqn', 'w') as f:
            f.write('!def User id name email created_at\n')

            # Stream records
            for batch in cursor.fetchmany(1000):
                for row in batch:
                    f.write(tauq.format_row(row) + '\n')

Load Back to Database

# Stream Tauq back into database
with open('users.tqn') as f:
    for record in tauq.stream(f):
        cursor.execute(
            "INSERT INTO users_new (id, name, email, created_at) VALUES (%s, %s, %s, %s)",
            (record['id'], record['name'], record['email'], record['created_at'])
        )

Data Transformations

Using TQQ (Tauq Query)

TQQ provides a powerful preprocessor for filtering, transforming, and aggregating data.

# Filter and transform with TQQ
tauq query 'SELECT id, name, total WHERE total > 100 ORDER BY total DESC' orders.tqn

# Output:
!def Order id name total
1042 "Enterprise Inc" 15420.00
1038 "Tech Corp" 8750.50
1045 "Global LLC" 2340.00

Schema Evolution

Easily add or rename fields while maintaining backwards compatibility.

# Old schema
!def User id name email

# New schema with additional field
!def UserV2 id name email role created_at

# Transform old to new
tauq transform --add-field "role:user" --add-field "created_at:now()" users.tqn

Batch Processing

Process Large Files in Chunks

import tauq

def process_in_batches(filename, batch_size=10000):
    """Process a large Tauq file in memory-efficient batches."""
    batch = []

    with open(filename) as f:
        for record in tauq.stream(f):
            batch.append(record)

            if len(batch) >= batch_size:
                yield batch
                batch = []

        # Don't forget the last batch
        if batch:
            yield batch

# Process 1M records in 10K batches
for batch in process_in_batches('large_dataset.tqn'):
    results = process_batch(batch)
    save_results(results)

Parallel Processing

import tauq
from concurrent.futures import ProcessPoolExecutor

def process_chunk(chunk_data):
    """Process a chunk of records."""
    return [transform(record) for record in chunk_data]

# Split file and process in parallel
chunks = list(process_in_batches('data.tqn', batch_size=50000))

with ProcessPoolExecutor(max_workers=8) as executor:
    results = list(executor.map(process_chunk, chunks))

# Combine results
all_results = [item for chunk in results for item in chunk]

Format Comparison

Feature Tauq JSON CSV Parquet
Streaming Native With SAX Native No
Schema Built-in External Header row Built-in
Human Readable Yes Yes Yes No
Nested Data Yes Yes No Yes
Size (1M records) ~110 MB ~240 MB ~140 MB ~45 MB

Tauq offers the best balance of streaming support, schema validation, and human readability.

CLI Tools for Data Processing

Convert JSON to Tauq

tauq format large_dataset.json -o dataset.tqn

Validate Schema

tauq validate dataset.tqn --schema user_schema.tqn

Merge Multiple Files

tauq merge part1.tqn part2.tqn part3.tqn -o combined.tqn

Split Large Files

tauq split large.tqn --lines 100000 -o chunks/part_

Ready to Process Data?

Install Tauq and start building efficient data pipelines today.