Skip to main content

Documentation Index

Fetch the complete documentation index at: https://motiadev-docs-verdict-review-plan.mintlify.app/llms.txt

Use this file to discover all available pages before exploring further.

Durable streams for real-time data subscriptions.
iii-stream

Architecture

Data Flow

When a worker triggers stream::set, the engine:
  1. Persists the data via the configured adapter (Redis or KvStore)
  2. Publishes a notification to all WebSocket clients subscribed to that stream and group
  3. Evaluates registered stream triggers and fires matching handlers
A single stream::set handles persistence, real-time delivery, and reactive logic in one operation.

Groups

Streams organize data hierarchically: stream_name > group_id > item_id.
  • stream_name identifies the top-level stream (e.g. chat, presence, dashboard)
  • group_id partitions data within a stream (e.g. room-1, team-alpha)
  • item_id uniquely identifies a record within a group (e.g. user-123, msg-456)
Clients subscribe at the group level by connecting to ws://host:port/stream/{stream_name}/{group_id}/. They receive all item-level changes within that group.

Sample Configuration

- name: iii-stream
  config:
    port: ${STREAM_PORT:3112}
    host: 0.0.0.0
    adapter:
      name: redis
      config:
        redis_url: ${REDIS_URL:redis://localhost:6379}

Configuration

port
number
The port to listen on. Defaults to 3112.
host
string
The host to listen on. Defaults to 0.0.0.0.
auth_function
string
The authentication function to use. It’s a path to a function that will be used to authenticate the client. You can register the function using the iii SDK and then use the path to the function here.
adapter
Adapter
The adapter to use. It’s the adapter that will be used to store the streams. You can register the adapter using the iii SDK and then use the path to the adapter here.

Adapters

redis

Uses Redis as the backend for the streams. Stores stream data in Redis and leverages Redis Pub/Sub for real-time event delivery.
name: redis
config:
  redis_url: ${REDIS_URL:redis://localhost:6379}

Configuration

redis_url
string
The URL of the Redis instance to use.

kv

Built-in key-value store. Supports in-memory or file-based persistence. No external dependencies required.
name: kv
config:
  store_method: file_based
  file_path: ./data/streams_store.db

Configuration

store_method
string
Storage method. Options: in_memory (lost on restart) or file_based (persisted to disk).
file_path
string
Directory path for file-based storage. Each stream is stored as a separate file.

Functions

stream::set
function
Sets a value in the stream.
stream_name
string
required
The ID of the stream to set the value in.
group_id
string
required
The group ID of the stream to set the value in.
item_id
string
required
The item ID of the stream to set the value in.
data
any
required
The value to set in the stream.
old_value
any
The previous value, or null if the item was newly created.
new_value
any
required
The value now stored in the stream.
stream::get
function
Gets a value from the stream.
stream_name
string
required
The ID of the stream to retrieve the value from.
group_id
string
required
The group ID in the stream to retrieve the value from.
item_id
string
required
The item ID in the stream to retrieve.
value
any
required
The value retrieved from the stream.
stream::delete
function
Deletes a value from the stream.
stream_name
string
required
The ID of the stream to delete the value from.
group_id
string
required
The group ID in the stream to delete the value from.
item_id
string
required
The item ID in the stream to delete.
old_value
any
The value that was deleted, or null if the item did not exist.
stream::list
function
Retrieves a group from the stream. This function will return all the items in the group.
stream_name
string
required
The ID of the stream to retrieve the group from.
group_id
string
required
The group ID in the stream to retrieve the group from.
group
any[]
required
The group retrieved from the stream. It’s an array of items in the group.
stream::list_groups
function
List all groups in a stream.
stream_name
string
required
The ID of the stream to list groups from.
groups
string[]
required
An array of group IDs in the stream.
stream::list_all
function
List all streams with their group metadata.
This function takes no parameters.
stream
object[]
required
An array of stream metadata objects. Each object has an id (string) and a groups (string[]) field.
count
number
required
The total number of streams.
stream::send
function
Send a custom event to all subscribers of a stream group.
stream_name
string
required
The ID of the stream to send the event to.
group_id
string
required
The group ID in the stream to send the event to.
type
string
required
The event type string delivered to subscribers.
id
string
Optional item ID to associate the event with.
data
any
required
The event payload delivered to subscribers.
result
null
Returns null on success.
stream::update
function
Atomically update an item in the stream using a list of operations.
stream_name
string
required
The ID of the stream containing the item to update.
group_id
string
required
The group ID in the stream containing the item to update.
item_id
string
required
The item ID in the stream to update.
ops
UpdateOp[]
required
The list of atomic operations to apply. Each operation is a tagged object with a type field (set, merge, increment, decrement, append, or remove) and associated fields (path, value, by). For set / increment / decrement / append / remove, paths are first-level field names. For merge, path accepts either a single string (legacy / first-level field) or an array of literal segments for nested merge — see the state worker docs for the full contract and validation rules. Use path: "" (or omit path) to target the root value.
old_value
any
The previous value, or null if the item was newly created.
new_value
any
required
The value now stored in the stream after applying the operations.
errors
UpdateOpError[]
Per-op validation errors. Field is omitted when empty. Each entry has op_index, code, message, and an optional doc_url.

Error codes

stream::update uses the same update engine as state::update. Each op may add an entry to the response errors array. Operations are best-effort: successfully applied ops still reflect in new_value, and failed ops are skipped.
CodeTriggered whenFix
set.target_not_objectset tried to write a field while the current value is not an objectSet the root to an object first, or use path: "" to replace the root.
append.target_not_objectappend used a field path while the current value is not an objectSet the root to an object first, or append at path: "".
append.type_mismatchappend targeted an incompatible existing value, such as appending to a number or appending a non-string to a stringMatch the appended value to the existing field type, or initialize the field to an array, string, or null.
increment.target_not_objectincrement used a field path while the current value is not an objectSet the root to an object first.
increment.not_numberincrement targeted an existing field that is not a numberInitialize the field as a number first, for example with set to 0.
decrement.target_not_objectdecrement used a field path while the current value is not an objectSet the root to an object first.
decrement.not_numberdecrement targeted an existing field that is not a numberInitialize the field as a number first, for example with set to 0.
remove.target_not_objectremove used a field path while the current value is not an objectSet the root to an object first. Removing a missing field from an object remains silent.
<op>.path.proto_pollutedA path segment is __proto__, constructor, or prototypeUse a different field name.
<op>.path.segment_too_longA path segment is longer than 256 bytesShorten the field name or merge path segment.
merge.path.too_deepA nested merge path has more than 32 segmentsReduce the nested path depth.
merge.path.empty_segmentA nested merge path array contains an empty segmentRemove the empty segment.
merge.value.not_an_objectmerge value is not a JSON objectPass an object as the merge value.
merge.value.too_deepmerge value has JSON nesting deeper than 16 levelsFlatten the value.
merge.value.too_many_keysmerge value has more than 1024 top-level keysSplit the write into smaller updates.
merge.value.proto_pollutedA top-level key in the merge value is __proto__, constructor, or prototypeUse a different key name.
Each error includes op_index, code, and message; doc_url is optional.
{
  "old_value": { "count": 1 },
  "new_value": { "count": 1 },
  "errors": [
    {
      "op_index": 0,
      "code": "append.type_mismatch",
      "message": "Cannot append at path 'count': target is number, expected array, string, null, or missing field.",
      "doc_url": "https://iii.dev/docs/workers/iii-state#error-codes"
    }
  ]
}

Authentication

It’s possible to implement a function to handle authentication.
  1. Define a function to handle the authentication. It received one single argument with the request data.
iii.registerFunction('onAuth', (input) => ({
  context: { name: 'John Doe' },
}))
  1. Make sure you add the function to the configuration file.
- name: iii-stream
  config:
    auth_function: onAuth
  1. Now whenever someone opens a websocket connection, the function onAuth will be called with the request data.

Trigger Types

This worker adds three trigger types: stream (item changes), stream:join (WebSocket connect), and stream:leave (WebSocket disconnect).

stream:join and stream:leave

Fire when a client connects or disconnects via WebSocket. Both trigger types deliver the same payload to the handler:
subscription_id
string
required
The subscription ID, used for uniqueness and logging.
stream_name
string
required
The stream name of the subscription.
group_id
string
required
The group ID of the subscription.
id
string
The item ID of the subscription, if provided by the client.
context
object
The context generated by the authentication layer.
import { StreamJoinLeaveEvent } from 'iii-sdk/stream'

const fn = iii.registerFunction('onJoin', (input: StreamJoinLeaveEvent) => {
  console.log(`Joined ${input.stream_name}/${input.group_id}`, input.context)
  return {}
})

iii.registerTrigger({
  type: 'stream:join',
  function_id: fn.id,
  config: {},
})

stream

Fires when an item changes in the stream (via stream::set, stream::update, or stream::delete). Register with a config object to filter which stream, group, or item triggers the handler:
stream_name
string
required
The stream name to watch. Only changes on this stream fire the handler.
group_id
string
If set, only changes within this group fire the handler.
item_id
string
If set, only changes to this specific item fire the handler.
condition_function_id
string
Function ID for conditional execution. The engine invokes it with the event payload; if it returns false, the handler function is not called.
The handler receives a payload with the following shape:
type
string
required
The event type: create, update, or delete.
timestamp
number
required
Unix timestamp of the event.
streamName
string
required
The stream where the change occurred.
groupId
string
required
The group where the change occurred.
id
string
The item ID that changed.
event
object
required
The event detail object containing type and data fields.
import { StreamChangeEvent } from 'iii-sdk/stream'

const fn = iii.registerFunction('onChatMessage', (input: StreamChangeEvent) => {
  console.log(`[${input.event.type}] ${input.streamName}/${input.groupId}/${input.id}`, input.event.data)
  return {}
})

iii.registerTrigger({
  type: 'stream',
  function_id: fn.id,
  config: { stream_name: 'chat' },
})

Usage Example: Real-Time Presence

Streams organize data by stream_name, group_id, and item_id. Use for live presence, collaborative docs, or dashboards:
import { registerWorker, TriggerAction } from 'iii-sdk'

const iii = registerWorker('ws://localhost:49134')

iii.trigger({
  function_id: 'stream::set',
  payload: {
    stream_name: 'presence',
    group_id: 'room-1',
    item_id: 'user-123',
    data: { name: 'Alice', online: true, lastSeen: new Date().toISOString() },
  },
  action: TriggerAction.Void(),
})

const user = await iii.trigger({
  function_id: 'stream::get',
  payload: {
    stream_name: 'presence',
    group_id: 'room-1',
    item_id: 'user-123',
  },
})

const roomMembers = await iii.trigger({
  function_id: 'stream::list',
  payload: {
    stream_name: 'presence',
    group_id: 'room-1',
  },
})

iii.trigger({
  function_id: 'stream::delete',
  payload: {
    stream_name: 'presence',
    group_id: 'room-1',
    item_id: 'user-123',
  },
  action: TriggerAction.Void(),
})
Clients connect via WebSocket to ws://host:3112/stream/presence/room-1/ and receive real-time updates when items change.

Usage Example: Join with Auth Context

Configure the stream worker with an auth function:
- name: iii-stream
  config:
    port: 3112
    host: 0.0.0.0
    auth_function: stream::auth
    adapter:
      name: kv
      config:
        store_method: file_based
        file_path: ./data/stream_store
Register the auth function. Clients may send the token via Authorization: Bearer <token> (Node.js) or Sec-WebSocket-Protocol: Authorization,<token> (browser stream-client):
iii.registerFunction('stream::auth', (input) => {
  const auth = input.headers?.['authorization']?.replace(/^Bearer\s+/i, '')
  const proto = input.headers?.['sec-websocket-protocol']
  const token = auth ?? (proto?.startsWith('Authorization,') ? proto.slice(13) : null)
  return token ? { context: { userId: 'user-from-token' } } : { context: null }
})
Join/leave triggers receive the auth context:
import { StreamJoinLeaveEvent } from 'iii-sdk/stream'

const fn = iii.registerFunction('onJoin', (input: StreamJoinLeaveEvent) => {
  if (input.context?.userId) {
    console.log(`User ${input.context.userId} joined ${input.stream_name}/${input.group_id}/${input.id}`)
  }
  return {}
})

iii.registerTrigger({
  type: 'stream:join',
  function_id: fn.id,
  config: {},
})

Usage Example: Conditional Join

import { StreamJoinLeaveEvent } from 'iii-sdk/stream'

const conditionFn = iii.registerFunction(
  { id: 'conditions::requireContext' },
  async (input: StreamJoinLeaveEvent) => input.context?.userId != null,
)

const fn = iii.registerFunction('onJoin', (input: StreamJoinLeaveEvent) => {
  console.log('User joined:', input.context?.userId, input.stream_name)
  return {}
})

iii.registerTrigger({
  type: 'stream:join',
  function_id: fn.id,
  config: { condition_function_id: conditionFn.id },
})