Skip to content

API Reference

Complete reference for the ALPCRUN.CH gRPC API.

Protocol Buffer Definition

All APIs are defined in metalcore/api/v1/ghpc.proto.

Services

Queue Service

The primary service for task submission and processing.

service Queue {
    rpc CreateSession(Session) returns (Session)
    rpc UpdateSession(Session) returns (Status)
    rpc CloseSession(Session) returns (Status)
    rpc ClientStream(stream Batch) returns (stream Batch)
    rpc ClientStreamDeadLetters(Session) returns (stream Batch)
    rpc WorkerStream(stream Batch) returns (stream Batch)
    rpc WorkerUpdate(stream WorkerState) returns (stream Instruction)
}

CreateSession

Creates a new session for workload processing.

Request: Session

Response: Session (with generated ID and client ID)

Example:

session, err := queueClient.CreateSession(ctx, &pb.Session{
    App: &pb.App{Id: "my-app"},
    Attributes: &pb.SessionAttributes{
        Priority:    5,
        IdleTimeout: durationpb.New(60 * time.Second),
        MinWorkers:  proto.Uint32(10),
        MaxWorkers:  proto.Uint32(100),
    },
})

Fields: - app.id: Application identifier (required) - attributes: Session configuration (see SessionAttributes)

UpdateSession

Updates an existing session's attributes.

Request: Session (with ID and updated attributes)

Response: Status

Example:

_, err := queueClient.UpdateSession(ctx, &pb.Session{
    Id: sessionID,
    Attributes: &pb.SessionAttributes{
        Priority:   8,  // Increase priority
        MaxWorkers: proto.Uint32(200),  // Allow more workers
    },
})

CloseSession

Closes a session and optionally cleans up resources.

Request: Session (with ID)

Response: Status

Example:

_, err := queueClient.CloseSession(ctx, &pb.Session{Id: sessionID})

Behavior: - Unbinds all workers - If cleanup_on_client_exit=true, drains queues - Removes session from registry

ClientStream

Bidirectional stream for sending tasks and receiving results.

Request Stream: Batch (task batches)

Response Stream: Batch (result batches)

Example:

stream, err := queueClient.ClientStream(ctx)

// Send tasks
stream.Send(&pb.Batch{
    SessionId: session.Id,
    Messages:  tasks,
})

// Receive results
for {
    batch, err := stream.Recv()
    if err != nil {
        break
    }
    processResults(batch.Messages)
}

Notes: - Long-lived connection (keep open for entire session) - Automatic flow control via backpressure - Can send and receive concurrently

ClientStreamDeadLetters

Stream for retrieving failed tasks.

Request: Session (with ID)

Response Stream: Batch (dead letter batches with errors)

Example:

stream, err := queueClient.ClientStreamDeadLetters(ctx, &pb.Session{Id: sessionID})

for {
    batch, err := stream.Recv()
    if err == io.EOF {
        break
    }

    for _, msg := range batch.Messages {
        log.Printf("Failed task %s: %s", msg.Id, msg.GetError())
    }
}

Notes: - Messages include error descriptions - dead field is set to true - Original payload is preserved

WorkerStream

Bidirectional stream for workers to receive tasks and send results.

Request Stream: Batch (result batches)

Response Stream: Batch (task batches)

Metadata Required: - sessionid: Session to process - api_key: Authentication key

Example:

ctx = metadata.AppendToOutgoingContext(ctx, "sessionid", sessionID)
stream, err := queueClient.WorkerStream(ctx)

for {
    taskBatch, err := stream.Recv()
    if err != nil {
        break
    }

    results := processTasks(taskBatch)
    stream.Send(results)
}

WorkerUpdate

Stream for worker registration and instruction handling.

Request Stream: WorkerState (worker status updates)

Response Stream: Instruction (BIND/UNBIND commands)

Response Header: workerid (assigned by queue manager)

