Skip to content

Worker Implementation Example

A complete example of a worker service that processes tasks from ALPCRUN.CH.

Overview

This example demonstrates:

  • Worker registration and lifecycle
  • Binding to sessions
  • Processing task batches
  • Handling shared data
  • Error handling

Complete Worker Code

package main

import (
    "context"
    "encoding/json"
    "fmt"
    "io"
    "log"
    "os"
    "time"

    pb "github.com/limelabs/metalcore-neo/api/v1"
    "github.com/limelabs/metalcore-neo/pkg/grpccon"
    "google.golang.org/grpc/metadata"
    "google.golang.org/protobuf/proto"
)

// Task structure (matches client)
type ComputeTask struct {
    ID    string  `json:"id"`
    Value float64 `json:"value"`
}

// Result structure
type ComputeResult struct {
    ID     string  `json:"id"`
    Result float64 `json:"result"`
}

func main() {
    // Get worker identity from environment
    workerNode := os.Getenv("METALCORE_WORKER_NODE")
    workerPod := os.Getenv("METALCORE_WORKER_POD")
    queueAddr := getEnvOrDefault("METALCORE_QUEUE_MANAGER_ADDR", "localhost:1337")
    caCert := getEnvOrDefault("METALCORE_CA_CERT", "")
    useTLS := caCert != ""

    if workerNode == "" {
        workerNode = "local-node"
    }
    if workerPod == "" {
        workerPod = fmt.Sprintf("worker-%d", os.Getpid())
    }

    log.Printf("Starting worker: node=%s, pod=%s", workerNode, workerPod)
    log.Printf("Connecting to queue manager at %s (TLS: %v)", queueAddr, useTLS)

    // Connect to queue manager
    ctx := context.Background()
    queueConn, queueClient, err := grpccon.ConnectToQueue(queueAddr, caCert, useTLS)
    if err != nil {
        log.Fatalf("Failed to connect: %v", err)
    }
    defer queueConn.Close()

    // Wait for ready
    if err := grpccon.BlockUntilReady(ctx, queueConn, 10*time.Second); err != nil {
        log.Fatalf("Connection not ready: %v", err)
    }

    log.Println("Connected successfully")

    // Run worker loop
    runWorker(ctx, queueClient, workerNode, workerPod)
}

func runWorker(ctx context.Context, client pb.QueueClient, node, pod string) {
    // Add worker metadata
    md := metadata.New(map[string]string{
        "api_key": os.Getenv("METALCORE_API_KEY"),
        "node":    node,
        "pod":     pod,
    })
    ctx = metadata.NewOutgoingContext(ctx, md)

    // Open WorkerUpdate stream for instructions
    updateStream, err := client.WorkerUpdate(ctx)
    if err != nil {
        log.Fatalf("Failed to create update stream: %v", err)
    }

    // Get assigned worker ID from response header
    mdFromServer, err := updateStream.Header()
    if err != nil {
        log.Fatalf("Failed to get worker ID: %v", err)
    }
    workerID := mdFromServer.Get("workerid")[0]

    log.Printf("Worker registered with ID: %s", workerID)

    // Send initial IDLE state
    err = updateStream.Send(&pb.WorkerState{
        WorkerId: workerID,
        Status:   pb.WorkerState_IDLE,
    })
    if err != nil {
        log.Fatalf("Failed to send initial state: %v", err)
    }

    log.Println("Worker is idle, waiting for instructions...")

    // Main worker loop - wait for BIND/UNBIND instructions
    for {
        instruction, err := updateStream.Recv()
        if err == io.EOF {
            log.Println("Update stream closed by server")
            break
        }
        if err != nil {
            log.Printf("Update stream error: %v", err)
            time.Sleep(5 * time.Second)
            continue
        }

        switch instruction.Type {
        case pb.Instruction_BIND:
            log.Printf("Received BIND instruction for session: %s", instruction.SessionId)

            // Process the session
            processSession(ctx, client, instruction.SessionId, workerID)

            // Return to idle state
            updateStream.Send(&pb.WorkerState{
                WorkerId: workerID,
                Status:   pb.WorkerState_IDLE,
            })

            log.Println("Returned to idle state")

        case pb.Instruction_UNBIND:
            log.Printf("Received UNBIND instruction for session: %s", instruction.SessionId)
            // Worker is already idle or will become idle
        }
    }
}

