Monte Carlo Pi Estimation Example¶
A real-world example of using ALPCRUN.CH for Monte Carlo simulation.
Overview¶
This example demonstrates:
- Distributed Monte Carlo computation
- Large-scale task submission
- Result aggregation
- Performance measurement
Algorithm¶
Estimate π by randomly sampling points in a unit square:
- Generate random points (x, y) where 0 ≤ x, y ≤ 1
- Count points inside quarter circle (x² + y² ≤ 1)
- π ≈ 4 × (points inside circle) / (total points)
Complete Example¶
Client Code¶
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"math"
"os"
"time"
pb "github.com/limelabs/metalcore-neo/api/v1"
"github.com/limelabs/metalcore-neo/pkg/grpccon"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/durationpb"
)
// Task: simulate N samples
type MonteCarloTask struct {
ID string `json:"id"`
Samples int64 `json:"samples"`
}
// Result: count of points inside circle
type MonteCarloResult struct {
ID string `json:"id"`
Samples int64 `json:"samples"`
InsideCircle int64 `json:"inside_circle"`
}
func main() {
ctx := context.Background()
// Configuration
totalSamples := int64(1_000_000_000) // 1 billion samples
samplesPerTask := int64(1_000_000) // 1 million per task
numTasks := totalSamples / samplesPerTask
log.Printf("Monte Carlo Pi Estimation")
log.Printf("Total samples: %d", totalSamples)
log.Printf("Samples per task: %d", samplesPerTask)
log.Printf("Number of tasks: %d", numTasks)
// Connect
queueAddr := getEnvOrDefault("METALCORE_QUEUE_MANAGER_ADDR", "localhost:1337")
queueConn, queueClient, err := grpccon.ConnectToQueue(queueAddr, "", false)
if err != nil {
log.Fatalf("Failed to connect: %v", err)
}
defer queueConn.Close()
if err := grpccon.BlockUntilReady(ctx, queueConn, 10*time.Second); err != nil {
log.Fatalf("Connection not ready: %v", err)
}
// Create session
session, err := queueClient.CreateSession(ctx, &pb.Session{
App: &pb.App{Id: "monte-carlo-pi"},
Attributes: &pb.SessionAttributes{
Priority: proto.Uint32(8),
MinWorkers: proto.Uint32(50),
MaxWorkers: proto.Uint32(500),
IdleTimeout: durationpb.New(5 * time.Minute),
CleanupOnClientExit: proto.Bool(true),
MaxInflightBatchesPerWorker: proto.Uint32(10),
Tags: []string{"simulation", "monte-carlo"},
},
})
if err != nil {
log.Fatalf("Failed to create session: %v", err)
}
defer queueClient.CloseSession(ctx, session)
log.Printf("Session created: %s", session.Id)
// Open stream
stream, err := queueClient.ClientStream(ctx)
if err != nil {
log.Fatalf("Failed to open stream: %v", err)
}
// Start receiving results
resultsChan := make(chan *pb.Message, 1000)
go receiveResults(stream, resultsChan)
// Submit tasks
startTime := time.Now()
if err := submitTasks(stream, session.Id, numTasks, samplesPerTask); err != nil {
log.Fatalf("Failed to submit tasks: %v", err)
}
submitDuration := time.Since(startTime)
log.Printf("Tasks submitted in %v", submitDuration)
log.Println("Collecting results...")
// Collect and aggregate results
piEstimate := aggregateResults(resultsChan, int(numTasks))
totalDuration := time.Since(startTime)
// Display results
actualPi := math.Pi
error := math.Abs(piEstimate - actualPi)
errorPercent := (error / actualPi) * 100
log.Println("=====================================")
log.Printf("Estimated π: %.10f", piEstimate)
log.Printf("Actual π: %.10f", actualPi)
log.Printf("Error: %.10f (%.4f%%)", error, errorPercent)
log.Printf("Total time: %v", totalDuration)
log.Printf("Throughput: %.0f samples/sec", float64(totalSamples)/totalDuration.Seconds())
log.Println("=====================================")
}
func submitTasks(stream pb.Queue_ClientStreamClient, sessionID string, numTasks, samplesPerTask int64) error {
const batchSize = 100
messages := []*pb.Message{}
for i := int64(0); i < numTasks; i++ {
task := MonteCarloTask{
ID: fmt.Sprintf("task-%06d", i),
Samples: samplesPerTask,
}
payload, err := json.Marshal(task)
if err != nil {
return fmt.Errorf("failed to marshal task: %w", err)
}
messages = append(messages, &pb.Message{
Id: task.ID,
Payload: payload,
})
// Send batch when full
if len(messages) >= batchSize || i == numTasks-1 {
batch := &pb.Batch{
Id: fmt.Sprintf("batch-%06d", i/batchSize),
SessionId: sessionID,
Messages: messages,
}
if err := stream.Send(batch); err != nil {
return fmt.Errorf("failed to send batch: %w", err)
}
messages = []*pb.Message{}
}
// Progress indicator
if (i+1)%1000 == 0 {
fmt.Printf("\rSubmitted: %d/%d tasks", i+1, numTasks)
}
}
fmt.Println()
return nil
}
func receiveResults(stream pb.Queue_ClientStreamClient, resultsChan chan<- *pb.Message) {
for {
batch, err := stream.Recv()
if err != nil {
close(resultsChan)
return
}
for _, msg := range batch.Messages {
resultsChan <- msg
}
}
}
func aggregateResults(resultsChan <-chan *pb.Message, expected int) float64 {
var totalSamples int64
var totalInsideCircle int64
received := 0
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for received < expected {
select {
case msg, ok := <-resultsChan:
if !ok {
goto done
}
var result MonteCarloResult
if err := json.Unmarshal(msg.Payload, &result); err != nil {
log.Printf("Failed to parse result: %v", err)
continue
}
totalSamples += result.Samples
totalInsideCircle += result.InsideCircle
received++
case <-ticker.C:
if received > 0 {
currentPi := 4.0 * float64(totalInsideCircle) / float64(totalSamples)
progress := float64(received) / float64(expected) * 100
fmt.Printf("\rProgress: %.1f%% | Current π: %.6f | Results: %d/%d",
progress, currentPi, received, expected)
}
}
}
done:
fmt.Println()
if totalSamples == 0 {
return 0
}
return 4.0 * float64(totalInsideCircle) / float64(totalSamples)
}
func getEnvOrDefault(key, defaultValue string) string {
if value := os.Getenv(key); value != "" {
return value
}
return defaultValue
}
Worker Code¶
package main
import (
"context"
"encoding/json"
"io"
"log"
"math/rand"
"os"
"time"
pb "github.com/limelabs/metalcore-neo/api/v1"
"github.com/limelabs/metalcore-neo/pkg/grpccon"
"google.golang.org/grpc/metadata"
"google.golang.org/protobuf/proto"
)
type MonteCarloTask struct {
ID string `json:"id"`
Samples int64 `json:"samples"`
}
type MonteCarloResult struct {
ID string `json:"id"`
Samples int64 `json:"samples"`
InsideCircle int64 `json:"inside_circle"`
}
func main() {
workerNode := getEnvOrDefault("METALCORE_WORKER_NODE", "local-node")
workerPod := getEnvOrDefault("METALCORE_WORKER_POD", "local-pod")
queueAddr := getEnvOrDefault("METALCORE_QUEUE_MANAGER_ADDR", "localhost:1337")
log.Printf("Starting Monte Carlo worker: %s/%s", workerNode, workerPod)
ctx := context.Background()
queueConn, queueClient, err := grpccon.ConnectToQueue(queueAddr, "", false)
if err != nil {
log.Fatalf("Failed to connect: %v", err)
}
defer queueConn.Close()
if err := grpccon.BlockUntilReady(ctx, queueConn, 10*time.Second); err != nil {
log.Fatalf("Connection not ready: %v", err)
}
runWorker(ctx, queueClient, workerNode, workerPod)
}
func runWorker(ctx context.Context, client pb.QueueClient, node, pod string) {
md := metadata.New(map[string]string{
"api_key": os.Getenv("METALCORE_API_KEY"),
"node": node,
"pod": pod,
})
ctx = metadata.NewOutgoingContext(ctx, md)
updateStream, err := client.WorkerUpdate(ctx)
if err != nil {
log.Fatalf("Failed to create update stream: %v", err)
}
mdFromServer, _ := updateStream.Header()
workerID := mdFromServer.Get("workerid")[0]
log.Printf("Worker registered: %s", workerID)
// Seed random number generator
rand.Seed(time.Now().UnixNano())
updateStream.Send(&pb.WorkerState{
WorkerId: workerID,
Status: pb.WorkerState_IDLE,
})
for {
inst, err := updateStream.Recv()
if err == io.EOF {
break
}
if err != nil {
log.Printf("Error: %v", err)
continue
}
if inst.Type == pb.Instruction_BIND {
log.Printf("BIND to session: %s", inst.SessionId)
processSession(ctx, client, inst.SessionId)
updateStream.Send(&pb.WorkerState{
WorkerId: workerID,
Status: pb.WorkerState_IDLE,
})
}
}
}
func processSession(ctx context.Context, client pb.QueueClient, sessionID string) {
ctx = metadata.AppendToOutgoingContext(ctx, "sessionid", sessionID)
stream, err := client.WorkerStream(ctx)
if err != nil {
log.Printf("Failed to open worker stream: %v", err)
return
}
tasksProcessed := 0
for {
taskBatch, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
log.Printf("Stream error: %v", err)
break
}
resultBatch := processBatch(taskBatch)
if err := stream.Send(resultBatch); err != nil {
log.Printf("Failed to send results: %v", err)
break
}
tasksProcessed += len(taskBatch.Messages)
}
log.Printf("Session complete: %d tasks processed", tasksProcessed)
}
func processBatch(taskBatch *pb.Batch) *pb.Batch {
results := make([]*pb.Message, len(taskBatch.Messages))
for i, task := range taskBatch.Messages {
results[i] = processTask(task)
}
return &pb.Batch{
Id: taskBatch.Id,
SessionId: taskBatch.SessionId,
Messages: results,
}
}
func processTask(task *pb.Message) *pb.Message {
var mcTask MonteCarloTask
if err := json.Unmarshal(task.Payload, &mcTask); err != nil {
return &pb.Message{
Id: task.Id,
Error: proto.String(err.Error()),
}
}
// Monte Carlo simulation
insideCircle := int64(0)
for i := int64(0); i < mcTask.Samples; i++ {
x := rand.Float64()
y := rand.Float64()
if x*x+y*y <= 1.0 {
insideCircle++
}
}
result := MonteCarloResult{
ID: mcTask.ID,
Samples: mcTask.Samples,
InsideCircle: insideCircle,
}
payload, _ := json.Marshal(result)
return &pb.Message{
Id: task.Id,
Payload: payload,
RelatedId: proto.String(task.Id),
}
}
func getEnvOrDefault(key, defaultValue string) string {
if value := os.Getenv(key); value != "" {
return value
}
return defaultValue
}
Running the Example¶
Setup¶
# Start ALPCRUN.CH services
docker-compose up -d
# Set API key
export METALCORE_API_KEY="your-secret-key"
Run Worker¶
# Terminal 1: Start workers
go run worker.go
Run Client¶
# Terminal 2: Run simulation
go run client.go
Expected Output¶
2025/01/15 10:30:00 Monte Carlo Pi Estimation
2025/01/15 10:30:00 Total samples: 1000000000
2025/01/15 10:30:00 Samples per task: 1000000
2025/01/15 10:30:00 Number of tasks: 1000
2025/01/15 10:30:00 Session created: 01HZXY...
2025/01/15 10:30:05 Tasks submitted in 5s
2025/01/15 10:30:05 Collecting results...
Progress: 50.0% | Current π: 3.141823 | Results: 500/1000
Progress: 100.0% | Current π: 3.141592 | Results: 1000/1000
=====================================
Estimated π: 3.1415926536
Actual π: 3.1415926536
Error: 0.0000000000 (0.0000%)
Total time: 45s
Throughput: 22222222 samples/sec
=====================================
Performance Tuning¶
Increase Workers¶
# Scale workers
docker-compose up -d --scale worker=100
Adjust Batch Size¶
Larger batches = higher throughput:
const batchSize = 500 // Increase from 100
Tune Session Parameters¶
Attributes: &pb.SessionAttributes{
MinWorkers: proto.Uint32(100), // More workers
MaxWorkers: proto.Uint32(1000),
MaxInflightBatchesPerWorker: proto.Uint32(20), // More inflight
}
Variations¶
GPU-Accelerated Version¶
Use CUDA for faster sampling on GPU workers.
Adaptive Sampling¶
Dynamically adjust samples per task based on variance.
Multi-Dimensional Integration¶
Extend to higher-dimensional Monte Carlo integration.
Next Steps¶
- Simple Client: Basic client example
- Worker Example: Worker implementation
- Submitting Workloads: Advanced patterns
- Writing Services: Worker optimization