Using Shared Data¶
Learn how to efficiently distribute common data to workers using ALPCRUN.CH's distributed cache.
Overview¶
Shared data is useful when:
- Multiple workers need access to the same large dataset
- Data doesn't change frequently (or changes in discrete versions)
- Copying data with each task would be inefficient
Examples: - Machine learning model weights - Lookup tables - Configuration parameters - Reference datasets
Basic Usage¶
1. Upload Shared Data¶
// Connect to cache
cacheConn, cacheClient, err := grpccon.ConnectToCache(
"central-cache:2337",
"/certs/ca.crt",
true)
if err != nil {
log.Fatalf("Failed to connect to cache: %v", err)
}
defer cacheConn.Close()
// Prepare data
sharedData := MySharedData{
ModelWeights: weights,
LookupTable: table,
}
payload, _ := json.Marshal(sharedData)
// Upload to cache
updateLevel := uint32(1)
_, err = cacheClient.SetSharedData(ctx, &pb.SharedData{
SessionId: session.Id,
UpdateLevel: &updateLevel,
Payload: payload,
Ttl: durationpb.New(10 * time.Minute),
})
if err != nil {
log.Fatalf("Failed to set shared data: %v", err)
}
2. Send Tasks with Shared Data Reference¶
batch := &pb.Batch{
SessionId: session.Id,
UseSharedData: proto.Bool(true),
RequiredSharedDataUpdateLevel: &updateLevel,
Messages: tasks,
}
stream.Send(batch)
3. Worker Fetches Data¶
func processBatch(ctx context.Context, taskBatch *pb.Batch) *pb.Batch {
var sharedData []byte
if taskBatch.GetUseSharedData() {
sharedData = fetchSharedData(ctx, taskBatch.SessionId,
taskBatch.RequiredSharedDataUpdateLevel)
}
// Use shared data for processing
results := processWithSharedData(taskBatch.Messages, sharedData)
return &pb.Batch{
SessionId: taskBatch.SessionId,
Messages: results,
}
}
Versioning¶
Shared data supports versioning via update levels:
// Version 1
updateLevel := uint32(1)
cacheClient.SetSharedData(ctx, &pb.SharedData{
SessionId: sessionID,
UpdateLevel: &updateLevel,
Payload: dataV1,
Ttl: durationpb.New(10 * time.Minute),
})
// Send tasks using version 1
batch := &pb.Batch{
UseSharedData: proto.Bool(true),
RequiredSharedDataUpdateLevel: &updateLevel,
Messages: tasksV1,
}
stream.Send(batch)
// Update to version 2
newUpdateLevel := uint32(2)
cacheClient.UpdateSharedData(ctx, &pb.SharedData{
SessionId: sessionID,
UpdateLevel: &newUpdateLevel,
Payload: dataV2,
Ttl: durationpb.New(10 * time.Minute),
})
// Send tasks using version 2
batch = &pb.Batch{
UseSharedData: proto.Bool(true),
RequiredSharedDataUpdateLevel: &newUpdateLevel,
Messages: tasksV2,
}
stream.Send(batch)
TTL Management¶
Shared data automatically expires after the TTL:
// Set TTL
cacheClient.SetSharedData(ctx, &pb.SharedData{
SessionId: sessionID,
UpdateLevel: &updateLevel,
Payload: data,
Ttl: durationpb.New(10 * time.Minute), // Expires in 10 minutes
})
// Refresh TTL by updating
cacheClient.UpdateSharedData(ctx, &pb.SharedData{
SessionId: sessionID,
UpdateLevel: &newUpdateLevel,
Payload: updatedData,
Ttl: durationpb.New(10 * time.Minute), // TTL reset
})
Best Practices: - Set TTL slightly longer than expected session duration - Maximum TTL is configured per cache service (default: 600s) - Update TTL if session is long-running
Cleanup¶
Always clean up shared data when done:
defer func() {
_, err := cacheClient.DeleteSharedData(ctx, &pb.SharedData{
SessionId: sessionID,
})
if err != nil {
log.Printf("Failed to delete shared data: %v", err)
}
}()
Or use CleanupOnClientExit for automatic cleanup:
session, _ := queueClient.CreateSession(ctx, &pb.Session{
App: &pb.App{Id: "my-app"},
Attributes: &pb.SessionAttributes{
CleanupOnClientExit: proto.Bool(true), // Auto-cleanup
},
})
Performance Optimization¶
Node Cache¶
Workers should connect to node cache (local) instead of central cache:
// Worker: connect to local node cache
cacheConn, cacheClient, err := grpccon.ConnectToCache(
"localhost:3337", // Node cache (local)
"",
false)
Benefits: - Sub-millisecond latency for cache hits - Reduced central cache load - Network traffic minimization
Preloading¶
For long-running sessions, preload shared data before sending tasks:
// Upload shared data first
cacheClient.SetSharedData(ctx, sharedData)
// Wait a moment for replication
time.Sleep(100 * time.Millisecond)
// Now send tasks
for _, task := range tasks {
batch := &pb.Batch{
UseSharedData: proto.Bool(true),
RequiredSharedDataUpdateLevel: &updateLevel,
Messages: []*pb.Message{task},
}
stream.Send(batch)
}
Common Patterns¶
Large Model Weights¶
// Load model from file
modelData, _ := ioutil.ReadFile("model.onnx")
// Upload to cache
updateLevel := uint32(1)
cacheClient.SetSharedData(ctx, &pb.SharedData{
SessionId: sessionID,
UpdateLevel: &updateLevel,
Payload: modelData,
Ttl: durationpb.New(30 * time.Minute),
})
// Tasks reference the model
for _, input := range inputs {
task := &pb.Message{
Id: fmt.Sprintf("task-%d", i),
Payload: input,
}
batch := &pb.Batch{
SessionId: sessionID,
UseSharedData: proto.Bool(true),
RequiredSharedDataUpdateLevel: &updateLevel,
Messages: []*pb.Message{task},
}
stream.Send(batch)
}
Worker side:
var cachedModel *Model
var cachedUpdateLevel uint32
func processBatch(ctx context.Context, taskBatch *pb.Batch) *pb.Batch {
// Load model only if version changed
if taskBatch.GetUseSharedData() {
requiredLevel := taskBatch.GetRequiredSharedDataUpdateLevel()
if cachedModel == nil || cachedUpdateLevel != requiredLevel {
sharedData := fetchSharedData(ctx, taskBatch)
cachedModel = loadModel(sharedData)
cachedUpdateLevel = requiredLevel
log.Printf("Loaded model version %d", requiredLevel)
}
}
// Use cached model
results := processWithModel(taskBatch.Messages, cachedModel)
return &pb.Batch{Messages: results}
}
Lookup Tables¶
type LookupTable struct {
Entries map[string]float64
}
// Client: upload lookup table
table := LookupTable{
Entries: make(map[string]float64),
}
// ... populate table ...
tableData, _ := json.Marshal(table)
updateLevel := uint32(1)
cacheClient.SetSharedData(ctx, &pb.SharedData{
SessionId: sessionID,
UpdateLevel: &updateLevel,
Payload: tableData,
Ttl: durationpb.New(15 * time.Minute),
})
// Worker: use lookup table
func processTask(task *pb.Message, sharedData []byte) *pb.Message {
var table LookupTable
json.Unmarshal(sharedData, &table)
// Use table
key := extractKey(task.Payload)
value := table.Entries[key]
result := computeWithValue(value)
return &pb.Message{Id: task.Id, Payload: result}
}
Iterative Updates¶
For iterative algorithms, update shared data between iterations:
updateLevel := uint32(0)
for iteration := 0; iteration < maxIterations; iteration++ {
updateLevel++
// Update shared parameters
params := computeParameters(results)
paramsData, _ := json.Marshal(params)
if iteration == 0 {
cacheClient.SetSharedData(ctx, &pb.SharedData{
SessionId: sessionID,
UpdateLevel: &updateLevel,
Payload: paramsData,
Ttl: durationpb.New(10 * time.Minute),
})
} else {
cacheClient.UpdateSharedData(ctx, &pb.SharedData{
SessionId: sessionID,
UpdateLevel: &updateLevel,
Payload: paramsData,
Ttl: durationpb.New(10 * time.Minute),
})
}
// Send tasks for this iteration
for _, task := range tasks {
batch := &pb.Batch{
SessionId: sessionID,
UseSharedData: proto.Bool(true),
RequiredSharedDataUpdateLevel: &updateLevel,
Messages: []*pb.Message{task},
}
stream.Send(batch)
}
// Collect results
results = collectResults(stream, len(tasks))
}
Best Practices¶
- Size Limits: Keep shared data under 100MB for good performance
- Versioning: Always increment update level sequentially (1, 2, 3...)
- TTL: Set TTL = session duration + buffer
- Cleanup: Always delete shared data when done
- Worker Caching: Cache deserialized data in worker memory
- Node Cache: Always use node cache from workers
Troubleshooting¶
Shared Data Not Found¶
Error: shared data not found for session <session-id>
Causes: - TTL expired - Data not uploaded before sending tasks - Wrong session ID
Solutions: - Increase TTL - Upload data before sending tasks - Verify session ID matches
Version Mismatch¶
Error: update level mismatch: expected 2, got 3
Causes: - Update level not sequential - Client and worker out of sync
Solutions: - Ensure update levels increment by 1 - Wait for all tasks of version N to complete before updating to N+1
Cache Miss¶
High cache miss rate on node cache:
Causes: - Node cache size too small - High cache eviction rate - First access (cold cache)
Solutions: - Increase node cache size - Preload data before sending tasks - Monitor cache hit rate metrics
Next Steps¶
- Writing Clients: Client patterns
- Writing Services: Worker implementation
- Submitting Workloads: Workload strategies