Skip to content

Simple Client Example

A complete, runnable example of a basic ALPCRUN.CH client.

Overview

This example demonstrates:

  • Connecting to the queue manager
  • Creating a session
  • Submitting tasks
  • Receiving results
  • Proper cleanup

Complete Code

package main

import (
    "context"
    "encoding/json"
    "fmt"
    "io"
    "log"
    "os"
    "sync/atomic"
    "time"

    pb "github.com/limelabs/metalcore-neo/api/v1"
    "github.com/limelabs/metalcore-neo/pkg/grpccon"
    "google.golang.org/protobuf/proto"
    "google.golang.org/protobuf/types/known/durationpb"
)

// Task definition
type ComputeTask struct {
    ID    string  `json:"id"`
    Value float64 `json:"value"`
}

// Result definition
type ComputeResult struct {
    ID     string  `json:"id"`
    Result float64 `json:"result"`
}

func main() {
    // Setup
    ctx := context.Background()

    queueAddr := getEnvOrDefault("METALCORE_QUEUE_MANAGER_ADDR", "localhost:1337")
    caCert := getEnvOrDefault("METALCORE_CA_CERT", "")
    useTLS := caCert != ""

    log.Printf("Connecting to queue manager at %s (TLS: %v)", queueAddr, useTLS)

    // Connect to queue manager
    queueConn, queueClient, err := grpccon.ConnectToQueue(queueAddr, caCert, useTLS)
    if err != nil {
        log.Fatalf("Failed to connect: %v", err)
    }
    defer queueConn.Close()

    // Wait for connection to be ready
    if err := grpccon.BlockUntilReady(ctx, queueConn, 10*time.Second); err != nil {
        log.Fatalf("Connection not ready: %v", err)
    }

    log.Println("Connected successfully")

    // Create session
    session, err := createSession(ctx, queueClient)
    if err != nil {
        log.Fatalf("Failed to create session: %v", err)
    }
    defer closeSession(ctx, queueClient, session)

    log.Printf("Session created: %s", session.Id)

    // Open bidirectional stream
    stream, err := queueClient.ClientStream(ctx)
    if err != nil {
        log.Fatalf("Failed to open stream: %v", err)
    }

    // Generate tasks
    numTasks := 100
    tasks := generateTasks(numTasks)

    log.Printf("Submitting %d tasks...", len(tasks))

    // Start receiving results in background
    resultsChan := make(chan *pb.Message, numTasks)
    errorChan := make(chan error, 1)

    go receiveResults(stream, resultsChan, errorChan)

    // Send tasks
    if err := sendTasks(stream, session.Id, tasks); err != nil {
        log.Fatalf("Failed to send tasks: %v", err)
    }

    log.Println("All tasks submitted, waiting for results...")

    // Collect and process results
    results := collectResults(resultsChan, errorChan, numTasks)

    // Display statistics
    displayStats(results)

    log.Println("All tasks completed successfully!")
}

func createSession(ctx context.Context, client pb.QueueClient) (*pb.Session, error) {
    session, err := client.CreateSession(ctx, &pb.Session{
        App: &pb.App{
            Id: "simple-client-example",
        },
        Attributes: &pb.SessionAttributes{
            Priority:                proto.Uint32(5),
            IdleTimeout:             durationpb.New(60 * time.Second),
            MinWorkers:              proto.Uint32(1),
            MaxWorkers:              proto.Uint32(10),
            CleanupOnClientExit:     proto.Bool(true),
            MaxInflightBatchesPerWorker: proto.Uint32(5),
            Tags:                    []string{"example", "simple"},
        },
    })

    return session, err
}

func closeSession(ctx context.Context, client pb.QueueClient, session *pb.Session) {
    log.Println("Closing session...")
    _, err := client.CloseSession(ctx, session)
    if err != nil {
        log.Printf("Error closing session: %v", err)
    } else {
        log.Println("Session closed")
    }
}

func generateTasks(count int) []ComputeTask {
    tasks := make([]ComputeTask, count)
    for i := 0; i < count; i++ {
        tasks[i] = ComputeTask{
            ID:    fmt.Sprintf("task-%04d", i),
            Value: float64(i) * 1.5,
        }
    }
    return tasks
}

func sendTasks(stream pb.Queue_ClientStreamClient, sessionID string, tasks []ComputeTask) error {
    for _, task := range tasks {
        // Serialize task
        payload, err := json.Marshal(task)
        if err != nil {
            return fmt.Errorf("failed to marshal task %s: %w", task.ID, err)
        }

        // Create message
        msg := &pb.Message{
            Id:      task.ID,
            Payload: payload,
        }

        // Create batch (one task per batch for simplicity)
        batch := &pb.Batch{
            Id:        task.ID,
            SessionId: sessionID,
            Priority:  0,
            Messages:  []*pb.Message{msg},
        }

        // Send batch
        if err := stream.Send(batch); err != nil {
            return fmt.Errorf("failed to send task %s: %w", task.ID, err)
        }
    }

    return nil
}

func receiveResults(stream pb.Queue_ClientStreamClient, resultsChan chan<- *pb.Message, errorChan chan<- error) {
    for {
        batch, err := stream.Recv()
        if err == io.EOF {
            close(resultsChan)
            return
        }
        if err != nil {
            errorChan <- err
            close(resultsChan)
            return
        }

        // Send each result to channel
        for _, msg := range batch.Messages {
            resultsChan <- msg
        }
    }
}