func processSession(ctx context.Context, client pb.QueueClient, sessionID, workerID string) {
    log.Printf("Processing session: %s", sessionID)

    // Add session ID to context metadata
    ctx = metadata.AppendToOutgoingContext(ctx, "sessionid", sessionID)

    // Open WorkerStream for this session
    stream, err := client.WorkerStream(ctx)
    if err != nil {
        log.Printf("Failed to open worker stream: %v", err)
        return
    }

    tasksProcessed := 0
    startTime := time.Now()

    // Process batches until stream closes
    for {
        // Receive task batch
        taskBatch, err := stream.Recv()
        if err == io.EOF {
            log.Println("Worker stream closed (session ended)")
            break
        }
        if err != nil {
            log.Printf("Worker stream error: %v", err)
            break
        }

        log.Printf("Received batch %s with %d tasks", taskBatch.Id, len(taskBatch.Messages))

        // Process the batch
        resultBatch := processBatch(ctx, client, taskBatch, workerID)

        // Send results back
        if err := stream.Send(resultBatch); err != nil {
            log.Printf("Failed to send results: %v", err)
            break
        }

        tasksProcessed += len(taskBatch.Messages)
        log.Printf("Sent batch %s with %d results (total: %d tasks)",
            resultBatch.Id, len(resultBatch.Messages), tasksProcessed)
    }

    duration := time.Since(startTime)
    throughput := float64(tasksProcessed) / duration.Seconds()

    log.Printf("Session complete: %d tasks in %.2fs (%.2f tasks/sec)",
        tasksProcessed, duration.Seconds(), throughput)
}

func processBatch(ctx context.Context, client pb.QueueClient, taskBatch *pb.Batch, workerID string) *pb.Batch {
    // Check if we need to fetch shared data
    var sharedData []byte
    if taskBatch.GetUseSharedData() {
        log.Printf("Batch requires shared data (update level %d)",
            taskBatch.GetRequiredSharedDataUpdateLevel())
        sharedData = fetchSharedData(ctx, taskBatch)
    }

    // Process each task in the batch
    results := make([]*pb.Message, 0, len(taskBatch.Messages))

    for _, task := range taskBatch.Messages {
        result := processTask(task, sharedData)
        results = append(results, result)
    }

    // Create result batch
    resultBatch := &pb.Batch{
        Id:        taskBatch.Id,
        SessionId: taskBatch.SessionId,
        Messages:  results,
        Dead:      proto.Bool(false),
    }

    return resultBatch
}

func processTask(task *pb.Message, sharedData []byte) *pb.Message {
    // Parse task
    var computeTask ComputeTask
    if err := json.Unmarshal(task.Payload, &computeTask); err != nil {
        return &pb.Message{
            Id:    task.Id,
            Error: proto.String(fmt.Sprintf("Failed to parse task: %v", err)),
        }
    }

    // Simulate computation
    // In a real worker, this would be your actual computation
    result := computeTask.Value * 2.0

    // If we have shared data, factor it in
    if len(sharedData) > 0 {
        var multiplier float64
        if err := json.Unmarshal(sharedData, &multiplier); err == nil {
            result *= multiplier
        }
    }

    // Create result
    computeResult := ComputeResult{
        ID:     computeTask.ID,
        Result: result,
    }

    resultPayload, err := json.Marshal(computeResult)
    if err != nil {
        return &pb.Message{
            Id:    task.Id,
            Error: proto.String(fmt.Sprintf("Failed to marshal result: %v", err)),
        }
    }

    return &pb.Message{
        Id:        task.Id,
        Payload:   resultPayload,
        RelatedId: proto.String(task.Id),
    }
}

func fetchSharedData(ctx context.Context, taskBatch *pb.Batch) []byte {
    cacheAddr := getEnvOrDefault("METALCORE_CACHE_ADDR", "localhost:3337")
    caCert := getEnvOrDefault("METALCORE_CA_CERT", "")
    useTLS := caCert != ""

    // Connect to node cache (local)
    cacheConn, cacheClient, err := grpccon.ConnectToCache(cacheAddr, caCert, useTLS)
    if err != nil {
        log.Printf("Failed to connect to cache: %v", err)
        return nil
    }
    defer cacheConn.Close()

    // Fetch shared data
    sharedData, err := cacheClient.GetSharedData(ctx, &pb.SharedData{
        SessionId:   taskBatch.SessionId,
        UpdateLevel: taskBatch.RequiredSharedDataUpdateLevel,
    })
    if err != nil {
        log.Printf("Failed to get shared data: %v", err)
        return nil
    }

    log.Printf("Fetched shared data (update level %d): %d bytes",
        *sharedData.UpdateLevel, len(sharedData.Payload))

    return sharedData.Payload
}

func getEnvOrDefault(key, defaultValue string) string {
    if value := os.Getenv(key); value != "" {
        return value
    }
    return defaultValue
}

Running the Worker

Prerequisites

  1. ALPCRUN.CH services running
  2. Go 1.21+ installed
  3. API key configured

Setup

# Required
export METALCORE_API_KEY="your-secret-key"

