SQL
The SQL output component writes messages to SQL databases. It supports batch inserts, UPSERT operations, and connection pooling.
Configuration
type
Database type.
Supported values: mysql, postgresql, sqlite
type: string
optional: false
dsn
Data Source Name (connection string).
type: string
optional: false
MySQL DSN Format
username:password@tcp(host:port)/database
Example: user:pass@tcp(localhost:3306)/mydb
PostgreSQL DSN Format
postgresql://username:password@host:port/database
Example: postgresql://user:pass@localhost:5432/mydb
SQLite DSN Format
path/to/database.db
Example: /data/app.db
table
Target table name.
type: string
optional: false
batch_size (optional)
Number of rows to batch before inserting.
type: integer
default: 100
optional: true
primary_key (optional)
Column name(s) to use for UPSERT operations. When specified, performs INSERT ... ON CONFLICT/DUPLICATE KEY UPDATE.
type: string or array of string
optional: true
Examples
MySQL Batch Insert
- output:
type: "sql"
type: "mysql"
dsn: "user:password@tcp(mysql-server:3306)/analytics"
table: "events"
batch_size: 500
PostgreSQL with UPSERT
- output:
type: "sql"
type: "postgresql"
dsn: "postgresql://user:pass@localhost:5432/production"
table: "metrics"
primary_key: "id"
batch_size: 1000
SQLite Database
- output:
type: "sql"
type: "sqlite"
dsn: "/data/local.db"
table: "sensor_readings"
batch_size: 100
PostgreSQL with Composite Primary Key
- output:
type: "sql"
type: "postgresql"
dsn: "postgresql://user:pass@postgres:5432/app"
table: "daily_stats"
primary_key: ["device_id", "date"]
batch_size: 200
High Throughput MySQL
- output:
type: "sql"
type: "mysql"
dsn: "write_user:write_pass@tcp(mysql-primary:3306)/high_traffic"
table: "event_log"
batch_size: 5000
Features
- Multiple Databases: Support for MySQL, PostgreSQL, and SQLite
- Batch Inserts: Efficient batch operations for high throughput
- UPSERT Support: Insert or update based on primary key
- Connection Pooling: Automatic connection pooling for performance
- Transaction Support: Automatic transaction management
- Composite Keys: Support for multi-column primary keys
Database-Specific Notes
MySQL
- Uses
INSERT ... ON DUPLICATE KEY UPDATEfor UPSERT - Requires at least one unique key or primary key
- DSN format:
username:password@protocol(host:port)/database
PostgreSQL
- Uses
INSERT ... ON CONFLICT DO UPDATEfor UPSERT - Supports UPSERT on any column with unique constraint
- DSN format:
postgresql://username:password@host:port/database
SQLite
- Uses
INSERT OR REPLACEfor UPSERT - Single file database
- No server required
- DSN format: path to database file
Best Practices
- Use Appropriate Batch Sizes: Larger batches (100-5000) improve throughput but increase memory usage
- Set Primary Keys for UPSERT: Enable upsert functionality for idempotent writes
- Monitor Connection Pool: Watch for connection exhaustion under high load
- Use Transactions: Batches are automatically wrapped in transactions
- Handle Timeouts: Configure appropriate timeout values in DSN
- Schema Design: Ensure table schema matches your data structure
- Indexing: Create indexes on frequently queried columns
Error Handling
The output component will:
- Retry failed batch operations
- Log detailed error messages
- Continue processing on non-fatal errors
- Validate connection on startup
Performance Tips
- Batch Size: Start with 500-1000 and tune based on your workload
- Connection Pool: The connection pool size is automatically managed
- Network Latency: For remote databases, larger batches reduce round-trip overhead
- Database Tuning: Configure database server for optimal write performance
- Disable Indexes: For bulk loads, consider temporarily disabling non-critical indexes
Example Schema
MySQL Table Schema
CREATE TABLE events (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
timestamp BIGINT NOT NULL,
device_id VARCHAR(255) NOT NULL,
temperature FLOAT,
humidity FLOAT,
status VARCHAR(50),
INDEX idx_device (device_id),
INDEX idx_timestamp (timestamp)
);
PostgreSQL Table Schema
CREATE TABLE events (
id BIGSERIAL PRIMARY KEY,
timestamp BIGINT NOT NULL,
device_id VARCHAR(255) NOT NULL,
temperature FLOAT,
humidity FLOAT,
status VARCHAR(50)
);
CREATE INDEX idx_device ON events(device_id);
CREATE INDEX idx_timestamp ON events(timestamp);
SQLite Table Schema
CREATE TABLE events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp INTEGER NOT NULL,
device_id TEXT NOT NULL,
temperature REAL,
humidity REAL,
status TEXT
);
CREATE INDEX idx_device ON events(device_id);
CREATE INDEX idx_timestamp ON events(timestamp);