Skip to content

Writing Clients

This guide shows you how to write client applications that submit workloads to ALPCRUN.CH.

Overview

A client application in ALPCRUN.CH:

  1. Connects to the Queue Manager
  2. Creates a session with configuration
  3. Optionally uploads shared data to the cache
  4. Opens a bidirectional stream
  5. Sends task batches
  6. Receives result batches
  7. Closes the session

Basic Client Structure

Step 1: Import Required Packages

package main

import (
    "context"
    "log"
    "time"

    pb "github.com/limelabs/metalcore-neo/api/v1"
    "github.com/limelabs/metalcore-neo/pkg/grpccon"
    "google.golang.org/protobuf/proto"
    "google.golang.org/protobuf/types/known/durationpb"
)

Step 2: Connect to Queue Manager

func main() {
    ctx := context.Background()

    // Connect to queue manager
    queueConn, queueClient, err := grpccon.ConnectToQueue(
        "queue-manager.alpcrun.svc.cluster.local:1337",
        "/certs/ca.crt",  // CA certificate path
        true)             // TLS enabled
    if err != nil {
        log.Fatalf("Failed to connect: %v", err)
    }
    defer queueConn.Close()

    // Wait for connection to be ready
    if err := grpccon.BlockUntilReady(ctx, queueConn, 10*time.Second); err != nil {
        log.Fatalf("Connection not ready: %v", err)
    }

    log.Println("Connected to queue manager")
}

Connection Parameters:

  • Address: Queue manager host:port
  • CA Certificate: Path to CA cert for TLS (empty string if TLS disabled)
  • TLS Enabled: Boolean flag

Step 3: Create a Session

session, err := queueClient.CreateSession(ctx, &pb.Session{
    App: &pb.App{
        Id: "monte-carlo-simulation",
    },
    Attributes: &pb.SessionAttributes{
        ClientId:    "",  // Leave empty for auto-generation
        Priority:    5,   // 0-10, higher = more important
        IdleTimeout: durationpb.New(60 * time.Second),
        Tags:        []string{"simulation", "monte-carlo"},

        // Worker configuration
        MinWorkers: proto.Uint32(10),
        MaxWorkers: proto.Uint32(100),

        // Flow control
        MaxInflightBatchesPerWorker: proto.Uint32(5),

        // Cleanup
        CleanupOnClientExit: proto.Bool(true),
    },
})
if err != nil {
    log.Fatalf("Failed to create session: %v", err)
}

log.Printf("Session created: %s", session.Id)

Session Attributes Explained:

  • Priority: Higher priority sessions get workers first (0-10 range)
  • IdleTimeout: How long to keep session alive without activity
  • Tags: Metadata for filtering and monitoring
  • MinWorkers: Minimum workers to bind (scheduling hint)
  • MaxWorkers: Maximum workers to bind (hard limit)
  • MaxInflightBatchesPerWorker: Flow control per worker
  • CleanupOnClientExit: Auto-cleanup queues on disconnect

Step 4: Open Bidirectional Stream

stream, err := queueClient.ClientStream(ctx)
if err != nil {
    log.Fatalf("Failed to create stream: %v", err)
}

log.Println("Stream opened")

Step 5: Send Task Batches

// Define your task structure (example)
type ComputeTask struct {
    TaskID    string
    InputData []float64
    Iterations int
}

// Create tasks
tasks := []ComputeTask{
    {TaskID: "task-1", InputData: []float64{1.0, 2.0, 3.0}, Iterations: 1000},
    {TaskID: "task-2", InputData: []float64{4.0, 5.0, 6.0}, Iterations: 1000},
    // ... more tasks
}

// Send tasks as batches
for _, task := range tasks {
    // Serialize your task (using protobuf, JSON, or any format)
    taskPayload, err := json.Marshal(task)
    if err != nil {
        log.Printf("Failed to serialize task: %v", err)
        continue
    }

    // Create message
    message := &pb.Message{
        Id:      task.TaskID,
        Payload: taskPayload,
    }

    // Create batch
    batch := &pb.Batch{
        Id:        task.TaskID,  // Batch ID for tracking
        SessionId: session.Id,
        Priority:  0,  // Per-batch priority (optional)
        Messages:  []*pb.Message{message},
    }

    // Send batch
    if err := stream.Send(batch); err != nil {
        log.Printf("Failed to send batch: %v", err)
        break
    }
}

