Components¶
This page provides detailed information about each component in the ALPCRUN.CH architecture.
Queue Manager¶
The Queue Manager is the central coordinator of the ALPCRUN.CH system.
Internal Architecture¶
┌─────────────────────────────────────────────┐
│ Queue Manager Process │
│ │
│ ┌──────────────────────────────────────┐ │
│ │ Session Manager │ │
│ │ ┌────────┐ ┌────────┐ ┌────────┐ │ │
│ │ │Session1│ │Session2│ │SessionN│ │ │
│ │ └───┬────┘ └───┬────┘ └───┬────┘ │ │
│ └──────┼───────────┼───────────┼───────┘ │
│ │ │ │ │
│ ┌──────▼───────────▼───────────▼────────┐ │
│ │ Queue Manager Core │ │
│ │ - Task queues (per session) │ │
│ │ - Result queues (per session) │ │
│ │ - Dead letter queues (per session) │ │
│ └──────┬───────────┬──────────┬─────────┘ │
│ │ │ │ │
│ ┌──────▼───────┐ ┌▼──────────▼────────┐ │
│ │ Scheduler │ │ Worker Registry │ │
│ └──────────────┘ └────────────────────┘ │
└─────────────────────────────────────────────┘
Session Management¶
QSession Structure:
type QSession struct {
Session *pb.Session
TaskQueue Queue
ResultQueue Queue
DeadLetterQueue Queue
Workers []string // Bound worker IDs
IdleTimer time.Time
Closed bool
}
Session Lifecycle:
- Creation:
CreateSessionRPC - Generates unique session ID (ULID)
- Allocates queues based on configuration
- Sets initial idle timeout
-
Returns session to client
-
Active: Client and workers streaming
- Tasks enqueued from client stream
- Workers bound by scheduler
- Results dequeued by client stream
-
Idle timer reset on activity
-
Idle: No activity within idle timeout
- Workers unbound if configured
- Session kept alive for reconnection
-
Can be updated via
UpdateSessionRPC -
Closure:
CloseSessionRPC or cleanup - Workers unbound
- Queues drained (if cleanup enabled)
- Resources released
- Session removed from registry
Queue Implementations¶
ALPCRUN.CH supports multiple queue implementations:
Priority Queue (default):
// Heap-based priority queue
// Tasks sorted by priority (higher first)
// O(log n) enqueue/dequeue
Regular Queue:
// Channel-based FIFO queue
// Simple and fast for single priority
// O(1) operations
Lock-Free Queue:
// CAS-based lock-free queue
// Better for high contention
// Lower latency variance
Speed Queue (testing only):
// Unbuffered queue for benchmarking
// Not suitable for production
Scheduler Logic¶
The scheduler binds workers to sessions based on:
- Session Priority: Higher priority sessions scheduled first
- Task Availability: Sessions with pending tasks
- Worker Limits: Respect min/max workers per session
- Worker Capacity: Available (unbound) workers
- Preemption: Optionally preempt lower priority sessions
Scheduling Algorithm:
1. Sort sessions by priority (descending)
2. For each session with pending tasks:
a. Check if session needs more workers
b. Find available workers
c. Send BIND instruction to worker
d. Add worker to session's worker list
3. Check for idle sessions
a. Send UNBIND to idle workers
b. Remove worker from session's worker list
Worker Registry¶
Tracks all connected workers:
type WorkerInfo struct {
ID string
Node string // Kubernetes node
Pod string // Pod name
BoundSession string // Empty if idle
LastSeen time.Time
Stream WorkerUpdateStream
}
Worker States: - Idle: Connected but not bound to session - Bound: Processing tasks for a session - Disconnected: Stream closed or error
Configuration¶
Key configuration parameters:
appid: "my-app" # Application identifier
loglevel: "info" # Logging level
bufferSize: 1000 # Queue buffer size
queueType: "priority" # Queue implementation
schedulerInterval: "1s" # Scheduler tick rate
workerTimeout: "60s" # Worker disconnect timeout
Environment variables (ALPCRUNCH_ prefix):
ALPCRUNCH_APPID=my-app
ALPCRUNCH_LOGLEVEL=info
ALPCRUNCH_BUFFERSIZE=1000
Central Cache¶
The Central Cache provides centralized storage for session-shared data.
Internal Architecture¶
┌─────────────────────────────────────┐
│ Central Cache Process │
│ │
│ ┌───────────────────────────────┐ │
│ │ In-Memory Storage │ │
│ │ │ │
│ │ map[sessionID]SharedData │ │
│ │ - Payload (bytes) │ │
│ │ - UpdateLevel (version) │ │
│ │ - TTL (expiration) │ │
│ │ - Timestamp │ │
│ └───────────────────────────────┘ │
│ │
│ ┌───────────────────────────────┐ │
│ │ Cleanup Goroutine │ │
│ │ - Periodic expiration check │ │
│ │ - Remove stale entries │ │
│ └───────────────────────────────┘ │
└─────────────────────────────────────┘
Data Structure¶
type SharedData struct {
SessionId string
UpdateLevel uint32 // Version number
Payload []byte // Actual data
Ttl duration // Time to live
Timestamp time.Time
}
Operations¶
SetSharedData: - Creates new entry or fails if exists - Sets initial update level - Starts TTL countdown
UpdateSharedData: - Updates existing entry - Increments update level - Resets TTL
GetSharedData: - Retrieves data for session and update level - Returns error if not found or version mismatch - Does not reset TTL
DeleteSharedData: - Removes entry from storage - Called by client on session cleanup
TTL Management¶
Cleanup Process:
Every cleanupInterval (default 1s):
1. Iterate through all entries
2. Check if TTL expired
3. Remove expired entries
4. Log cleanup statistics
Configuration:
maxTTL: "600s" # Maximum allowed TTL
cleanupInterval: "1s" # Cleanup frequency
Concurrency¶
Thread-safe implementation: - Mutex-protected map access - Atomic update level increments - Safe for concurrent clients
Node Cache¶
The Node Cache is a local pull-through cache that proxies requests to the Central Cache.
Architecture¶
┌──────────────────────────────────────┐
│ Node Cache Process │
│ │
│ ┌────────────────────────────────┐ │
│ │ Local Cache (LRU) │ │
│ │ - Limited size │ │
│ │ - Update level tracking │ │
│ │ - TTL inheritance │ │
│ └────────┬───────────────────────┘ │
│ │ │
│ │ Cache Miss │
│ ▼ │
│ ┌────────────────────────────────┐ │
│ │ Central Cache Proxy │ │
│ │ - Forward on miss │ │
│ │ - Cache response locally │ │
│ └────────────────────────────────┘ │
└──────────────────────────────────────┘
Caching Strategy¶
Get Request Flow:
1. Worker requests shared data from node cache
2. Node cache checks local cache:
a. Hit: Return cached data immediately
b. Miss: Forward request to central cache
3. Central cache returns data
4. Node cache stores locally and returns to worker
Update Level Handling: - Cache stores data per (sessionID, updateLevel) - If worker requests higher update level, always forward to central cache - Old versions can coexist in cache
Eviction Policy: - LRU (Least Recently Used) - Size-based limits - TTL-based expiration
Configuration¶
cacheSize: "1GB" # Maximum cache size
centralCacheAddr: "central-cache:2337"
tls: true
caCert: "/certs/ca.crt"
Deployment Pattern¶
Typical Kubernetes deployment:
apiVersion: apps/v1
kind: DaemonSet
metadata:
name: node-cache
spec:
selector:
matchLabels:
app: node-cache
template:
spec:
containers:
- name: node-cache
image: alpcrun/node-cache:latest
ports:
- containerPort: 3337
env:
- name: METALCORE_CENTRAL_CACHE_ADDR
value: "central-cache:2337"
Workers connect to localhost:3337 or via Kubernetes service.
Workers¶
Workers are user-defined services that process tasks.
Worker Lifecycle¶
┌──────────────────────────────────────────────┐
│ Worker Process Lifecycle │
│ │
│ 1. Connect to Queue Manager │
│ - WorkerUpdate stream │
│ - Receive worker ID │
│ │
│ 2. Idle State │
│ - Send periodic state updates │
│ - Wait for BIND instruction │
│ │
│ 3. Bind to Session │
│ - Receive BIND from queue manager │
│ - Open WorkerStream to session │
│ │
│ 4. Process Tasks │
│ - Receive task batches │
│ - Fetch shared data if needed │
│ - Execute computation │
│ - Send result batches │
│ │
│ 5. Unbind from Session │
│ - Session closes or idle timeout │
│ - Receive UNBIND instruction │
│ - Return to idle state │
│ │
│ 6. Shutdown (optional) │
│ - Close WorkerUpdate stream │
│ - Graceful termination │
└──────────────────────────────────────────────┘
Worker Implementation Pattern¶
func main() {
// Connect and get worker ID
updateStream, _ := queueClient.WorkerUpdate(ctx)
mdFromServer, _ := updateStream.Header()
workerID := mdFromServer.Get("workerid")[0]
// Idle loop
for {
// Wait for instruction
inst, _ := updateStream.Recv()
if inst.Type == pb.Instruction_BIND {
// Process session
processSession(inst.SessionId)
}
}
}
func processSession(sessionID string) {
// Open worker stream
stream, _ := queueClient.WorkerStream(
metadata.AppendToOutgoingContext(ctx, "sessionid", sessionID))
// Process batches
for {
batch, err := stream.Recv()
if err != nil {
break // Session closed
}
// Process tasks
results := processBatch(batch)
// Send results
stream.Send(results)
}
}
Worker Metadata¶
Required gRPC metadata headers:
md := metadata.New(map[string]string{
"api_key": os.Getenv("METALCORE_API_KEY"),
"node": os.Getenv("METALCORE_WORKER_NODE"),
"pod": os.Getenv("METALCORE_WORKER_POD"),
})
ctx := metadata.NewOutgoingContext(context.Background(), md)
Worker Scaling¶
Workers are fully stateless and can be scaled:
Kubernetes Deployment:
apiVersion: apps/v1
kind: Deployment
metadata:
name: worker
spec:
replicas: 100 # Scale as needed
template:
spec:
containers:
- name: worker
image: myorg/my-worker:latest
env:
- name: METALCORE_WORKER_NODE
valueFrom:
fieldRef:
fieldPath: spec.nodeName
- name: METALCORE_WORKER_POD
valueFrom:
fieldRef:
fieldPath: metadata.name
Auto-scaling:
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: worker-hpa
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: worker
minReplicas: 10
maxReplicas: 1000
metrics:
- type: External
external:
metric:
name: alpcrun_queue_depth
target:
type: AverageValue
averageValue: "100"
Communication Patterns¶
gRPC Streaming¶
All services use bidirectional streaming for efficiency:
Client Stream:
rpc ClientStream(stream Batch) returns (stream Batch)
Benefits: - Single connection for all tasks/results - Natural flow control via stream backpressure - Reduced connection overhead - Lower latency
Worker Stream:
rpc WorkerStream(stream Batch) returns (stream Batch)
Same benefits as client stream, plus: - Session affinity via metadata - Automatic task distribution - Result collection
Metadata Propagation¶
gRPC metadata used for:
Authentication:
api_key: secret-key
Session Context:
sessionid: 01HZXY...
Worker Identity:
workerid: worker-123
node: node-1
pod: worker-pod-abc
Next Steps¶
- Data Flow: Detailed data flow diagrams
- API Reference: Complete API documentation
- Configuration Reference: All configuration options