Skip to content

Components

This page provides detailed information about each component in the ALPCRUN.CH architecture.

Queue Manager

The Queue Manager is the central coordinator of the ALPCRUN.CH system.

Internal Architecture

┌─────────────────────────────────────────────┐
│          Queue Manager Process              │
│                                             │
│  ┌──────────────────────────────────────┐   │
│  │     Session Manager                  │   │
│  │  ┌────────┐  ┌────────┐  ┌────────┐  │   │
│  │  │Session1│  │Session2│  │SessionN│  │   │
│  │  └───┬────┘  └───┬────┘  └───┬────┘  │   │
│  └──────┼───────────┼───────────┼───────┘   │
│         │           │           │           │
│  ┌──────▼───────────▼───────────▼────────┐  │
│  │         Queue Manager Core            │  │
│  │  - Task queues (per session)          │  │
│  │  - Result queues (per session)        │  │
│  │  - Dead letter queues (per session)   │  │
│  └──────┬──────────────────────┬─────────┘  │
│         │                      │            │
│  ┌──────▼───────┐  ┌───────────▼────────┐   │
│  │  Scheduler   │  │  Worker Registry   │   │
│  └──────────────┘  └────────────────────┘   │
└─────────────────────────────────────────────┘

Session Management

Session Lifecycle:

  1. Creation: CreateSession RPC
  2. Generates unique session ID (ULID)
  3. Allocates queues based on configuration
  4. Sets initial idle timeout
  5. Returns session to client

  6. Active: Client and workers streaming

  7. Tasks enqueued from client stream
  8. Workers bound by scheduler
  9. Results dequeued by client stream
  10. Idle timer reset on activity

  11. Idle: No activity within idle timeout

  12. Workers unbound if configured
  13. Session kept alive for reconnection
  14. Can be updated via UpdateSession RPC

  15. Closure: CloseSession RPC or cleanup

  16. Workers unbound
  17. Queues drained (if cleanup enabled)
  18. Resources released
  19. Session removed from registry

Queue Implementations

ALPCRUN.CH supports multiple queue implementations:

Priority Queue (default):

// Heap-based priority queue
// Tasks sorted by priority (higher first)
// O(log n) enqueue/dequeue

Regular Queue:

// Channel-based FIFO queue
// Simple and fast for single priority
// O(1) operations

Lock-Free Queue:

// CAS-based lock-free queue
// Better for high contention
// Lower latency variance

Speed Queue (testing only):

// Unbuffered queue for benchmarking
// Not suitable for production

Scheduler Logic

The scheduler binds workers to sessions based on:

  1. Session Priority: Higher priority sessions scheduled first
  2. Task Availability: Sessions with pending tasks
  3. Worker Limits: Respect min/max workers per session
  4. Worker Capacity: Available (unbound) workers
  5. Preemption: Optionally preempt lower priority sessions

Scheduling Algorithm:

1. Sort sessions by priority (descending)
2. For each session with pending tasks:
   a. Check if session needs more workers
   b. Find available workers
   c. Send BIND instruction to worker
   d. Add worker to session's worker list
3. Check for idle sessions
   a. Send UNBIND to idle workers
   b. Remove worker from session's worker list

Worker Registry

Tracks all connected workers.

Worker States: - Idle: Connected but not bound to session - Bound: Processing tasks for a session - Disconnected: Stream closed or error

Configuration

Key configuration parameters:

appid: "my-app"              # Application identifier
loglevel: "info"             # Logging level
bufferSize: 1000             # Queue buffer size
queueType: "priority"        # Queue implementation
schedulerInterval: "1s"      # Scheduler tick rate
workerTimeout: "60s"         # Worker disconnect timeout

Environment variables (ALPCRUNCH_ prefix):

ALPCRUNCH_APPID=my-app
ALPCRUNCH_LOGLEVEL=info
ALPCRUNCH_BUFFERSIZE=1000

Central Cache

The Central Cache provides centralized storage for session-shared data. It is accessed by the local node caches.

Data Structure

type SharedData struct {
    SessionId   string
    UpdateLevel uint32   // Version number
    Payload     []byte   // Actual data
    Ttl         duration // Time to live
    Timestamp   time.Time
}

Operations

SetSharedData: - Creates new entry or fails if exists - Sets initial update level - Starts TTL countdown

UpdateSharedData: - Updates existing entry - Increments update level - Resets TTL

GetSharedData: - Retrieves data for session and update level - Returns error if not found or version mismatch - Does not reset TTL

DeleteSharedData: - Removes entry from storage - Called by client on session cleanup

TTL Management

Cleanup Process:

Every cleanupInterval (default 1s):
1. Iterate through all entries
2. Check if TTL expired
3. Remove expired entries
4. Log cleanup statistics

Node Cache

The Node Cache is a local pull-through cache that transparently proxies worker requests to the Central Cache.

