Skip to main content
Version: Next

Session Window

The Session Window buffer component provides a session-based message grouping mechanism where messages are grouped based on activity gaps. It implements a session window that closes after a configurable period of inactivity.

Configuration

gap

The maximum time gap between messages in a session. If no new messages arrive within this duration, the session is considered complete and all accumulated messages are emitted.

type: string

required: true

example: 1ms, 1s, 1m, 1h, 1d

join

Optional join configuration for SQL join operations on message batches. When specified, allows joining multiple message sources using SQL queries.

type: object

required: false

query

The SQL query to execute for joining message batches from different input sources.

type: string

required: true (when join is specified)

value_field

The field name to use for binary data values. Defaults to the system default binary value field.

type: string

required: false

codec

The codec configuration for decoding message batches before joining.

type: object

required: true (when join is specified)

Internal Mechanism

  • Built on top of the BaseWindow component which provides core windowing functionality
  • Messages are grouped by session keys using RwLock<HashMap<String, Arc<RwLock<VecDeque>>>>
  • Each session maintains its own message queue with independent timeout tracking
  • When a session exceeds the configured gap duration without new messages, it triggers window emission
  • Messages within the same session are batched and concatenated during processing using Arrow's concat_batches
  • Optional SQL join operations are performed using DataFusion's query engine with parallel decoding
  • Uses Tokio's async runtime with cancellation tokens for efficient timeout management
  • Implements proper session cleanup to prevent memory leaks from inactive sessions
  • Join operations validate that all required input tables are present before executing SQL queries

Examples

Basic Configuration

buffer:
type: "session_window"
gap: "5s" # Close session after 5 seconds of inactivity

This example configures a session window buffer that will:

  • Group messages into sessions
  • Close the session and process messages when no new messages arrive for 5 seconds

The buffer helps group related messages that occur close together in time while separating unrelated messages that have gaps between them.

With Join Configuration

buffer:
type: "session_window"
gap: "10s" # Close session after 10 seconds of inactivity
join:
query: "SELECT a.user_id, a.event_type, b.metadata FROM events a JOIN metadata b ON a.user_id = b.user_id"
codec:
type: "json"

This example configures a session window buffer with SQL join operations that:

  • Groups messages into sessions with 10-second inactivity gap
  • Joins event data with metadata using SQL
  • Uses JSON codec for message decoding
  • Processes joined data when session closes