Event System¶
FuncNodes uses an event-driven architecture where components communicate through events. This document describes the event system and all available events.
Overview¶
flowchart LR
subgraph EventFlow["Event Flow"]
Emitter["Emitter"]
Queue["Event Queue"]
subgraph Listeners["Listeners"]
Internal["Internal Handler"]
WS["WebSocket Relay"]
Custom["Custom Handler"]
end
UI["🖥️ UI"]
end
Emitter -->|"emit()"| Queue
Queue -->|dispatch| Internal
Queue -->|dispatch| WS
Queue -->|dispatch| Custom
WS --> UI
EventEmitter Base¶
All event-emitting components inherit from EventEmitter:
from funcnodes_core.eventmanager import EventEmitter
class MyComponent(EventEmitter):
def do_something(self):
# ... do work ...
self.emit("something_done", result=42)
# Subscribe to events
component = MyComponent()
component.on("something_done", lambda result: print(f"Got {result}"))
# Unsubscribe
component.off("something_done", handler)
# One-time listener
component.once("something_done", handler)
API¶
| Method | Description |
|---|---|
on(event, callback) |
Subscribe to event |
off(event, callback) |
Unsubscribe from event |
once(event, callback) |
Subscribe for single event |
emit(event, *args, **kwargs) |
Emit event to listeners |
listeners(event) |
Get all listeners for event |
remove_all_listeners(event) |
Remove all listeners |
Node Events¶
Trigger Events¶
Events related to node execution:
before_trigger¶
Emitted before node execution begins:
Use cases:
- Validation before execution
- Logging/profiling start
after_trigger¶
Emitted after node execution completes:
Use cases:
- Cleanup operations
- Logging/profiling end
- Cascading updates
trigger_error¶
Emitted when node execution fails:
Payload:
error: Exception that was raisedtraceback: Full traceback string
Progress Events¶
progress¶
Emitted during long-running operations:
Payload:
progress: Float 0.0 to 1.0message: Optional status message
Emitting progress:
class MyNode(fn.Node):
async def func(self, data):
for i, item in enumerate(self.progress(data)):
process(item)
# Progress automatically emitted
IO Events¶
NodeInput Events¶
after_set_value¶
Emitted when an input value changes:
When emitted:
- Manual value set via
input.set_value() - Value received from connected output
- UI sets value via RPC
Options:
emit_value_set=Falsesuppresses this eventdoes_trigger=Falseprevents node triggering
value_options_changed¶
Emitted when value constraints change:
Payload: New value_options dict
render_options_changed¶
Emitted when render hints change:
Payload: New render_options dict
NodeOutput Events¶
value_changed¶
Emitted when output value changes:
Triggered by:
- Setting
output.value = x - Return value from decorated function
Connection Events¶
connected¶
Emitted when an IO is connected:
disconnected¶
Emitted when an IO is disconnected:
NodeSpace Events¶
Node Lifecycle¶
node_added¶
Emitted when a node is added to the graph:
Payload: Node instance
node_removed¶
Emitted when a node is removed:
Payload: Node UUID string
Edge Lifecycle¶
edge_added¶
Emitted when a connection is created:
Payload:
src: Tuple of (node_uuid, io_id)dst: Tuple of (node_uuid, io_id)
edge_removed¶
Emitted when a connection is removed:
Execution Events¶
node_triggered¶
Emitted when a node starts execution:
node_done¶
Emitted when a node completes execution:
node_error¶
Emitted when a node execution fails:
Payload:
uuid: Node UUIDerror: Exception messagetraceback: Full traceback
State Events¶
cleared¶
Emitted when nodespace is cleared:
loaded¶
Emitted after loading state:
Worker Events¶
Lifecycle¶
starting¶
Emitted when worker begins startup:
ready¶
Emitted when worker is fully initialized:
stopping¶
Emitted when worker begins shutdown:
stopped¶
Emitted after worker cleanup:
Client Events¶
client_connected¶
Emitted when a WebSocket client connects:
client_disconnected¶
Emitted when a client disconnects:
Event Propagation¶
Trigger Cascade¶
When an input value changes, events cascade through the graph:
flowchart TD
SetValue["Input.set_value()"]
EmitAfterSet["emit('after_set_value')"]
RequestTrigger["Node.request_trigger()"]
EmitBefore["emit('before_trigger')"]
AwaitFunc["await func()"]
SetOutput["Output.value = result"]
EmitValueChanged["emit('value_changed')"]
ConnectedInputs["Connected inputs..."]
Cascade["(cascade continues)"]
EmitAfter["emit('after_trigger')"]
SetValue --> EmitAfterSet
SetValue --> RequestTrigger
RequestTrigger --> EmitBefore
RequestTrigger --> AwaitFunc
RequestTrigger --> EmitAfter
AwaitFunc --> SetOutput
SetOutput --> EmitValueChanged
EmitValueChanged --> ConnectedInputs
ConnectedInputs -.-> Cascade
Event Relay to UI¶
The worker relays nodespace events to connected WebSocket clients:
# Internal setup (simplified)
def setup_event_relay(nodespace, worker):
def relay_to_clients(event_name):
def handler(*args, **kwargs):
worker.broadcast({
"type": "nodespaceevent",
"event": event_name,
"data": serialize_event_data(args, kwargs)
})
return handler
for event in RELAYED_EVENTS:
nodespace.on(event, relay_to_clients(event))
Relayed events:
node_added,node_removededge_added,edge_removednode_triggered,node_done,node_errorio_value_changednode_progress
Custom Event Handlers¶
Subscribing in Nodes¶
Class-based nodes can subscribe to their own events:
class MyNode(fn.Node):
node_id = "my_node"
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.on("before_trigger", self._log_start)
self.on("after_trigger", self._log_end)
def _log_start(self):
print(f"Node {self.uuid} starting")
def _log_end(self):
print(f"Node {self.uuid} finished")
NodeSpace Event Hooks¶
Workers can add custom handlers:
class MyWorker(WSWorker):
async def setup(self):
await super().setup()
self.nodespace.on("node_error", self._handle_error)
def _handle_error(self, uuid, error, traceback):
# Custom error handling
send_alert(f"Node {uuid} failed: {error}")
Dynamic IO Updates¶
@update_other_io_options¶
Decorator to rebuild dropdown options when an input changes:
from funcnodes_core.io_hooks import update_other_io_options
@fn.NodeDecorator(node_id="column_selector")
@update_other_io_options("column", modifier=lambda df: df.columns.tolist())
def select_column(df: pd.DataFrame, column: str) -> pd.Series:
return df[column]
How it works:
- When
dfinput changes, modifier is called - Result becomes
column.value_options["options"] value_options_changedevent fires- UI updates dropdown
@update_other_io_value_options¶
Decorator to update numeric constraints:
from funcnodes_core.io_hooks import update_other_io_value_options
@fn.NodeDecorator(node_id="list_get")
@update_other_io_value_options("index", options_generator=lambda lst: {
"min": 0,
"max": len(lst) - 1 if lst else 0
})
def list_get(lst: list, index: int) -> Any:
return lst[index]
Event Best Practices¶
Do¶
✅ Use events for loose coupling between components
✅ Keep event handlers fast (offload heavy work)
✅ Clean up listeners when components are destroyed
✅ Use once() for one-time setup handlers
✅ Document custom events in node docstrings
Don't¶
❌ Create circular event chains (A→B→A)
❌ Rely on event ordering between different emitters
❌ Block in event handlers (use async if needed)
❌ Emit events during __init__ (object not ready)
❌ Store sensitive data in event payloads
See Also¶
- Architecture Overview — System diagram
- Core Components — Event emitter implementation
- Message Protocol — WebSocket event relay
- Inputs & Outputs — IO events