log.Printf("Sent %d tasks", len(tasks))

Batching Strategies:

Single Task per Batch (simple):

for _, task := range tasks {
    batch := &pb.Batch{
        Messages: []*pb.Message{{Id: task.ID, Payload: task.Data}},
    }
    stream.Send(batch)
}

Multiple Tasks per Batch (efficient):

const batchSize = 100
messages := []*pb.Message{}

for i, task := range tasks {
    messages = append(messages, &pb.Message{
        Id: task.ID,
        Payload: task.Data,
    })

    // Send when batch is full or last task
    if len(messages) >= batchSize || i == len(tasks)-1 {
        batch := &pb.Batch{Messages: messages, SessionId: session.Id}
        stream.Send(batch)
        messages = []*pb.Message{}
    }
}

Step 6: Receive Result Batches

// Receive results in a goroutine
resultChan := make(chan *pb.Batch, 100)
errorChan := make(chan error, 1)

go func() {
    for {
        batch, err := stream.Recv()
        if err != nil {
            errorChan <- err
            return
        }
        resultChan <- batch
    }
}()

// Process results
receivedResults := 0
expectedResults := len(tasks)

for receivedResults < expectedResults {
    select {
    case batch := <-resultChan:
        for _, result := range batch.Messages {
            // Deserialize result
            var taskResult ComputeTask
            if err := json.Unmarshal(result.Payload, &taskResult); err != nil {
                log.Printf("Failed to parse result: %v", err)
                continue
            }

            log.Printf("Result received: %s", taskResult.TaskID)
            receivedResults++
        }

    case err := <-errorChan:
        if err == io.EOF {
            log.Println("Stream closed")
        } else {
            log.Printf("Stream error: %v", err)
        }
        break

    case <-time.After(5 * time.Minute):
        log.Println("Timeout waiting for results")
        break
    }
}

log.Printf("Received %d/%d results", receivedResults, expectedResults)

Step 7: Handle Dead Letter Queue (Optional)

// Retrieve failed tasks
deadLetterStream, err := queueClient.ClientStreamDeadLetters(&pb.Session{Id: session.Id})
if err != nil {
    log.Printf("Failed to open dead letter stream: %v", err)
} else {
    for {
        batch, err := deadLetterStream.Recv()
        if err == io.EOF {
            break
        }
        if err != nil {
            log.Printf("Dead letter stream error: %v", err)
            break
        }

        for _, msg := range batch.Messages {
            log.Printf("Failed task: %s, Error: %s", msg.Id, msg.Error)

            // Optionally retry
            retryBatch := &pb.Batch{
                Id:        msg.Id + "-retry",
                SessionId: session.Id,
                Messages:  []*pb.Message{{Id: msg.Id, Payload: msg.Payload}},
            }
            stream.Send(retryBatch)
        }
    }
}

Step 8: Close Session

// Close the stream
stream.CloseSend()

// Close the session
_, err = queueClient.CloseSession(ctx, session)
if err != nil {
    log.Printf("Failed to close session: %v", err)
} else {
    log.Println("Session closed successfully")
}

Using Shared Data

When you have data that all workers need (e.g., model parameters, lookup tables):

Upload Shared Data

// Connect to cache
cacheConn, cacheClient, err := grpccon.ConnectToCache(
    "central-cache.alpcrun.svc.cluster.local:2337",
    "/certs/ca.crt",
    true)
if err != nil {
    log.Fatalf("Failed to connect to cache: %v", err)
}
defer cacheConn.Close()

// Prepare shared data
sharedData := MySharedData{
    ModelWeights: weights,
    LookupTable:  table,
}
sharedPayload, _ := json.Marshal(sharedData)

// Upload to cache
updateLevel := uint32(1)
_, err = cacheClient.SetSharedData(ctx, &pb.SharedData{
    SessionId:   session.Id,
    UpdateLevel: &updateLevel,
    Payload:     sharedPayload,
    Ttl:         durationpb.New(10 * time.Minute),
})
if err != nil {
    log.Fatalf("Failed to set shared data: %v", err)
}

