Skip to content

Writing Services (Workers)

This guide shows you how to write worker services that process tasks from ALPCRUN.CH.

Overview

A worker service in ALPCRUN.CH:

  1. Connects to the Queue Manager
  2. Registers as an available worker
  3. Waits for BIND instructions
  4. Opens a stream to the assigned session
  5. Receives task batches
  6. Processes tasks (the actual computation)
  7. Sends result batches back
  8. Repeats for new sessions

Basic Worker Structure

Step 1: Import Required Packages

package main

import (
    "context"
    "log"
    "os"
    "time"

    pb "github.com/limelabs/metalcore-neo/api/v1"
    "github.com/limelabs/metalcore-neo/pkg/grpccon"
    "google.golang.org/grpc/metadata"
)

Step 2: Connect and Register

func main() {
    ctx := context.Background()

    // Get worker identity from environment
    workerNode := os.Getenv("METALCORE_WORKER_NODE")
    workerPod := os.Getenv("METALCORE_WORKER_POD")

    if workerNode == "" || workerPod == "" {
        log.Fatal("METALCORE_WORKER_NODE and METALCORE_WORKER_POD must be set")
    }

    // Connect to queue manager
    queueConn, queueClient, err := grpccon.ConnectToQueue(
        "queue-manager.alpcrun.svc.cluster.local:1337",
        "/certs/ca.crt",
        true)
    if err != nil {
        log.Fatalf("Failed to connect: %v", err)
    }
    defer queueConn.Close()

    // Wait for connection
    if err := grpccon.BlockUntilReady(ctx, queueConn, 10*time.Second); err != nil {
        log.Fatalf("Connection not ready: %v", err)
    }

    log.Printf("Worker connected: node=%s, pod=%s", workerNode, workerPod)

    // Start worker loop
    runWorker(ctx, queueClient, workerNode, workerPod)
}

Step 3: Worker Registration and Main Loop

func runWorker(ctx context.Context, client pb.QueueClient, node, pod string) {
    // Create metadata with worker identity
    md := metadata.New(map[string]string{
        "api_key": os.Getenv("METALCORE_API_KEY"),
        "node":    node,
        "pod":     pod,
    })
    ctx = metadata.NewOutgoingContext(ctx, md)

    // Open WorkerUpdate stream
    updateStream, err := client.WorkerUpdate(ctx)
    if err != nil {
        log.Fatalf("Failed to create update stream: %v", err)
    }

    // Get assigned worker ID from response header
    mdFromServer, err := updateStream.Header()
    if err != nil {
        log.Fatalf("Failed to get worker ID: %v", err)
    }
    workerID := mdFromServer.Get("workerid")[0]

    log.Printf("Worker registered with ID: %s", workerID)

    // Send initial state
    updateStream.Send(&pb.WorkerState{
        WorkerId: workerID,
        Status:   pb.WorkerState_IDLE,
    })

    // Main worker loop
    for {
        // Wait for instruction from queue manager
        instruction, err := updateStream.Recv()
        if err != nil {
            log.Printf("Update stream error: %v", err)
            time.Sleep(5 * time.Second)
            continue
        }

        switch instruction.Type {
        case pb.Instruction_BIND:
            log.Printf("Received BIND to session: %s", instruction.SessionId)
            processSession(ctx, client, instruction.SessionId, workerID)

        case pb.Instruction_UNBIND:
            log.Printf("Received UNBIND from session: %s", instruction.SessionId)
            // Return to idle state
            updateStream.Send(&pb.WorkerState{
                WorkerId: workerID,
                Status:   pb.WorkerState_IDLE,
            })
        }
    }
}

Step 4: Process Session

