Error Handling¶
Best practices for handling errors in ALPCRUN.CH applications.
Overview¶
ALPCRUN.CH provides multiple mechanisms for error handling:
- gRPC Status Codes: Standard error responses
- Dead Letter Queues: Failed task collection
- Message Errors: Per-task error messages
- Stream Errors: Connection and stream failures
gRPC Error Codes¶
Common Error Codes¶
| Code | Description | Typical Cause |
|---|---|---|
OK |
Success | No error |
INVALID_ARGUMENT |
Invalid request | Bad parameters, validation failure |
NOT_FOUND |
Resource not found | Session doesn't exist |
ALREADY_EXISTS |
Resource exists | Duplicate session creation |
PERMISSION_DENIED |
Access denied | Invalid API key |
UNAVAILABLE |
Service unavailable | Network issue, service down |
UNAUTHENTICATED |
Not authenticated | Missing API key |
DEADLINE_EXCEEDED |
Timeout | Operation took too long |
Handling gRPC Errors¶
import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
session, err := queueClient.CreateSession(ctx, &pb.Session{...})
if err != nil {
st, ok := status.FromError(err)
if !ok {
log.Fatalf("Unknown error: %v", err)
}
switch st.Code() {
case codes.Unauthenticated:
log.Fatal("Authentication failed: check METALCORE_API_KEY")
case codes.InvalidArgument:
log.Fatalf("Invalid request: %s", st.Message())
case codes.Unavailable:
log.Printf("Service unavailable, retrying...")
time.Sleep(5 * time.Second)
// Retry logic here
default:
log.Fatalf("Error: %s (code: %s)", st.Message(), st.Code())
}
}
Retry Logic¶
func createSessionWithRetry(ctx context.Context, client pb.QueueClient, session *pb.Session, maxRetries int) (*pb.Session, error) {
var lastErr error
for attempt := 0; attempt < maxRetries; attempt++ {
result, err := client.CreateSession(ctx, session)
if err == nil {
return result, nil
}
st, ok := status.FromError(err)
if !ok {
return nil, err
}
// Retry only on transient errors
if st.Code() == codes.Unavailable || st.Code() == codes.DeadlineExceeded {
backoff := time.Duration(1<<uint(attempt)) * time.Second
log.Printf("Attempt %d failed, retrying in %v", attempt+1, backoff)
time.Sleep(backoff)
lastErr = err
continue
}
// Non-retryable error
return nil, err
}
return nil, fmt.Errorf("max retries exceeded: %w", lastErr)
}
Dead Letter Queues¶
Accessing Dead Letters¶
func processDeadLetters(ctx context.Context, client pb.QueueClient, session *pb.Session) error {
stream, err := client.ClientStreamDeadLetters(ctx, session)
if err != nil {
return fmt.Errorf("failed to open dead letter stream: %w", err)
}
deadCount := 0
for {
batch, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
return fmt.Errorf("error reading dead letters: %w", err)
}
for _, msg := range batch.Messages {
deadCount++
log.Printf("Failed task %s: %s", msg.Id, msg.GetError())
// Optionally log details
logDeadLetter(msg)
}
}
log.Printf("Total dead letters: %d", deadCount)
return nil
}
func logDeadLetter(msg *pb.Message) {
// Parse original task for context
var task MyTask
if err := json.Unmarshal(msg.Payload, &task); err == nil {
log.Printf(" Task details: %+v", task)
}
log.Printf(" Error: %s", msg.GetError())
log.Printf(" Timestamp: %v", msg.Timestamp.AsTime())
}
Retry Failed Tasks¶
func retryDeadLetters(ctx context.Context, queueClient pb.QueueClient, session *pb.Session, maxRetries int) {
deadStream, _ := queueClient.ClientStreamDeadLetters(ctx, session)
taskStream, _ := queueClient.ClientStream(ctx)
retryCount := make(map[string]int)
for {
batch, err := deadStream.Recv()
if err == io.EOF {
break
}
if err != nil {
log.Printf("Error reading dead letters: %v", err)
break
}
for _, msg := range batch.Messages {
// Check retry count
count := retryCount[msg.Id]
if count >= maxRetries {
log.Printf("Task %s failed after %d retries, giving up", msg.Id, maxRetries)
continue
}
// Check if error is retryable
if !isRetryableError(msg.GetError()) {
log.Printf("Task %s has non-retryable error: %s", msg.Id, msg.GetError())
continue
}
// Retry task
log.Printf("Retrying task %s (attempt %d)", msg.Id, count+1)
retryCount[msg.Id] = count + 1
retryBatch := &pb.Batch{
Id: fmt.Sprintf("%s-retry-%d", msg.Id, count+1),
SessionId: session.Id,
Messages: []*pb.Message{{Id: msg.Id, Payload: msg.Payload}},
}
if err := taskStream.Send(retryBatch); err != nil {
log.Printf("Failed to send retry: %v", err)
}
}
}
}
func isRetryableError(errMsg string) bool {
retryablePatterns := []string{
"timeout",
"connection",
"temporary",
"unavailable",
}
errLower := strings.ToLower(errMsg)
for _, pattern := range retryablePatterns {
if strings.Contains(errLower, pattern) {
return true
}
}
return false
}
Worker Error Handling¶
Catching Panics¶
func processTaskSafely(task *pb.Message) *pb.Message {
defer func() {
if r := recover(); r != nil {
log.Printf("Task %s panicked: %v", task.Id, r)
log.Printf("Stack trace: %s", debug.Stack())
}
}()
return processTask(task)
}
Timeout Protection¶
func processTaskWithTimeout(task *pb.Message, timeout time.Duration) *pb.Message {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
resultChan := make(chan *pb.Message, 1)
errChan := make(chan error, 1)
go func() {
result, err := doComputation(task)
if err != nil {
errChan <- err
return
}
resultChan <- result
}()
select {
case result := <-resultChan:
return result
case err := <-errChan:
return &pb.Message{
Id: task.Id,
Error: proto.String(err.Error()),
}
case <-ctx.Done():
return &pb.Message{
Id: task.Id,
Error: proto.String(fmt.Sprintf("Task timeout after %v", timeout)),
}
}
}
Error Result Messages¶
func processTask(task *pb.Message) *pb.Message {
// Parse task
var computeTask ComputeTask
if err := json.Unmarshal(task.Payload, &computeTask); err != nil {
return &pb.Message{
Id: task.Id,
Error: proto.String(fmt.Sprintf("Parse error: %v", err)),
}
}
// Validate task
if err := validateTask(computeTask); err != nil {
return &pb.Message{
Id: task.Id,
Error: proto.String(fmt.Sprintf("Validation error: %v", err)),
}
}
// Process task
result, err := compute(computeTask)
if err != nil {
return &pb.Message{
Id: task.Id,
Error: proto.String(fmt.Sprintf("Computation error: %v", err)),
}
}
// Serialize result
resultPayload, err := json.Marshal(result)
if err != nil {
return &pb.Message{
Id: task.Id,
Error: proto.String(fmt.Sprintf("Serialization error: %v", err)),
}
}
return &pb.Message{
Id: task.Id,
Payload: resultPayload,
RelatedId: proto.String(task.Id),
}
}
Stream Error Handling¶
Reconnection Logic¶
func runWorkerWithReconnect(ctx context.Context, queueAddr string) {
for {
err := runWorker(ctx, queueAddr)
if err == nil {
break
}
log.Printf("Worker error: %v", err)
log.Println("Reconnecting in 5 seconds...")
time.Sleep(5 * time.Second)
}
}
func runWorker(ctx context.Context, queueAddr string) error {
// Connect
queueConn, queueClient, err := grpccon.ConnectToQueue(queueAddr, "", false)
if err != nil {
return fmt.Errorf("connection failed: %w", err)
}
defer queueConn.Close()
// Run worker loop
return workerLoop(ctx, queueClient)
}
Stream Health Checks¶
func monitorStream(stream grpc.ClientStream) <-chan error {
errChan := make(chan error, 1)
go func() {
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
// Check stream state
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := stream.Context().Err(); err != nil {
errChan <- fmt.Errorf("stream unhealthy: %w", err)
return
}
case <-stream.Context().Done():
errChan <- stream.Context().Err()
return
}
}
}()
return errChan
}
Client Error Handling¶
Connection Failures¶
func connectWithRetry(addr, caCert string, useTLS bool, maxRetries int) (
*grpc.ClientConn, pb.QueueClient, error) {
var conn *grpc.ClientConn
var client pb.QueueClient
var lastErr error
for attempt := 0; attempt < maxRetries; attempt++ {
conn, client, err := grpccon.ConnectToQueue(addr, caCert, useTLS)
if err == nil {
// Verify connection is ready
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
if err := grpccon.BlockUntilReady(ctx, conn, 10*time.Second); err == nil {
return conn, client, nil
}
conn.Close()
lastErr = err
} else {
lastErr = err
}
backoff := time.Duration(1<<uint(attempt)) * time.Second
log.Printf("Connection attempt %d failed, retrying in %v", attempt+1, backoff)
time.Sleep(backoff)
}
return nil, nil, fmt.Errorf("failed after %d attempts: %w", maxRetries, lastErr)
}
Session Errors¶
func submitWithErrorHandling(ctx context.Context, stream pb.Queue_ClientStreamClient,
session *pb.Session, tasks []*pb.Message) error {
for _, task := range tasks {
batch := &pb.Batch{
SessionId: session.Id,
Messages: []*pb.Message{task},
}
err := stream.Send(batch)
if err != nil {
st, ok := status.FromError(err)
if !ok {
return fmt.Errorf("send failed: %w", err)
}
switch st.Code() {
case codes.NotFound:
return fmt.Errorf("session not found: %s", session.Id)
case codes.InvalidArgument:
log.Printf("Invalid task %s: %s", task.Id, st.Message())
continue // Skip this task
case codes.ResourceExhausted:
log.Println("Queue full, backing off...")
time.Sleep(1 * time.Second)
// Retry this task
// ... retry logic ...
default:
return fmt.Errorf("send error: %w", err)
}
}
}
return nil
}
Monitoring and Alerting¶
Error Rate Monitoring¶
# High error rate alert
rate(grpc_server_handled_total{grpc_code!="OK"}[5m]) > 10
# Dead letter growth
rate(alpcrun_queue_deadletter_depth[5m]) > 5
Logging Best Practices¶
import "log/slog"
// Structured error logging
slog.Error("Task processing failed",
"task_id", task.Id,
"session_id", sessionID,
"worker_id", workerID,
"error", err,
"retries", retryCount)
// Context-aware logging
logger := slog.With(
"session_id", sessionID,
"worker_id", workerID)
logger.Error("Failed to process batch",
"batch_id", batch.Id,
"error", err)
Best Practices¶
- Always check errors: Never ignore return values
- Use dead letter queues: Don't silently drop failed tasks
- Implement retries: Transient failures are common
- Set timeouts: Prevent hanging operations
- Log errors: Include context for debugging
- Monitor metrics: Track error rates
- Graceful degradation: Handle partial failures
- Test error paths: Verify error handling works
Next Steps¶
- Writing Clients: Client implementation patterns
- Writing Services: Worker error handling
- API Reference: Error codes reference
- Configuration: Timeout settings