Simple Client Example¶
A complete, runnable example of a basic ALPCRUN.CH client.
Overview¶
This example demonstrates:
- Connecting to the queue manager
- Creating a session
- Submitting tasks
- Receiving results
- Proper cleanup
Complete Code¶
package main
import (
"context"
"encoding/json"
"fmt"
"io"
"log"
"os"
"sync/atomic"
"time"
pb "github.com/limelabs/metalcore-neo/api/v1"
"github.com/limelabs/metalcore-neo/pkg/grpccon"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/durationpb"
)
// Task definition
type ComputeTask struct {
ID string `json:"id"`
Value float64 `json:"value"`
}
// Result definition
type ComputeResult struct {
ID string `json:"id"`
Result float64 `json:"result"`
}
func main() {
// Setup
ctx := context.Background()
queueAddr := getEnvOrDefault("METALCORE_QUEUE_MANAGER_ADDR", "localhost:1337")
caCert := getEnvOrDefault("METALCORE_CA_CERT", "")
useTLS := caCert != ""
log.Printf("Connecting to queue manager at %s (TLS: %v)", queueAddr, useTLS)
// Connect to queue manager
queueConn, queueClient, err := grpccon.ConnectToQueue(queueAddr, caCert, useTLS)
if err != nil {
log.Fatalf("Failed to connect: %v", err)
}
defer queueConn.Close()
// Wait for connection to be ready
if err := grpccon.BlockUntilReady(ctx, queueConn, 10*time.Second); err != nil {
log.Fatalf("Connection not ready: %v", err)
}
log.Println("Connected successfully")
// Create session
session, err := createSession(ctx, queueClient)
if err != nil {
log.Fatalf("Failed to create session: %v", err)
}
defer closeSession(ctx, queueClient, session)
log.Printf("Session created: %s", session.Id)
// Open bidirectional stream
stream, err := queueClient.ClientStream(ctx)
if err != nil {
log.Fatalf("Failed to open stream: %v", err)
}
// Generate tasks
numTasks := 100
tasks := generateTasks(numTasks)
log.Printf("Submitting %d tasks...", len(tasks))
// Start receiving results in background
resultsChan := make(chan *pb.Message, numTasks)
errorChan := make(chan error, 1)
go receiveResults(stream, resultsChan, errorChan)
// Send tasks
if err := sendTasks(stream, session.Id, tasks); err != nil {
log.Fatalf("Failed to send tasks: %v", err)
}
log.Println("All tasks submitted, waiting for results...")
// Collect and process results
results := collectResults(resultsChan, errorChan, numTasks)
// Display statistics
displayStats(results)
log.Println("All tasks completed successfully!")
}
func createSession(ctx context.Context, client pb.QueueClient) (*pb.Session, error) {
session, err := client.CreateSession(ctx, &pb.Session{
App: &pb.App{
Id: "simple-client-example",
},
Attributes: &pb.SessionAttributes{
Priority: proto.Uint32(5),
IdleTimeout: durationpb.New(60 * time.Second),
MinWorkers: proto.Uint32(1),
MaxWorkers: proto.Uint32(10),
CleanupOnClientExit: proto.Bool(true),
MaxInflightBatchesPerWorker: proto.Uint32(5),
Tags: []string{"example", "simple"},
},
})
return session, err
}
func closeSession(ctx context.Context, client pb.QueueClient, session *pb.Session) {
log.Println("Closing session...")
_, err := client.CloseSession(ctx, session)
if err != nil {
log.Printf("Error closing session: %v", err)
} else {
log.Println("Session closed")
}
}
func generateTasks(count int) []ComputeTask {
tasks := make([]ComputeTask, count)
for i := 0; i < count; i++ {
tasks[i] = ComputeTask{
ID: fmt.Sprintf("task-%04d", i),
Value: float64(i) * 1.5,
}
}
return tasks
}
func sendTasks(stream pb.Queue_ClientStreamClient, sessionID string, tasks []ComputeTask) error {
for _, task := range tasks {
// Serialize task
payload, err := json.Marshal(task)
if err != nil {
return fmt.Errorf("failed to marshal task %s: %w", task.ID, err)
}
// Create message
msg := &pb.Message{
Id: task.ID,
Payload: payload,
}
// Create batch (one task per batch for simplicity)
batch := &pb.Batch{
Id: task.ID,
SessionId: sessionID,
Priority: 0,
Messages: []*pb.Message{msg},
}
// Send batch
if err := stream.Send(batch); err != nil {
return fmt.Errorf("failed to send task %s: %w", task.ID, err)
}
}
return nil
}
func receiveResults(stream pb.Queue_ClientStreamClient, resultsChan chan<- *pb.Message, errorChan chan<- error) {
for {
batch, err := stream.Recv()
if err == io.EOF {
close(resultsChan)
return
}
if err != nil {
errorChan <- err
close(resultsChan)
return
}
// Send each result to channel
for _, msg := range batch.Messages {
resultsChan <- msg
}
}
}
func collectResults(resultsChan <-chan *pb.Message, errorChan <-chan error, expected int) []ComputeResult {
results := make([]ComputeResult, 0, expected)
received := 0
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for received < expected {
select {
case msg, ok := <-resultsChan:
if !ok {
log.Printf("Results channel closed, received %d/%d", received, expected)
return results
}
// Parse result
var result ComputeResult
if err := json.Unmarshal(msg.Payload, &result); err != nil {
log.Printf("Failed to parse result %s: %v", msg.Id, err)
continue
}
results = append(results, result)
received++
case err := <-errorChan:
log.Printf("Error receiving results: %v", err)
return results
case <-ticker.C:
log.Printf("Progress: %d/%d results received", received, expected)
}
}
return results
}
func displayStats(results []ComputeResult) {
if len(results) == 0 {
log.Println("No results to display")
return
}
sum := 0.0
for _, result := range results {
sum += result.Result
}
avg := sum / float64(len(results))
log.Printf("Statistics:")
log.Printf(" Total results: %d", len(results))
log.Printf(" Sum: %.2f", sum)
log.Printf(" Average: %.2f", avg)
}
func getEnvOrDefault(key, defaultValue string) string {
if value := os.Getenv(key); value != "" {
return value
}
return defaultValue
}
Running the Example¶
Prerequisites¶
- ALPCRUN.CH services running (queue manager, cache, workers)
- Go 1.21+ installed
- API key configured
Setup¶
# Set required environment variables
export METALCORE_API_KEY="your-secret-key"
# Optional: if using TLS
export METALCORE_CA_CERT="/path/to/ca.crt"
# Optional: if queue manager is not on localhost
export METALCORE_QUEUE_MANAGER_ADDR="queue-manager:1337"
Build and Run¶
# Initialize Go module
go mod init simple-client
go mod tidy
# Run
go run main.go
Expected Output¶
2025/01/15 10:30:00 Connecting to queue manager at localhost:1337 (TLS: false)
2025/01/15 10:30:00 Connected successfully
2025/01/15 10:30:00 Session created: 01HZXY...
2025/01/15 10:30:00 Submitting 100 tasks...
2025/01/15 10:30:00 All tasks submitted, waiting for results...
2025/01/15 10:30:05 Progress: 47/100 results received
2025/01/15 10:30:10 Progress: 100/100 results received
2025/01/15 10:30:10 Statistics:
2025/01/15 10:30:10 Total results: 100
2025/01/15 10:30:10 Sum: 7425.00
2025/01/15 10:30:10 Average: 74.25
2025/01/15 10:30:10 All tasks completed successfully!
2025/01/15 10:30:10 Closing session...
2025/01/15 10:30:10 Session closed
Variations¶
Batch Multiple Tasks¶
Modify sendTasks to send multiple tasks per batch:
func sendTasksBatched(stream pb.Queue_ClientStreamClient, sessionID string, tasks []ComputeTask, batchSize int) error {
messages := []*pb.Message{}
for i, task := range tasks {
payload, _ := json.Marshal(task)
messages = append(messages, &pb.Message{
Id: task.ID,
Payload: payload,
})
// Send when batch is full or last task
if len(messages) >= batchSize || i == len(tasks)-1 {
batch := &pb.Batch{
Id: fmt.Sprintf("batch-%d", i/batchSize),
SessionId: sessionID,
Messages: messages,
}
if err := stream.Send(batch); err != nil {
return err
}
messages = []*pb.Message{}
}
}
return nil
}
Add Error Handling¶
Check for dead letter messages:
func checkDeadLetters(ctx context.Context, client pb.QueueClient, session *pb.Session) {
stream, err := client.ClientStreamDeadLetters(ctx, session)
if err != nil {
log.Printf("Failed to open dead letter stream: %v", err)
return
}
for {
batch, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
log.Printf("Error reading dead letters: %v", err)
break
}
for _, msg := range batch.Messages {
log.Printf("Failed task %s: %s", msg.Id, msg.GetError())
}
}
}
// Call before closing session
checkDeadLetters(ctx, queueClient, session)
Add Progress Bar¶
Using a progress library:
import "github.com/schollz/progressbar/v3"
func collectResultsWithProgress(resultsChan <-chan *pb.Message, errorChan <-chan error, expected int) []ComputeResult {
results := make([]ComputeResult, 0, expected)
bar := progressbar.Default(int64(expected))
for received := 0; received < expected; {
select {
case msg, ok := <-resultsChan:
if !ok {
return results
}
var result ComputeResult
json.Unmarshal(msg.Payload, &result)
results = append(results, result)
bar.Add(1)
received++
case err := <-errorChan:
log.Printf("Error: %v", err)
return results
}
}
return results
}
Next Steps¶
- Worker Example: Implement a worker service
- Monte Carlo Example: Real-world application
- Writing Clients Guide: Advanced client patterns
- Submitting Workloads Guide: Workload strategies