Skip to content

Using Shared Data

Learn how to efficiently distribute common data to workers using ALPCRUN.CH's distributed cache.

Overview

Shared data is useful when:

  • Multiple workers need access to the same large dataset
  • Data doesn't change frequently (or changes in discrete versions)
  • Copying data with each task would be inefficient

Examples: - Machine learning model weights - Lookup tables - Configuration parameters - Reference datasets

Basic Usage

1. Upload Shared Data

// Connect to cache
cacheConn, cacheClient, err := grpccon.ConnectToCache(
    "central-cache:2337",
    "/certs/ca.crt",
    true)
if err != nil {
    log.Fatalf("Failed to connect to cache: %v", err)
}
defer cacheConn.Close()

// Prepare data
sharedData := MySharedData{
    ModelWeights: weights,
    LookupTable:  table,
}
payload, _ := json.Marshal(sharedData)

// Upload to cache
updateLevel := uint32(1)
_, err = cacheClient.SetSharedData(ctx, &pb.SharedData{
    SessionId:   session.Id,
    UpdateLevel: &updateLevel,
    Payload:     payload,
    Ttl:         durationpb.New(10 * time.Minute),
})
if err != nil {
    log.Fatalf("Failed to set shared data: %v", err)
}

2. Send Tasks with Shared Data Reference

batch := &pb.Batch{
    SessionId:                     session.Id,
    UseSharedData:                 proto.Bool(true),
    RequiredSharedDataUpdateLevel: &updateLevel,
    Messages:                      tasks,
}
stream.Send(batch)

3. Worker Fetches Data

func processBatch(ctx context.Context, taskBatch *pb.Batch) *pb.Batch {
    var sharedData []byte

    if taskBatch.GetUseSharedData() {
        sharedData = fetchSharedData(ctx, taskBatch.SessionId,
            taskBatch.RequiredSharedDataUpdateLevel)
    }

    // Use shared data for processing
    results := processWithSharedData(taskBatch.Messages, sharedData)

    return &pb.Batch{
        SessionId: taskBatch.SessionId,
        Messages:  results,
    }
}

Versioning

Shared data supports versioning via update levels:

// Version 1
updateLevel := uint32(1)
cacheClient.SetSharedData(ctx, &pb.SharedData{
    SessionId:   sessionID,
    UpdateLevel: &updateLevel,
    Payload:     dataV1,
    Ttl:         durationpb.New(10 * time.Minute),
})

// Send tasks using version 1
batch := &pb.Batch{
    UseSharedData:                 proto.Bool(true),
    RequiredSharedDataUpdateLevel: &updateLevel,
    Messages:                      tasksV1,
}
stream.Send(batch)

// Update to version 2
newUpdateLevel := uint32(2)
cacheClient.UpdateSharedData(ctx, &pb.SharedData{
    SessionId:   sessionID,
    UpdateLevel: &newUpdateLevel,
    Payload:     dataV2,
    Ttl:         durationpb.New(10 * time.Minute),
})

// Send tasks using version 2
batch = &pb.Batch{
    UseSharedData:                 proto.Bool(true),
    RequiredSharedDataUpdateLevel: &newUpdateLevel,
    Messages:                      tasksV2,
}
stream.Send(batch)

TTL Management

Shared data automatically expires after the TTL:

// Set TTL
cacheClient.SetSharedData(ctx, &pb.SharedData{
    SessionId:   sessionID,
    UpdateLevel: &updateLevel,
    Payload:     data,
    Ttl:         durationpb.New(10 * time.Minute),  // Expires in 10 minutes
})

// Refresh TTL by updating
cacheClient.UpdateSharedData(ctx, &pb.SharedData{
    SessionId:   sessionID,
    UpdateLevel: &newUpdateLevel,
    Payload:     updatedData,
    Ttl:         durationpb.New(10 * time.Minute),  // TTL reset
})

Best Practices: - Set TTL slightly longer than expected session duration - Maximum TTL is configured per cache service (default: 600s) - Update TTL if session is long-running

Cleanup

Always clean up shared data when done:

defer func() {
    _, err := cacheClient.DeleteSharedData(ctx, &pb.SharedData{
        SessionId: sessionID,
    })
    if err != nil {
        log.Printf("Failed to delete shared data: %v", err)
    }
}()

Or use CleanupOnClientExit for automatic cleanup:

session, _ := queueClient.CreateSession(ctx, &pb.Session{
    App: &pb.App{Id: "my-app"},
    Attributes: &pb.SessionAttributes{
        CleanupOnClientExit: proto.Bool(true),  // Auto-cleanup
    },
})

Performance Optimization

Node Cache

Workers should connect to node cache (local) instead of central cache:

// Worker: connect to local node cache
cacheConn, cacheClient, err := grpccon.ConnectToCache(
    "localhost:3337",  // Node cache (local)
    "",
    false)

Benefits: - Sub-millisecond latency for cache hits - Reduced central cache load - Network traffic minimization

Preloading

For long-running sessions, preload shared data before sending tasks:

// Upload shared data first
cacheClient.SetSharedData(ctx, sharedData)

