Skip to content

Submitting Workloads

This guide covers advanced patterns and best practices for submitting workloads to ALPCRUN.CH.

Workload Patterns

Pattern 1: Embarrassingly Parallel

The simplest pattern - independent tasks with no dependencies.

Use Cases: Monte Carlo simulations, parameter sweeps, batch processing

Implementation:

// Generate all tasks upfront
tasks := generateTasks(10000)

// Send all tasks
for _, task := range tasks {
    batch := &pb.Batch{
        SessionId: session.Id,
        Messages:  []*pb.Message{{Id: task.ID, Payload: task.Data}},
    }
    stream.Send(batch)
}

// Collect all results
results := collectResults(stream, len(tasks))

Characteristics: - No shared state needed - Perfect scalability - Simple result aggregation

Pattern 2: MapReduce

Two-phase processing: map tasks followed by reduce.

Use Cases: Data aggregation, distributed sorting, word count

Implementation:

// Phase 1: Map
mapTasks := splitDataIntoChunks(data, 100)
for _, chunk := range mapTasks {
    batch := &pb.Batch{
        SessionId: session.Id,
        Priority:  1,  // Higher priority for map phase
        Messages:  []*pb.Message{{Id: chunk.ID, Payload: chunk.Data}},
    }
    stream.Send(batch)
}

// Collect map results
mapResults := collectResults(stream, len(mapTasks))

// Phase 2: Reduce
reduceTasks := groupMapResults(mapResults)
for _, group := range reduceTasks {
    batch := &pb.Batch{
        SessionId: session.Id,
        Priority:  0,
        Messages:  []*pb.Message{{Id: group.ID, Payload: group.Data}},
    }
    stream.Send(batch)
}

// Collect final results
finalResult := collectResults(stream, len(reduceTasks))

Optimization: Use two separate sessions for better control:

// Map session with many workers
mapSession := createSession(priority: 10, maxWorkers: 1000)

// Reduce session with fewer workers
reduceSession := createSession(priority: 5, maxWorkers: 10)

Pattern 3: Pipeline

Multi-stage processing where output of one stage feeds the next.

Use Cases: Image processing pipelines, data transformation chains

Implementation:

// Stage 1: Preprocessing
for _, input := range inputs {
    sendTask(stream, "preprocess", input)
}

stage1Results := make(map[string]*Result)

// As stage 1 completes, feed into stage 2
for result := range receiveResults(stream) {
    stage1Results[result.Id] = result

    // Send to stage 2
    stage2Task := prepareStage2(result)
    sendTask(stream, "process", stage2Task)
}

// Continue through stages...

Multi-Session Pipeline:

// Create sessions for each stage
preprocess := createSession("preprocess", priority: 10)
process := createSession("process", priority: 8)
postprocess := createSession("postprocess", priority: 6)

// Connect stages
go func() {
    for result := range stage1Results {
        sendTaskToSession(processSession, result)
    }
}()

go func() {
    for result := range stage2Results {
        sendTaskToSession(postprocessSession, result)
    }
}()

Pattern 4: Dynamic Task Generation

Generate new tasks based on results.

Use Cases: Tree search, adaptive sampling, iterative refinement

Implementation:

// Seed initial tasks
for _, seed := range seeds {
    sendTask(stream, seed)
}

totalTasks := len(seeds)
completedTasks := 0

for completedTasks < totalTasks {
    result := <-receiveResults(stream)
    completedTasks++

    // Analyze result and potentially generate more work
    if needsRefinement(result) {
        newTasks := generateRefinementTasks(result)
        for _, task := range newTasks {
            sendTask(stream, task)
            totalTasks++
        }
    }
}

With Depth Limit:

type Task struct {
    ID    string
    Data  []byte
    Depth int
}

const maxDepth = 5

for {
    result := <-resultsChan
    task := tasks[result.Id]

    if task.Depth < maxDepth && needsMore(result) {
        newTasks := expand(result)
        for _, newTask := range newTasks {
            newTask.Depth = task.Depth + 1
            sendTask(stream, newTask)
        }
    }
}

Pattern 5: Iterative Convergence

Repeat computation until convergence criteria met.

Use Cases: Optimization algorithms, machine learning training, numerical methods

Implementation:

iteration := 0
converged := false

