Skip to content

Error Handling

Best practices for handling errors in ALPCRUN.CH applications.

Overview

ALPCRUN.CH provides multiple mechanisms for error handling:

  1. gRPC Status Codes: Standard error responses
  2. Dead Letter Queues: Failed task collection
  3. Message Errors: Per-task error messages
  4. Stream Errors: Connection and stream failures

gRPC Error Codes

Common Error Codes

Code Description Typical Cause
OK Success No error
INVALID_ARGUMENT Invalid request Bad parameters, validation failure
NOT_FOUND Resource not found Session doesn't exist
ALREADY_EXISTS Resource exists Duplicate session creation
PERMISSION_DENIED Access denied Invalid API key
UNAVAILABLE Service unavailable Network issue, service down
UNAUTHENTICATED Not authenticated Missing API key
DEADLINE_EXCEEDED Timeout Operation took too long

Handling gRPC Errors

import (
    "google.golang.org/grpc/codes"
    "google.golang.org/grpc/status"
)

session, err := queueClient.CreateSession(ctx, &pb.Session{...})
if err != nil {
    st, ok := status.FromError(err)
    if !ok {
        log.Fatalf("Unknown error: %v", err)
    }

    switch st.Code() {
    case codes.Unauthenticated:
        log.Fatal("Authentication failed: check METALCORE_API_KEY")

    case codes.InvalidArgument:
        log.Fatalf("Invalid request: %s", st.Message())

    case codes.Unavailable:
        log.Printf("Service unavailable, retrying...")
        time.Sleep(5 * time.Second)
        // Retry logic here

    default:
        log.Fatalf("Error: %s (code: %s)", st.Message(), st.Code())
    }
}

Retry Logic

func createSessionWithRetry(ctx context.Context, client pb.QueueClient, session *pb.Session, maxRetries int) (*pb.Session, error) {
    var lastErr error

    for attempt := 0; attempt < maxRetries; attempt++ {
        result, err := client.CreateSession(ctx, session)
        if err == nil {
            return result, nil
        }

        st, ok := status.FromError(err)
        if !ok {
            return nil, err
        }

        // Retry only on transient errors
        if st.Code() == codes.Unavailable || st.Code() == codes.DeadlineExceeded {
            backoff := time.Duration(1<<uint(attempt)) * time.Second
            log.Printf("Attempt %d failed, retrying in %v", attempt+1, backoff)
            time.Sleep(backoff)
            lastErr = err
            continue
        }

        // Non-retryable error
        return nil, err
    }

    return nil, fmt.Errorf("max retries exceeded: %w", lastErr)
}

Dead Letter Queues

Accessing Dead Letters

func processDeadLetters(ctx context.Context, client pb.QueueClient, session *pb.Session) error {
    stream, err := client.ClientStreamDeadLetters(ctx, session)
    if err != nil {
        return fmt.Errorf("failed to open dead letter stream: %w", err)
    }

    deadCount := 0

    for {
        batch, err := stream.Recv()
        if err == io.EOF {
            break
        }
        if err != nil {
            return fmt.Errorf("error reading dead letters: %w", err)
        }

        for _, msg := range batch.Messages {
            deadCount++
            log.Printf("Failed task %s: %s", msg.Id, msg.GetError())

            // Optionally log details
            logDeadLetter(msg)
        }
    }

    log.Printf("Total dead letters: %d", deadCount)
    return nil
}

func logDeadLetter(msg *pb.Message) {
    // Parse original task for context
    var task MyTask
    if err := json.Unmarshal(msg.Payload, &task); err == nil {
        log.Printf("  Task details: %+v", task)
    }

    log.Printf("  Error: %s", msg.GetError())
    log.Printf("  Timestamp: %v", msg.Timestamp.AsTime())
}

Retry Failed Tasks