Architecture

┌──────────────────────────────────────┐
│      Node Cache Process              │
│                                      │
│  ┌────────────────────────────────┐  │
│  │   Local Cache (LRU)            │  │
│  │   - Limited size               │  │
│  │   - Update level tracking      │  │
│  │   - TTL inheritance            │  │
│  └────────┬───────────────────────┘  │
│           │                          │
│           │  Cache Miss              │
│           ▼                          │
│  ┌────────────────────────────────┐  │
│  │   Central Cache Proxy          │  │
│  │   - Forward on miss            │  │
│  │   - Cache response locally     │  │
│  └────────────────────────────────┘  │
└──────────────────────────────────────┘

Caching Strategy

Get Request Flow:

1. Worker requests shared data from local node cache
2. Node cache checks local cache:
   a. Hit: Return cached data immediately
   b. Miss: Forward request to central cache
3. Central cache returns data
4. Node cache stores locally and returns to worker

Update Level Handling: - Cache stores data per (sessionID, updateLevel) - If worker requests higher update level, always forward to central cache - Old versions can coexist in cache

Eviction Policy: - LRU (Least Recently Used) - Size-based limits - TTL-based expiration

Workers can just connect to localhost:3337 or via Kubernetes service nodecache:3337 which maps to the local node cache running as a daemon.

Workers

Workers are the user-defined and user-written service instances that process the tasks. They can be written in any language that supports gRPC.

Worker Lifecycle

┌─────────────────────────────────────────────┐
│         Worker Process Lifecycle            │
│                                             │
│  1. Connect to Queue Manager                │
│     - WorkerUpdate stream                   │
│     - Receive worker ID                     │
│                                             │
│  2. Idle State                              │
│     - Send periodic state updates           │
│     - Wait for BIND instruction             │
│                                             │
│  3. Bind to Session                         │
│     - Receive BIND from queue manager       │
│     - Open WorkerStream to session          │
│                                             │
│  4. Process Tasks                           │
│     - Receive task batches                  │
│     - Fetch shared data if needed           │
│     - Execute computation                   │
│     - Send result batches                   │
│                                             │
│  5. Unbind from Session                     │
│     - Session closes or idle timeout        │
│     - Receive UNBIND instruction            │
│     - Return to idle state                  │
│                                             │
│  6. Shutdown (optional)                     │
│     - Close WorkerUpdate stream             │
│     - Graceful termination                  │
└─────────────────────────────────────────────┘

Worker Implementation Pattern

Go example:

func main() {
    // Connect and get worker ID
    updateStream, _ := queueClient.WorkerUpdate(ctx)
    mdFromServer, _ := updateStream.Header()
    workerID := mdFromServer.Get("workerid")[0]

    // Idle loop
    for {
        // Wait for instruction
        inst, _ := updateStream.Recv()

        if inst.Type == pb.Instruction_BIND {
            // Process session
            processSession(inst.SessionId)
        }
    }
}

func processSession(sessionID string) {
    // Open worker stream
    stream, _ := queueClient.WorkerStream(
        metadata.AppendToOutgoingContext(ctx, "sessionid", sessionID))

    // Process batches
    for {
        batch, err := stream.Recv()
        if err != nil {
            break  // Session closed
        }

        // Process tasks
        results := processBatch(batch)

        // Send results
        stream.Send(results)
    }
}

Worker Metadata

Required gRPC metadata headers:

md := metadata.New(map[string]string{
    "api_key": os.Getenv("METALCORE_API_KEY"),
    "node":    os.Getenv("METALCORE_WORKER_NODE"),
    "pod":     os.Getenv("METALCORE_WORKER_POD"),
})
ctx := metadata.NewOutgoingContext(context.Background(), md)

Worker Scaling

Workers are fully stateless and can be scaled automatically based on node utilization and queue depth:

Auto-scaling:

spec:
  minReplicas: 10
  maxReplicas: 1000
  metrics:
  - type: External
    external:
      metric:
        name: alpcrun_queue_depth
      target:
        type: AverageValue
        averageValue: "100"

Communication Patterns

gRPC Streaming

All services use bidirectional streaming for efficiency:

Client Stream:

// sends task batches and receives result batches
rpc ClientStream(stream Batch) returns (stream Batch)

Benefits: - Single connection for all tasks/results - Natural flow control via stream backpressure - Reduced connection overhead - Lower latency

Worker Stream:

// receives task batches to process and sends back result batches
rpc WorkerStream(stream Batch) returns (stream Batch)

Same benefits as client stream, plus: - Session affinity via metadata - Automatic task distribution - Result collection

Metadata Propagation

gRPC metadata used for:

Authentication:

api_key: secret-key

Session Context:

sessionid: 01HZXY...

Worker Identity:

workerid: worker-123
node: node-1
pod: worker-pod-abc

Next Steps