Skip to content

Components

This page provides detailed information about each component in the ALPCRUN.CH architecture.

Queue Manager

The Queue Manager is the central coordinator of the ALPCRUN.CH system.

Internal Architecture

┌─────────────────────────────────────────────┐
│          Queue Manager Process              │
│                                             │
│  ┌──────────────────────────────────────┐   │
│  │     Session Manager                  │   │
│  │  ┌────────┐  ┌────────┐  ┌────────┐  │   │
│  │  │Session1│  │Session2│  │SessionN│  │   │
│  │  └───┬────┘  └───┬────┘  └───┬────┘  │   │
│  └──────┼───────────┼───────────┼───────┘   │
│         │           │           │           │
│  ┌──────▼───────────▼───────────▼────────┐  │
│  │         Queue Manager Core            │  │
│  │  - Task queues (per session)          │  │
│  │  - Result queues (per session)        │  │
│  │  - Dead letter queues (per session)   │  │
│  └──────┬───────────┬──────────┬─────────┘  │
│         │           │          │            │
│  ┌──────▼───────┐  ┌▼──────────▼────────┐   │
│  │  Scheduler   │  │  Worker Registry   │   │
│  └──────────────┘  └────────────────────┘   │
└─────────────────────────────────────────────┘

Session Management

QSession Structure:

type QSession struct {
    Session        *pb.Session
    TaskQueue      Queue
    ResultQueue    Queue
    DeadLetterQueue Queue
    Workers        []string  // Bound worker IDs
    IdleTimer      time.Time
    Closed         bool
}

Session Lifecycle:

  1. Creation: CreateSession RPC
  2. Generates unique session ID (ULID)
  3. Allocates queues based on configuration
  4. Sets initial idle timeout
  5. Returns session to client

  6. Active: Client and workers streaming

  7. Tasks enqueued from client stream
  8. Workers bound by scheduler
  9. Results dequeued by client stream
  10. Idle timer reset on activity

  11. Idle: No activity within idle timeout

  12. Workers unbound if configured
  13. Session kept alive for reconnection
  14. Can be updated via UpdateSession RPC

  15. Closure: CloseSession RPC or cleanup

  16. Workers unbound
  17. Queues drained (if cleanup enabled)
  18. Resources released
  19. Session removed from registry

Queue Implementations

ALPCRUN.CH supports multiple queue implementations:

Priority Queue (default):

// Heap-based priority queue
// Tasks sorted by priority (higher first)
// O(log n) enqueue/dequeue

Regular Queue:

// Channel-based FIFO queue
// Simple and fast for single priority
// O(1) operations

Lock-Free Queue:

// CAS-based lock-free queue
// Better for high contention
// Lower latency variance

Speed Queue (testing only):

// Unbuffered queue for benchmarking
// Not suitable for production

Scheduler Logic

The scheduler binds workers to sessions based on:

  1. Session Priority: Higher priority sessions scheduled first
  2. Task Availability: Sessions with pending tasks
  3. Worker Limits: Respect min/max workers per session
  4. Worker Capacity: Available (unbound) workers
  5. Preemption: Optionally preempt lower priority sessions

Scheduling Algorithm:

1. Sort sessions by priority (descending)
2. For each session with pending tasks:
   a. Check if session needs more workers
   b. Find available workers
   c. Send BIND instruction to worker
   d. Add worker to session's worker list
3. Check for idle sessions
   a. Send UNBIND to idle workers
   b. Remove worker from session's worker list

Worker Registry

Tracks all connected workers:

type WorkerInfo struct {
    ID           string
    Node         string  // Kubernetes node
    Pod          string  // Pod name
    BoundSession string  // Empty if idle
    LastSeen     time.Time
    Stream       WorkerUpdateStream
}

Worker States: - Idle: Connected but not bound to session - Bound: Processing tasks for a session - Disconnected: Stream closed or error

Configuration

Key configuration parameters:

appid: "my-app"              # Application identifier
loglevel: "info"             # Logging level
bufferSize: 1000             # Queue buffer size
queueType: "priority"        # Queue implementation
schedulerInterval: "1s"      # Scheduler tick rate
workerTimeout: "60s"         # Worker disconnect timeout

Environment variables (ALPCRUNCH_ prefix):

ALPCRUNCH_APPID=my-app
ALPCRUNCH_LOGLEVEL=info
ALPCRUNCH_BUFFERSIZE=1000

Central Cache

The Central Cache provides centralized storage for session-shared data.

Internal Architecture

┌─────────────────────────────────────┐
      Central Cache Process          
                                     
  ┌───────────────────────────────┐  
     In-Memory Storage              
                                    
     map[sessionID]SharedData       
     - Payload (bytes)              
     - UpdateLevel (version)        
     - TTL (expiration)             
     - Timestamp                    
  └───────────────────────────────┘  
                                     
  ┌───────────────────────────────┐  
     Cleanup Goroutine              
     - Periodic expiration check    
     - Remove stale entries         
  └───────────────────────────────┘  
└─────────────────────────────────────┘

Data Structure

type SharedData struct {
    SessionId   string
    UpdateLevel uint32   // Version number
    Payload     []byte   // Actual data
    Ttl         duration // Time to live
    Timestamp   time.Time
}

Operations

SetSharedData: - Creates new entry or fails if exists - Sets initial update level - Starts TTL countdown

UpdateSharedData: - Updates existing entry - Increments update level - Resets TTL

GetSharedData: - Retrieves data for session and update level - Returns error if not found or version mismatch - Does not reset TTL

DeleteSharedData: - Removes entry from storage - Called by client on session cleanup

TTL Management

Cleanup Process:

Every cleanupInterval (default 1s):
1. Iterate through all entries
2. Check if TTL expired
3. Remove expired entries
4. Log cleanup statistics

Configuration:

maxTTL: "600s"           # Maximum allowed TTL
cleanupInterval: "1s"    # Cleanup frequency

Concurrency

Thread-safe implementation: - Mutex-protected map access - Atomic update level increments - Safe for concurrent clients

Node Cache

The Node Cache is a local pull-through cache that proxies requests to the Central Cache.

Architecture

┌──────────────────────────────────────┐
│      Node Cache Process              │
│                                      │
│  ┌────────────────────────────────┐  │
│  │   Local Cache (LRU)            │  │
│  │   - Limited size               │  │
│  │   - Update level tracking      │  │
│  │   - TTL inheritance            │  │
│  └────────┬───────────────────────┘  │
│           │                          │
│           │  Cache Miss              │
│           ▼                          │
│  ┌────────────────────────────────┐  │
│  │   Central Cache Proxy          │  │
│  │   - Forward on miss            │  │
│  │   - Cache response locally     │  │
│  └────────────────────────────────┘  │
└──────────────────────────────────────┘

Caching Strategy

Get Request Flow:

1. Worker requests shared data from node cache
2. Node cache checks local cache:
   a. Hit: Return cached data immediately
   b. Miss: Forward request to central cache
3. Central cache returns data
4. Node cache stores locally and returns to worker

Update Level Handling: - Cache stores data per (sessionID, updateLevel) - If worker requests higher update level, always forward to central cache - Old versions can coexist in cache

Eviction Policy: - LRU (Least Recently Used) - Size-based limits - TTL-based expiration

Configuration

cacheSize: "1GB"            # Maximum cache size
centralCacheAddr: "central-cache:2337"
tls: true
caCert: "/certs/ca.crt"

Deployment Pattern

Typical Kubernetes deployment:

apiVersion: apps/v1
kind: DaemonSet
metadata:
  name: node-cache
spec:
  selector:
    matchLabels:
      app: node-cache
  template:
    spec:
      containers:
      - name: node-cache
        image: alpcrun/node-cache:latest
        ports:
        - containerPort: 3337
        env:
        - name: METALCORE_CENTRAL_CACHE_ADDR
          value: "central-cache:2337"