# Recommended
export METALCORE_WORKER_NODE=$(hostname)
export METALCORE_WORKER_POD="worker-$(hostname)-$$"

# Optional
export METALCORE_QUEUE_MANAGER_ADDR="queue-manager:1337"
export METALCORE_CACHE_ADDR="localhost:3337"
export METALCORE_CA_CERT="/path/to/ca.crt"

Build and Run

# Build
go build -o worker main.go

# Run
./worker

Expected Output

2025/01/15 10:30:00 Starting worker: node=my-node, pod=worker-12345
2025/01/15 10:30:00 Connecting to queue manager at localhost:1337 (TLS: false)
2025/01/15 10:30:00 Connected successfully
2025/01/15 10:30:00 Worker registered with ID: w-abc123
2025/01/15 10:30:00 Worker is idle, waiting for instructions...
2025/01/15 10:30:15 Received BIND instruction for session: 01HZXY...
2025/01/15 10:30:15 Processing session: 01HZXY...
2025/01/15 10:30:15 Received batch batch-1 with 10 tasks
2025/01/15 10:30:15 Sent batch batch-1 with 10 results (total: 10 tasks)
2025/01/15 10:30:16 Received batch batch-2 with 10 tasks
2025/01/15 10:30:16 Sent batch batch-2 with 10 results (total: 20 tasks)
...
2025/01/15 10:30:25 Worker stream closed (session ended)
2025/01/15 10:30:25 Session complete: 100 tasks in 10.00s (10.00 tasks/sec)
2025/01/15 10:30:25 Returned to idle state
2025/01/15 10:30:25 Worker is idle, waiting for instructions...

Docker Deployment

Dockerfile

FROM golang:1.21 AS builder

WORKDIR /app
COPY . .

RUN go mod download
RUN CGO_ENABLED=0 GOOS=linux go build -o /worker main.go

FROM gcr.io/distroless/static-debian11

COPY --from=builder /worker /worker

ENTRYPOINT ["/worker"]

Build and Push

docker build -t myorg/worker:latest .
docker push myorg/worker:latest

Kubernetes Deployment

apiVersion: apps/v1
kind: Deployment
metadata:
  name: worker
spec:
  replicas: 10
  selector:
    matchLabels:
      app: worker
  template:
    metadata:
      labels:
        app: worker
    spec:
      containers:
      - name: worker
        image: myorg/worker:latest
        env:
        - name: METALCORE_API_KEY
          valueFrom:
            secretKeyRef:
              name: alpcrun-secrets
              key: api-key
        - name: METALCORE_WORKER_NODE
          valueFrom:
            fieldRef:
              fieldPath: spec.nodeName
        - name: METALCORE_WORKER_POD
          valueFrom:
            fieldRef:
              fieldPath: metadata.name
        - name: METALCORE_QUEUE_MANAGER_ADDR
          value: "queue-manager:1337"
        - name: METALCORE_CACHE_ADDR
          value: "localhost:3337"
        resources:
          requests:
            cpu: "1"
            memory: "2Gi"
          limits:
            cpu: "2"
            memory: "4Gi"

Advanced Patterns

Multi-threaded Worker

Process multiple tasks concurrently:

func processBatchConcurrent(ctx context.Context, taskBatch *pb.Batch, numThreads int) *pb.Batch {
    tasks := taskBatch.Messages
    results := make([]*pb.Message, len(tasks))

    // Create worker pool
    taskChan := make(chan struct {
        index int
        task  *pb.Message
    }, len(tasks))

    resultChan := make(chan struct {
        index  int
        result *pb.Message
    }, len(tasks))

    // Start worker goroutines
    for i := 0; i < numThreads; i++ {
        go func() {
            for item := range taskChan {
                result := processTask(item.task, nil)
                resultChan <- struct {
                    index  int
                    result *pb.Message
                }{item.index, result}
            }
        }()
    }

    // Send tasks
    go func() {
        for i, task := range tasks {
            taskChan <- struct {
                index int
                task  *pb.Message
            }{i, task}
        }
        close(taskChan)
    }()

    // Collect results
    for i := 0; i < len(tasks); i++ {
        item := <-resultChan
        results[item.index] = item.result
    }

    return &pb.Batch{
        Id:       taskBatch.Id,
        SessionId: taskBatch.SessionId,
        Messages: results,
    }
}

Graceful Shutdown

func main() {
    // ... setup ...

    // Handle signals
    sigChan := make(chan os.Signal, 1)
    signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM)

    go func() {
        <-sigChan
        log.Println("Shutdown signal received, cleaning up...")
        // Close connections gracefully
        queueConn.Close()
        os.Exit(0)
    }()

    runWorker(ctx, queueClient, workerNode, workerPod)
}

Next Steps