Skip to content

Monte Carlo Pi Estimation Example

A real-world example of using ALPCRUN.CH for Monte Carlo simulation.

Overview

This example demonstrates:

  • Distributed Monte Carlo computation
  • Large-scale task submission
  • Result aggregation
  • Performance measurement

Algorithm

Estimate π by randomly sampling points in a unit square:

  1. Generate random points (x, y) where 0 ≤ x, y ≤ 1
  2. Count points inside quarter circle (x² + y² ≤ 1)
  3. π ≈ 4 × (points inside circle) / (total points)

Complete Example

Client Code

package main

import (
    "context"
    "encoding/json"
    "fmt"
    "log"
    "math"
    "os"
    "time"

    pb "github.com/limelabs/metalcore-neo/api/v1"
    "github.com/limelabs/metalcore-neo/pkg/grpccon"
    "google.golang.org/protobuf/proto"
    "google.golang.org/protobuf/types/known/durationpb"
)

// Task: simulate N samples
type MonteCarloTask struct {
    ID      string `json:"id"`
    Samples int64  `json:"samples"`
}

// Result: count of points inside circle
type MonteCarloResult struct {
    ID            string `json:"id"`
    Samples       int64  `json:"samples"`
    InsideCircle  int64  `json:"inside_circle"`
}

func main() {
    ctx := context.Background()

    // Configuration
    totalSamples := int64(1_000_000_000)  // 1 billion samples
    samplesPerTask := int64(1_000_000)    // 1 million per task
    numTasks := totalSamples / samplesPerTask

    log.Printf("Monte Carlo Pi Estimation")
    log.Printf("Total samples: %d", totalSamples)
    log.Printf("Samples per task: %d", samplesPerTask)
    log.Printf("Number of tasks: %d", numTasks)

    // Connect
    queueAddr := getEnvOrDefault("METALCORE_QUEUE_MANAGER_ADDR", "localhost:1337")
    queueConn, queueClient, err := grpccon.ConnectToQueue(queueAddr, "", false)
    if err != nil {
        log.Fatalf("Failed to connect: %v", err)
    }
    defer queueConn.Close()

    if err := grpccon.BlockUntilReady(ctx, queueConn, 10*time.Second); err != nil {
        log.Fatalf("Connection not ready: %v", err)
    }

    // Create session
    session, err := queueClient.CreateSession(ctx, &pb.Session{
        App: &pb.App{Id: "monte-carlo-pi"},
        Attributes: &pb.SessionAttributes{
            Priority:                    proto.Uint32(8),
            MinWorkers:                  proto.Uint32(50),
            MaxWorkers:                  proto.Uint32(500),
            IdleTimeout:                 durationpb.New(5 * time.Minute),
            CleanupOnClientExit:         proto.Bool(true),
            MaxInflightBatchesPerWorker: proto.Uint32(10),
            Tags:                        []string{"simulation", "monte-carlo"},
        },
    })
    if err != nil {
        log.Fatalf("Failed to create session: %v", err)
    }
    defer queueClient.CloseSession(ctx, session)

    log.Printf("Session created: %s", session.Id)

    // Open stream
    stream, err := queueClient.ClientStream(ctx)
    if err != nil {
        log.Fatalf("Failed to open stream: %v", err)
    }

    // Start receiving results
    resultsChan := make(chan *pb.Message, 1000)
    go receiveResults(stream, resultsChan)

    // Submit tasks
    startTime := time.Now()
    if err := submitTasks(stream, session.Id, numTasks, samplesPerTask); err != nil {
        log.Fatalf("Failed to submit tasks: %v", err)
    }
    submitDuration := time.Since(startTime)

    log.Printf("Tasks submitted in %v", submitDuration)
    log.Println("Collecting results...")

    // Collect and aggregate results
    piEstimate := aggregateResults(resultsChan, int(numTasks))
    totalDuration := time.Since(startTime)

    // Display results
    actualPi := math.Pi
    error := math.Abs(piEstimate - actualPi)
    errorPercent := (error / actualPi) * 100

    log.Println("=====================================")
    log.Printf("Estimated π: %.10f", piEstimate)
    log.Printf("Actual π:    %.10f", actualPi)
    log.Printf("Error:       %.10f (%.4f%%)", error, errorPercent)
    log.Printf("Total time:  %v", totalDuration)
    log.Printf("Throughput:  %.0f samples/sec", float64(totalSamples)/totalDuration.Seconds())
    log.Println("=====================================")
}

