Submitting Workloads¶
This guide covers advanced patterns and best practices for submitting workloads to ALPCRUN.CH.
Workload Patterns¶
Pattern 1: Embarrassingly Parallel¶
The simplest pattern - independent tasks with no dependencies.
Use Cases: Monte Carlo simulations, parameter sweeps, batch processing
Implementation:
// Generate all tasks upfront
tasks := generateTasks(10000)
// Send all tasks
for _, task := range tasks {
batch := &pb.Batch{
SessionId: session.Id,
Messages: []*pb.Message{{Id: task.ID, Payload: task.Data}},
}
stream.Send(batch)
}
// Collect all results
results := collectResults(stream, len(tasks))
Characteristics: - No shared state needed - Perfect scalability - Simple result aggregation
Pattern 2: MapReduce¶
Two-phase processing: map tasks followed by reduce.
Use Cases: Data aggregation, distributed sorting, word count
Implementation:
// Phase 1: Map
mapTasks := splitDataIntoChunks(data, 100)
for _, chunk := range mapTasks {
batch := &pb.Batch{
SessionId: session.Id,
Priority: 1, // Higher priority for map phase
Messages: []*pb.Message{{Id: chunk.ID, Payload: chunk.Data}},
}
stream.Send(batch)
}
// Collect map results
mapResults := collectResults(stream, len(mapTasks))
// Phase 2: Reduce
reduceTasks := groupMapResults(mapResults)
for _, group := range reduceTasks {
batch := &pb.Batch{
SessionId: session.Id,
Priority: 0,
Messages: []*pb.Message{{Id: group.ID, Payload: group.Data}},
}
stream.Send(batch)
}
// Collect final results
finalResult := collectResults(stream, len(reduceTasks))
Optimization: Use two separate sessions for better control:
// Map session with many workers
mapSession := createSession(priority: 10, maxWorkers: 1000)
// Reduce session with fewer workers
reduceSession := createSession(priority: 5, maxWorkers: 10)
Pattern 3: Pipeline¶
Multi-stage processing where output of one stage feeds the next.
Use Cases: Image processing pipelines, data transformation chains
Implementation:
// Stage 1: Preprocessing
for _, input := range inputs {
sendTask(stream, "preprocess", input)
}
stage1Results := make(map[string]*Result)
// As stage 1 completes, feed into stage 2
for result := range receiveResults(stream) {
stage1Results[result.Id] = result
// Send to stage 2
stage2Task := prepareStage2(result)
sendTask(stream, "process", stage2Task)
}
// Continue through stages...
Multi-Session Pipeline:
// Create sessions for each stage
preprocess := createSession("preprocess", priority: 10)
process := createSession("process", priority: 8)
postprocess := createSession("postprocess", priority: 6)
// Connect stages
go func() {
for result := range stage1Results {
sendTaskToSession(processSession, result)
}
}()
go func() {
for result := range stage2Results {
sendTaskToSession(postprocessSession, result)
}
}()
Pattern 4: Dynamic Task Generation¶
Generate new tasks based on results.
Use Cases: Tree search, adaptive sampling, iterative refinement
Implementation:
// Seed initial tasks
for _, seed := range seeds {
sendTask(stream, seed)
}
totalTasks := len(seeds)
completedTasks := 0
for completedTasks < totalTasks {
result := <-receiveResults(stream)
completedTasks++
// Analyze result and potentially generate more work
if needsRefinement(result) {
newTasks := generateRefinementTasks(result)
for _, task := range newTasks {
sendTask(stream, task)
totalTasks++
}
}
}
With Depth Limit:
type Task struct {
ID string
Data []byte
Depth int
}
const maxDepth = 5
for {
result := <-resultsChan
task := tasks[result.Id]
if task.Depth < maxDepth && needsMore(result) {
newTasks := expand(result)
for _, newTask := range newTasks {
newTask.Depth = task.Depth + 1
sendTask(stream, newTask)
}
}
}
Pattern 5: Iterative Convergence¶
Repeat computation until convergence criteria met.
Use Cases: Optimization algorithms, machine learning training, numerical methods
Implementation:
iteration := 0
converged := false
for !converged && iteration < maxIterations {
// Send tasks for this iteration
for i, task := range tasks {
task.Iteration = iteration
sendTask(stream, task)
}
// Collect results
results := collectResults(stream, len(tasks))
// Check convergence
converged = checkConvergence(results)
// Update tasks for next iteration
tasks = updateTasks(tasks, results)
iteration++
}
With Shared State Updates:
updateLevel := uint32(0)
for iteration := 0; iteration < maxIterations; iteration++ {
// Update shared parameters
updateLevel++
sharedParams := computeSharedParams(results)
cacheClient.UpdateSharedData(ctx, &pb.SharedData{
SessionId: session.Id,
UpdateLevel: &updateLevel,
Payload: sharedParams,
})
// Send tasks referencing new shared data
for _, task := range tasks {
batch := &pb.Batch{
SessionId: session.Id,
UseSharedData: proto.Bool(true),
RequiredSharedDataUpdateLevel: &updateLevel,
Messages: []*pb.Message{task},
}
stream.Send(batch)
}
results = collectResults(stream, len(tasks))
}
Batching Strategies¶
Fixed-Size Batching¶
const batchSize = 100
messages := []*pb.Message{}
for _, task := range tasks {
messages = append(messages, &pb.Message{
Id: task.ID,
Payload: task.Data,
})
if len(messages) >= batchSize {
stream.Send(&pb.Batch{
SessionId: session.Id,
Messages: messages,
})
messages = []*pb.Message{}
}
}
// Send remaining
if len(messages) > 0 {
stream.Send(&pb.Batch{SessionId: session.Id, Messages: messages})
}
Time-Based Batching¶
const batchInterval = 100 * time.Millisecond
messages := []*pb.Message{}
timer := time.NewTimer(batchInterval)
for {
select {
case task := <-taskChan:
messages = append(messages, &pb.Message{
Id: task.ID,
Payload: task.Data,
})
case <-timer.C:
if len(messages) > 0 {
stream.Send(&pb.Batch{SessionId: session.Id, Messages: messages})
messages = []*pb.Message{}
}
timer.Reset(batchInterval)
case <-doneChan:
// Flush remaining
if len(messages) > 0 {
stream.Send(&pb.Batch{SessionId: session.Id, Messages: messages})
}
return
}
}
Adaptive Batching¶
Adjust batch size based on throughput:
func adaptiveBatcher(stream pb.Queue_ClientStreamClient, taskChan <-chan Task) {
minBatchSize := 10
maxBatchSize := 1000
currentBatchSize := 100
for {
messages := []*pb.Message{}
start := time.Now()
// Collect messages up to current batch size
for len(messages) < currentBatchSize {
select {
case task := <-taskChan:
messages = append(messages, taskToMessage(task))
case <-time.After(10 * time.Millisecond):
break
}
}
// Send batch
stream.Send(&pb.Batch{SessionId: session.Id, Messages: messages})
// Adjust batch size based on latency
latency := time.Since(start)
if latency < 10*time.Millisecond {
currentBatchSize = min(currentBatchSize*2, maxBatchSize)
} else if latency > 100*time.Millisecond {
currentBatchSize = max(currentBatchSize/2, minBatchSize)
}
}
}
Priority Management¶
Task Priorities¶
// Critical tasks
criticalBatch := &pb.Batch{
SessionId: session.Id,
Priority: 10, // Highest priority
Messages: criticalTasks,
}
// Normal tasks
normalBatch := &pb.Batch{
SessionId: session.Id,
Priority: 5, // Medium priority
Messages: normalTasks,
}
// Background tasks
backgroundBatch := &pb.Batch{
SessionId: session.Id,
Priority: 1, // Low priority
Messages: backgroundTasks,
}
Session Priorities¶
// High priority session (gets workers first)
urgentSession := createSession(&pb.SessionAttributes{
Priority: 10,
MinWorkers: 100,
MaxWorkers: 1000,
})
// Normal priority session
normalSession := createSession(&pb.SessionAttributes{
Priority: 5,
MinWorkers: 10,
MaxWorkers: 100,
})
// Background session (uses spare capacity)
backgroundSession := createSession(&pb.SessionAttributes{
Priority: 1,
MinWorkers: 0,
MaxWorkers: 50,
Preemptible: true, // Can be interrupted
})
Flow Control¶
Rate Limiting¶
Prevent overwhelming the system:
import "golang.org/x/time/rate"
// Limit to 1000 tasks/second
limiter := rate.NewLimiter(rate.Limit(1000), 100)
for _, task := range tasks {
// Wait for rate limiter
limiter.Wait(ctx)
batch := &pb.Batch{
SessionId: session.Id,
Messages: []*pb.Message{task},
}
stream.Send(batch)
}
Backpressure Handling¶
Monitor result rate and adjust sending:
type FlowController struct {
sentTasks atomic.Int64
receivedTasks atomic.Int64
maxInflight int64
}
func (fc *FlowController) canSend() bool {
inflight := fc.sentTasks.Load() - fc.receivedTasks.Load()
return inflight < fc.maxInflight
}
// Sender goroutine
go func() {
for _, task := range tasks {
// Wait until we can send
for !flowController.canSend() {
time.Sleep(10 * time.Millisecond)
}
stream.Send(taskBatch)
flowController.sentTasks.Add(1)
}
}()
// Receiver goroutine
go func() {
for {
result := stream.Recv()
flowController.receivedTasks.Add(int64(len(result.Messages)))
processResults(result)
}
}()
MaxInflightBatchesPerWorker¶
Configure per-worker flow control:
session := createSession(&pb.SessionAttributes{
MaxInflightBatchesPerWorker: proto.Uint32(5), // Limit per worker
MaxWorkers: proto.Uint32(100),
})
// Effective max inflight = 5 * 100 = 500 batches
Result Handling¶
Ordered Result Collection¶
Maintain task order:
type ResultCollector struct {
results map[string]*pb.Message
nextIndex int
orderedChan chan *pb.Message
}
func (rc *ResultCollector) collect(result *pb.Message) {
rc.results[result.Id] = result
// Emit results in order
for {
key := fmt.Sprintf("task-%d", rc.nextIndex)
if result, ok := rc.results[key]; ok {
rc.orderedChan <- result
delete(rc.results, key)
rc.nextIndex++
} else {
break
}
}
}
Streaming Result Processing¶
Process results as they arrive without buffering:
go func() {
for {
batch, err := stream.Recv()
if err != nil {
break
}
for _, result := range batch.Messages {
// Process immediately
processResult(result)
// Stream to output (e.g., file, database)
writeToOutput(result)
}
}
}()
Result Aggregation¶
Aggregate results incrementally:
type Aggregator struct {
sum float64
count int64
mu sync.Mutex
}
func (a *Aggregator) add(result *pb.Message) {
var value float64
json.Unmarshal(result.Payload, &value)
a.mu.Lock()
a.sum += value
a.count++
a.mu.Unlock()
}
func (a *Aggregator) mean() float64 {
a.mu.Lock()
defer a.mu.Unlock()
return a.sum / float64(a.count)
}
// Use in result processing
go func() {
for {
batch := stream.Recv()
for _, result := range batch.Messages {
aggregator.add(result)
// Report progress
if aggregator.count%1000 == 0 {
log.Printf("Progress: %d tasks, mean: %.2f",
aggregator.count, aggregator.mean())
}
}
}
}()
Error Handling¶
Retry Failed Tasks¶
func submitWithRetry(stream pb.Queue_ClientStreamClient, task Task, maxRetries int) error {
attempt := 0
for attempt < maxRetries {
batch := &pb.Batch{
SessionId: session.Id,
Messages: []*pb.Message{{Id: task.ID, Payload: task.Data}},
}
if err := stream.Send(batch); err != nil {
attempt++
time.Sleep(time.Duration(attempt) * time.Second)
continue
}
return nil
}
return fmt.Errorf("failed after %d retries", maxRetries)
}
Dead Letter Processing¶
func processDeadLetters(ctx context.Context, client pb.QueueClient, session *pb.Session) {
stream, err := client.ClientStreamDeadLetters(ctx, session)
if err != nil {
log.Printf("Failed to open dead letter stream: %v", err)
return
}
for {
batch, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
log.Printf("Dead letter stream error: %v", err)
break
}
for _, msg := range batch.Messages {
log.Printf("Failed task %s: %s", msg.Id, msg.GetError())
// Analyze error and decide action
if isRetryable(msg.GetError()) {
retryTask(msg)
} else {
logPermanentFailure(msg)
}
}
}
}
func isRetryable(errorMsg string) bool {
// Retryable errors
retryable := []string{
"timeout",
"connection refused",
"temporary failure",
}
for _, pattern := range retryable {
if strings.Contains(errorMsg, pattern) {
return true
}
}
return false
}
Complete Example: Parameter Sweep¶
package main
import (
"context"
"encoding/json"
"log"
"sync"
pb "github.com/limelabs/metalcore-neo/api/v1"
"github.com/limelabs/metalcore-neo/pkg/grpccon"
)
type ParameterSet struct {
Alpha float64
Beta float64
Gamma float64
}
type SimulationResult struct {
Params ParameterSet
Score float64
}
func main() {
ctx := context.Background()
// Connect
queueConn, queueClient, _ := grpccon.ConnectToQueue("localhost:1337", "", false)
defer queueConn.Close()
// Create session
session, _ := queueClient.CreateSession(ctx, &pb.Session{
App: &pb.App{Id: "parameter-sweep"},
Attributes: &pb.SessionAttributes{
Priority: 5,
MinWorkers: 50,
MaxWorkers: 500,
},
})
// Open stream
stream, _ := queueClient.ClientStream(ctx)
// Generate parameter combinations
params := generateParameterCombinations()
log.Printf("Generated %d parameter sets", len(params))
// Send tasks
go func() {
for i, p := range params {
payload, _ := json.Marshal(p)
batch := &pb.Batch{
SessionId: session.Id,
Messages: []*pb.Message{{
Id: fmt.Sprintf("param-%d", i),
Payload: payload,
}},
}
stream.Send(batch)
}
}()
// Collect results
results := make([]SimulationResult, 0, len(params))
var mu sync.Mutex
for i := 0; i < len(params); i++ {
batch, _ := stream.Recv()
for _, msg := range batch.Messages {
var result SimulationResult
json.Unmarshal(msg.Payload, &result)
mu.Lock()
results = append(results, result)
mu.Unlock()
}
if (i+1)%100 == 0 {
log.Printf("Progress: %d/%d", i+1, len(params))
}
}
// Find best parameters
best := findBestResult(results)
log.Printf("Best parameters: %+v, Score: %.4f", best.Params, best.Score)
// Cleanup
queueClient.CloseSession(ctx, session)
}
func generateParameterCombinations() []ParameterSet {
alphas := linspace(0.1, 1.0, 10)
betas := linspace(0.1, 1.0, 10)
gammas := linspace(0.1, 1.0, 10)
params := []ParameterSet{}
for _, alpha := range alphas {
for _, beta := range betas {
for _, gamma := range gammas {
params = append(params, ParameterSet{alpha, beta, gamma})
}
}
}
return params
}
Best Practices¶
- Task Granularity: Balance between too fine (overhead) and too coarse (load imbalance)
- Aim for tasks that take 1-10 seconds each
-
Adjust based on your workload characteristics
-
Batch Size: Larger batches = better throughput, smaller batches = lower latency
- Start with 50-100 tasks per batch
-
Use adaptive batching for variable workloads
-
Resource Management: Configure sessions appropriately
- Set realistic worker limits
- Use priority to manage multiple workloads
-
Enable cleanup for transient workloads
-
Error Handling: Always handle errors gracefully
- Monitor dead letter queues
- Implement retry logic with backoff
-
Log failures for debugging
-
Monitoring: Track progress and performance
- Log progress at regular intervals
- Monitor queue depths via metrics
- Measure end-to-end latency
Next Steps¶
- Using Shared Data: Optimize data distribution
- Error Handling: Advanced error patterns
- API Reference: Complete API documentation
- Examples: Real-world examples