Metadata Required: - api_key: Authentication key - node: Kubernetes node name - pod: Pod identifier

Example:

md := metadata.New(map[string]string{
    "api_key": apiKey,
    "node":    nodeName,
    "pod":     podName,
})
ctx = metadata.NewOutgoingContext(ctx, md)

stream, err := queueClient.WorkerUpdate(ctx)

// Get worker ID from header
header, _ := stream.Header()
workerID := header.Get("workerid")[0]

// Send initial state
stream.Send(&pb.WorkerState{
    WorkerId: workerID,
    Status:   pb.WorkerState_IDLE,
})

// Wait for instructions
for {
    inst, _ := stream.Recv()
    if inst.Type == pb.Instruction_BIND {
        processSession(inst.SessionId)
    }
}

Cache Service

Service for shared data management.

service Cache {
    rpc SetSharedData(SharedData) returns (Status)
    rpc UpdateSharedData(SharedData) returns (Status)
    rpc GetSharedData(SharedData) returns (SharedData)
    rpc DeleteSharedData(SharedData) returns (Status)
}

SetSharedData

Creates new shared data for a session.

Request: SharedData

Response: Status

Example:

updateLevel := uint32(1)
_, err := cacheClient.SetSharedData(ctx, &pb.SharedData{
    SessionId:   sessionID,
    UpdateLevel: &updateLevel,
    Payload:     data,
    Ttl:         durationpb.New(10 * time.Minute),
})

Error: Returns error if data already exists for this session

UpdateSharedData

Updates existing shared data and increments version.

Request: SharedData (with new update level)

Response: Status

Example:

newUpdateLevel := uint32(2)
_, err := cacheClient.UpdateSharedData(ctx, &pb.SharedData{
    SessionId:   sessionID,
    UpdateLevel: &newUpdateLevel,
    Payload:     updatedData,
    Ttl:         durationpb.New(10 * time.Minute),
})

Notes: - Update level must be sequential (no gaps) - Resets TTL

GetSharedData

Retrieves shared data for a specific version.

Request: SharedData (with session ID and update level)

Response: SharedData (with payload)

Example:

updateLevel := uint32(1)
data, err := cacheClient.GetSharedData(ctx, &pb.SharedData{
    SessionId:   sessionID,
    UpdateLevel: &updateLevel,
})

payload := data.Payload

Error: Returns error if not found or version mismatch

DeleteSharedData

Removes shared data for a session.

Request: SharedData (with session ID)

Response: Status

Example:

_, err := cacheClient.DeleteSharedData(ctx, &pb.SharedData{
    SessionId: sessionID,
})

Message Types

Session

Represents a workload session.

message Session {
    string id = 1;
    App app = 2;
    SessionAttributes attributes = 3;
}

Fields: - id: Unique session identifier (ULID, generated by server) - app: Application information - attributes: Session configuration

SessionAttributes

Configuration for a session.

message SessionAttributes {
    uint32 priority = 1;
    string client_id = 2;
    uint32 min_workers = 3;
    uint32 max_workers = 4;
    google.protobuf.Duration idle_timeout = 5;
    bool preemptible = 6;
    bool cleanup_on_client_exit = 7;
    uint32 max_inflight_batches_per_worker = 8;
    repeated string tags = 9;
}

Fields: - priority: 0-10, higher = more important (default: 5) - client_id: Client identifier (auto-generated if empty) - min_workers: Minimum workers to bind (scheduling hint) - max_workers: Maximum workers allowed (hard limit) - idle_timeout: Keep-alive duration without activity - preemptible: Can be preempted by higher priority sessions - cleanup_on_client_exit: Auto-cleanup queues on disconnect - max_inflight_batches_per_worker: Flow control limit per worker - tags: Metadata for filtering and monitoring

Batch

Collection of messages for efficient streaming.