func collectResults(resultsChan <-chan *pb.Message, errorChan <-chan error, expected int) []ComputeResult {
    results := make([]ComputeResult, 0, expected)
    received := 0

    ticker := time.NewTicker(5 * time.Second)
    defer ticker.Stop()

    for received < expected {
        select {
        case msg, ok := <-resultsChan:
            if !ok {
                log.Printf("Results channel closed, received %d/%d", received, expected)
                return results
            }

            // Parse result
            var result ComputeResult
            if err := json.Unmarshal(msg.Payload, &result); err != nil {
                log.Printf("Failed to parse result %s: %v", msg.Id, err)
                continue
            }

            results = append(results, result)
            received++

        case err := <-errorChan:
            log.Printf("Error receiving results: %v", err)
            return results

        case <-ticker.C:
            log.Printf("Progress: %d/%d results received", received, expected)
        }
    }

    return results
}

func displayStats(results []ComputeResult) {
    if len(results) == 0 {
        log.Println("No results to display")
        return
    }

    sum := 0.0
    for _, result := range results {
        sum += result.Result
    }

    avg := sum / float64(len(results))

    log.Printf("Statistics:")
    log.Printf("  Total results: %d", len(results))
    log.Printf("  Sum: %.2f", sum)
    log.Printf("  Average: %.2f", avg)
}

func getEnvOrDefault(key, defaultValue string) string {
    if value := os.Getenv(key); value != "" {
        return value
    }
    return defaultValue
}

Running the Example

Prerequisites

  1. ALPCRUN.CH services running (queue manager, cache, workers)
  2. Go 1.21+ installed
  3. API key configured

Setup

# Set required environment variables
export METALCORE_API_KEY="your-secret-key"

# Optional: if using TLS
export METALCORE_CA_CERT="/path/to/ca.crt"

# Optional: if queue manager is not on localhost
export METALCORE_QUEUE_MANAGER_ADDR="queue-manager:1337"

Build and Run

# Initialize Go module
go mod init simple-client
go mod tidy

# Run
go run main.go

Expected Output

2025/01/15 10:30:00 Connecting to queue manager at localhost:1337 (TLS: false)
2025/01/15 10:30:00 Connected successfully
2025/01/15 10:30:00 Session created: 01HZXY...
2025/01/15 10:30:00 Submitting 100 tasks...
2025/01/15 10:30:00 All tasks submitted, waiting for results...
2025/01/15 10:30:05 Progress: 47/100 results received
2025/01/15 10:30:10 Progress: 100/100 results received
2025/01/15 10:30:10 Statistics:
2025/01/15 10:30:10   Total results: 100
2025/01/15 10:30:10   Sum: 7425.00
2025/01/15 10:30:10   Average: 74.25
2025/01/15 10:30:10 All tasks completed successfully!
2025/01/15 10:30:10 Closing session...
2025/01/15 10:30:10 Session closed

Variations

Batch Multiple Tasks

Modify sendTasks to send multiple tasks per batch:

func sendTasksBatched(stream pb.Queue_ClientStreamClient, sessionID string, tasks []ComputeTask, batchSize int) error {
    messages := []*pb.Message{}

    for i, task := range tasks {
        payload, _ := json.Marshal(task)
        messages = append(messages, &pb.Message{
            Id:      task.ID,
            Payload: payload,
        })

        // Send when batch is full or last task
        if len(messages) >= batchSize || i == len(tasks)-1 {
            batch := &pb.Batch{
                Id:        fmt.Sprintf("batch-%d", i/batchSize),
                SessionId: sessionID,
                Messages:  messages,
            }

            if err := stream.Send(batch); err != nil {
                return err
            }

            messages = []*pb.Message{}
        }
    }

    return nil
}

Add Error Handling

Check for dead letter messages:

func checkDeadLetters(ctx context.Context, client pb.QueueClient, session *pb.Session) {
    stream, err := client.ClientStreamDeadLetters(ctx, session)
    if err != nil {
        log.Printf("Failed to open dead letter stream: %v", err)
        return
    }

    for {
        batch, err := stream.Recv()
        if err == io.EOF {
            break
        }
        if err != nil {
            log.Printf("Error reading dead letters: %v", err)
            break
        }

        for _, msg := range batch.Messages {
            log.Printf("Failed task %s: %s", msg.Id, msg.GetError())
        }
    }
}

// Call before closing session
checkDeadLetters(ctx, queueClient, session)

Add Progress Bar

Using a progress library:

import "github.com/schollz/progressbar/v3"

func collectResultsWithProgress(resultsChan <-chan *pb.Message, errorChan <-chan error, expected int) []ComputeResult {
    results := make([]ComputeResult, 0, expected)
    bar := progressbar.Default(int64(expected))

    for received := 0; received < expected; {
        select {
        case msg, ok := <-resultsChan:
            if !ok {
                return results
            }

            var result ComputeResult
            json.Unmarshal(msg.Payload, &result)
            results = append(results, result)
            bar.Add(1)
            received++

        case err := <-errorChan:
            log.Printf("Error: %v", err)
            return results
        }
    }

    return results
}

Next Steps