func processSession(ctx context.Context, client pb.QueueClient, sessionID, workerID string) {
    // Add session ID to context
    ctx = metadata.AppendToOutgoingContext(ctx, "sessionid", sessionID)

    // Open WorkerStream for this session
    stream, err := client.WorkerStream(ctx)
    if err != nil {
        log.Printf("Failed to open worker stream: %v", err)
        return
    }

    log.Printf("Processing session: %s", sessionID)

    // Process batches until stream closes
    for {
        // Receive task batch
        taskBatch, err := stream.Recv()
        if err != nil {
            if err == io.EOF {
                log.Println("Session stream closed")
            } else {
                log.Printf("Stream error: %v", err)
            }
            break
        }

        log.Printf("Received batch with %d tasks", len(taskBatch.Messages))

        // Process the batch
        resultBatch := processBatch(ctx, client, taskBatch)

        // Send results back
        if err := stream.Send(resultBatch); err != nil {
            log.Printf("Failed to send results: %v", err)
            break
        }

        log.Printf("Sent batch with %d results", len(resultBatch.Messages))
    }

    log.Printf("Finished processing session: %s", sessionID)
}

Step 5: Process Tasks

func processBatch(ctx context.Context, client pb.QueueClient, taskBatch *pb.Batch) *pb.Batch {
    // Fetch shared data if needed
    var sharedData []byte
    if taskBatch.GetUseSharedData() {
        sharedData = fetchSharedData(ctx, taskBatch.SessionId, taskBatch.RequiredSharedDataUpdateLevel)
    }

    // Process each task
    results := make([]*pb.Message, 0, len(taskBatch.Messages))

    for _, task := range taskBatch.Messages {
        // Process the task
        result := processTask(task, sharedData)

        // Add to results
        results = append(results, result)
    }

    // Create result batch
    resultBatch := &pb.Batch{
        Id:        taskBatch.Id,  // Use same batch ID
        SessionId: taskBatch.SessionId,
        Messages:  results,
        Dead:      proto.Bool(false),  // Not a dead letter
    }

    return resultBatch
}

func processTask(task *pb.Message, sharedData []byte) *pb.Message {
    // Parse task payload
    var taskData MyTaskType
    if err := json.Unmarshal(task.Payload, &taskData); err != nil {
        return &pb.Message{
            Id:      task.Id,
            Payload: nil,
            Error:   proto.String(fmt.Sprintf("Parse error: %v", err)),
        }
    }

    // Do the actual computation
    result, err := doComputation(taskData, sharedData)
    if err != nil {
        return &pb.Message{
            Id:      task.Id,
            Payload: nil,
            Error:   proto.String(err.Error()),
        }
    }

    // Serialize result
    resultPayload, _ := json.Marshal(result)

    return &pb.Message{
        Id:        task.Id,
        Payload:   resultPayload,
        RelatedId: proto.String(task.Id),  // Link result to task
    }
}

Step 6: Fetch Shared Data

func fetchSharedData(ctx context.Context, sessionID string, updateLevel *uint32) []byte {
    // Connect to node cache (local)
    cacheConn, cacheClient, err := grpccon.ConnectToCache(
        "localhost:3337",  // Local node cache
        "",
        false)
    if err != nil {
        log.Printf("Failed to connect to cache: %v", err)
        return nil
    }
    defer cacheConn.Close()

    // Fetch shared data
    sharedData, err := cacheClient.GetSharedData(ctx, &pb.SharedData{
        SessionId:   sessionID,
        UpdateLevel: updateLevel,
    })
    if err != nil {
        log.Printf("Failed to get shared data: %v", err)
        return nil
    }

    return sharedData.Payload
}

Advanced Worker Patterns

GPU-Accelerated Workers

For GPU workloads, ensure proper device management:

import "github.com/NVIDIA/go-nvml/pkg/nvml"

func initGPU() {
    ret := nvml.Init()
    if ret != nvml.SUCCESS {
        log.Fatalf("Failed to initialize NVML: %v", nvml.ErrorString(ret))
    }
}

