Database Integration

Use Tauq for efficient database exports, imports, and data transfer between systems.

PostgreSQL

Export to Tauq

# export_to_tauq.py
import psycopg2
import tauq

conn = psycopg2.connect("postgresql://localhost/mydb")
cursor = conn.cursor()

# Fetch data
cursor.execute("""
    SELECT id, name, email, created_at
    FROM users
    WHERE active = true
    ORDER BY id
""")

# Get column names from cursor
columns = [desc[0] for desc in cursor.description]

# Stream to Tauq file
with open('users_export.tqn', 'w') as f:
    # Write schema
    f.write(f"!def User {' '.join(columns)}\n")

    # Stream rows
    while True:
        rows = cursor.fetchmany(1000)
        if not rows:
            break
        for row in rows:
            f.write(tauq.format_row(row) + '\n')

cursor.close()
conn.close()

Import from Tauq

# import_from_tauq.py
import psycopg2
import tauq

conn = psycopg2.connect("postgresql://localhost/mydb")
cursor = conn.cursor()

# Stream from Tauq file
with open('users_import.tqn') as f:
    for record in tauq.stream(f):
        cursor.execute("""
            INSERT INTO users (id, name, email, created_at)
            VALUES (%(id)s, %(name)s, %(email)s, %(created_at)s)
            ON CONFLICT (id) DO UPDATE SET
                name = EXCLUDED.name,
                email = EXCLUDED.email
        """, record)

conn.commit()
cursor.close()
conn.close()

SQLite

Backup and Restore

import sqlite3
import tauq

def backup_table(db_path, table_name, output_path):
    """Export a SQLite table to Tauq."""
    conn = sqlite3.connect(db_path)
    conn.row_factory = sqlite3.Row
    cursor = conn.cursor()

    cursor.execute(f"SELECT * FROM {table_name}")
    rows = cursor.fetchall()

    if rows:
        columns = rows[0].keys()
        data = [dict(row) for row in rows]
        with open(output_path, 'w') as f:
            f.write(tauq.dumps(data))

    conn.close()

def restore_table(db_path, table_name, input_path):
    """Import Tauq data into a SQLite table."""
    conn = sqlite3.connect(db_path)
    cursor = conn.cursor()

    with open(input_path) as f:
        records = tauq.load(f)

    if records:
        columns = records[0].keys()
        placeholders = ', '.join(['?' for _ in columns])
        cols = ', '.join(columns)

        for record in records:
            values = [record[col] for col in columns]
            cursor.execute(
                f"INSERT OR REPLACE INTO {table_name} ({cols}) VALUES ({placeholders})",
                values
            )

    conn.commit()
    conn.close()

# Usage
backup_table('app.db', 'users', 'backup/users.tqn')
restore_table('app.db', 'users', 'backup/users.tqn')

MongoDB

Export Collection

# mongo_export.py
from pymongo import MongoClient
import tauq

client = MongoClient('mongodb://localhost:27017')
db = client['myapp']

# Export users collection
users = list(db.users.find({}, {'_id': 0}))  # Exclude _id

with open('users.tqn', 'w') as f:
    f.write(tauq.dumps(users))

# For large collections, use streaming
def export_large_collection(collection, output_path, batch_size=10000):
    cursor = collection.find({}, {'_id': 0})

    with open(output_path, 'w') as f:
        # Peek first document for schema
        first = cursor.next()
        columns = list(first.keys())
        f.write(f"!def Record {' '.join(columns)}\n")
        f.write(tauq.format_row([first[c] for c in columns]) + '\n')

        # Stream rest
        for doc in cursor:
            f.write(tauq.format_row([doc.get(c) for c in columns]) + '\n')

export_large_collection(db.events, 'events.tqn')

Redis

Cache Tauq Data

import redis
import tauq

r = redis.Redis(host='localhost', port=6379)

def cache_as_tauq(key, data, ttl=3600):
    """Cache data in Tauq format for token efficiency."""
    tauq_str = tauq.dumps(data)
    r.setex(key, ttl, tauq_str)
    return tauq_str

def get_cached_tauq(key):
    """Retrieve cached Tauq data."""
    data = r.get(key)
    if data:
        return tauq.loads(data.decode())
    return None

# Usage
users = [
    {"id": 1, "name": "Alice"},
    {"id": 2, "name": "Bob"},
]

# Store as Tauq (more compact than JSON)
cache_as_tauq('users:active', users)

# Retrieve
cached_users = get_cached_tauq('users:active')

ETL Pipeline Example

Complete Pipeline

# etl_pipeline.py
"""
ETL Pipeline: PostgreSQL -> Transform -> Data Warehouse
Using Tauq as intermediate format for efficiency
"""
import psycopg2
import tauq
from datetime import datetime

class TauqETL:
    def __init__(self, source_dsn, target_dsn):
        self.source = psycopg2.connect(source_dsn)
        self.target = psycopg2.connect(target_dsn)

    def extract(self, query, output_file):
        """Extract data to Tauq file."""
        cursor = self.source.cursor()
        cursor.execute(query)

        columns = [desc[0] for desc in cursor.description]

        with open(output_file, 'w') as f:
            f.write(f"!def Record {' '.join(columns)}\n")

            batch_num = 0
            while True:
                rows = cursor.fetchmany(10000)
                if not rows:
                    break
                for row in rows:
                    f.write(tauq.format_row(row) + '\n')
                batch_num += 1
                print(f"Extracted batch {batch_num}")

        cursor.close()
        return output_file

    def transform(self, input_file, output_file, transform_fn):
        """Transform Tauq data."""
        with open(input_file) as fin, open(output_file, 'w') as fout:
            schema_written = False

            for record in tauq.stream(fin):
                transformed = transform_fn(record)

                if not schema_written:
                    columns = list(transformed.keys())
                    fout.write(f"!def Record {' '.join(columns)}\n")
                    schema_written = True

                fout.write(tauq.format_row(
                    [transformed[c] for c in columns]
                ) + '\n')

        return output_file

    def load(self, input_file, table_name):
        """Load Tauq data into target database."""
        cursor = self.target.cursor()

        with open(input_file) as f:
            records = list(tauq.stream(f))

        if records:
            columns = list(records[0].keys())
            placeholders = ', '.join(['%s' for _ in columns])
            cols = ', '.join(columns)

            for record in records:
                values = [record[col] for col in columns]
                cursor.execute(
                    f"INSERT INTO {table_name} ({cols}) VALUES ({placeholders})",
                    values
                )

        self.target.commit()
        cursor.close()

# Usage
etl = TauqETL(
    source_dsn="postgresql://source/db",
    target_dsn="postgresql://warehouse/db"
)

# Extract
etl.extract(
    "SELECT id, name, revenue, created_at FROM orders WHERE created_at > '2024-01-01'",
    "extracted.tqn"
)

# Transform
def transform_order(record):
    return {
        'order_id': record['id'],
        'customer': record['name'].upper(),
        'revenue_cents': int(record['revenue'] * 100),
        'year': record['created_at'].year,
        'month': record['created_at'].month,
    }

etl.transform("extracted.tqn", "transformed.tqn", transform_order)

# Load
etl.load("transformed.tqn", "fact_orders")

Storage Comparison

Dataset JSON CSV Tauq Savings vs JSON
10K users 2.4 MB 1.4 MB 1.1 MB 54%
100K orders 28 MB 16 MB 13 MB 54%
1M events 310 MB 180 MB 142 MB 54%

Tauq provides consistent ~54% savings over JSON for tabular data exports.

Learn More

Explore streaming and data processing patterns.