Writing Clients¶
This guide shows you how to write client applications that submit workloads to ALPCRUN.CH.
Overview¶
A client application in ALPCRUN.CH:
- Connects to the Queue Manager
- Creates a session with configuration
- Optionally uploads shared data to the cache
- Opens a bidirectional stream
- Sends task batches
- Receives result batches
- Closes the session
Basic Client Structure¶
Step 1: Import Required Packages¶
package main
import (
"context"
"log"
"time"
pb "github.com/limelabs/metalcore-neo/api/v1"
"github.com/limelabs/metalcore-neo/pkg/grpccon"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/durationpb"
)
Step 2: Connect to Queue Manager¶
func main() {
ctx := context.Background()
// Connect to queue manager
queueConn, queueClient, err := grpccon.ConnectToQueue(
"queue-manager.alpcrun.svc.cluster.local:1337",
"/certs/ca.crt", // CA certificate path
true) // TLS enabled
if err != nil {
log.Fatalf("Failed to connect: %v", err)
}
defer queueConn.Close()
// Wait for connection to be ready
if err := grpccon.BlockUntilReady(ctx, queueConn, 10*time.Second); err != nil {
log.Fatalf("Connection not ready: %v", err)
}
log.Println("Connected to queue manager")
}
Connection Parameters:
- Address: Queue manager host:port
- CA Certificate: Path to CA cert for TLS (empty string if TLS disabled)
- TLS Enabled: Boolean flag
Step 3: Create a Session¶
session, err := queueClient.CreateSession(ctx, &pb.Session{
App: &pb.App{
Id: "monte-carlo-simulation",
},
Attributes: &pb.SessionAttributes{
ClientId: "", // Leave empty for auto-generation
Priority: 5, // 0-10, higher = more important
IdleTimeout: durationpb.New(60 * time.Second),
Tags: []string{"simulation", "monte-carlo"},
// Worker configuration
MinWorkers: proto.Uint32(10),
MaxWorkers: proto.Uint32(100),
// Flow control
MaxInflightBatchesPerWorker: proto.Uint32(5),
// Cleanup
CleanupOnClientExit: proto.Bool(true),
},
})
if err != nil {
log.Fatalf("Failed to create session: %v", err)
}
log.Printf("Session created: %s", session.Id)
Session Attributes Explained:
- Priority: Higher priority sessions get workers first (0-10 range)
- IdleTimeout: How long to keep session alive without activity
- Tags: Metadata for filtering and monitoring
- MinWorkers: Minimum workers to bind (scheduling hint)
- MaxWorkers: Maximum workers to bind (hard limit)
- MaxInflightBatchesPerWorker: Flow control per worker
- CleanupOnClientExit: Auto-cleanup queues on disconnect
Step 4: Open Bidirectional Stream¶
stream, err := queueClient.ClientStream(ctx)
if err != nil {
log.Fatalf("Failed to create stream: %v", err)
}
log.Println("Stream opened")
Step 5: Send Task Batches¶
// Define your task structure (example)
type ComputeTask struct {
TaskID string
InputData []float64
Iterations int
}
// Create tasks
tasks := []ComputeTask{
{TaskID: "task-1", InputData: []float64{1.0, 2.0, 3.0}, Iterations: 1000},
{TaskID: "task-2", InputData: []float64{4.0, 5.0, 6.0}, Iterations: 1000},
// ... more tasks
}
// Send tasks as batches
for _, task := range tasks {
// Serialize your task (using protobuf, JSON, or any format)
taskPayload, err := json.Marshal(task)
if err != nil {
log.Printf("Failed to serialize task: %v", err)
continue
}
// Create message
message := &pb.Message{
Id: task.TaskID,
Payload: taskPayload,
}
// Create batch
batch := &pb.Batch{
Id: task.TaskID, // Batch ID for tracking
SessionId: session.Id,
Priority: 0, // Per-batch priority (optional)
Messages: []*pb.Message{message},
}
// Send batch
if err := stream.Send(batch); err != nil {
log.Printf("Failed to send batch: %v", err)
break
}
}
log.Printf("Sent %d tasks", len(tasks))
Batching Strategies:
Single Task per Batch (simple):
for _, task := range tasks {
batch := &pb.Batch{
Messages: []*pb.Message{{Id: task.ID, Payload: task.Data}},
}
stream.Send(batch)
}
Multiple Tasks per Batch (efficient):
const batchSize = 100
messages := []*pb.Message{}
for i, task := range tasks {
messages = append(messages, &pb.Message{
Id: task.ID,
Payload: task.Data,
})
// Send when batch is full or last task
if len(messages) >= batchSize || i == len(tasks)-1 {
batch := &pb.Batch{Messages: messages, SessionId: session.Id}
stream.Send(batch)
messages = []*pb.Message{}
}
}
Step 6: Receive Result Batches¶
// Receive results in a goroutine
resultChan := make(chan *pb.Batch, 100)
errorChan := make(chan error, 1)
go func() {
for {
batch, err := stream.Recv()
if err != nil {
errorChan <- err
return
}
resultChan <- batch
}
}()
// Process results
receivedResults := 0
expectedResults := len(tasks)
for receivedResults < expectedResults {
select {
case batch := <-resultChan:
for _, result := range batch.Messages {
// Deserialize result
var taskResult ComputeTask
if err := json.Unmarshal(result.Payload, &taskResult); err != nil {
log.Printf("Failed to parse result: %v", err)
continue
}
log.Printf("Result received: %s", taskResult.TaskID)
receivedResults++
}
case err := <-errorChan:
if err == io.EOF {
log.Println("Stream closed")
} else {
log.Printf("Stream error: %v", err)
}
break
case <-time.After(5 * time.Minute):
log.Println("Timeout waiting for results")
break
}
}
log.Printf("Received %d/%d results", receivedResults, expectedResults)
Step 7: Handle Dead Letter Queue (Optional)¶
// Retrieve failed tasks
deadLetterStream, err := queueClient.ClientStreamDeadLetters(&pb.Session{Id: session.Id})
if err != nil {
log.Printf("Failed to open dead letter stream: %v", err)
} else {
for {
batch, err := deadLetterStream.Recv()
if err == io.EOF {
break
}
if err != nil {
log.Printf("Dead letter stream error: %v", err)
break
}
for _, msg := range batch.Messages {
log.Printf("Failed task: %s, Error: %s", msg.Id, msg.Error)
// Optionally retry
retryBatch := &pb.Batch{
Id: msg.Id + "-retry",
SessionId: session.Id,
Messages: []*pb.Message{{Id: msg.Id, Payload: msg.Payload}},
}
stream.Send(retryBatch)
}
}
}
Step 8: Close Session¶
// Close the stream
stream.CloseSend()
// Close the session
_, err = queueClient.CloseSession(ctx, session)
if err != nil {
log.Printf("Failed to close session: %v", err)
} else {
log.Println("Session closed successfully")
}
Using Shared Data¶
When you have data that all workers need (e.g., model parameters, lookup tables):
Upload Shared Data¶
// Connect to cache
cacheConn, cacheClient, err := grpccon.ConnectToCache(
"central-cache.alpcrun.svc.cluster.local:2337",
"/certs/ca.crt",
true)
if err != nil {
log.Fatalf("Failed to connect to cache: %v", err)
}
defer cacheConn.Close()
// Prepare shared data
sharedData := MySharedData{
ModelWeights: weights,
LookupTable: table,
}
sharedPayload, _ := json.Marshal(sharedData)
// Upload to cache
updateLevel := uint32(1)
_, err = cacheClient.SetSharedData(ctx, &pb.SharedData{
SessionId: session.Id,
UpdateLevel: &updateLevel,
Payload: sharedPayload,
Ttl: durationpb.New(10 * time.Minute),
})
if err != nil {
log.Fatalf("Failed to set shared data: %v", err)
}
log.Println("Shared data uploaded")
Send Tasks with Shared Data Reference¶
batch := &pb.Batch{
Id: "batch-1",
SessionId: session.Id,
UseSharedData: proto.Bool(true),
RequiredSharedDataUpdateLevel: &updateLevel,
Messages: messages,
}
stream.Send(batch)
Workers will automatically fetch the shared data from the cache.
Update Shared Data¶
// Prepare updated data
newUpdateLevel := uint32(2)
updatedPayload, _ := json.Marshal(updatedSharedData)
// Update in cache
_, err = cacheClient.UpdateSharedData(ctx, &pb.SharedData{
SessionId: session.Id,
UpdateLevel: &newUpdateLevel,
Payload: updatedPayload,
Ttl: durationpb.New(10 * time.Minute),
})
// Send new tasks with updated version
batch := &pb.Batch{
UseSharedData: proto.Bool(true),
RequiredSharedDataUpdateLevel: &newUpdateLevel,
// ...
}
Delete Shared Data¶
// When done, clean up shared data
_, err = cacheClient.DeleteSharedData(ctx, &pb.SharedData{
SessionId: session.Id,
})
Advanced Patterns¶
Multiple Sessions¶
Run multiple independent workloads:
sessions := make([]*pb.Session, 10)
for i := 0; i < 10; i++ {
session, _ := queueClient.CreateSession(ctx, &pb.Session{
App: &pb.App{Id: fmt.Sprintf("app-%d", i)},
})
sessions[i] = session
// Launch goroutine to handle each session
go handleSession(session)
}
Progress Tracking¶
Track completion percentage:
var (
totalTasks = len(tasks)
completedTasks atomic.Int32
)
// In result processing loop
completedTasks.Add(1)
progress := float64(completedTasks.Load()) / float64(totalTasks) * 100
log.Printf("Progress: %.2f%%", progress)
Dynamic Task Generation¶
Generate tasks based on results:
for {
select {
case batch := <-resultChan:
for _, result := range batch.Messages {
// Process result
processedResult := processResult(result)
// Generate new tasks based on result
if needsMoreWork(processedResult) {
newTasks := generateTasks(processedResult)
for _, task := range newTasks {
sendTask(stream, task)
}
}
}
}
}
Error Handling and Retries¶
const maxRetries = 3
retryCount := make(map[string]int)
// In dead letter processing
for _, msg := range batch.Messages {
retries := retryCount[msg.Id]
if retries < maxRetries {
log.Printf("Retrying task %s (attempt %d)", msg.Id, retries+1)
retryCount[msg.Id]++
// Resend task
stream.Send(&pb.Batch{
SessionId: session.Id,
Messages: []*pb.Message{{Id: msg.Id, Payload: msg.Payload}},
})
} else {
log.Printf("Task %s failed after %d retries: %s",
msg.Id, maxRetries, msg.Error)
}
}
Complete Example¶
See Simple Client Example for a complete, runnable client application.
Best Practices¶
- Connection Management:
- Reuse connections across sessions
- Implement exponential backoff for reconnection
-
Use
BlockUntilReadyto ensure connection health -
Batching:
- Batch multiple tasks for better throughput
- Balance batch size (50-1000 tasks typically optimal)
-
Don't make batches too large (memory overhead)
-
Flow Control:
- Set
MaxInflightBatchesPerWorkerappropriately - Monitor backpressure on the stream
-
Adjust sending rate based on result rate
-
Session Configuration:
- Use
CleanupOnClientExit: truefor automatic cleanup - Set reasonable
IdleTimeoutvalues -
Adjust
MinWorkers/MaxWorkersbased on workload -
Error Handling:
- Always check dead letter queue
- Implement retry logic with exponential backoff
-
Log all errors for debugging
-
Shared Data:
- Use shared data for large common datasets
- Keep update levels sequential
- Set appropriate TTLs (slightly longer than session duration)
Next Steps¶
- Writing Services: Create worker services
- Submitting Workloads: Advanced workload patterns
- API Reference: Complete API documentation
- Examples: More code examples