API Reference¶
Complete reference for the ALPCRUN.CH gRPC API.
Protocol Buffer Definition¶
All APIs are defined in metalcore/api/v1/ghpc.proto.
Services¶
Queue Service¶
The primary service for task submission and processing.
service Queue {
rpc CreateSession(Session) returns (Session)
rpc UpdateSession(Session) returns (Status)
rpc CloseSession(Session) returns (Status)
rpc ClientStream(stream Batch) returns (stream Batch)
rpc ClientStreamDeadLetters(Session) returns (stream Batch)
rpc WorkerStream(stream Batch) returns (stream Batch)
rpc WorkerUpdate(stream WorkerState) returns (stream Instruction)
}
CreateSession¶
Creates a new session for workload processing.
Request: Session
Response: Session (with generated ID and client ID)
Example:
session, err := queueClient.CreateSession(ctx, &pb.Session{
App: &pb.App{Id: "my-app"},
Attributes: &pb.SessionAttributes{
Priority: 5,
IdleTimeout: durationpb.New(60 * time.Second),
MinWorkers: proto.Uint32(10),
MaxWorkers: proto.Uint32(100),
},
})
Fields:
- app.id: Application identifier (required)
- attributes: Session configuration (see SessionAttributes)
UpdateSession¶
Updates an existing session's attributes.
Request: Session (with ID and updated attributes)
Response: Status
Example:
_, err := queueClient.UpdateSession(ctx, &pb.Session{
Id: sessionID,
Attributes: &pb.SessionAttributes{
Priority: 8, // Increase priority
MaxWorkers: proto.Uint32(200), // Allow more workers
},
})
CloseSession¶
Closes a session and optionally cleans up resources.
Request: Session (with ID)
Response: Status
Example:
_, err := queueClient.CloseSession(ctx, &pb.Session{Id: sessionID})
Behavior:
- Unbinds all workers
- If cleanup_on_client_exit=true, drains queues
- Removes session from registry
ClientStream¶
Bidirectional stream for sending tasks and receiving results.
Request Stream: Batch (task batches)
Response Stream: Batch (result batches)
Example:
stream, err := queueClient.ClientStream(ctx)
// Send tasks
stream.Send(&pb.Batch{
SessionId: session.Id,
Messages: tasks,
})
// Receive results
for {
batch, err := stream.Recv()
if err != nil {
break
}
processResults(batch.Messages)
}
Notes: - Long-lived connection (keep open for entire session) - Automatic flow control via backpressure - Can send and receive concurrently
ClientStreamDeadLetters¶
Stream for retrieving failed tasks.
Request: Session (with ID)
Response Stream: Batch (dead letter batches with errors)
Example:
stream, err := queueClient.ClientStreamDeadLetters(ctx, &pb.Session{Id: sessionID})
for {
batch, err := stream.Recv()
if err == io.EOF {
break
}
for _, msg := range batch.Messages {
log.Printf("Failed task %s: %s", msg.Id, msg.GetError())
}
}
Notes:
- Messages include error descriptions
- dead field is set to true
- Original payload is preserved
WorkerStream¶
Bidirectional stream for workers to receive tasks and send results.
Request Stream: Batch (result batches)
Response Stream: Batch (task batches)
Metadata Required:
- sessionid: Session to process
- api_key: Authentication key
Example:
ctx = metadata.AppendToOutgoingContext(ctx, "sessionid", sessionID)
stream, err := queueClient.WorkerStream(ctx)
for {
taskBatch, err := stream.Recv()
if err != nil {
break
}
results := processTasks(taskBatch)
stream.Send(results)
}
WorkerUpdate¶
Stream for worker registration and instruction handling.
Request Stream: WorkerState (worker status updates)
Response Stream: Instruction (BIND/UNBIND commands)
Response Header: workerid (assigned by queue manager)
Metadata Required:
- api_key: Authentication key
- node: Kubernetes node name
- pod: Pod identifier
Example:
md := metadata.New(map[string]string{
"api_key": apiKey,
"node": nodeName,
"pod": podName,
})
ctx = metadata.NewOutgoingContext(ctx, md)
stream, err := queueClient.WorkerUpdate(ctx)
// Get worker ID from header
header, _ := stream.Header()
workerID := header.Get("workerid")[0]
// Send initial state
stream.Send(&pb.WorkerState{
WorkerId: workerID,
Status: pb.WorkerState_IDLE,
})
// Wait for instructions
for {
inst, _ := stream.Recv()
if inst.Type == pb.Instruction_BIND {
processSession(inst.SessionId)
}
}
Cache Service¶
Service for shared data management.
service Cache {
rpc SetSharedData(SharedData) returns (Status)
rpc UpdateSharedData(SharedData) returns (Status)
rpc GetSharedData(SharedData) returns (SharedData)
rpc DeleteSharedData(SharedData) returns (Status)
}
SetSharedData¶
Creates new shared data for a session.
Request: SharedData
Response: Status
Example:
updateLevel := uint32(1)
_, err := cacheClient.SetSharedData(ctx, &pb.SharedData{
SessionId: sessionID,
UpdateLevel: &updateLevel,
Payload: data,
Ttl: durationpb.New(10 * time.Minute),
})
Error: Returns error if data already exists for this session
UpdateSharedData¶
Updates existing shared data and increments version.
Request: SharedData (with new update level)
Response: Status
Example:
newUpdateLevel := uint32(2)
_, err := cacheClient.UpdateSharedData(ctx, &pb.SharedData{
SessionId: sessionID,
UpdateLevel: &newUpdateLevel,
Payload: updatedData,
Ttl: durationpb.New(10 * time.Minute),
})
Notes: - Update level must be sequential (no gaps) - Resets TTL
GetSharedData¶
Retrieves shared data for a specific version.
Request: SharedData (with session ID and update level)
Response: SharedData (with payload)
Example:
updateLevel := uint32(1)
data, err := cacheClient.GetSharedData(ctx, &pb.SharedData{
SessionId: sessionID,
UpdateLevel: &updateLevel,
})
payload := data.Payload
Error: Returns error if not found or version mismatch
DeleteSharedData¶
Removes shared data for a session.
Request: SharedData (with session ID)
Response: Status
Example:
_, err := cacheClient.DeleteSharedData(ctx, &pb.SharedData{
SessionId: sessionID,
})
Message Types¶
Session¶
Represents a workload session.
message Session {
string id = 1;
App app = 2;
SessionAttributes attributes = 3;
}
Fields:
- id: Unique session identifier (ULID, generated by server)
- app: Application information
- attributes: Session configuration
SessionAttributes¶
Configuration for a session.
message SessionAttributes {
uint32 priority = 1;
string client_id = 2;
uint32 min_workers = 3;
uint32 max_workers = 4;
google.protobuf.Duration idle_timeout = 5;
bool preemptible = 6;
bool cleanup_on_client_exit = 7;
uint32 max_inflight_batches_per_worker = 8;
repeated string tags = 9;
}
Fields:
- priority: 0-10, higher = more important (default: 5)
- client_id: Client identifier (auto-generated if empty)
- min_workers: Minimum workers to bind (scheduling hint)
- max_workers: Maximum workers allowed (hard limit)
- idle_timeout: Keep-alive duration without activity
- preemptible: Can be preempted by higher priority sessions
- cleanup_on_client_exit: Auto-cleanup queues on disconnect
- max_inflight_batches_per_worker: Flow control limit per worker
- tags: Metadata for filtering and monitoring
Batch¶
Collection of messages for efficient streaming.
message Batch {
string id = 1;
string session_id = 2;
int32 priority = 3;
repeated Message messages = 4;
bool use_shared_data = 5;
uint32 required_shared_data_update_level = 6;
bool dead = 7;
}
Fields:
- id: Batch identifier for tracking
- session_id: Associated session
- priority: Batch priority (overrides session priority)
- messages: Array of messages
- use_shared_data: Whether workers should fetch shared data
- required_shared_data_update_level: Version of shared data required
- dead: Indicates dead letter batch
Message¶
Individual task or result.
message Message {
string id = 1;
bytes payload = 2;
google.protobuf.Timestamp timestamp = 3;
string error = 4;
string related_id = 5;
}
Fields:
- id: Unique message identifier
- payload: Serialized task/result data
- timestamp: Creation timestamp (auto-set by server)
- error: Error message for failed tasks (dead letters only)
- related_id: Reference to related message (e.g., task ID for result)
SharedData¶
Session-scoped cached data.
message SharedData {
string session_id = 1;
uint32 update_level = 2;
bytes payload = 3;
google.protobuf.Duration ttl = 4;
}
Fields:
- session_id: Associated session
- update_level: Version number (sequential)
- payload: Serialized shared data
- ttl: Time to live (refreshed on update)
WorkerState¶
Worker status update.
message WorkerState {
string worker_id = 1;
enum Status {
IDLE = 0;
BUSY = 1;
}
Status status = 2;
}
Instruction¶
Command from queue manager to worker.
message Instruction {
enum Type {
BIND = 0;
UNBIND = 1;
}
Type type = 1;
string session_id = 2;
}
Types:
- BIND: Worker should start processing session
- UNBIND: Worker should stop processing session
Status¶
Generic operation status.
message Status {
int32 code = 1;
string message = 2;
}
Error Codes¶
gRPC Status Codes¶
OK(0): SuccessINVALID_ARGUMENT(3): Invalid request parametersNOT_FOUND(5): Session or data not foundALREADY_EXISTS(6): Resource already existsPERMISSION_DENIED(7): Authentication failedUNAVAILABLE(14): Service unavailableUNAUTHENTICATED(16): Missing or invalid credentials
Common Errors¶
Session not found:
code: NOT_FOUND
message: "session not found: <session-id>"
Authentication failed:
code: UNAUTHENTICATED
message: "invalid API key"
Shared data version mismatch:
code: INVALID_ARGUMENT
message: "update level mismatch: expected 2, got 3"
Connection Helpers¶
Go Client¶
import "github.com/limelabs/metalcore-neo/pkg/grpccon"
// Connect to queue manager
conn, client, err := grpccon.ConnectToQueue(
"queue-manager:1337", // address
"/certs/ca.crt", // CA cert path (empty for no TLS)
true) // TLS enabled
// Connect to cache
conn, client, err := grpccon.ConnectToCache(
"cache:2337",
"/certs/ca.crt",
true)
// Wait for connection ready
err := grpccon.BlockUntilReady(ctx, conn, 10*time.Second)
Authentication¶
All requests require API key in gRPC metadata:
import "google.golang.org/grpc/metadata"
md := metadata.New(map[string]string{
"api_key": os.Getenv("METALCORE_API_KEY"),
})
ctx := metadata.NewOutgoingContext(context.Background(), md)
See Authentication Reference for details.
Rate Limits¶
No explicit rate limits, but consider:
- Task submission: Limited by stream backpressure
- Cache operations: ~5,000 ops/sec per cache instance
- Session creation: ~100 sessions/sec
Timeouts¶
Recommended timeouts:
- CreateSession: 10 seconds
- ClientStream: No timeout (long-lived)
- Cache operations: 5 seconds
- WorkerStream: No timeout (long-lived)
Pagination¶
Not applicable - all operations are streaming-based.
Versioning¶
API version: v1 (stable)
Breaking changes will use new version (v2, etc.).
Next Steps¶
- Message Types: Detailed message field descriptions
- Configuration: Service configuration reference
- Authentication: Authentication details