Components¶
This page provides detailed information about each component in the ALPCRUN.CH architecture.
Queue Manager¶
The Queue Manager is the central coordinator of the ALPCRUN.CH system.
Internal Architecture¶
┌─────────────────────────────────────────────┐
│ Queue Manager Process │
│ │
│ ┌──────────────────────────────────────┐ │
│ │ Session Manager │ │
│ │ ┌────────┐ ┌────────┐ ┌────────┐ │ │
│ │ │Session1│ │Session2│ │SessionN│ │ │
│ │ └───┬────┘ └───┬────┘ └───┬────┘ │ │
│ └──────┼───────────┼───────────┼───────┘ │
│ │ │ │ │
│ ┌──────▼───────────▼───────────▼────────┐ │
│ │ Queue Manager Core │ │
│ │ - Task queues (per session) │ │
│ │ - Result queues (per session) │ │
│ │ - Dead letter queues (per session) │ │
│ └──────┬──────────────────────┬─────────┘ │
│ │ │ │
│ ┌──────▼───────┐ ┌───────────▼────────┐ │
│ │ Scheduler │ │ Worker Registry │ │
│ └──────────────┘ └────────────────────┘ │
└─────────────────────────────────────────────┘
Session Management¶
Session Lifecycle:
- Creation:
CreateSessionRPC - Generates unique session ID (ULID)
- Allocates queues based on configuration
- Sets initial idle timeout
-
Returns session to client
-
Active: Client and workers streaming
- Tasks enqueued from client stream
- Workers bound by scheduler
- Results dequeued by client stream
-
Idle timer reset on activity
-
Idle: No activity within idle timeout
- Workers unbound if configured
- Session kept alive for reconnection
-
Can be updated via
UpdateSessionRPC -
Closure:
CloseSessionRPC or cleanup - Workers unbound
- Queues drained (if cleanup enabled)
- Resources released
- Session removed from registry
Queue Implementations¶
ALPCRUN.CH supports multiple queue implementations:
Priority Queue (default):
// Heap-based priority queue
// Tasks sorted by priority (higher first)
// O(log n) enqueue/dequeue
Regular Queue:
// Channel-based FIFO queue
// Simple and fast for single priority
// O(1) operations
Lock-Free Queue:
// CAS-based lock-free queue
// Better for high contention
// Lower latency variance
Speed Queue (testing only):
// Unbuffered queue for benchmarking
// Not suitable for production
Scheduler Logic¶
The scheduler binds workers to sessions based on:
- Session Priority: Higher priority sessions scheduled first
- Task Availability: Sessions with pending tasks
- Worker Limits: Respect min/max workers per session
- Worker Capacity: Available (unbound) workers
- Preemption: Optionally preempt lower priority sessions
Scheduling Algorithm:
1. Sort sessions by priority (descending)
2. For each session with pending tasks:
a. Check if session needs more workers
b. Find available workers
c. Send BIND instruction to worker
d. Add worker to session's worker list
3. Check for idle sessions
a. Send UNBIND to idle workers
b. Remove worker from session's worker list
Worker Registry¶
Tracks all connected workers.
Worker States: - Idle: Connected but not bound to session - Bound: Processing tasks for a session - Disconnected: Stream closed or error
Configuration¶
Key configuration parameters:
appid: "my-app" # Application identifier
loglevel: "info" # Logging level
bufferSize: 1000 # Queue buffer size
queueType: "priority" # Queue implementation
schedulerInterval: "1s" # Scheduler tick rate
workerTimeout: "60s" # Worker disconnect timeout
Environment variables (ALPCRUNCH_ prefix):
ALPCRUNCH_APPID=my-app
ALPCRUNCH_LOGLEVEL=info
ALPCRUNCH_BUFFERSIZE=1000
Central Cache¶
The Central Cache provides centralized storage for session-shared data. It is accessed by the local node caches.
Data Structure¶
type SharedData struct {
SessionId string
UpdateLevel uint32 // Version number
Payload []byte // Actual data
Ttl duration // Time to live
Timestamp time.Time
}
Operations¶
SetSharedData: - Creates new entry or fails if exists - Sets initial update level - Starts TTL countdown
UpdateSharedData: - Updates existing entry - Increments update level - Resets TTL
GetSharedData: - Retrieves data for session and update level - Returns error if not found or version mismatch - Does not reset TTL
DeleteSharedData: - Removes entry from storage - Called by client on session cleanup
TTL Management¶
Cleanup Process:
Every cleanupInterval (default 1s):
1. Iterate through all entries
2. Check if TTL expired
3. Remove expired entries
4. Log cleanup statistics
Node Cache¶
The Node Cache is a local pull-through cache that transparently proxies worker requests to the Central Cache.
Architecture¶
┌──────────────────────────────────────┐
│ Node Cache Process │
│ │
│ ┌────────────────────────────────┐ │
│ │ Local Cache (LRU) │ │
│ │ - Limited size │ │
│ │ - Update level tracking │ │
│ │ - TTL inheritance │ │
│ └────────┬───────────────────────┘ │
│ │ │
│ │ Cache Miss │
│ ▼ │
│ ┌────────────────────────────────┐ │
│ │ Central Cache Proxy │ │
│ │ - Forward on miss │ │
│ │ - Cache response locally │ │
│ └────────────────────────────────┘ │
└──────────────────────────────────────┘
Caching Strategy¶
Get Request Flow:
1. Worker requests shared data from local node cache
2. Node cache checks local cache:
a. Hit: Return cached data immediately
b. Miss: Forward request to central cache
3. Central cache returns data
4. Node cache stores locally and returns to worker
Update Level Handling:
- Cache stores data per (sessionID, updateLevel)
- If worker requests higher update level, always forward to central cache
- Old versions can coexist in cache
Eviction Policy: - LRU (Least Recently Used) - Size-based limits - TTL-based expiration
Workers can just connect to localhost:3337 or via Kubernetes service nodecache:3337 which maps to the local node cache running as a daemon.
Workers¶
Workers are the user-defined and user-written service instances that process the tasks. They can be written in any language that supports gRPC.
Worker Lifecycle¶
┌─────────────────────────────────────────────┐
│ Worker Process Lifecycle │
│ │
│ 1. Connect to Queue Manager │
│ - WorkerUpdate stream │
│ - Receive worker ID │
│ │
│ 2. Idle State │
│ - Send periodic state updates │
│ - Wait for BIND instruction │
│ │
│ 3. Bind to Session │
│ - Receive BIND from queue manager │
│ - Open WorkerStream to session │
│ │
│ 4. Process Tasks │
│ - Receive task batches │
│ - Fetch shared data if needed │
│ - Execute computation │
│ - Send result batches │
│ │
│ 5. Unbind from Session │
│ - Session closes or idle timeout │
│ - Receive UNBIND instruction │
│ - Return to idle state │
│ │
│ 6. Shutdown (optional) │
│ - Close WorkerUpdate stream │
│ - Graceful termination │
└─────────────────────────────────────────────┘
Worker Implementation Pattern¶
Go example:
func main() {
// Connect and get worker ID
updateStream, _ := queueClient.WorkerUpdate(ctx)
mdFromServer, _ := updateStream.Header()
workerID := mdFromServer.Get("workerid")[0]
// Idle loop
for {
// Wait for instruction
inst, _ := updateStream.Recv()
if inst.Type == pb.Instruction_BIND {
// Process session
processSession(inst.SessionId)
}
}
}
func processSession(sessionID string) {
// Open worker stream
stream, _ := queueClient.WorkerStream(
metadata.AppendToOutgoingContext(ctx, "sessionid", sessionID))
// Process batches
for {
batch, err := stream.Recv()
if err != nil {
break // Session closed
}
// Process tasks
results := processBatch(batch)
// Send results
stream.Send(results)
}
}
Worker Metadata¶
Required gRPC metadata headers:
md := metadata.New(map[string]string{
"api_key": os.Getenv("METALCORE_API_KEY"),
"node": os.Getenv("METALCORE_WORKER_NODE"),
"pod": os.Getenv("METALCORE_WORKER_POD"),
})
ctx := metadata.NewOutgoingContext(context.Background(), md)
Worker Scaling¶
Workers are fully stateless and can be scaled automatically based on node utilization and queue depth:
Auto-scaling:
spec:
minReplicas: 10
maxReplicas: 1000
metrics:
- type: External
external:
metric:
name: alpcrun_queue_depth
target:
type: AverageValue
averageValue: "100"
Communication Patterns¶
gRPC Streaming¶
All services use bidirectional streaming for efficiency:
Client Stream:
// sends task batches and receives result batches
rpc ClientStream(stream Batch) returns (stream Batch)
Benefits: - Single connection for all tasks/results - Natural flow control via stream backpressure - Reduced connection overhead - Lower latency
Worker Stream:
// receives task batches to process and sends back result batches
rpc WorkerStream(stream Batch) returns (stream Batch)
Same benefits as client stream, plus: - Session affinity via metadata - Automatic task distribution - Result collection
Metadata Propagation¶
gRPC metadata used for:
Authentication:
api_key: secret-key
Session Context:
sessionid: 01HZXY...
Worker Identity:
workerid: worker-123
node: node-1
pod: worker-pod-abc
Next Steps¶
- Data Flow: Detailed data flow diagrams
- API Reference: Complete API documentation
- Configuration Reference: All configuration options