func submitTasks(stream pb.Queue_ClientStreamClient, sessionID string, numTasks, samplesPerTask int64) error {
    const batchSize = 100

    messages := []*pb.Message{}

    for i := int64(0); i < numTasks; i++ {
        task := MonteCarloTask{
            ID:      fmt.Sprintf("task-%06d", i),
            Samples: samplesPerTask,
        }

        payload, err := json.Marshal(task)
        if err != nil {
            return fmt.Errorf("failed to marshal task: %w", err)
        }

        messages = append(messages, &pb.Message{
            Id:      task.ID,
            Payload: payload,
        })

        // Send batch when full
        if len(messages) >= batchSize || i == numTasks-1 {
            batch := &pb.Batch{
                Id:        fmt.Sprintf("batch-%06d", i/batchSize),
                SessionId: sessionID,
                Messages:  messages,
            }

            if err := stream.Send(batch); err != nil {
                return fmt.Errorf("failed to send batch: %w", err)
            }

            messages = []*pb.Message{}
        }

        // Progress indicator
        if (i+1)%1000 == 0 {
            fmt.Printf("\rSubmitted: %d/%d tasks", i+1, numTasks)
        }
    }

    fmt.Println()
    return nil
}

func receiveResults(stream pb.Queue_ClientStreamClient, resultsChan chan<- *pb.Message) {
    for {
        batch, err := stream.Recv()
        if err != nil {
            close(resultsChan)
            return
        }

        for _, msg := range batch.Messages {
            resultsChan <- msg
        }
    }
}

func aggregateResults(resultsChan <-chan *pb.Message, expected int) float64 {
    var totalSamples int64
    var totalInsideCircle int64
    received := 0

    ticker := time.NewTicker(5 * time.Second)
    defer ticker.Stop()

    for received < expected {
        select {
        case msg, ok := <-resultsChan:
            if !ok {
                goto done
            }

            var result MonteCarloResult
            if err := json.Unmarshal(msg.Payload, &result); err != nil {
                log.Printf("Failed to parse result: %v", err)
                continue
            }

            totalSamples += result.Samples
            totalInsideCircle += result.InsideCircle
            received++

        case <-ticker.C:
            if received > 0 {
                currentPi := 4.0 * float64(totalInsideCircle) / float64(totalSamples)
                progress := float64(received) / float64(expected) * 100
                fmt.Printf("\rProgress: %.1f%% | Current π: %.6f | Results: %d/%d",
                    progress, currentPi, received, expected)
            }
        }
    }

done:
    fmt.Println()

    if totalSamples == 0 {
        return 0
    }

    return 4.0 * float64(totalInsideCircle) / float64(totalSamples)
}

func getEnvOrDefault(key, defaultValue string) string {
    if value := os.Getenv(key); value != "" {
        return value
    }
    return defaultValue
}

Worker Code

package main

import (
    "context"
    "encoding/json"
    "io"
    "log"
    "math/rand"
    "os"
    "time"

    pb "github.com/limelabs/metalcore-neo/api/v1"
    "github.com/limelabs/metalcore-neo/pkg/grpccon"
    "google.golang.org/grpc/metadata"
    "google.golang.org/protobuf/proto"
)

type MonteCarloTask struct {
    ID      string `json:"id"`
    Samples int64  `json:"samples"`
}

type MonteCarloResult struct {
    ID           string `json:"id"`
    Samples      int64  `json:"samples"`
    InsideCircle int64  `json:"inside_circle"`
}

func main() {
    workerNode := getEnvOrDefault("METALCORE_WORKER_NODE", "local-node")
    workerPod := getEnvOrDefault("METALCORE_WORKER_POD", "local-pod")
    queueAddr := getEnvOrDefault("METALCORE_QUEUE_MANAGER_ADDR", "localhost:1337")

    log.Printf("Starting Monte Carlo worker: %s/%s", workerNode, workerPod)

    ctx := context.Background()
    queueConn, queueClient, err := grpccon.ConnectToQueue(queueAddr, "", false)
    if err != nil {
        log.Fatalf("Failed to connect: %v", err)
    }
    defer queueConn.Close()

    if err := grpccon.BlockUntilReady(ctx, queueConn, 10*time.Second); err != nil {
        log.Fatalf("Connection not ready: %v", err)
    }

    runWorker(ctx, queueClient, workerNode, workerPod)
}