func processTaskOnGPU(task *pb.Message) *pb.Message {
    // Select GPU device
    device, ret := nvml.DeviceGetHandleByIndex(0)
    if ret != nvml.SUCCESS {
        return errorResult(task.Id, "GPU not available")
    }

    // Run computation on GPU
    result := runCudaKernel(task.Payload)

    return &pb.Message{
        Id:      task.Id,
        Payload: result,
    }
}

Kubernetes GPU Worker Deployment:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: gpu-worker
spec:
  replicas: 10
  template:
    spec:
      containers:
      - name: worker
        image: myorg/gpu-worker:latest
        resources:
          limits:
            nvidia.com/gpu: 1  # Request 1 GPU

Multi-Threaded Workers

Process multiple tasks concurrently:

func processBatchConcurrent(ctx context.Context, taskBatch *pb.Batch, numWorkers int) *pb.Batch {
    tasks := taskBatch.Messages
    results := make([]*pb.Message, len(tasks))

    // Create worker pool
    taskChan := make(chan struct {
        index int
        task  *pb.Message
    }, len(tasks))
    resultChan := make(chan struct {
        index  int
        result *pb.Message
    }, len(tasks))

    // Start worker goroutines
    for i := 0; i < numWorkers; i++ {
        go func() {
            for item := range taskChan {
                result := processTask(item.task, nil)
                resultChan <- struct {
                    index  int
                    result *pb.Message
                }{item.index, result}
            }
        }()
    }

    // Send tasks to workers
    go func() {
        for i, task := range tasks {
            taskChan <- struct {
                index int
                task  *pb.Message
            }{i, task}
        }
        close(taskChan)
    }()

    // Collect results
    for i := 0; i < len(tasks); i++ {
        item := <-resultChan
        results[item.index] = item.result
    }

    return &pb.Batch{
        Id:       taskBatch.Id,
        SessionId: taskBatch.SessionId,
        Messages: results,
    }
}

Stateful Workers

Maintain state across tasks (e.g., loaded models):

type StatefulWorker struct {
    model      *MLModel
    cache      map[string]interface{}
    mu         sync.RWMutex
}

func (w *StatefulWorker) processSession(ctx context.Context, client pb.QueueClient, sessionID string) {
    // Load model once per session
    w.loadModel()

    stream, _ := client.WorkerStream(ctx)

    for {
        taskBatch, err := stream.Recv()
        if err != nil {
            break
        }

        // Use cached model for all tasks
        results := w.processBatchWithModel(taskBatch)
        stream.Send(results)
    }

    // Clean up
    w.unloadModel()
}

func (w *StatefulWorker) loadModel() {
    log.Println("Loading ML model...")
    w.model = LoadMLModel("/models/my-model.onnx")
    log.Println("Model loaded")
}

func (w *StatefulWorker) processBatchWithModel(batch *pb.Batch) *pb.Batch {
    results := make([]*pb.Message, len(batch.Messages))

    for i, task := range batch.Messages {
        // Use pre-loaded model
        prediction := w.model.Predict(task.Payload)
        results[i] = &pb.Message{
            Id:      task.Id,
            Payload: prediction,
        }
    }

    return &pb.Batch{
        Id:       batch.Id,
        SessionId: batch.SessionId,
        Messages: results,
    }
}

Error Handling and Retries

Properly handle task failures:

func processTaskSafely(task *pb.Message) *pb.Message {
    // Catch panics
    defer func() {
        if r := recover(); r != nil {
            log.Printf("Task %s panicked: %v", task.Id, r)
        }
    }()

    // Timeout protection
    ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
    defer cancel()

    resultChan := make(chan *pb.Message, 1)
    errChan := make(chan error, 1)

    go func() {
        result, err := doComputation(task.Payload)
        if err != nil {
            errChan <- err
            return
        }
        resultChan <- result
    }()

    select {
    case result := <-resultChan:
        return result

    case err := <-errChan:
        return &pb.Message{
            Id:    task.Id,
            Error: proto.String(err.Error()),
        }

    case <-ctx.Done():
        return &pb.Message{
            Id:    task.Id,
            Error: proto.String("Task timeout"),
        }
    }
}

