Writing Services (Workers)¶
This guide shows you how to write worker services that process tasks from ALPCRUN.CH.
Overview¶
A worker service in ALPCRUN.CH:
- Connects to the Queue Manager
- Registers as an available worker
- Waits for BIND instructions
- Opens a stream to the assigned session
- Receives task batches
- Processes tasks (the actual computation)
- Sends result batches back
- Repeats for new sessions
Basic Worker Structure¶
Step 1: Import Required Packages¶
package main
import (
"context"
"log"
"os"
"time"
pb "github.com/limelabs/metalcore-neo/api/v1"
"github.com/limelabs/metalcore-neo/pkg/grpccon"
"google.golang.org/grpc/metadata"
)
Step 2: Connect and Register¶
func main() {
ctx := context.Background()
// Get worker identity from environment
workerNode := os.Getenv("METALCORE_WORKER_NODE")
workerPod := os.Getenv("METALCORE_WORKER_POD")
if workerNode == "" || workerPod == "" {
log.Fatal("METALCORE_WORKER_NODE and METALCORE_WORKER_POD must be set")
}
// Connect to queue manager
queueConn, queueClient, err := grpccon.ConnectToQueue(
"queue-manager.alpcrun.svc.cluster.local:1337",
"/certs/ca.crt",
true)
if err != nil {
log.Fatalf("Failed to connect: %v", err)
}
defer queueConn.Close()
// Wait for connection
if err := grpccon.BlockUntilReady(ctx, queueConn, 10*time.Second); err != nil {
log.Fatalf("Connection not ready: %v", err)
}
log.Printf("Worker connected: node=%s, pod=%s", workerNode, workerPod)
// Start worker loop
runWorker(ctx, queueClient, workerNode, workerPod)
}
Step 3: Worker Registration and Main Loop¶
func runWorker(ctx context.Context, client pb.QueueClient, node, pod string) {
// Create metadata with worker identity
md := metadata.New(map[string]string{
"api_key": os.Getenv("METALCORE_API_KEY"),
"node": node,
"pod": pod,
})
ctx = metadata.NewOutgoingContext(ctx, md)
// Open WorkerUpdate stream
updateStream, err := client.WorkerUpdate(ctx)
if err != nil {
log.Fatalf("Failed to create update stream: %v", err)
}
// Get assigned worker ID from response header
mdFromServer, err := updateStream.Header()
if err != nil {
log.Fatalf("Failed to get worker ID: %v", err)
}
workerID := mdFromServer.Get("workerid")[0]
log.Printf("Worker registered with ID: %s", workerID)
// Send initial state
updateStream.Send(&pb.WorkerState{
WorkerId: workerID,
Status: pb.WorkerState_IDLE,
})
// Main worker loop
for {
// Wait for instruction from queue manager
instruction, err := updateStream.Recv()
if err != nil {
log.Printf("Update stream error: %v", err)
time.Sleep(5 * time.Second)
continue
}
switch instruction.Type {
case pb.Instruction_BIND:
log.Printf("Received BIND to session: %s", instruction.SessionId)
processSession(ctx, client, instruction.SessionId, workerID)
case pb.Instruction_UNBIND:
log.Printf("Received UNBIND from session: %s", instruction.SessionId)
// Return to idle state
updateStream.Send(&pb.WorkerState{
WorkerId: workerID,
Status: pb.WorkerState_IDLE,
})
}
}
}
Step 4: Process Session¶
func processSession(ctx context.Context, client pb.QueueClient, sessionID, workerID string) {
// Add session ID to context
ctx = metadata.AppendToOutgoingContext(ctx, "sessionid", sessionID)
// Open WorkerStream for this session
stream, err := client.WorkerStream(ctx)
if err != nil {
log.Printf("Failed to open worker stream: %v", err)
return
}
log.Printf("Processing session: %s", sessionID)
// Process batches until stream closes
for {
// Receive task batch
taskBatch, err := stream.Recv()
if err != nil {
if err == io.EOF {
log.Println("Session stream closed")
} else {
log.Printf("Stream error: %v", err)
}
break
}
log.Printf("Received batch with %d tasks", len(taskBatch.Messages))
// Process the batch
resultBatch := processBatch(ctx, client, taskBatch)
// Send results back
if err := stream.Send(resultBatch); err != nil {
log.Printf("Failed to send results: %v", err)
break
}
log.Printf("Sent batch with %d results", len(resultBatch.Messages))
}
log.Printf("Finished processing session: %s", sessionID)
}
Step 5: Process Tasks¶
func processBatch(ctx context.Context, client pb.QueueClient, taskBatch *pb.Batch) *pb.Batch {
// Fetch shared data if needed
var sharedData []byte
if taskBatch.GetUseSharedData() {
sharedData = fetchSharedData(ctx, taskBatch.SessionId, taskBatch.RequiredSharedDataUpdateLevel)
}
// Process each task
results := make([]*pb.Message, 0, len(taskBatch.Messages))
for _, task := range taskBatch.Messages {
// Process the task
result := processTask(task, sharedData)
// Add to results
results = append(results, result)
}
// Create result batch
resultBatch := &pb.Batch{
Id: taskBatch.Id, // Use same batch ID
SessionId: taskBatch.SessionId,
Messages: results,
Dead: proto.Bool(false), // Not a dead letter
}
return resultBatch
}
func processTask(task *pb.Message, sharedData []byte) *pb.Message {
// Parse task payload
var taskData MyTaskType
if err := json.Unmarshal(task.Payload, &taskData); err != nil {
return &pb.Message{
Id: task.Id,
Payload: nil,
Error: proto.String(fmt.Sprintf("Parse error: %v", err)),
}
}
// Do the actual computation
result, err := doComputation(taskData, sharedData)
if err != nil {
return &pb.Message{
Id: task.Id,
Payload: nil,
Error: proto.String(err.Error()),
}
}
// Serialize result
resultPayload, _ := json.Marshal(result)
return &pb.Message{
Id: task.Id,
Payload: resultPayload,
RelatedId: proto.String(task.Id), // Link result to task
}
}
Step 6: Fetch Shared Data¶
func fetchSharedData(ctx context.Context, sessionID string, updateLevel *uint32) []byte {
// Connect to node cache (local)
cacheConn, cacheClient, err := grpccon.ConnectToCache(
"localhost:3337", // Local node cache
"",
false)
if err != nil {
log.Printf("Failed to connect to cache: %v", err)
return nil
}
defer cacheConn.Close()
// Fetch shared data
sharedData, err := cacheClient.GetSharedData(ctx, &pb.SharedData{
SessionId: sessionID,
UpdateLevel: updateLevel,
})
if err != nil {
log.Printf("Failed to get shared data: %v", err)
return nil
}
return sharedData.Payload
}
Advanced Worker Patterns¶
GPU-Accelerated Workers¶
For GPU workloads, ensure proper device management:
import "github.com/NVIDIA/go-nvml/pkg/nvml"
func initGPU() {
ret := nvml.Init()
if ret != nvml.SUCCESS {
log.Fatalf("Failed to initialize NVML: %v", nvml.ErrorString(ret))
}
}
func processTaskOnGPU(task *pb.Message) *pb.Message {
// Select GPU device
device, ret := nvml.DeviceGetHandleByIndex(0)
if ret != nvml.SUCCESS {
return errorResult(task.Id, "GPU not available")
}
// Run computation on GPU
result := runCudaKernel(task.Payload)
return &pb.Message{
Id: task.Id,
Payload: result,
}
}
Kubernetes GPU Worker Deployment:
apiVersion: apps/v1
kind: Deployment
metadata:
name: gpu-worker
spec:
replicas: 10
template:
spec:
containers:
- name: worker
image: myorg/gpu-worker:latest
resources:
limits:
nvidia.com/gpu: 1 # Request 1 GPU
Multi-Threaded Workers¶
Process multiple tasks concurrently:
func processBatchConcurrent(ctx context.Context, taskBatch *pb.Batch, numWorkers int) *pb.Batch {
tasks := taskBatch.Messages
results := make([]*pb.Message, len(tasks))
// Create worker pool
taskChan := make(chan struct {
index int
task *pb.Message
}, len(tasks))
resultChan := make(chan struct {
index int
result *pb.Message
}, len(tasks))
// Start worker goroutines
for i := 0; i < numWorkers; i++ {
go func() {
for item := range taskChan {
result := processTask(item.task, nil)
resultChan <- struct {
index int
result *pb.Message
}{item.index, result}
}
}()
}
// Send tasks to workers
go func() {
for i, task := range tasks {
taskChan <- struct {
index int
task *pb.Message
}{i, task}
}
close(taskChan)
}()
// Collect results
for i := 0; i < len(tasks); i++ {
item := <-resultChan
results[item.index] = item.result
}
return &pb.Batch{
Id: taskBatch.Id,
SessionId: taskBatch.SessionId,
Messages: results,
}
}
Stateful Workers¶
Maintain state across tasks (e.g., loaded models):
type StatefulWorker struct {
model *MLModel
cache map[string]interface{}
mu sync.RWMutex
}
func (w *StatefulWorker) processSession(ctx context.Context, client pb.QueueClient, sessionID string) {
// Load model once per session
w.loadModel()
stream, _ := client.WorkerStream(ctx)
for {
taskBatch, err := stream.Recv()
if err != nil {
break
}
// Use cached model for all tasks
results := w.processBatchWithModel(taskBatch)
stream.Send(results)
}
// Clean up
w.unloadModel()
}
func (w *StatefulWorker) loadModel() {
log.Println("Loading ML model...")
w.model = LoadMLModel("/models/my-model.onnx")
log.Println("Model loaded")
}
func (w *StatefulWorker) processBatchWithModel(batch *pb.Batch) *pb.Batch {
results := make([]*pb.Message, len(batch.Messages))
for i, task := range batch.Messages {
// Use pre-loaded model
prediction := w.model.Predict(task.Payload)
results[i] = &pb.Message{
Id: task.Id,
Payload: prediction,
}
}
return &pb.Batch{
Id: batch.Id,
SessionId: batch.SessionId,
Messages: results,
}
}
Error Handling and Retries¶
Properly handle task failures:
func processTaskSafely(task *pb.Message) *pb.Message {
// Catch panics
defer func() {
if r := recover(); r != nil {
log.Printf("Task %s panicked: %v", task.Id, r)
}
}()
// Timeout protection
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
resultChan := make(chan *pb.Message, 1)
errChan := make(chan error, 1)
go func() {
result, err := doComputation(task.Payload)
if err != nil {
errChan <- err
return
}
resultChan <- result
}()
select {
case result := <-resultChan:
return result
case err := <-errChan:
return &pb.Message{
Id: task.Id,
Error: proto.String(err.Error()),
}
case <-ctx.Done():
return &pb.Message{
Id: task.Id,
Error: proto.String("Task timeout"),
}
}
}
Worker Configuration¶
Environment Variables¶
Required:
METALCORE_API_KEY=secret-key
METALCORE_WORKER_NODE=$(hostname)
METALCORE_WORKER_POD=worker-pod-123
Optional:
METALCORE_QUEUE_MANAGER_ADDR=queue-manager:1337
METALCORE_CACHE_ADDR=localhost:3337
METALCORE_LOG_LEVEL=info
METALCORE_NUM_THREADS=8
Command-Line Flags¶
var (
queueAddr = flag.String("queue", "queue-manager:1337", "Queue manager address")
cacheAddr = flag.String("cache", "localhost:3337", "Cache address")
numThreads = flag.Int("threads", runtime.NumCPU(), "Number of worker threads")
logLevel = flag.String("log", "info", "Log level")
)
func main() {
flag.Parse()
// Use configuration
log.Printf("Starting worker with %d threads", *numThreads)
}
Deployment Patterns¶
CPU Workers¶
apiVersion: apps/v1
kind: Deployment
metadata:
name: cpu-worker
spec:
replicas: 100
template:
spec:
containers:
- name: worker
image: myorg/worker:latest
resources:
requests:
cpu: "1"
memory: "2Gi"
limits:
cpu: "2"
memory: "4Gi"
env:
- name: METALCORE_API_KEY
valueFrom:
secretKeyRef:
name: alpcrun-secrets
key: api-key
- name: METALCORE_WORKER_NODE
valueFrom:
fieldRef:
fieldPath: spec.nodeName
- name: METALCORE_WORKER_POD
valueFrom:
fieldRef:
fieldPath: metadata.name
GPU Workers¶
apiVersion: apps/v1
kind: Deployment
metadata:
name: gpu-worker
spec:
replicas: 10
template:
spec:
nodeSelector:
accelerator: nvidia-tesla-v100
containers:
- name: worker
image: myorg/gpu-worker:latest
resources:
limits:
nvidia.com/gpu: 1
env:
- name: NVIDIA_VISIBLE_DEVICES
value: "all"
- name: NVIDIA_DRIVER_CAPABILITIES
value: "compute,utility"
Heterogeneous Workers¶
Different worker types for different workloads:
# Light workers (cheap tasks)
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: light-worker
spec:
replicas: 200
template:
spec:
nodeSelector:
worker-type: light
containers:
- name: worker
resources:
requests:
cpu: "500m"
memory: "512Mi"
---
# Heavy workers (expensive tasks)
apiVersion: apps/v1
kind: Deployment
metadata:
name: heavy-worker
spec:
replicas: 20
template:
spec:
nodeSelector:
worker-type: heavy
containers:
- name: worker
resources:
requests:
cpu: "8"
memory: "32Gi"
Workers can filter sessions based on tags in session attributes.
Best Practices¶
- Resource Management:
- Clean up resources after each session
- Implement proper shutdown handlers
-
Monitor memory usage
-
Error Handling:
- Always return a result or error for each task
- Use timeouts for long-running computations
-
Catch and log panics
-
Performance:
- Use connection pooling for cache connections
- Process multiple tasks concurrently when possible
-
Minimize serialization/deserialization overhead
-
Logging:
- Log worker ID, session ID, and batch ID
- Use structured logging for easy parsing
-
Include timing information
-
Health Checks:
- Implement readiness/liveness probes
- Monitor connection health
- Gracefully handle reconnections
Complete Example¶
See Worker Implementation Example for a complete, production-ready worker.
Next Steps¶
- Submitting Workloads: Learn workload patterns
- Using Shared Data: Efficient data distribution
- API Reference: Complete API documentation
- Examples: More code examples