Worker Implementation Example¶
A complete example of a worker service that processes tasks from ALPCRUN.CH.
Overview¶
This example demonstrates:
- Worker registration and lifecycle
- Binding to sessions
- Processing task batches
- Handling shared data
- Error handling
Complete Worker Code¶
package main
import (
"context"
"encoding/json"
"fmt"
"io"
"log"
"os"
"time"
pb "github.com/limelabs/metalcore-neo/api/v1"
"github.com/limelabs/metalcore-neo/pkg/grpccon"
"google.golang.org/grpc/metadata"
"google.golang.org/protobuf/proto"
)
// Task structure (matches client)
type ComputeTask struct {
ID string `json:"id"`
Value float64 `json:"value"`
}
// Result structure
type ComputeResult struct {
ID string `json:"id"`
Result float64 `json:"result"`
}
func main() {
// Get worker identity from environment
workerNode := os.Getenv("METALCORE_WORKER_NODE")
workerPod := os.Getenv("METALCORE_WORKER_POD")
queueAddr := getEnvOrDefault("METALCORE_QUEUE_MANAGER_ADDR", "localhost:1337")
caCert := getEnvOrDefault("METALCORE_CA_CERT", "")
useTLS := caCert != ""
if workerNode == "" {
workerNode = "local-node"
}
if workerPod == "" {
workerPod = fmt.Sprintf("worker-%d", os.Getpid())
}
log.Printf("Starting worker: node=%s, pod=%s", workerNode, workerPod)
log.Printf("Connecting to queue manager at %s (TLS: %v)", queueAddr, useTLS)
// Connect to queue manager
ctx := context.Background()
queueConn, queueClient, err := grpccon.ConnectToQueue(queueAddr, caCert, useTLS)
if err != nil {
log.Fatalf("Failed to connect: %v", err)
}
defer queueConn.Close()
// Wait for ready
if err := grpccon.BlockUntilReady(ctx, queueConn, 10*time.Second); err != nil {
log.Fatalf("Connection not ready: %v", err)
}
log.Println("Connected successfully")
// Run worker loop
runWorker(ctx, queueClient, workerNode, workerPod)
}
func runWorker(ctx context.Context, client pb.QueueClient, node, pod string) {
// Add worker metadata
md := metadata.New(map[string]string{
"api_key": os.Getenv("METALCORE_API_KEY"),
"node": node,
"pod": pod,
})
ctx = metadata.NewOutgoingContext(ctx, md)
// Open WorkerUpdate stream for instructions
updateStream, err := client.WorkerUpdate(ctx)
if err != nil {
log.Fatalf("Failed to create update stream: %v", err)
}
// Get assigned worker ID from response header
mdFromServer, err := updateStream.Header()
if err != nil {
log.Fatalf("Failed to get worker ID: %v", err)
}
workerID := mdFromServer.Get("workerid")[0]
log.Printf("Worker registered with ID: %s", workerID)
// Send initial IDLE state
err = updateStream.Send(&pb.WorkerState{
WorkerId: workerID,
Status: pb.WorkerState_IDLE,
})
if err != nil {
log.Fatalf("Failed to send initial state: %v", err)
}
log.Println("Worker is idle, waiting for instructions...")
// Main worker loop - wait for BIND/UNBIND instructions
for {
instruction, err := updateStream.Recv()
if err == io.EOF {
log.Println("Update stream closed by server")
break
}
if err != nil {
log.Printf("Update stream error: %v", err)
time.Sleep(5 * time.Second)
continue
}
switch instruction.Type {
case pb.Instruction_BIND:
log.Printf("Received BIND instruction for session: %s", instruction.SessionId)
// Process the session
processSession(ctx, client, instruction.SessionId, workerID)
// Return to idle state
updateStream.Send(&pb.WorkerState{
WorkerId: workerID,
Status: pb.WorkerState_IDLE,
})
log.Println("Returned to idle state")
case pb.Instruction_UNBIND:
log.Printf("Received UNBIND instruction for session: %s", instruction.SessionId)
// Worker is already idle or will become idle
}
}
}
func processSession(ctx context.Context, client pb.QueueClient, sessionID, workerID string) {
log.Printf("Processing session: %s", sessionID)
// Add session ID to context metadata
ctx = metadata.AppendToOutgoingContext(ctx, "sessionid", sessionID)
// Open WorkerStream for this session
stream, err := client.WorkerStream(ctx)
if err != nil {
log.Printf("Failed to open worker stream: %v", err)
return
}
tasksProcessed := 0
startTime := time.Now()
// Process batches until stream closes
for {
// Receive task batch
taskBatch, err := stream.Recv()
if err == io.EOF {
log.Println("Worker stream closed (session ended)")
break
}
if err != nil {
log.Printf("Worker stream error: %v", err)
break
}
log.Printf("Received batch %s with %d tasks", taskBatch.Id, len(taskBatch.Messages))
// Process the batch
resultBatch := processBatch(ctx, client, taskBatch, workerID)
// Send results back
if err := stream.Send(resultBatch); err != nil {
log.Printf("Failed to send results: %v", err)
break
}
tasksProcessed += len(taskBatch.Messages)
log.Printf("Sent batch %s with %d results (total: %d tasks)",
resultBatch.Id, len(resultBatch.Messages), tasksProcessed)
}
duration := time.Since(startTime)
throughput := float64(tasksProcessed) / duration.Seconds()
log.Printf("Session complete: %d tasks in %.2fs (%.2f tasks/sec)",
tasksProcessed, duration.Seconds(), throughput)
}
func processBatch(ctx context.Context, client pb.QueueClient, taskBatch *pb.Batch, workerID string) *pb.Batch {
// Check if we need to fetch shared data
var sharedData []byte
if taskBatch.GetUseSharedData() {
log.Printf("Batch requires shared data (update level %d)",
taskBatch.GetRequiredSharedDataUpdateLevel())
sharedData = fetchSharedData(ctx, taskBatch)
}
// Process each task in the batch
results := make([]*pb.Message, 0, len(taskBatch.Messages))
for _, task := range taskBatch.Messages {
result := processTask(task, sharedData)
results = append(results, result)
}
// Create result batch
resultBatch := &pb.Batch{
Id: taskBatch.Id,
SessionId: taskBatch.SessionId,
Messages: results,
Dead: proto.Bool(false),
}
return resultBatch
}
func processTask(task *pb.Message, sharedData []byte) *pb.Message {
// Parse task
var computeTask ComputeTask
if err := json.Unmarshal(task.Payload, &computeTask); err != nil {
return &pb.Message{
Id: task.Id,
Error: proto.String(fmt.Sprintf("Failed to parse task: %v", err)),
}
}
// Simulate computation
// In a real worker, this would be your actual computation
result := computeTask.Value * 2.0
// If we have shared data, factor it in
if len(sharedData) > 0 {
var multiplier float64
if err := json.Unmarshal(sharedData, &multiplier); err == nil {
result *= multiplier
}
}
// Create result
computeResult := ComputeResult{
ID: computeTask.ID,
Result: result,
}
resultPayload, err := json.Marshal(computeResult)
if err != nil {
return &pb.Message{
Id: task.Id,
Error: proto.String(fmt.Sprintf("Failed to marshal result: %v", err)),
}
}
return &pb.Message{
Id: task.Id,
Payload: resultPayload,
RelatedId: proto.String(task.Id),
}
}
func fetchSharedData(ctx context.Context, taskBatch *pb.Batch) []byte {
cacheAddr := getEnvOrDefault("METALCORE_CACHE_ADDR", "localhost:3337")
caCert := getEnvOrDefault("METALCORE_CA_CERT", "")
useTLS := caCert != ""
// Connect to node cache (local)
cacheConn, cacheClient, err := grpccon.ConnectToCache(cacheAddr, caCert, useTLS)
if err != nil {
log.Printf("Failed to connect to cache: %v", err)
return nil
}
defer cacheConn.Close()
// Fetch shared data
sharedData, err := cacheClient.GetSharedData(ctx, &pb.SharedData{
SessionId: taskBatch.SessionId,
UpdateLevel: taskBatch.RequiredSharedDataUpdateLevel,
})
if err != nil {
log.Printf("Failed to get shared data: %v", err)
return nil
}
log.Printf("Fetched shared data (update level %d): %d bytes",
*sharedData.UpdateLevel, len(sharedData.Payload))
return sharedData.Payload
}
func getEnvOrDefault(key, defaultValue string) string {
if value := os.Getenv(key); value != "" {
return value
}
return defaultValue
}
Running the Worker¶
Prerequisites¶
- ALPCRUN.CH services running
- Go 1.21+ installed
- API key configured
Setup¶
# Required
export METALCORE_API_KEY="your-secret-key"
# Recommended
export METALCORE_WORKER_NODE=$(hostname)
export METALCORE_WORKER_POD="worker-$(hostname)-$$"
# Optional
export METALCORE_QUEUE_MANAGER_ADDR="queue-manager:1337"
export METALCORE_CACHE_ADDR="localhost:3337"
export METALCORE_CA_CERT="/path/to/ca.crt"
Build and Run¶
# Build
go build -o worker main.go
# Run
./worker
Expected Output¶
2025/01/15 10:30:00 Starting worker: node=my-node, pod=worker-12345
2025/01/15 10:30:00 Connecting to queue manager at localhost:1337 (TLS: false)
2025/01/15 10:30:00 Connected successfully
2025/01/15 10:30:00 Worker registered with ID: w-abc123
2025/01/15 10:30:00 Worker is idle, waiting for instructions...
2025/01/15 10:30:15 Received BIND instruction for session: 01HZXY...
2025/01/15 10:30:15 Processing session: 01HZXY...
2025/01/15 10:30:15 Received batch batch-1 with 10 tasks
2025/01/15 10:30:15 Sent batch batch-1 with 10 results (total: 10 tasks)
2025/01/15 10:30:16 Received batch batch-2 with 10 tasks
2025/01/15 10:30:16 Sent batch batch-2 with 10 results (total: 20 tasks)
...
2025/01/15 10:30:25 Worker stream closed (session ended)
2025/01/15 10:30:25 Session complete: 100 tasks in 10.00s (10.00 tasks/sec)
2025/01/15 10:30:25 Returned to idle state
2025/01/15 10:30:25 Worker is idle, waiting for instructions...
Docker Deployment¶
Dockerfile¶
FROM golang:1.21 AS builder
WORKDIR /app
COPY . .
RUN go mod download
RUN CGO_ENABLED=0 GOOS=linux go build -o /worker main.go
FROM gcr.io/distroless/static-debian11
COPY --from=builder /worker /worker
ENTRYPOINT ["/worker"]
Build and Push¶
docker build -t myorg/worker:latest .
docker push myorg/worker:latest
Kubernetes Deployment¶
apiVersion: apps/v1
kind: Deployment
metadata:
name: worker
spec:
replicas: 10
selector:
matchLabels:
app: worker
template:
metadata:
labels:
app: worker
spec:
containers:
- name: worker
image: myorg/worker:latest
env:
- name: METALCORE_API_KEY
valueFrom:
secretKeyRef:
name: alpcrun-secrets
key: api-key
- name: METALCORE_WORKER_NODE
valueFrom:
fieldRef:
fieldPath: spec.nodeName
- name: METALCORE_WORKER_POD
valueFrom:
fieldRef:
fieldPath: metadata.name
- name: METALCORE_QUEUE_MANAGER_ADDR
value: "queue-manager:1337"
- name: METALCORE_CACHE_ADDR
value: "localhost:3337"
resources:
requests:
cpu: "1"
memory: "2Gi"
limits:
cpu: "2"
memory: "4Gi"
Advanced Patterns¶
Multi-threaded Worker¶
Process multiple tasks concurrently:
func processBatchConcurrent(ctx context.Context, taskBatch *pb.Batch, numThreads int) *pb.Batch {
tasks := taskBatch.Messages
results := make([]*pb.Message, len(tasks))
// Create worker pool
taskChan := make(chan struct {
index int
task *pb.Message
}, len(tasks))
resultChan := make(chan struct {
index int
result *pb.Message
}, len(tasks))
// Start worker goroutines
for i := 0; i < numThreads; i++ {
go func() {
for item := range taskChan {
result := processTask(item.task, nil)
resultChan <- struct {
index int
result *pb.Message
}{item.index, result}
}
}()
}
// Send tasks
go func() {
for i, task := range tasks {
taskChan <- struct {
index int
task *pb.Message
}{i, task}
}
close(taskChan)
}()
// Collect results
for i := 0; i < len(tasks); i++ {
item := <-resultChan
results[item.index] = item.result
}
return &pb.Batch{
Id: taskBatch.Id,
SessionId: taskBatch.SessionId,
Messages: results,
}
}
Graceful Shutdown¶
func main() {
// ... setup ...
// Handle signals
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM)
go func() {
<-sigChan
log.Println("Shutdown signal received, cleaning up...")
// Close connections gracefully
queueConn.Close()
os.Exit(0)
}()
runWorker(ctx, queueClient, workerNode, workerPod)
}
Next Steps¶
- Simple Client Example: Implement a client
- Monte Carlo Example: Real-world application
- Writing Services Guide: Advanced worker patterns
- Architecture Overview: Understand the system