Streams organize data hierarchically: stream_name > group_id > item_id.
stream_name identifies the top-level stream (e.g. chat, presence, dashboard)
group_id partitions data within a stream (e.g. room-1, team-alpha)
item_id uniquely identifies a record within a group (e.g. user-123, msg-456)
Clients subscribe at the group level by connecting to ws://host:port/stream/{stream_name}/{group_id}/. They receive all item-level changes within that group.
The authentication function to use. It’s a path to a function that will be used to authenticate the client. You can
register the function using the iii SDK and then use the path to the function here.
The adapter to use. It’s the adapter that will be used to store the streams. You can register the adapter using the
iii SDK and then use the path to the adapter here.
The list of atomic operations to apply. Each operation is a tagged object with a type field (set, merge, increment, decrement, append, or remove) and associated fields (path, value, by). For set / increment / decrement / append / remove, paths are first-level field names. For merge, path accepts either a single string (legacy / first-level field) or an array of literal segments for nested merge — see the state worker docs for the full contract and validation rules. Use path: "" (or omit path) to target the root value.
stream::update uses the same update engine as state::update. Each op may add an entry to the response errors array. Operations are best-effort: successfully applied ops still reflect in new_value, and failed ops are skipped.
Code
Triggered when
Fix
set.target_not_object
set tried to write a field while the current value is not an object
Set the root to an object first, or use path: "" to replace the root.
append.target_not_object
append used a field path while the current value is not an object
Set the root to an object first, or append at path: "".
append.type_mismatch
append targeted an incompatible existing value, such as appending to a number or appending a non-string to a string
Match the appended value to the existing field type, or initialize the field to an array, string, or null.
increment.target_not_object
increment used a field path while the current value is not an object
Set the root to an object first.
increment.not_number
increment targeted an existing field that is not a number
Initialize the field as a number first, for example with set to 0.
decrement.target_not_object
decrement used a field path while the current value is not an object
Set the root to an object first.
decrement.not_number
decrement targeted an existing field that is not a number
Initialize the field as a number first, for example with set to 0.
remove.target_not_object
remove used a field path while the current value is not an object
Set the root to an object first. Removing a missing field from an object remains silent.
<op>.path.proto_polluted
A path segment is __proto__, constructor, or prototype
Use a different field name.
<op>.path.segment_too_long
A path segment is longer than 256 bytes
Shorten the field name or merge path segment.
merge.path.too_deep
A nested merge path has more than 32 segments
Reduce the nested path depth.
merge.path.empty_segment
A nested merge path array contains an empty segment
Remove the empty segment.
merge.value.not_an_object
merge value is not a JSON object
Pass an object as the merge value.
merge.value.too_deep
merge value has JSON nesting deeper than 16 levels
Flatten the value.
merge.value.too_many_keys
merge value has more than 1024 top-level keys
Split the write into smaller updates.
merge.value.proto_polluted
A top-level key in the merge value is __proto__, constructor, or prototype
Use a different key name.
Each error includes op_index, code, and message; doc_url is optional.
Fires when an item changes in the stream (via stream::set, stream::update, or stream::delete). Register with a config object to filter which stream, group, or item triggers the handler:
Register the auth function. Clients may send the token via Authorization: Bearer <token> (Node.js) or Sec-WebSocket-Protocol: Authorization,<token> (browser stream-client):
import { StreamJoinLeaveEvent } from 'iii-sdk/stream'const fn = iii.registerFunction('onJoin', (input: StreamJoinLeaveEvent) => { if (input.context?.userId) { console.log(`User ${input.context.userId} joined ${input.stream_name}/${input.group_id}/${input.id}`) } return {}})iii.registerTrigger({ type: 'stream:join', function_id: fn.id, config: {},})
from iii import StreamJoinLeaveEventdef on_join(input: StreamJoinLeaveEvent): if input.context and input.context.get('userId'): print(f"User {input.context['userId']} joined {input.stream_name}/{input.group_id}/{input.id}") return {}iii.register_function("onJoin", on_join)iii.register_trigger({'type': 'stream:join', 'function_id': 'onJoin', 'config': {}})
use iii_sdk::{IIITrigger, StreamJoinLeaveCallRequest, StreamJoinLeaveTriggerConfig};iii.register_function((RegisterFunctionMessage::with_id("onJoin".into()), |input| async move { let event: StreamJoinLeaveCallRequest = serde_json::from_value(input)?; if let Some(user_id) = event.context.as_ref().and_then(|c| c.get("userId")).and_then(|u| u.as_str()) { println!("User {} joined {}/{}/{}", user_id, event.stream_name, event.group_id, event.id.as_deref().unwrap_or("")); } Ok(json!({}))});iii.register_trigger( IIITrigger::StreamJoin(StreamJoinLeaveTriggerConfig::new()).for_function("onJoin"),)?;
from iii import StreamJoinLeaveEventdef require_context(input: StreamJoinLeaveEvent): return input.context is not None and input.context.get('userId') is not Noneiii.register_function("conditions::requireContext", require_context)def on_join(input: StreamJoinLeaveEvent): print('User joined:', input.context.get('userId') if input.context else None, input.stream_name) return {}iii.register_function("onJoin", on_join)iii.register_trigger({ 'type': 'stream:join', 'function_id': 'onJoin', 'config': {'condition_function_id': 'conditions::requireContext'},})
use iii_sdk::{IIITrigger, StreamJoinLeaveCallRequest, StreamJoinLeaveTriggerConfig};iii.register_function((RegisterFunctionMessage::with_id("conditions::requireContext".into()), |input| async move { let event: StreamJoinLeaveCallRequest = serde_json::from_value(input)?; let has_user = event.context .as_ref() .and_then(|c| c.get("userId")) .is_some(); Ok(json!(has_user))});iii.register_function((RegisterFunctionMessage::with_id("onJoin".into()), |input| async move { let event: StreamJoinLeaveCallRequest = serde_json::from_value(input)?; let user_id = event.context.as_ref().and_then(|c| c.get("userId")).and_then(|u| u.as_str()).unwrap_or(""); println!("User joined: {} {}", user_id, event.stream_name); Ok(json!({}))});iii.register_trigger( IIITrigger::StreamJoin(StreamJoinLeaveTriggerConfig::new().condition("conditions::requireContext")) .for_function("onJoin"),)?;