Workers connect to localhost:3337 or via Kubernetes service.

Workers

Workers are user-defined services that process tasks.

Worker Lifecycle

┌──────────────────────────────────────────────┐
│         Worker Process Lifecycle             │
│                                              │
│  1. Connect to Queue Manager                │
│     - WorkerUpdate stream                   │
│     - Receive worker ID                     │
│                                              │
│  2. Idle State                              │
│     - Send periodic state updates           │
│     - Wait for BIND instruction             │
│                                              │
│  3. Bind to Session                         │
│     - Receive BIND from queue manager       │
│     - Open WorkerStream to session          │
│                                              │
│  4. Process Tasks                           │
│     - Receive task batches                  │
│     - Fetch shared data if needed           │
│     - Execute computation                   │
│     - Send result batches                   │
│                                              │
│  5. Unbind from Session                     │
│     - Session closes or idle timeout        │
│     - Receive UNBIND instruction            │
│     - Return to idle state                  │
│                                              │
│  6. Shutdown (optional)                     │
│     - Close WorkerUpdate stream             │
│     - Graceful termination                  │
└──────────────────────────────────────────────┘

Worker Implementation Pattern

func main() {
    // Connect and get worker ID
    updateStream, _ := queueClient.WorkerUpdate(ctx)
    mdFromServer, _ := updateStream.Header()
    workerID := mdFromServer.Get("workerid")[0]

    // Idle loop
    for {
        // Wait for instruction
        inst, _ := updateStream.Recv()

        if inst.Type == pb.Instruction_BIND {
            // Process session
            processSession(inst.SessionId)
        }
    }
}

func processSession(sessionID string) {
    // Open worker stream
    stream, _ := queueClient.WorkerStream(
        metadata.AppendToOutgoingContext(ctx, "sessionid", sessionID))

    // Process batches
    for {
        batch, err := stream.Recv()
        if err != nil {
            break  // Session closed
        }

        // Process tasks
        results := processBatch(batch)

        // Send results
        stream.Send(results)
    }
}

Worker Metadata

Required gRPC metadata headers:

md := metadata.New(map[string]string{
    "api_key": os.Getenv("METALCORE_API_KEY"),
    "node":    os.Getenv("METALCORE_WORKER_NODE"),
    "pod":     os.Getenv("METALCORE_WORKER_POD"),
})
ctx := metadata.NewOutgoingContext(context.Background(), md)

Worker Scaling

Workers are fully stateless and can be scaled:

Kubernetes Deployment:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: worker
spec:
  replicas: 100  # Scale as needed
  template:
    spec:
      containers:
      - name: worker
        image: myorg/my-worker:latest
        env:
        - name: METALCORE_WORKER_NODE
          valueFrom:
            fieldRef:
              fieldPath: spec.nodeName
        - name: METALCORE_WORKER_POD
          valueFrom:
            fieldRef:
              fieldPath: metadata.name

Auto-scaling:

apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: worker-hpa
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: worker
  minReplicas: 10
  maxReplicas: 1000
  metrics:
  - type: External
    external:
      metric:
        name: alpcrun_queue_depth
      target:
        type: AverageValue
        averageValue: "100"

Communication Patterns

gRPC Streaming

All services use bidirectional streaming for efficiency:

Client Stream:

rpc ClientStream(stream Batch) returns (stream Batch)

Benefits: - Single connection for all tasks/results - Natural flow control via stream backpressure - Reduced connection overhead - Lower latency

Worker Stream:

rpc WorkerStream(stream Batch) returns (stream Batch)

Same benefits as client stream, plus: - Session affinity via metadata - Automatic task distribution - Result collection

Metadata Propagation

gRPC metadata used for:

Authentication:

api_key: secret-key

Session Context:

sessionid: 01HZXY...

Worker Identity:

workerid: worker-123
node: node-1
pod: worker-pod-abc

Next Steps