for !converged && iteration < maxIterations {
    // Send tasks for this iteration
    for i, task := range tasks {
        task.Iteration = iteration
        sendTask(stream, task)
    }

    // Collect results
    results := collectResults(stream, len(tasks))

    // Check convergence
    converged = checkConvergence(results)

    // Update tasks for next iteration
    tasks = updateTasks(tasks, results)

    iteration++
}

With Shared State Updates:

updateLevel := uint32(0)

for iteration := 0; iteration < maxIterations; iteration++ {
    // Update shared parameters
    updateLevel++
    sharedParams := computeSharedParams(results)
    cacheClient.UpdateSharedData(ctx, &pb.SharedData{
        SessionId:   session.Id,
        UpdateLevel: &updateLevel,
        Payload:     sharedParams,
    })

    // Send tasks referencing new shared data
    for _, task := range tasks {
        batch := &pb.Batch{
            SessionId:                     session.Id,
            UseSharedData:                 proto.Bool(true),
            RequiredSharedDataUpdateLevel: &updateLevel,
            Messages:                      []*pb.Message{task},
        }
        stream.Send(batch)
    }

    results = collectResults(stream, len(tasks))
}

Batching Strategies

Fixed-Size Batching

const batchSize = 100

messages := []*pb.Message{}
for _, task := range tasks {
    messages = append(messages, &pb.Message{
        Id:      task.ID,
        Payload: task.Data,
    })

    if len(messages) >= batchSize {
        stream.Send(&pb.Batch{
            SessionId: session.Id,
            Messages:  messages,
        })
        messages = []*pb.Message{}
    }
}

// Send remaining
if len(messages) > 0 {
    stream.Send(&pb.Batch{SessionId: session.Id, Messages: messages})
}

Time-Based Batching

const batchInterval = 100 * time.Millisecond

messages := []*pb.Message{}
timer := time.NewTimer(batchInterval)

for {
    select {
    case task := <-taskChan:
        messages = append(messages, &pb.Message{
            Id:      task.ID,
            Payload: task.Data,
        })

    case <-timer.C:
        if len(messages) > 0 {
            stream.Send(&pb.Batch{SessionId: session.Id, Messages: messages})
            messages = []*pb.Message{}
        }
        timer.Reset(batchInterval)

    case <-doneChan:
        // Flush remaining
        if len(messages) > 0 {
            stream.Send(&pb.Batch{SessionId: session.Id, Messages: messages})
        }
        return
    }
}

Adaptive Batching

Adjust batch size based on throughput:

func adaptiveBatcher(stream pb.Queue_ClientStreamClient, taskChan <-chan Task) {
    minBatchSize := 10
    maxBatchSize := 1000
    currentBatchSize := 100

    for {
        messages := []*pb.Message{}
        start := time.Now()

        // Collect messages up to current batch size
        for len(messages) < currentBatchSize {
            select {
            case task := <-taskChan:
                messages = append(messages, taskToMessage(task))
            case <-time.After(10 * time.Millisecond):
                break
            }
        }

        // Send batch
        stream.Send(&pb.Batch{SessionId: session.Id, Messages: messages})

        // Adjust batch size based on latency
        latency := time.Since(start)
        if latency < 10*time.Millisecond {
            currentBatchSize = min(currentBatchSize*2, maxBatchSize)
        } else if latency > 100*time.Millisecond {
            currentBatchSize = max(currentBatchSize/2, minBatchSize)
        }
    }
}

Priority Management

Task Priorities

// Critical tasks
criticalBatch := &pb.Batch{
    SessionId: session.Id,
    Priority:  10,  // Highest priority
    Messages:  criticalTasks,
}

// Normal tasks
normalBatch := &pb.Batch{
    SessionId: session.Id,
    Priority:  5,   // Medium priority
    Messages:  normalTasks,
}

// Background tasks
backgroundBatch := &pb.Batch{
    SessionId: session.Id,
    Priority:  1,   // Low priority
    Messages:  backgroundTasks,
}

Session Priorities

// High priority session (gets workers first)
urgentSession := createSession(&pb.SessionAttributes{
    Priority:    10,
    MinWorkers:  100,
    MaxWorkers:  1000,
})

// Normal priority session
normalSession := createSession(&pb.SessionAttributes{
    Priority:    5,
    MinWorkers:  10,
    MaxWorkers:  100,
})

