Worker Components (funcnodes_worker)¶
The funcnodes_worker package provides the runtime environment for executing FuncNodes graphs. This document explains the worker architecture and its components.
Package Structure¶
funcnodes_worker/
├── __init__.py # Public exports
├── worker.py # WSWorker class
├── websocket.py # WebSocket server and RPC handling
├── loop.py # Runtime loops (save, trigger, heartbeat)
├── external_worker.py # External worker base class
└── config.py # Worker configuration
Worker Architecture¶
flowchart TB
subgraph WSWorker["WSWorker"]
subgraph EventLoop["Event Loop (asyncio)"]
NSLoop["NodeSpaceLoop<br/>(5ms tick)"]
SaveLoop["SaveLoop<br/>(1s tick)"]
HeartLoop["HeartbeatLoop<br/>(optional)"]
subgraph WS["WebSocket Server"]
RPC["RPC command handling"]
Broadcast["Event broadcasting"]
HTTP["Large message HTTP fallback"]
end
end
subgraph NS["NodeSpace"]
Nodes["Nodes<br/>(graph)"]
Edges["Edges<br/>(connections)"]
Library["Library<br/>(shelves)"]
end
subgraph FS["File System (data_path/)"]
NSJson["nodespace.json"]
Files["files/"]
Scripts["local_scripts/"]
Log["worker.log"]
end
end
WSWorker Class¶
The main worker class that orchestrates everything:
class WSWorker:
"""WebSocket-based FuncNodes worker."""
# Identity
uuid: str # Unique identifier
name: str # Human-readable name
# Paths
data_path: Path # Worker data directory
env_path: Optional[Path] # Virtualenv path
# Components
nodespace: NodeSpace # Graph container
lib: Library # Node registry
# Server
host: str # WebSocket host
port: int # WebSocket port
# State
_running: bool # Is worker active?
_clients: Set[WebSocket] # Connected clients
# Loops
_nodespace_loop: NodeSpaceLoop
_save_loop: SaveLoop
_heartbeat_loop: Optional[HeartbeatLoop]
Worker Lifecycle¶
stateDiagram-v2
[*] --> Created: config.json written
Created --> Starting: start()
state Starting {
[*] --> LoadLibs: Load libs
LoadLibs --> LoadState: Load state
LoadState --> StartWS: Start WS
}
Starting --> Running
state Running {
[*] --> Active
Active: • Accepting WebSocket connections
Active: • Processing RPC commands
Active: • Executing node triggers
Active: • Saving state periodically
}
Running --> Stopping: stop()
state Stopping {
[*] --> SaveState: Save state
SaveState --> CloseWS: Close WS
CloseWS --> Cleanup: Cleanup
}
Stopping --> Stopped
Stopped --> [*]: PID file removed
Runtime Loops¶
NodeSpaceLoop¶
Processes pending node triggers:
class NodeSpaceLoop:
"""Drains the node trigger queue."""
interval: float = 0.005 # 5ms default
async def run(self):
while self._running:
# Wait for any pending triggers to complete
await self.nodespace.await_done()
await asyncio.sleep(self.interval)
Purpose:
- Ensures async node execution completes
- Prevents event loop starvation
- Configurable tick rate for responsiveness vs CPU
SaveLoop¶
Persists state to disk:
class SaveLoop:
"""Periodically saves worker state."""
interval: float = 1.0 # 1 second default
async def run(self):
while self._running:
if self._save_requested:
await self._save_state()
self._save_requested = False
await asyncio.sleep(self.interval)
def request_save(self):
"""Mark that a save is needed."""
self._save_requested = True
Saved files:
nodespace.json— Graph stateworker_<uuid>.p— PID file (liveness indicator)worker_<uuid>.runstate— Human-readable status
HeartbeatLoop (Optional)¶
Enforces client connectivity:
class HeartbeatLoop:
"""Stops worker if no heartbeat received."""
timeout: float # From worker config
async def run(self):
while self._running:
if time.time() - self._last_heartbeat > self.timeout:
logger.warning("Heartbeat timeout, stopping worker")
await self.worker.stop()
await asyncio.sleep(1.0)
def heartbeat(self):
"""Called when client sends heartbeat."""
self._last_heartbeat = time.time()
Use case: Auto-stop workers when UI disconnects (optional feature).
WebSocket Server¶
Connection Handling¶
async def websocket_handler(request):
ws = web.WebSocketResponse()
await ws.prepare(request)
worker._clients.add(ws)
try:
async for msg in ws:
if msg.type == WSMsgType.TEXT:
await handle_message(ws, msg.data)
elif msg.type == WSMsgType.BINARY:
await handle_binary(ws, msg.data)
finally:
worker._clients.discard(ws)
return ws
RPC Dispatch¶
async def handle_message(ws, data):
msg = json.loads(data)
if msg["type"] == "ping":
await ws.send_json({"type": "pong"})
elif msg["type"] == "cmd":
cmd = msg["cmd"]
kwargs = msg.get("kwargs", {})
handler = COMMAND_HANDLERS.get(cmd)
if handler:
try:
result = await handler(worker, **kwargs)
await ws.send_json({
"type": "result",
"cmd": cmd,
"result": result
})
except Exception as e:
await ws.send_json({
"type": "error",
"cmd": cmd,
"error": str(e)
})
Event Broadcasting¶
def broadcast_event(event_type: str, data: dict):
"""Send event to all connected clients."""
message = {
"type": "nodespaceevent",
"event": event_type,
"data": data
}
for client in worker._clients:
asyncio.create_task(client.send_json(message))
Large Message Handling¶
Messages exceeding MESSAGE_SIZE_BEFORE_REQUEST (default 1MB) use HTTP fallback:
┌──────────┐ ┌──────────┐
│ Client │ │ Worker │
└────┬─────┘ └────┬─────┘
│ │
│ 1. WS: request full_state │
│ ───────────────────────────────────►│
│ │
│ 2. Worker serializes (>1MB) │
│ │
│ 3. WS: {"type": "large_message", │
│ "msg_id": "abc123"} │
│◄─────────────────────────────────── │
│ │
│ 4. HTTP GET /message/abc123 │
│ ───────────────────────────────────►│
│ │
│ 5. HTTP Response (full JSON) │
│◄─────────────────────────────────── │
│ │
# Server side
async def send_large_message(ws, data):
msg_id = str(uuid4())
_pending_messages[msg_id] = data
await ws.send_json({
"type": "large_message",
"msg_id": msg_id
})
# HTTP endpoint
async def get_message(request):
msg_id = request.match_info["msg_id"]
data = _pending_messages.pop(msg_id)
return web.json_response(data)
File Upload Handling¶
Uploads are received via HTTP POST and stored in files/:
async def upload_handler(request):
reader = await request.multipart()
async for part in reader:
filename = part.filename
# Security: sanitize filename, prevent path traversal
safe_name = secure_filename(filename)
target_path = worker.files_dir / safe_name
with open(target_path, 'wb') as f:
while chunk := await part.read_chunk():
f.write(chunk)
return web.json_response({"success": True})
Security measures:
- Filename sanitization
- Path traversal prevention (no
..) - Files constrained to
files/directory - Size limits at proxy layer (recommended)
Worker Configuration¶
WorkerConfig¶
@dataclass
class WorkerConfig:
uuid: str
name: str
# Paths
data_path: Path
env_path: Optional[Path]
python_path: Optional[Path]
# Network
host: str = "localhost"
port: int = 9382
ssl: bool = False
# Behavior
update_on_startup: Dict[str, bool] = field(default_factory=dict)
required_heartbeat: Optional[float] = None
# Dependencies
package_dependencies: Dict[str, PackageDependency] = field(default_factory=dict)
worker_dependencies: Dict[str, Any] = field(default_factory=dict)
# Type
workertype: str = "WSWorker"
Config File Location¶
Example Config¶
{
"uuid": "abc123",
"name": "my-workflow",
"data_path": "~/.funcnodes/workers/worker_my-workflow",
"env_path": "~/.funcnodes/workers/worker_my-workflow/.venv",
"host": "localhost",
"port": 9382,
"ssl": false,
"update_on_startup": {
"funcnodes": true,
"funcnodes-core": true
},
"package_dependencies": {
"funcnodes-numpy": {
"package": "funcnodes-numpy",
"version": ">=0.2.0"
}
},
"workertype": "WSWorker"
}
External Workers¶
Custom worker types can be created by subclassing:
from funcnodes_worker import ExternalWorker
class MyCustomWorker(ExternalWorker):
"""Custom worker with additional capabilities."""
worker_type = "my_custom_worker"
async def setup(self):
"""Called during worker initialization."""
await super().setup()
# Custom setup logic
async def handle_custom_command(self, **kwargs):
"""Custom RPC command."""
return {"custom": "result"}
Register via entry point:
Process Isolation¶
Separate Thread¶
For I/O-bound or blocking operations:
@fn.NodeDecorator(
node_id="heavy_io",
separate_thread=True # Run in ThreadPoolExecutor
)
def heavy_io_operation(data: bytes) -> bytes:
# This won't block the event loop
return process_data(data)
Separate Process¶
For CPU-bound operations:
@fn.NodeDecorator(
node_id="cpu_intensive",
separate_process=True # Run in ProcessPoolExecutor
)
def cpu_intensive_task(data: list) -> list:
# This runs in a separate process
return [heavy_computation(x) for x in data]
Note: separate_process has limitations:
- Arguments must be picklable
- No access to node state during execution
- Higher overhead than threads
Subprocess Monitor Integration¶
For long-running external processes:
# Worker config
{
"subprocess_monitor": {
"host": "localhost",
"port": 8765
}
}
# Usage in node
async def run_external_tool(cmd: str):
async with subprocess_monitor.spawn(cmd) as proc:
async for line in proc.stdout:
yield line # Stream output
Logging¶
Per-Worker Logs¶
Each worker has its own rotating log file:
Log Configuration¶
# Rotating file handler
handler = RotatingFileHandler(
log_path,
maxBytes=100_000, # 100KB per file
backupCount=5 # Keep 5 backups
)
# Format
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
Log Levels¶
| Level | Usage |
|---|---|
| DEBUG | Detailed execution flow |
| INFO | Startup, shutdown, major events |
| WARNING | Recoverable issues |
| ERROR | Failures, exceptions |
See Also¶
- Architecture Overview — System-level view
- Core Components —
funcnodes_coreinternals - Message Protocol — RPC reference
- Worker Configuration — User guide