log.Println("Shared data uploaded")

Send Tasks with Shared Data Reference

batch := &pb.Batch{
    Id:                             "batch-1",
    SessionId:                      session.Id,
    UseSharedData:                  proto.Bool(true),
    RequiredSharedDataUpdateLevel:  &updateLevel,
    Messages:                       messages,
}
stream.Send(batch)

Workers will automatically fetch the shared data from the cache.

Update Shared Data

// Prepare updated data
newUpdateLevel := uint32(2)
updatedPayload, _ := json.Marshal(updatedSharedData)

// Update in cache
_, err = cacheClient.UpdateSharedData(ctx, &pb.SharedData{
    SessionId:   session.Id,
    UpdateLevel: &newUpdateLevel,
    Payload:     updatedPayload,
    Ttl:         durationpb.New(10 * time.Minute),
})

// Send new tasks with updated version
batch := &pb.Batch{
    UseSharedData:                 proto.Bool(true),
    RequiredSharedDataUpdateLevel: &newUpdateLevel,
    // ...
}

Delete Shared Data

// When done, clean up shared data
_, err = cacheClient.DeleteSharedData(ctx, &pb.SharedData{
    SessionId: session.Id,
})

Advanced Patterns

Multiple Sessions

Run multiple independent workloads:

sessions := make([]*pb.Session, 10)
for i := 0; i < 10; i++ {
    session, _ := queueClient.CreateSession(ctx, &pb.Session{
        App: &pb.App{Id: fmt.Sprintf("app-%d", i)},
    })
    sessions[i] = session

    // Launch goroutine to handle each session
    go handleSession(session)
}

Progress Tracking

Track completion percentage:

var (
    totalTasks     = len(tasks)
    completedTasks atomic.Int32
)

// In result processing loop
completedTasks.Add(1)
progress := float64(completedTasks.Load()) / float64(totalTasks) * 100
log.Printf("Progress: %.2f%%", progress)

Dynamic Task Generation

Generate tasks based on results:

for {
    select {
    case batch := <-resultChan:
        for _, result := range batch.Messages {
            // Process result
            processedResult := processResult(result)

            // Generate new tasks based on result
            if needsMoreWork(processedResult) {
                newTasks := generateTasks(processedResult)
                for _, task := range newTasks {
                    sendTask(stream, task)
                }
            }
        }
    }
}

Error Handling and Retries

const maxRetries = 3
retryCount := make(map[string]int)

// In dead letter processing
for _, msg := range batch.Messages {
    retries := retryCount[msg.Id]

    if retries < maxRetries {
        log.Printf("Retrying task %s (attempt %d)", msg.Id, retries+1)
        retryCount[msg.Id]++

        // Resend task
        stream.Send(&pb.Batch{
            SessionId: session.Id,
            Messages:  []*pb.Message{{Id: msg.Id, Payload: msg.Payload}},
        })
    } else {
        log.Printf("Task %s failed after %d retries: %s",
            msg.Id, maxRetries, msg.Error)
    }
}

Complete Example

See Simple Client Example for a complete, runnable client application.

Best Practices

  1. Connection Management:
  2. Reuse connections across sessions
  3. Implement exponential backoff for reconnection
  4. Use BlockUntilReady to ensure connection health

  5. Batching:

  6. Batch multiple tasks for better throughput
  7. Balance batch size (50-1000 tasks typically optimal)
  8. Don't make batches too large (memory overhead)

  9. Flow Control:

  10. Set MaxInflightBatchesPerWorker appropriately
  11. Monitor backpressure on the stream
  12. Adjust sending rate based on result rate

  13. Session Configuration:

  14. Use CleanupOnClientExit: true for automatic cleanup
  15. Set reasonable IdleTimeout values
  16. Adjust MinWorkers/MaxWorkers based on workload

  17. Error Handling:

  18. Always check dead letter queue
  19. Implement retry logic with exponential backoff
  20. Log all errors for debugging

  21. Shared Data:

  22. Use shared data for large common datasets
  23. Keep update levels sequential
  24. Set appropriate TTLs (slightly longer than session duration)

Next Steps