// Background session (uses spare capacity)
backgroundSession := createSession(&pb.SessionAttributes{
    Priority:    1,
    MinWorkers:  0,
    MaxWorkers:  50,
    Preemptible: true,  // Can be interrupted
})

Flow Control

Rate Limiting

Prevent overwhelming the system:

import "golang.org/x/time/rate"

// Limit to 1000 tasks/second
limiter := rate.NewLimiter(rate.Limit(1000), 100)

for _, task := range tasks {
    // Wait for rate limiter
    limiter.Wait(ctx)

    batch := &pb.Batch{
        SessionId: session.Id,
        Messages:  []*pb.Message{task},
    }
    stream.Send(batch)
}

Backpressure Handling

Monitor result rate and adjust sending:

type FlowController struct {
    sentTasks      atomic.Int64
    receivedTasks  atomic.Int64
    maxInflight    int64
}

func (fc *FlowController) canSend() bool {
    inflight := fc.sentTasks.Load() - fc.receivedTasks.Load()
    return inflight < fc.maxInflight
}

// Sender goroutine
go func() {
    for _, task := range tasks {
        // Wait until we can send
        for !flowController.canSend() {
            time.Sleep(10 * time.Millisecond)
        }

        stream.Send(taskBatch)
        flowController.sentTasks.Add(1)
    }
}()

// Receiver goroutine
go func() {
    for {
        result := stream.Recv()
        flowController.receivedTasks.Add(int64(len(result.Messages)))
        processResults(result)
    }
}()

MaxInflightBatchesPerWorker

Configure per-worker flow control:

session := createSession(&pb.SessionAttributes{
    MaxInflightBatchesPerWorker: proto.Uint32(5),  // Limit per worker
    MaxWorkers:                  proto.Uint32(100),
})

// Effective max inflight = 5 * 100 = 500 batches

Result Handling

Ordered Result Collection

Maintain task order:

type ResultCollector struct {
    results     map[string]*pb.Message
    nextIndex   int
    orderedChan chan *pb.Message
}

func (rc *ResultCollector) collect(result *pb.Message) {
    rc.results[result.Id] = result

    // Emit results in order
    for {
        key := fmt.Sprintf("task-%d", rc.nextIndex)
        if result, ok := rc.results[key]; ok {
            rc.orderedChan <- result
            delete(rc.results, key)
            rc.nextIndex++
        } else {
            break
        }
    }
}

Streaming Result Processing

Process results as they arrive without buffering:

go func() {
    for {
        batch, err := stream.Recv()
        if err != nil {
            break
        }

        for _, result := range batch.Messages {
            // Process immediately
            processResult(result)

            // Stream to output (e.g., file, database)
            writeToOutput(result)
        }
    }
}()

Result Aggregation

Aggregate results incrementally:

type Aggregator struct {
    sum   float64
    count int64
    mu    sync.Mutex
}

func (a *Aggregator) add(result *pb.Message) {
    var value float64
    json.Unmarshal(result.Payload, &value)

    a.mu.Lock()
    a.sum += value
    a.count++
    a.mu.Unlock()
}

func (a *Aggregator) mean() float64 {
    a.mu.Lock()
    defer a.mu.Unlock()
    return a.sum / float64(a.count)
}

// Use in result processing
go func() {
    for {
        batch := stream.Recv()
        for _, result := range batch.Messages {
            aggregator.add(result)

            // Report progress
            if aggregator.count%1000 == 0 {
                log.Printf("Progress: %d tasks, mean: %.2f",
                    aggregator.count, aggregator.mean())
            }
        }
    }
}()

Error Handling

Retry Failed Tasks

func submitWithRetry(stream pb.Queue_ClientStreamClient, task Task, maxRetries int) error {
    attempt := 0

    for attempt < maxRetries {
        batch := &pb.Batch{
            SessionId: session.Id,
            Messages:  []*pb.Message{{Id: task.ID, Payload: task.Data}},
        }

        if err := stream.Send(batch); err != nil {
            attempt++
            time.Sleep(time.Duration(attempt) * time.Second)
            continue
        }

        return nil
    }

    return fmt.Errorf("failed after %d retries", maxRetries)
}

Dead Letter Processing

