You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

404 lines
14 KiB

syntax = "proto3";
package worker_pb;
option go_package = "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb";
// WorkerService provides bidirectional communication between admin and worker
service WorkerService {
// WorkerStream maintains a bidirectional stream for worker communication
rpc WorkerStream(stream WorkerMessage) returns (stream AdminMessage);
// GetMasterAddresses returns master server addresses for worker tasks
rpc GetMasterAddresses(GetMasterAddressesRequest) returns (GetMasterAddressesResponse);
}
// WorkerMessage represents messages from worker to admin
message WorkerMessage {
string worker_id = 1;
int64 timestamp = 2;
oneof message {
WorkerRegistration registration = 3;
WorkerHeartbeat heartbeat = 4;
TaskRequest task_request = 5;
TaskUpdate task_update = 6;
TaskComplete task_complete = 7;
WorkerShutdown shutdown = 8;
TaskLogResponse task_log_response = 9;
}
}
// AdminMessage represents messages from admin to worker
message AdminMessage {
string admin_id = 1;
int64 timestamp = 2;
oneof message {
RegistrationResponse registration_response = 3;
HeartbeatResponse heartbeat_response = 4;
TaskAssignment task_assignment = 5;
TaskCancellation task_cancellation = 6;
AdminShutdown admin_shutdown = 7;
TaskLogRequest task_log_request = 8;
}
}
// WorkerRegistration message when worker connects
message WorkerRegistration {
string worker_id = 1;
string address = 2;
repeated string capabilities = 3;
int32 max_concurrent = 4;
map<string, string> metadata = 5;
}
// RegistrationResponse confirms worker registration
message RegistrationResponse {
bool success = 1;
string message = 2;
string assigned_worker_id = 3;
}
// WorkerHeartbeat sent periodically by worker
message WorkerHeartbeat {
string worker_id = 1;
string status = 2;
int32 current_load = 3;
int32 max_concurrent = 4;
repeated string current_task_ids = 5;
int32 tasks_completed = 6;
int32 tasks_failed = 7;
int64 uptime_seconds = 8;
}
// HeartbeatResponse acknowledges heartbeat
message HeartbeatResponse {
bool success = 1;
string message = 2;
}
// TaskRequest from worker asking for new tasks
message TaskRequest {
string worker_id = 1;
repeated string capabilities = 2;
int32 available_slots = 3;
}
// TaskAssignment from admin to worker
message TaskAssignment {
string task_id = 1;
string task_type = 2;
TaskParams params = 3;
int32 priority = 4;
int64 created_time = 5;
map<string, string> metadata = 6;
}
// TaskParams contains task-specific parameters with typed variants
message TaskParams {
string task_id = 1; // ActiveTopology task ID for lifecycle management
uint32 volume_id = 2; // Primary volume ID for the task
string collection = 3; // Collection name
string data_center = 4; // Primary data center
string rack = 5; // Primary rack
uint64 volume_size = 6; // Original volume size in bytes for tracking size changes
// Unified source and target arrays for all task types
repeated TaskSource sources = 7; // Source locations (volume replicas, EC shards, etc.)
repeated TaskTarget targets = 8; // Target locations (destinations, new replicas, etc.)
// Typed task parameters
oneof task_params {
VacuumTaskParams vacuum_params = 9;
ErasureCodingTaskParams erasure_coding_params = 10;
BalanceTaskParams balance_params = 11;
ReplicationTaskParams replication_params = 12;
}
}
// VacuumTaskParams for vacuum operations
message VacuumTaskParams {
double garbage_threshold = 1; // Minimum garbage ratio to trigger vacuum
bool force_vacuum = 2; // Force vacuum even if below threshold
int32 batch_size = 3; // Number of files to process per batch
string working_dir = 4; // Working directory for temporary files
bool verify_checksum = 5; // Verify file checksums during vacuum
}
// ErasureCodingTaskParams for EC encoding operations
message ErasureCodingTaskParams {
uint64 estimated_shard_size = 1; // Estimated size per shard
int32 data_shards = 2; // Number of data shards (default: 10)
int32 parity_shards = 3; // Number of parity shards (default: 4)
string working_dir = 4; // Working directory for EC processing
string master_client = 5; // Master server address
bool cleanup_source = 6; // Whether to cleanup source volume after EC
uint32 generation = 7; // Generation number for EC shards (0=default, >0=generational)
}
// TaskSource represents a unified source location for any task type
message TaskSource {
string node = 1; // Source server address
uint32 disk_id = 2; // Source disk ID
string rack = 3; // Source rack for tracking
string data_center = 4; // Source data center for tracking
uint32 volume_id = 5; // Volume ID (for volume operations)
repeated uint32 shard_ids = 6; // Shard IDs (for EC shard operations)
uint64 estimated_size = 7; // Estimated size to be processed
uint32 generation = 8; // Generation number (for EC operations)
}
// TaskTarget represents a unified target location for any task type
message TaskTarget {
string node = 1; // Target server address
uint32 disk_id = 2; // Target disk ID
string rack = 3; // Target rack for tracking
string data_center = 4; // Target data center for tracking
uint32 volume_id = 5; // Volume ID (for volume operations)
repeated uint32 shard_ids = 6; // Shard IDs (for EC shard operations)
uint64 estimated_size = 7; // Estimated size to be created
}
// BalanceTaskParams for volume balancing operations
message BalanceTaskParams {
bool force_move = 1; // Force move even with conflicts
int32 timeout_seconds = 2; // Operation timeout
}
// ReplicationTaskParams for adding replicas
message ReplicationTaskParams {
int32 replica_count = 1; // Target replica count
bool verify_consistency = 2; // Verify replica consistency after creation
}
// TaskUpdate reports task progress
message TaskUpdate {
string task_id = 1;
string worker_id = 2;
string status = 3;
float progress = 4;
string message = 5;
map<string, string> metadata = 6;
}
// TaskComplete reports task completion
message TaskComplete {
string task_id = 1;
string worker_id = 2;
bool success = 3;
string error_message = 4;
int64 completion_time = 5;
map<string, string> result_metadata = 6;
}
// TaskCancellation from admin to cancel a task
message TaskCancellation {
string task_id = 1;
string reason = 2;
bool force = 3;
}
// WorkerShutdown notifies admin that worker is shutting down
message WorkerShutdown {
string worker_id = 1;
string reason = 2;
repeated string pending_task_ids = 3;
}
// AdminShutdown notifies worker that admin is shutting down
message AdminShutdown {
string reason = 1;
int32 graceful_shutdown_seconds = 2;
}
// ========== Task Log Messages ==========
// TaskLogRequest requests logs for a specific task
message TaskLogRequest {
string task_id = 1;
string worker_id = 2;
bool include_metadata = 3; // Include task metadata
int32 max_entries = 4; // Maximum number of log entries (0 = all)
string log_level = 5; // Filter by log level (INFO, WARNING, ERROR, DEBUG)
int64 start_time = 6; // Unix timestamp for start time filter
int64 end_time = 7; // Unix timestamp for end time filter
}
// TaskLogResponse returns task logs and metadata
message TaskLogResponse {
string task_id = 1;
string worker_id = 2;
bool success = 3;
string error_message = 4;
TaskLogMetadata metadata = 5;
repeated TaskLogEntry log_entries = 6;
}
// TaskLogMetadata contains metadata about task execution
message TaskLogMetadata {
string task_id = 1;
string task_type = 2;
string worker_id = 3;
int64 start_time = 4;
int64 end_time = 5;
int64 duration_ms = 6;
string status = 7;
float progress = 8;
uint32 volume_id = 9;
string server = 10;
string collection = 11;
string log_file_path = 12;
int64 created_at = 13;
map<string, string> custom_data = 14;
}
// TaskLogEntry represents a single log entry
message TaskLogEntry {
int64 timestamp = 1;
string level = 2;
string message = 3;
map<string, string> fields = 4;
float progress = 5;
string status = 6;
}
// ========== Maintenance Configuration Messages ==========
// MaintenanceConfig holds configuration for the maintenance system
message MaintenanceConfig {
bool enabled = 1;
int32 scan_interval_seconds = 2; // How often to scan for maintenance needs
int32 worker_timeout_seconds = 3; // Worker heartbeat timeout
int32 task_timeout_seconds = 4; // Individual task timeout
int32 retry_delay_seconds = 5; // Delay between retries
int32 max_retries = 6; // Default max retries for tasks
int32 cleanup_interval_seconds = 7; // How often to clean up old tasks
int32 task_retention_seconds = 8; // How long to keep completed/failed tasks
MaintenancePolicy policy = 9;
}
// MaintenancePolicy defines policies for maintenance operations
message MaintenancePolicy {
map<string, TaskPolicy> task_policies = 1; // Task type -> policy mapping
int32 global_max_concurrent = 2; // Overall limit across all task types
int32 default_repeat_interval_seconds = 3; // Default seconds if task doesn't specify
int32 default_check_interval_seconds = 4; // Default seconds for periodic checks
}
// TaskPolicy represents configuration for a specific task type
message TaskPolicy {
bool enabled = 1;
int32 max_concurrent = 2;
int32 repeat_interval_seconds = 3; // Seconds to wait before repeating
int32 check_interval_seconds = 4; // Seconds between checks
// Typed task-specific configuration (replaces generic map)
oneof task_config {
ErasureCodingTaskConfig erasure_coding_config = 5;
EcVacuumTaskConfig ec_vacuum_config = 6;
}
}
// Task-specific configuration messages
// ErasureCodingTaskConfig contains EC-specific configuration
message ErasureCodingTaskConfig {
double fullness_ratio = 1; // Minimum fullness ratio to trigger EC (0.0-1.0)
int32 quiet_for_seconds = 2; // Minimum quiet time before EC
int32 min_volume_size_mb = 3; // Minimum volume size for EC
string collection_filter = 4; // Only process volumes from specific collections
uint32 generation = 5; // Generation number for EC shards (0=default, >0=generational)
}
// EcVacuumTaskConfig contains EC vacuum-specific configuration
message EcVacuumTaskConfig {
double deletion_threshold = 1; // Minimum deletion ratio to trigger vacuum (0.0-1.0)
int32 min_volume_age_seconds = 2; // Minimum age before considering vacuum (in seconds)
string collection_filter = 3; // Only vacuum EC volumes in this collection (empty = all)
int32 min_size_mb = 4; // Minimum original EC volume size to consider (in MB)
}
// ========== Task Persistence Messages ==========
// MaintenanceTaskData represents complete task state for persistence
message MaintenanceTaskData {
string id = 1;
string type = 2;
string priority = 3;
string status = 4;
uint32 volume_id = 5;
string server = 6;
string collection = 7;
TaskParams typed_params = 8;
string reason = 9;
int64 created_at = 10;
int64 scheduled_at = 11;
int64 started_at = 12;
int64 completed_at = 13;
string worker_id = 14;
string error = 15;
double progress = 16;
int32 retry_count = 17;
int32 max_retries = 18;
// Enhanced fields for detailed task tracking
string created_by = 19;
string creation_context = 20;
repeated TaskAssignmentRecord assignment_history = 21;
string detailed_reason = 22;
map<string, string> tags = 23;
TaskCreationMetrics creation_metrics = 24;
}
// TaskAssignmentRecord tracks worker assignments for a task
message TaskAssignmentRecord {
string worker_id = 1;
string worker_address = 2;
int64 assigned_at = 3;
int64 unassigned_at = 4; // Optional: when worker was unassigned
string reason = 5; // Reason for assignment/unassignment
}
// TaskCreationMetrics tracks why and how a task was created
message TaskCreationMetrics {
string trigger_metric = 1; // Name of metric that triggered creation
double metric_value = 2; // Value that triggered creation
double threshold = 3; // Threshold that was exceeded
VolumeHealthMetrics volume_metrics = 4; // Volume health at creation time
map<string, string> additional_data = 5; // Additional context data
}
// VolumeHealthMetrics captures volume state at task creation
message VolumeHealthMetrics {
uint64 total_size = 1;
uint64 used_size = 2;
uint64 garbage_size = 3;
double garbage_ratio = 4;
int32 file_count = 5;
int32 deleted_file_count = 6;
int64 last_modified = 7;
int32 replica_count = 8;
bool is_ec_volume = 9;
string collection = 10;
}
// TaskStateFile wraps task data with metadata for persistence
message TaskStateFile {
MaintenanceTaskData task = 1;
int64 last_updated = 2;
string admin_version = 3;
}
// GetMasterAddressesRequest sent by worker to get master server addresses
message GetMasterAddressesRequest {
string worker_id = 1; // Worker identification
}
// GetMasterAddressesResponse returns master addresses to worker
message GetMasterAddressesResponse {
repeated string master_addresses = 1; // List of available master addresses
string primary_master = 2; // Primary master address (if applicable)
}