Join
The Join buffer component enables SQL join operations across multiple input sources. It allows you to correlate and combine data from different streams within a window.
Configuration
join
Join configuration object.
type: object
optional: false
join.query
SQL query for joining data from multiple sources.
- Use table names like
flow_input1,flow_input2, etc. to reference different input sources - Support for INNER JOIN, LEFT JOIN, RIGHT JOIN, FULL OUTER JOIN
- Can include WHERE, GROUP BY, HAVING clauses
type: string
optional: false (when join is specified)
join.codec (optional)
Codec configuration for data transformation.
type: object
optional: true
join.codec.type
Codec type.
Supported values: json, protobuf, arrow
type: string
optional: false (when codec is specified)
Examples
Simple Inner Join
buffer:
type: "session_window"
gap: 1s
join:
query: "SELECT * FROM flow_input1 INNER JOIN flow_input2 ON flow_input1.id = flow_input2.id"
codec:
type: "json"
Join with Projection
buffer:
type: "tumbling_window"
size: 10s
join:
query: |
SELECT
flow_input1.user_id,
flow_input1.name,
flow_input2.order_id,
flow_input2.amount
FROM flow_input1
INNER JOIN flow_input2 ON flow_input1.user_id = flow_input2.user_id
codec:
type: "json"
Left Join with Aggregation
buffer:
type: "sliding_window"
size: 1m
slide: 30s
join:
query: |
SELECT
flow_input1.device_id,
flow_input1.location,
AVG(flow_input2.temperature) as avg_temp,
MAX(flow_input2.humidity) as max_humidity
FROM flow_input1
LEFT JOIN flow_input2 ON flow_input1.device_id = flow_input2.device_id
GROUP BY flow_input1.device_id, flow_input1.location
codec:
type: "json"
Three-way Join
buffer:
type: "session_window"
gap: 5s
join:
query: |
SELECT
a.event_id,
a.timestamp,
b.user_name,
c.product_name,
b.quantity
FROM flow_input1 a
INNER JOIN flow_input2 b ON a.event_id = b.event_id
INNER JOIN flow_input3 c ON b.product_id = c.product_id
codec:
type: "json"
Join with Filters
buffer:
type: "tumbling_window"
size: 1m
join:
query: |
SELECT
flow_input1.sensor_id,
flow_input1.reading,
flow_input2.threshold,
flow_input1.reading > flow_input2.threshold as alert
FROM flow_input1
INNER JOIN flow_input2 ON flow_input1.sensor_type = flow_input2.sensor_type
WHERE flow_input1.timestamp > NOW() - INTERVAL '1' HOUR
codec:
type: "json"
Self-Join Pattern
buffer:
type: "sliding_window"
size: 5m
slide: 1m
join:
query: |
SELECT
a.user_id,
a.current_value,
b.previous_value,
a.current_value - b.previous_value as delta
FROM flow a
INNER JOIN flow b ON a.user_id = b.user_id
WHERE a.timestamp > b.timestamp
codec:
type: "json"
Features
- Multiple Sources: Join data from 2 or more input sources
- SQL Support: Full SQL query capabilities with DataFusion
- Window Types: Works with tumbling, sliding, and session windows
- Flexible Joins: Support for INNER, LEFT, RIGHT, and FULL OUTER joins
- Aggregations: GROUP BY, HAVING, and aggregate functions
- Data Transformation: Optional codec support for data conversion
How It Works
- Data Collection: The buffer collects data from multiple input sources within the window
- Table Registration: Each input source is registered as a table (
flow_input1,flow_input2, etc.) - Join Execution: The SQL query is executed to join and transform the data
- Result Output: Joined results are passed to the pipeline processors
Table Naming Convention
When using multiple inputs with join buffer:
- The first input is registered as
flow_input1 - The second input is registered as
flow_input2 - The third input is registered as
flow_input3 - And so on...
Use Cases
Data Enrichment
Join event streams with reference data:
join:
query: |
SELECT
events.*,
reference.category,
reference.priority
FROM flow_input1 events
INNER JOIN flow_input2 reference ON events.type_id = reference.id
Correlation Analysis
Correlate data from different sensors:
join:
query: |
SELECT
temp.sensor_id,
temp.temperature,
hum.humidity,
temp.temperature * hum.humidity as heat_index
FROM flow_input1 temp
INNER JOIN flow_input2 hum ON temp.sensor_id = hum.sensor_id
Time Series Join
Join time-series data with different timestamps:
join:
query: |
SELECT
COALESCE(a.timestamp, b.timestamp) as timestamp,
a.value as metric_a,
b.value as metric_b
FROM flow_input1 a
FULL OUTER JOIN flow_input2 b ON a.timestamp = b.timestamp
Session Analysis
Analyze user sessions across multiple event types:
join:
query: |
SELECT
user_id,
COUNT(DISTINCT pageview.session_id) as page_views,
COUNT(DISTINCT click.session_id) as clicks,
SUM(click.amount) as total_spent
FROM flow_input1 pageview
LEFT JOIN flow_input2 click ON pageview.user_id = click.user_id
GROUP BY user_id
Performance Considerations
- Window Size: Smaller windows use less memory but may miss join matches
- Data Volume: Large windows with high data rates can consume significant memory
- Join Complexity: Multiple joins and complex queries are more CPU-intensive
- Indexing: DataFusion optimizes queries, but consider filter pushdown
- Memory Management: Monitor buffer memory usage in production
Best Practices
- Specify Columns: Select only required columns instead of using
SELECT * - Filter Early: Use WHERE clauses to reduce data before joining
- Window Choice: Choose appropriate window type for your use case:
- Tumbling: Fixed-size, non-overlapping windows
- Sliding: Continuous analysis with overlap
- Session: Activity-based grouping
- Join Type: Use the most restrictive join type that meets your needs (INNER > LEFT > FULL)
- Aggregate Wisely: Use aggregations to reduce result size
Error Handling
- Invalid SQL queries will prevent stream startup
- Type mismatches between sources will cause runtime errors
- Missing tables (inputs) will be reported at startup
- Join results with no matches may be empty depending on join type
Advanced Features
Custom Functions
Use custom UDFs in join queries:
join:
query: |
SELECT
flow_input1.id,
custom_transform(flow_input1.value) as transformed
FROM flow_input1
INNER JOIN flow_input2 ON flow_input1.id = flow_input2.id
Time-based Joins
Join with time conditions:
join:
query: |
SELECT
a.event_id,
a.value,
b.value
FROM flow_input1 a
INNER JOIN flow_input2 b
ON a.event_id = b.event_id
AND ABS(a.timestamp - b.timestamp) < 60
Subqueries
Use subqueries for complex logic:
join:
query: |
SELECT *
FROM (
SELECT
device_id,
AVG(temperature) as avg_temp
FROM flow_input1
GROUP BY device_id
) agg
INNER JOIN flow_input2 ref ON agg.device_id = ref.device_id
WHERE agg.avg_temp > ref.threshold