message Batch {
    string id = 1;
    string session_id = 2;
    int32 priority = 3;
    repeated Message messages = 4;
    bool use_shared_data = 5;
    uint32 required_shared_data_update_level = 6;
    bool dead = 7;
}

Fields: - id: Batch identifier for tracking - session_id: Associated session - priority: Batch priority (overrides session priority) - messages: Array of messages - use_shared_data: Whether workers should fetch shared data - required_shared_data_update_level: Version of shared data required - dead: Indicates dead letter batch

Message

Individual task or result.

message Message {
    string id = 1;
    bytes payload = 2;
    google.protobuf.Timestamp timestamp = 3;
    string error = 4;
    string related_id = 5;
}

Fields: - id: Unique message identifier - payload: Serialized task/result data - timestamp: Creation timestamp (auto-set by server) - error: Error message for failed tasks (dead letters only) - related_id: Reference to related message (e.g., task ID for result)

SharedData

Session-scoped cached data.

message SharedData {
    string session_id = 1;
    uint32 update_level = 2;
    bytes payload = 3;
    google.protobuf.Duration ttl = 4;
}

Fields: - session_id: Associated session - update_level: Version number (sequential) - payload: Serialized shared data - ttl: Time to live (refreshed on update)

WorkerState

Worker status update.

message WorkerState {
    string worker_id = 1;
    enum Status {
        IDLE = 0;
        BUSY = 1;
    }
    Status status = 2;
}

Instruction

Command from queue manager to worker.

message Instruction {
    enum Type {
        BIND = 0;
        UNBIND = 1;
    }
    Type type = 1;
    string session_id = 2;
}

Types: - BIND: Worker should start processing session - UNBIND: Worker should stop processing session

Status

Generic operation status.

message Status {
    int32 code = 1;
    string message = 2;
}

Error Codes

gRPC Status Codes

  • OK (0): Success
  • INVALID_ARGUMENT (3): Invalid request parameters
  • NOT_FOUND (5): Session or data not found
  • ALREADY_EXISTS (6): Resource already exists
  • PERMISSION_DENIED (7): Authentication failed
  • UNAVAILABLE (14): Service unavailable
  • UNAUTHENTICATED (16): Missing or invalid credentials

Common Errors

Session not found:

code: NOT_FOUND
message: "session not found: <session-id>"

Authentication failed:

code: UNAUTHENTICATED
message: "invalid API key"

Shared data version mismatch:

code: INVALID_ARGUMENT
message: "update level mismatch: expected 2, got 3"

Connection Helpers

Go Client

import "github.com/limelabs/metalcore-neo/pkg/grpccon"

// Connect to queue manager
conn, client, err := grpccon.ConnectToQueue(
    "queue-manager:1337",  // address
    "/certs/ca.crt",       // CA cert path (empty for no TLS)
    true)                  // TLS enabled

// Connect to cache
conn, client, err := grpccon.ConnectToCache(
    "cache:2337",
    "/certs/ca.crt",
    true)

// Wait for connection ready
err := grpccon.BlockUntilReady(ctx, conn, 10*time.Second)

Authentication

All requests require API key in gRPC metadata:

import "google.golang.org/grpc/metadata"

md := metadata.New(map[string]string{
    "api_key": os.Getenv("METALCORE_API_KEY"),
})
ctx := metadata.NewOutgoingContext(context.Background(), md)

See Authentication Reference for details.

Rate Limits

No explicit rate limits, but consider:

  • Task submission: Limited by stream backpressure
  • Cache operations: ~5,000 ops/sec per cache instance
  • Session creation: ~100 sessions/sec

Timeouts

Recommended timeouts:

  • CreateSession: 10 seconds
  • ClientStream: No timeout (long-lived)
  • Cache operations: 5 seconds
  • WorkerStream: No timeout (long-lived)

Pagination

Not applicable - all operations are streaming-based.

Versioning

API version: v1 (stable)

Breaking changes will use new version (v2, etc.).

Next Steps