// Wait a moment for replication
time.Sleep(100 * time.Millisecond)

// Now send tasks
for _, task := range tasks {
    batch := &pb.Batch{
        UseSharedData: proto.Bool(true),
        RequiredSharedDataUpdateLevel: &updateLevel,
        Messages: []*pb.Message{task},
    }
    stream.Send(batch)
}

Common Patterns

Large Model Weights

// Load model from file
modelData, _ := ioutil.ReadFile("model.onnx")

// Upload to cache
updateLevel := uint32(1)
cacheClient.SetSharedData(ctx, &pb.SharedData{
    SessionId:   sessionID,
    UpdateLevel: &updateLevel,
    Payload:     modelData,
    Ttl:         durationpb.New(30 * time.Minute),
})

// Tasks reference the model
for _, input := range inputs {
    task := &pb.Message{
        Id:      fmt.Sprintf("task-%d", i),
        Payload: input,
    }

    batch := &pb.Batch{
        SessionId:                     sessionID,
        UseSharedData:                 proto.Bool(true),
        RequiredSharedDataUpdateLevel: &updateLevel,
        Messages:                      []*pb.Message{task},
    }
    stream.Send(batch)
}

Worker side:

var cachedModel *Model
var cachedUpdateLevel uint32

func processBatch(ctx context.Context, taskBatch *pb.Batch) *pb.Batch {
    // Load model only if version changed
    if taskBatch.GetUseSharedData() {
        requiredLevel := taskBatch.GetRequiredSharedDataUpdateLevel()
        if cachedModel == nil || cachedUpdateLevel != requiredLevel {
            sharedData := fetchSharedData(ctx, taskBatch)
            cachedModel = loadModel(sharedData)
            cachedUpdateLevel = requiredLevel
            log.Printf("Loaded model version %d", requiredLevel)
        }
    }

    // Use cached model
    results := processWithModel(taskBatch.Messages, cachedModel)
    return &pb.Batch{Messages: results}
}

Lookup Tables

type LookupTable struct {
    Entries map[string]float64
}

// Client: upload lookup table
table := LookupTable{
    Entries: make(map[string]float64),
}
// ... populate table ...

tableData, _ := json.Marshal(table)
updateLevel := uint32(1)
cacheClient.SetSharedData(ctx, &pb.SharedData{
    SessionId:   sessionID,
    UpdateLevel: &updateLevel,
    Payload:     tableData,
    Ttl:         durationpb.New(15 * time.Minute),
})

// Worker: use lookup table
func processTask(task *pb.Message, sharedData []byte) *pb.Message {
    var table LookupTable
    json.Unmarshal(sharedData, &table)

    // Use table
    key := extractKey(task.Payload)
    value := table.Entries[key]

    result := computeWithValue(value)
    return &pb.Message{Id: task.Id, Payload: result}
}

Iterative Updates

For iterative algorithms, update shared data between iterations:

updateLevel := uint32(0)

for iteration := 0; iteration < maxIterations; iteration++ {
    updateLevel++

    // Update shared parameters
    params := computeParameters(results)
    paramsData, _ := json.Marshal(params)

    if iteration == 0 {
        cacheClient.SetSharedData(ctx, &pb.SharedData{
            SessionId:   sessionID,
            UpdateLevel: &updateLevel,
            Payload:     paramsData,
            Ttl:         durationpb.New(10 * time.Minute),
        })
    } else {
        cacheClient.UpdateSharedData(ctx, &pb.SharedData{
            SessionId:   sessionID,
            UpdateLevel: &updateLevel,
            Payload:     paramsData,
            Ttl:         durationpb.New(10 * time.Minute),
        })
    }

    // Send tasks for this iteration
    for _, task := range tasks {
        batch := &pb.Batch{
            SessionId:                     sessionID,
            UseSharedData:                 proto.Bool(true),
            RequiredSharedDataUpdateLevel: &updateLevel,
            Messages:                      []*pb.Message{task},
        }
        stream.Send(batch)
    }

    // Collect results
    results = collectResults(stream, len(tasks))
}

Best Practices

  1. Size Limits: Keep shared data under 100MB for good performance
  2. Versioning: Always increment update level sequentially (1, 2, 3...)
  3. TTL: Set TTL = session duration + buffer
  4. Cleanup: Always delete shared data when done
  5. Worker Caching: Cache deserialized data in worker memory
  6. Node Cache: Always use node cache from workers

Troubleshooting

Shared Data Not Found

Error: shared data not found for session <session-id>

Causes: - TTL expired - Data not uploaded before sending tasks - Wrong session ID

Solutions: - Increase TTL - Upload data before sending tasks - Verify session ID matches

Version Mismatch

Error: update level mismatch: expected 2, got 3

Causes: - Update level not sequential - Client and worker out of sync

Solutions: - Ensure update levels increment by 1 - Wait for all tasks of version N to complete before updating to N+1

Cache Miss

High cache miss rate on node cache:

Causes: - Node cache size too small - High cache eviction rate - First access (cold cache)

Solutions: - Increase node cache size - Preload data before sending tasks - Monitor cache hit rate metrics

Next Steps