func runWorker(ctx context.Context, client pb.QueueClient, node, pod string) {
    md := metadata.New(map[string]string{
        "api_key": os.Getenv("METALCORE_API_KEY"),
        "node":    node,
        "pod":     pod,
    })
    ctx = metadata.NewOutgoingContext(ctx, md)

    updateStream, err := client.WorkerUpdate(ctx)
    if err != nil {
        log.Fatalf("Failed to create update stream: %v", err)
    }

    mdFromServer, _ := updateStream.Header()
    workerID := mdFromServer.Get("workerid")[0]

    log.Printf("Worker registered: %s", workerID)

    // Seed random number generator
    rand.Seed(time.Now().UnixNano())

    updateStream.Send(&pb.WorkerState{
        WorkerId: workerID,
        Status:   pb.WorkerState_IDLE,
    })

    for {
        inst, err := updateStream.Recv()
        if err == io.EOF {
            break
        }
        if err != nil {
            log.Printf("Error: %v", err)
            continue
        }

        if inst.Type == pb.Instruction_BIND {
            log.Printf("BIND to session: %s", inst.SessionId)
            processSession(ctx, client, inst.SessionId)

            updateStream.Send(&pb.WorkerState{
                WorkerId: workerID,
                Status:   pb.WorkerState_IDLE,
            })
        }
    }
}

func processSession(ctx context.Context, client pb.QueueClient, sessionID string) {
    ctx = metadata.AppendToOutgoingContext(ctx, "sessionid", sessionID)

    stream, err := client.WorkerStream(ctx)
    if err != nil {
        log.Printf("Failed to open worker stream: %v", err)
        return
    }

    tasksProcessed := 0

    for {
        taskBatch, err := stream.Recv()
        if err == io.EOF {
            break
        }
        if err != nil {
            log.Printf("Stream error: %v", err)
            break
        }

        resultBatch := processBatch(taskBatch)

        if err := stream.Send(resultBatch); err != nil {
            log.Printf("Failed to send results: %v", err)
            break
        }

        tasksProcessed += len(taskBatch.Messages)
    }

    log.Printf("Session complete: %d tasks processed", tasksProcessed)
}

func processBatch(taskBatch *pb.Batch) *pb.Batch {
    results := make([]*pb.Message, len(taskBatch.Messages))

    for i, task := range taskBatch.Messages {
        results[i] = processTask(task)
    }

    return &pb.Batch{
        Id:        taskBatch.Id,
        SessionId: taskBatch.SessionId,
        Messages:  results,
    }
}

func processTask(task *pb.Message) *pb.Message {
    var mcTask MonteCarloTask
    if err := json.Unmarshal(task.Payload, &mcTask); err != nil {
        return &pb.Message{
            Id:    task.Id,
            Error: proto.String(err.Error()),
        }
    }

    // Monte Carlo simulation
    insideCircle := int64(0)
    for i := int64(0); i < mcTask.Samples; i++ {
        x := rand.Float64()
        y := rand.Float64()

        if x*x+y*y <= 1.0 {
            insideCircle++
        }
    }

    result := MonteCarloResult{
        ID:           mcTask.ID,
        Samples:      mcTask.Samples,
        InsideCircle: insideCircle,
    }

    payload, _ := json.Marshal(result)

    return &pb.Message{
        Id:        task.Id,
        Payload:   payload,
        RelatedId: proto.String(task.Id),
    }
}

func getEnvOrDefault(key, defaultValue string) string {
    if value := os.Getenv(key); value != "" {
        return value
    }
    return defaultValue
}

Running the Example

Setup

# Start ALPCRUN.CH services
docker-compose up -d

# Set API key
export METALCORE_API_KEY="your-secret-key"

Run Worker

# Terminal 1: Start workers
go run worker.go

Run Client

# Terminal 2: Run simulation
go run client.go

Expected Output

2025/01/15 10:30:00 Monte Carlo Pi Estimation
2025/01/15 10:30:00 Total samples: 1000000000
2025/01/15 10:30:00 Samples per task: 1000000
2025/01/15 10:30:00 Number of tasks: 1000
2025/01/15 10:30:00 Session created: 01HZXY...
2025/01/15 10:30:05 Tasks submitted in 5s
2025/01/15 10:30:05 Collecting results...
Progress: 50.0% | Current π: 3.141823 | Results: 500/1000
Progress: 100.0% | Current π: 3.141592 | Results: 1000/1000
=====================================
Estimated π: 3.1415926536
Actual π:    3.1415926536
Error:       0.0000000000 (0.0000%)
Total time:  45s
Throughput:  22222222 samples/sec
=====================================

Performance Tuning

Increase Workers

# Scale workers
docker-compose up -d --scale worker=100

Adjust Batch Size

Larger batches = higher throughput:

const batchSize = 500  // Increase from 100

Tune Session Parameters

Attributes: &pb.SessionAttributes{
    MinWorkers: proto.Uint32(100),   // More workers
    MaxWorkers: proto.Uint32(1000),
    MaxInflightBatchesPerWorker: proto.Uint32(20),  // More inflight
}

Variations

GPU-Accelerated Version

Use CUDA for faster sampling on GPU workers.

Adaptive Sampling

Dynamically adjust samples per task based on variance.

Multi-Dimensional Integration

Extend to higher-dimensional Monte Carlo integration.

Next Steps