Worker Configuration

Environment Variables

Required:

METALCORE_API_KEY=secret-key
METALCORE_WORKER_NODE=$(hostname)
METALCORE_WORKER_POD=worker-pod-123

Optional:

METALCORE_QUEUE_MANAGER_ADDR=queue-manager:1337
METALCORE_CACHE_ADDR=localhost:3337
METALCORE_LOG_LEVEL=info
METALCORE_NUM_THREADS=8

Command-Line Flags

var (
    queueAddr  = flag.String("queue", "queue-manager:1337", "Queue manager address")
    cacheAddr  = flag.String("cache", "localhost:3337", "Cache address")
    numThreads = flag.Int("threads", runtime.NumCPU(), "Number of worker threads")
    logLevel   = flag.String("log", "info", "Log level")
)

func main() {
    flag.Parse()

    // Use configuration
    log.Printf("Starting worker with %d threads", *numThreads)
}

Deployment Patterns

CPU Workers

apiVersion: apps/v1
kind: Deployment
metadata:
  name: cpu-worker
spec:
  replicas: 100
  template:
    spec:
      containers:
      - name: worker
        image: myorg/worker:latest
        resources:
          requests:
            cpu: "1"
            memory: "2Gi"
          limits:
            cpu: "2"
            memory: "4Gi"
        env:
        - name: METALCORE_API_KEY
          valueFrom:
            secretKeyRef:
              name: alpcrun-secrets
              key: api-key
        - name: METALCORE_WORKER_NODE
          valueFrom:
            fieldRef:
              fieldPath: spec.nodeName
        - name: METALCORE_WORKER_POD
          valueFrom:
            fieldRef:
              fieldPath: metadata.name

GPU Workers

apiVersion: apps/v1
kind: Deployment
metadata:
  name: gpu-worker
spec:
  replicas: 10
  template:
    spec:
      nodeSelector:
        accelerator: nvidia-tesla-v100
      containers:
      - name: worker
        image: myorg/gpu-worker:latest
        resources:
          limits:
            nvidia.com/gpu: 1
        env:
        - name: NVIDIA_VISIBLE_DEVICES
          value: "all"
        - name: NVIDIA_DRIVER_CAPABILITIES
          value: "compute,utility"

Heterogeneous Workers

Different worker types for different workloads:

# Light workers (cheap tasks)
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: light-worker
spec:
  replicas: 200
  template:
    spec:
      nodeSelector:
        worker-type: light
      containers:
      - name: worker
        resources:
          requests:
            cpu: "500m"
            memory: "512Mi"
---
# Heavy workers (expensive tasks)
apiVersion: apps/v1
kind: Deployment
metadata:
  name: heavy-worker
spec:
  replicas: 20
  template:
    spec:
      nodeSelector:
        worker-type: heavy
      containers:
      - name: worker
        resources:
          requests:
            cpu: "8"
            memory: "32Gi"

Workers can filter sessions based on tags in session attributes.

Best Practices

  1. Resource Management:
  2. Clean up resources after each session
  3. Implement proper shutdown handlers
  4. Monitor memory usage

  5. Error Handling:

  6. Always return a result or error for each task
  7. Use timeouts for long-running computations
  8. Catch and log panics

  9. Performance:

  10. Use connection pooling for cache connections
  11. Process multiple tasks concurrently when possible
  12. Minimize serialization/deserialization overhead

  13. Logging:

  14. Log worker ID, session ID, and batch ID
  15. Use structured logging for easy parsing
  16. Include timing information

  17. Health Checks:

  18. Implement readiness/liveness probes
  19. Monitor connection health
  20. Gracefully handle reconnections

Complete Example

See Worker Implementation Example for a complete, production-ready worker.

Next Steps