Message Types Reference¶
Complete reference for all Protocol Buffer message types in ALPCRUN.CH.
Core Types¶
Session¶
Represents a workload session.
message Session {
string id = 1;
App app = 2;
SessionAttributes attributes = 3;
}
Fields:
| Field | Type | Description |
|---|---|---|
id |
string | Unique session identifier (ULID format). Auto-generated by server if empty. |
app |
App | Application information |
attributes |
SessionAttributes | Session configuration |
Example:
session := &pb.Session{
Id: "", // Empty = auto-generate
App: &pb.App{Id: "my-app"},
Attributes: &pb.SessionAttributes{
Priority: 5,
MaxWorkers: proto.Uint32(100),
},
}
App¶
Application identifier.
message App {
string id = 1;
}
Fields:
| Field | Type | Description |
|---|---|---|
id |
string | Application identifier (user-defined) |
SessionAttributes¶
Configuration for a session.
message SessionAttributes {
uint32 priority = 1;
string client_id = 2;
uint32 min_workers = 3;
uint32 max_workers = 4;
google.protobuf.Duration idle_timeout = 5;
bool preemptible = 6;
bool cleanup_on_client_exit = 7;
uint32 max_inflight_batches_per_worker = 8;
repeated string tags = 9;
}
Fields:
| Field | Type | Default | Description |
|---|---|---|---|
priority |
uint32 | 5 | Session priority (0-10, higher = more important) |
client_id |
string | auto | Client identifier (auto-generated if empty) |
min_workers |
uint32 | 0 | Minimum workers to bind (scheduling hint) |
max_workers |
uint32 | unlimited | Maximum workers allowed (hard limit) |
idle_timeout |
Duration | 60s | Keep-alive duration without activity |
preemptible |
bool | false | Can be preempted by higher priority sessions |
cleanup_on_client_exit |
bool | false | Auto-cleanup queues on client disconnect |
max_inflight_batches_per_worker |
uint32 | 10 | Flow control limit per worker |
tags |
[]string | [] | Metadata tags for filtering and monitoring |
Example:
attributes := &pb.SessionAttributes{
Priority: 8,
MinWorkers: proto.Uint32(10),
MaxWorkers: proto.Uint32(100),
IdleTimeout: durationpb.New(2 * time.Minute),
Preemptible: false,
CleanupOnClientExit: proto.Bool(true),
MaxInflightBatchesPerWorker: proto.Uint32(5),
Tags: []string{"production", "gpu"},
}
Task and Result Types¶
Batch¶
Collection of messages for streaming efficiency.
message Batch {
string id = 1;
string session_id = 2;
int32 priority = 3;
repeated Message messages = 4;
bool use_shared_data = 5;
uint32 required_shared_data_update_level = 6;
bool dead = 7;
}
Fields:
| Field | Type | Description |
|---|---|---|
id |
string | Batch identifier for tracking |
session_id |
string | Associated session |
priority |
int32 | Batch priority (overrides session priority if set) |
messages |
[]Message | Array of messages |
use_shared_data |
bool | Whether workers should fetch shared data |
required_shared_data_update_level |
uint32 | Version of shared data required |
dead |
bool | Indicates dead letter batch (failed tasks) |
Example (Task Batch):
taskBatch := &pb.Batch{
Id: "batch-001",
SessionId: session.Id,
Priority: 0,
UseSharedData: proto.Bool(true),
RequiredSharedDataUpdateLevel: proto.Uint32(1),
Messages: []*pb.Message{
{Id: "task-1", Payload: payload1},
{Id: "task-2", Payload: payload2},
},
Dead: proto.Bool(false),
}
Example (Result Batch):
resultBatch := &pb.Batch{
Id: taskBatch.Id,
SessionId: taskBatch.SessionId,
Messages: []*pb.Message{
{Id: "task-1", Payload: result1, RelatedId: proto.String("task-1")},
{Id: "task-2", Payload: result2, RelatedId: proto.String("task-2")},
},
}
Message¶
Individual task or result.
message Message {
string id = 1;
bytes payload = 2;
google.protobuf.Timestamp timestamp = 3;
string error = 4;
string related_id = 5;
}
Fields:
| Field | Type | Description |
|---|---|---|
id |
string | Unique message identifier |
payload |
bytes | Serialized task/result data (format defined by user) |
timestamp |
Timestamp | Creation timestamp (auto-set by server) |
error |
string | Error message (only for dead letter messages) |
related_id |
string | Reference to related message (e.g., task ID for result) |
Example (Task):
taskPayload, _ := json.Marshal(MyTask{Value: 42})
task := &pb.Message{
Id: "task-001",
Payload: taskPayload,
}
Example (Result):
resultPayload, _ := json.Marshal(MyResult{Result: 84})
result := &pb.Message{
Id: "task-001-result",
Payload: resultPayload,
RelatedId: proto.String("task-001"),
}
Example (Error):
errorMsg := &pb.Message{
Id: "task-001",
Error: proto.String("computation failed: division by zero"),
}
Cache Types¶
SharedData¶
Session-scoped cached data.
message SharedData {
string session_id = 1;
uint32 update_level = 2;
bytes payload = 3;
google.protobuf.Duration ttl = 4;
}
Fields:
| Field | Type | Description |
|---|---|---|
session_id |
string | Associated session |
update_level |
uint32 | Version number (must be sequential: 1, 2, 3...) |
payload |
bytes | Serialized shared data |
ttl |
Duration | Time to live (auto-expiration) |
Example (Set):
sharedData := &pb.SharedData{
SessionId: sessionID,
UpdateLevel: proto.Uint32(1),
Payload: modelWeights,
Ttl: durationpb.New(10 * time.Minute),
}
cacheClient.SetSharedData(ctx, sharedData)
Example (Update):
updatedData := &pb.SharedData{
SessionId: sessionID,
UpdateLevel: proto.Uint32(2), // Increment
Payload: updatedModelWeights,
Ttl: durationpb.New(10 * time.Minute),
}
cacheClient.UpdateSharedData(ctx, updatedData)
Worker Types¶
WorkerState¶
Worker status update.
message WorkerState {
string worker_id = 1;
enum Status {
IDLE = 0;
BUSY = 1;
}
Status status = 2;
}
Fields:
| Field | Type | Description |
|---|---|---|
worker_id |
string | Worker identifier (assigned by queue manager) |
status |
Status | Current worker status |
Status Values:
- IDLE (0): Worker available for work
- BUSY (1): Worker processing tasks
Example:
state := &pb.WorkerState{
WorkerId: workerID,
Status: pb.WorkerState_IDLE,
}
updateStream.Send(state)
Instruction¶
Command from queue manager to worker.
message Instruction {
enum Type {
BIND = 0;
UNBIND = 1;
}
Type type = 1;
string session_id = 2;
}
Fields:
| Field | Type | Description |
|---|---|---|
type |
Type | Instruction type |
session_id |
string | Session to bind/unbind |
Type Values:
- BIND (0): Worker should start processing session
- UNBIND (1): Worker should stop processing session
Example (Received by Worker):
instruction, _ := updateStream.Recv()
switch instruction.Type {
case pb.Instruction_BIND:
processSession(instruction.SessionId)
case pb.Instruction_UNBIND:
stopProcessing(instruction.SessionId)
}
Status Types¶
Status¶
Generic operation status.
message Status {
int32 code = 1;
string message = 2;
}
Fields:
| Field | Type | Description |
|---|---|---|
code |
int32 | Status code (0 = success, non-zero = error) |
message |
string | Human-readable status message |
Example:
status := &pb.Status{
Code: 0,
Message: "Operation successful",
}
Protocol Buffer Types¶
Timestamp¶
Google protobuf timestamp.
import "google/protobuf/timestamp.proto";
Usage:
import "google.golang.org/protobuf/types/known/timestamppb"
timestamp := timestamppb.Now()
Duration¶
Google protobuf duration.
import "google/protobuf/duration.proto";
Usage:
import "google.golang.org/protobuf/types/known/durationpb"
duration := durationpb.New(10 * time.Second)
Field Conventions¶
Optional Fields¶
Optional fields use pointers:
// Required field
Priority: 5
// Optional field (omitted)
MinWorkers: nil
// Optional field (set)
MinWorkers: proto.Uint32(10)
Helper functions:
proto.Bool(true) // *bool
proto.Int32(42) // *int32
proto.Int64(42) // *int64
proto.Uint32(42) // *uint32
proto.Uint64(42) // *uint64
proto.Float32(3.14) // *float32
proto.Float64(3.14) // *float64
proto.String("text") // *string
Retrieving Optional Fields¶
// Check if field is set
if batch.UseSharedData != nil && *batch.UseSharedData {
// Use shared data
}
// Get value with default
useSharedData := batch.GetUseSharedData() // Returns false if nil
updateLevel := batch.GetRequiredSharedDataUpdateLevel() // Returns 0 if nil
Serialization¶
JSON¶
import "google.golang.org/protobuf/encoding/protojson"
// Marshal to JSON
jsonData, _ := protojson.Marshal(session)
// Unmarshal from JSON
var session pb.Session
protojson.Unmarshal(jsonData, &session)
Binary¶
import "google.golang.org/protobuf/proto"
// Marshal to binary
binaryData, _ := proto.Marshal(session)
// Unmarshal from binary
var session pb.Session
proto.Unmarshal(binaryData, &session)
Validation¶
Field Constraints¶
Session:
- app.id: Required, non-empty string
- attributes.priority: 0-10 range recommended
Batch:
- session_id: Required, must match existing session
- messages: At least one message required
- required_shared_data_update_level: Required if use_shared_data=true
SharedData:
- session_id: Required
- update_level: Must be sequential (1, 2, 3...)
- ttl: Must not exceed configured maximum
Next Steps¶
- API Reference: Complete API documentation
- Configuration Reference: Service configuration
- Examples: Code examples