func retryDeadLetters(ctx context.Context, queueClient pb.QueueClient, session *pb.Session, maxRetries int) {
    deadStream, _ := queueClient.ClientStreamDeadLetters(ctx, session)
    taskStream, _ := queueClient.ClientStream(ctx)

    retryCount := make(map[string]int)

    for {
        batch, err := deadStream.Recv()
        if err == io.EOF {
            break
        }
        if err != nil {
            log.Printf("Error reading dead letters: %v", err)
            break
        }

        for _, msg := range batch.Messages {
            // Check retry count
            count := retryCount[msg.Id]
            if count >= maxRetries {
                log.Printf("Task %s failed after %d retries, giving up", msg.Id, maxRetries)
                continue
            }

            // Check if error is retryable
            if !isRetryableError(msg.GetError()) {
                log.Printf("Task %s has non-retryable error: %s", msg.Id, msg.GetError())
                continue
            }

            // Retry task
            log.Printf("Retrying task %s (attempt %d)", msg.Id, count+1)
            retryCount[msg.Id] = count + 1

            retryBatch := &pb.Batch{
                Id:        fmt.Sprintf("%s-retry-%d", msg.Id, count+1),
                SessionId: session.Id,
                Messages:  []*pb.Message{{Id: msg.Id, Payload: msg.Payload}},
            }

            if err := taskStream.Send(retryBatch); err != nil {
                log.Printf("Failed to send retry: %v", err)
            }
        }
    }
}

func isRetryableError(errMsg string) bool {
    retryablePatterns := []string{
        "timeout",
        "connection",
        "temporary",
        "unavailable",
    }

    errLower := strings.ToLower(errMsg)
    for _, pattern := range retryablePatterns {
        if strings.Contains(errLower, pattern) {
            return true
        }
    }

    return false
}

Worker Error Handling

Catching Panics

func processTaskSafely(task *pb.Message) *pb.Message {
    defer func() {
        if r := recover(); r != nil {
            log.Printf("Task %s panicked: %v", task.Id, r)
            log.Printf("Stack trace: %s", debug.Stack())
        }
    }()

    return processTask(task)
}

Timeout Protection

func processTaskWithTimeout(task *pb.Message, timeout time.Duration) *pb.Message {
    ctx, cancel := context.WithTimeout(context.Background(), timeout)
    defer cancel()

    resultChan := make(chan *pb.Message, 1)
    errChan := make(chan error, 1)

    go func() {
        result, err := doComputation(task)
        if err != nil {
            errChan <- err
            return
        }
        resultChan <- result
    }()

    select {
    case result := <-resultChan:
        return result

    case err := <-errChan:
        return &pb.Message{
            Id:    task.Id,
            Error: proto.String(err.Error()),
        }

    case <-ctx.Done():
        return &pb.Message{
            Id:    task.Id,
            Error: proto.String(fmt.Sprintf("Task timeout after %v", timeout)),
        }
    }
}

Error Result Messages

func processTask(task *pb.Message) *pb.Message {
    // Parse task
    var computeTask ComputeTask
    if err := json.Unmarshal(task.Payload, &computeTask); err != nil {
        return &pb.Message{
            Id:    task.Id,
            Error: proto.String(fmt.Sprintf("Parse error: %v", err)),
        }
    }

    // Validate task
    if err := validateTask(computeTask); err != nil {
        return &pb.Message{
            Id:    task.Id,
            Error: proto.String(fmt.Sprintf("Validation error: %v", err)),
        }
    }

    // Process task
    result, err := compute(computeTask)
    if err != nil {
        return &pb.Message{
            Id:    task.Id,
            Error: proto.String(fmt.Sprintf("Computation error: %v", err)),
        }
    }

    // Serialize result
    resultPayload, err := json.Marshal(result)
    if err != nil {
        return &pb.Message{
            Id:    task.Id,
            Error: proto.String(fmt.Sprintf("Serialization error: %v", err)),
        }
    }

    return &pb.Message{
        Id:        task.Id,
        Payload:   resultPayload,
        RelatedId: proto.String(task.Id),
    }
}

Stream Error Handling

Reconnection Logic

func runWorkerWithReconnect(ctx context.Context, queueAddr string) {
    for {
        err := runWorker(ctx, queueAddr)
        if err == nil {
            break
        }

        log.Printf("Worker error: %v", err)
        log.Println("Reconnecting in 5 seconds...")
        time.Sleep(5 * time.Second)
    }
}

