Data Processing
Stream large datasets efficiently. Perfect for ETL pipelines, data transformations, and batch processing.
Why Tauq for Data Processing?
🚀
Streaming-First
Process records as they arrive. No need to load entire files into memory.
📉
Smaller Files
54% smaller than JSON means faster I/O and lower storage costs.
🔧
Schema Validation
Built-in schemas ensure data consistency throughout your pipeline.
ETL Pipelines
Extract, Transform, Load
Tauq's streaming API makes it ideal for processing large datasets without loading everything into memory.
# Extract from database, transform to Tauq
import tauq
import psycopg2
# Stream 1M records without loading all into memory
with psycopg2.connect(dsn) as conn:
with conn.cursor(name='large_cursor') as cursor:
cursor.execute("SELECT id, name, email, created_at FROM users")
# Write schema once
with open('users.tqn', 'w') as f:
f.write('!def User id name email created_at\n')
# Stream records
for batch in cursor.fetchmany(1000):
for row in batch:
f.write(tauq.format_row(row) + '\n') Load Back to Database
# Stream Tauq back into database
with open('users.tqn') as f:
for record in tauq.stream(f):
cursor.execute(
"INSERT INTO users_new (id, name, email, created_at) VALUES (%s, %s, %s, %s)",
(record['id'], record['name'], record['email'], record['created_at'])
) Data Transformations
Using TQQ (Tauq Query)
TQQ provides a powerful preprocessor for filtering, transforming, and aggregating data.
# Filter and transform with TQQ
tauq query 'SELECT id, name, total WHERE total > 100 ORDER BY total DESC' orders.tqn
# Output:
!def Order id name total
1042 "Enterprise Inc" 15420.00
1038 "Tech Corp" 8750.50
1045 "Global LLC" 2340.00 Schema Evolution
Easily add or rename fields while maintaining backwards compatibility.
# Old schema
!def User id name email
# New schema with additional field
!def UserV2 id name email role created_at
# Transform old to new
tauq transform --add-field "role:user" --add-field "created_at:now()" users.tqn Batch Processing
Process Large Files in Chunks
import tauq
def process_in_batches(filename, batch_size=10000):
"""Process a large Tauq file in memory-efficient batches."""
batch = []
with open(filename) as f:
for record in tauq.stream(f):
batch.append(record)
if len(batch) >= batch_size:
yield batch
batch = []
# Don't forget the last batch
if batch:
yield batch
# Process 1M records in 10K batches
for batch in process_in_batches('large_dataset.tqn'):
results = process_batch(batch)
save_results(results) Parallel Processing
import tauq
from concurrent.futures import ProcessPoolExecutor
def process_chunk(chunk_data):
"""Process a chunk of records."""
return [transform(record) for record in chunk_data]
# Split file and process in parallel
chunks = list(process_in_batches('data.tqn', batch_size=50000))
with ProcessPoolExecutor(max_workers=8) as executor:
results = list(executor.map(process_chunk, chunks))
# Combine results
all_results = [item for chunk in results for item in chunk] Format Comparison
| Feature | Tauq | JSON | CSV | Parquet |
|---|---|---|---|---|
| Streaming | Native | With SAX | Native | No |
| Schema | Built-in | External | Header row | Built-in |
| Human Readable | Yes | Yes | Yes | No |
| Nested Data | Yes | Yes | No | Yes |
| Size (1M records) | ~110 MB | ~240 MB | ~140 MB | ~45 MB |
Tauq offers the best balance of streaming support, schema validation, and human readability.
CLI Tools for Data Processing
Convert JSON to Tauq
tauq format large_dataset.json -o dataset.tqn Validate Schema
tauq validate dataset.tqn --schema user_schema.tqn Merge Multiple Files
tauq merge part1.tqn part2.tqn part3.tqn -o combined.tqn Split Large Files
tauq split large.tqn --lines 100000 -o chunks/part_ Ready to Process Data?
Install Tauq and start building efficient data pipelines today.