func processDeadLetters(ctx context.Context, client pb.QueueClient, session *pb.Session) {
    stream, err := client.ClientStreamDeadLetters(ctx, session)
    if err != nil {
        log.Printf("Failed to open dead letter stream: %v", err)
        return
    }

    for {
        batch, err := stream.Recv()
        if err == io.EOF {
            break
        }
        if err != nil {
            log.Printf("Dead letter stream error: %v", err)
            break
        }

        for _, msg := range batch.Messages {
            log.Printf("Failed task %s: %s", msg.Id, msg.GetError())

            // Analyze error and decide action
            if isRetryable(msg.GetError()) {
                retryTask(msg)
            } else {
                logPermanentFailure(msg)
            }
        }
    }
}

func isRetryable(errorMsg string) bool {
    // Retryable errors
    retryable := []string{
        "timeout",
        "connection refused",
        "temporary failure",
    }

    for _, pattern := range retryable {
        if strings.Contains(errorMsg, pattern) {
            return true
        }
    }

    return false
}

Complete Example: Parameter Sweep

package main

import (
    "context"
    "encoding/json"
    "log"
    "sync"

    pb "github.com/limelabs/metalcore-neo/api/v1"
    "github.com/limelabs/metalcore-neo/pkg/grpccon"
)

type ParameterSet struct {
    Alpha float64
    Beta  float64
    Gamma float64
}

type SimulationResult struct {
    Params ParameterSet
    Score  float64
}

func main() {
    ctx := context.Background()

    // Connect
    queueConn, queueClient, _ := grpccon.ConnectToQueue("localhost:1337", "", false)
    defer queueConn.Close()

    // Create session
    session, _ := queueClient.CreateSession(ctx, &pb.Session{
        App: &pb.App{Id: "parameter-sweep"},
        Attributes: &pb.SessionAttributes{
            Priority:    5,
            MinWorkers:  50,
            MaxWorkers:  500,
        },
    })

    // Open stream
    stream, _ := queueClient.ClientStream(ctx)

    // Generate parameter combinations
    params := generateParameterCombinations()
    log.Printf("Generated %d parameter sets", len(params))

    // Send tasks
    go func() {
        for i, p := range params {
            payload, _ := json.Marshal(p)
            batch := &pb.Batch{
                SessionId: session.Id,
                Messages: []*pb.Message{{
                    Id:      fmt.Sprintf("param-%d", i),
                    Payload: payload,
                }},
            }
            stream.Send(batch)
        }
    }()

    // Collect results
    results := make([]SimulationResult, 0, len(params))
    var mu sync.Mutex

    for i := 0; i < len(params); i++ {
        batch, _ := stream.Recv()

        for _, msg := range batch.Messages {
            var result SimulationResult
            json.Unmarshal(msg.Payload, &result)

            mu.Lock()
            results = append(results, result)
            mu.Unlock()
        }

        if (i+1)%100 == 0 {
            log.Printf("Progress: %d/%d", i+1, len(params))
        }
    }

    // Find best parameters
    best := findBestResult(results)
    log.Printf("Best parameters: %+v, Score: %.4f", best.Params, best.Score)

    // Cleanup
    queueClient.CloseSession(ctx, session)
}

func generateParameterCombinations() []ParameterSet {
    alphas := linspace(0.1, 1.0, 10)
    betas := linspace(0.1, 1.0, 10)
    gammas := linspace(0.1, 1.0, 10)

    params := []ParameterSet{}
    for _, alpha := range alphas {
        for _, beta := range betas {
            for _, gamma := range gammas {
                params = append(params, ParameterSet{alpha, beta, gamma})
            }
        }
    }
    return params
}

Best Practices

  1. Task Granularity: Balance between too fine (overhead) and too coarse (load imbalance)
  2. Aim for tasks that take 1-10 seconds each
  3. Adjust based on your workload characteristics

  4. Batch Size: Larger batches = better throughput, smaller batches = lower latency

  5. Start with 50-100 tasks per batch
  6. Use adaptive batching for variable workloads

  7. Resource Management: Configure sessions appropriately

  8. Set realistic worker limits
  9. Use priority to manage multiple workloads
  10. Enable cleanup for transient workloads

  11. Error Handling: Always handle errors gracefully

  12. Monitor dead letter queues
  13. Implement retry logic with backoff
  14. Log failures for debugging

  15. Monitoring: Track progress and performance

  16. Log progress at regular intervals
  17. Monitor queue depths via metrics
  18. Measure end-to-end latency

Next Steps