func runWorker(ctx context.Context, queueAddr string) error {
    // Connect
    queueConn, queueClient, err := grpccon.ConnectToQueue(queueAddr, "", false)
    if err != nil {
        return fmt.Errorf("connection failed: %w", err)
    }
    defer queueConn.Close()

    // Run worker loop
    return workerLoop(ctx, queueClient)
}

Stream Health Checks

func monitorStream(stream grpc.ClientStream) <-chan error {
    errChan := make(chan error, 1)

    go func() {
        ticker := time.NewTicker(30 * time.Second)
        defer ticker.Stop()

        for {
            select {
            case <-ticker.C:
                // Check stream state
                ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
                defer cancel()

                if err := stream.Context().Err(); err != nil {
                    errChan <- fmt.Errorf("stream unhealthy: %w", err)
                    return
                }

            case <-stream.Context().Done():
                errChan <- stream.Context().Err()
                return
            }
        }
    }()

    return errChan
}

Client Error Handling

Connection Failures

func connectWithRetry(addr, caCert string, useTLS bool, maxRetries int) (
    *grpc.ClientConn, pb.QueueClient, error) {

    var conn *grpc.ClientConn
    var client pb.QueueClient
    var lastErr error

    for attempt := 0; attempt < maxRetries; attempt++ {
        conn, client, err := grpccon.ConnectToQueue(addr, caCert, useTLS)
        if err == nil {
            // Verify connection is ready
            ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
            defer cancel()

            if err := grpccon.BlockUntilReady(ctx, conn, 10*time.Second); err == nil {
                return conn, client, nil
            }

            conn.Close()
            lastErr = err
        } else {
            lastErr = err
        }

        backoff := time.Duration(1<<uint(attempt)) * time.Second
        log.Printf("Connection attempt %d failed, retrying in %v", attempt+1, backoff)
        time.Sleep(backoff)
    }

    return nil, nil, fmt.Errorf("failed after %d attempts: %w", maxRetries, lastErr)
}

Session Errors

func submitWithErrorHandling(ctx context.Context, stream pb.Queue_ClientStreamClient,
    session *pb.Session, tasks []*pb.Message) error {

    for _, task := range tasks {
        batch := &pb.Batch{
            SessionId: session.Id,
            Messages:  []*pb.Message{task},
        }

        err := stream.Send(batch)
        if err != nil {
            st, ok := status.FromError(err)
            if !ok {
                return fmt.Errorf("send failed: %w", err)
            }

            switch st.Code() {
            case codes.NotFound:
                return fmt.Errorf("session not found: %s", session.Id)

            case codes.InvalidArgument:
                log.Printf("Invalid task %s: %s", task.Id, st.Message())
                continue  // Skip this task

            case codes.ResourceExhausted:
                log.Println("Queue full, backing off...")
                time.Sleep(1 * time.Second)
                // Retry this task
                // ... retry logic ...

            default:
                return fmt.Errorf("send error: %w", err)
            }
        }
    }

    return nil
}

Monitoring and Alerting

Error Rate Monitoring

# High error rate alert
rate(grpc_server_handled_total{grpc_code!="OK"}[5m]) > 10

# Dead letter growth
rate(alpcrun_queue_deadletter_depth[5m]) > 5

Logging Best Practices

import "log/slog"

// Structured error logging
slog.Error("Task processing failed",
    "task_id", task.Id,
    "session_id", sessionID,
    "worker_id", workerID,
    "error", err,
    "retries", retryCount)

// Context-aware logging
logger := slog.With(
    "session_id", sessionID,
    "worker_id", workerID)

logger.Error("Failed to process batch",
    "batch_id", batch.Id,
    "error", err)

Best Practices

  1. Always check errors: Never ignore return values
  2. Use dead letter queues: Don't silently drop failed tasks
  3. Implement retries: Transient failures are common
  4. Set timeouts: Prevent hanging operations
  5. Log errors: Include context for debugging
  6. Monitor metrics: Track error rates
  7. Graceful degradation: Handle partial failures
  8. Test error paths: Verify error handling works

Next Steps