syntax = "proto3"; package worker_pb; option go_package = "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"; // WorkerService provides bidirectional communication between admin and worker service WorkerService { // WorkerStream maintains a bidirectional stream for worker communication rpc WorkerStream(stream WorkerMessage) returns (stream AdminMessage); // GetMasterAddresses returns master server addresses for worker tasks rpc GetMasterAddresses(GetMasterAddressesRequest) returns (GetMasterAddressesResponse); } // WorkerMessage represents messages from worker to admin message WorkerMessage { string worker_id = 1; int64 timestamp = 2; oneof message { WorkerRegistration registration = 3; WorkerHeartbeat heartbeat = 4; TaskRequest task_request = 5; TaskUpdate task_update = 6; TaskComplete task_complete = 7; WorkerShutdown shutdown = 8; TaskLogResponse task_log_response = 9; } } // AdminMessage represents messages from admin to worker message AdminMessage { string admin_id = 1; int64 timestamp = 2; oneof message { RegistrationResponse registration_response = 3; HeartbeatResponse heartbeat_response = 4; TaskAssignment task_assignment = 5; TaskCancellation task_cancellation = 6; AdminShutdown admin_shutdown = 7; TaskLogRequest task_log_request = 8; } } // WorkerRegistration message when worker connects message WorkerRegistration { string worker_id = 1; string address = 2; repeated string capabilities = 3; int32 max_concurrent = 4; map metadata = 5; } // RegistrationResponse confirms worker registration message RegistrationResponse { bool success = 1; string message = 2; string assigned_worker_id = 3; } // WorkerHeartbeat sent periodically by worker message WorkerHeartbeat { string worker_id = 1; string status = 2; int32 current_load = 3; int32 max_concurrent = 4; repeated string current_task_ids = 5; int32 tasks_completed = 6; int32 tasks_failed = 7; int64 uptime_seconds = 8; } // HeartbeatResponse acknowledges heartbeat message HeartbeatResponse { bool success = 1; string message = 2; } // TaskRequest from worker asking for new tasks message TaskRequest { string worker_id = 1; repeated string capabilities = 2; int32 available_slots = 3; } // TaskAssignment from admin to worker message TaskAssignment { string task_id = 1; string task_type = 2; TaskParams params = 3; int32 priority = 4; int64 created_time = 5; map metadata = 6; } // TaskParams contains task-specific parameters with typed variants message TaskParams { string task_id = 1; // ActiveTopology task ID for lifecycle management uint32 volume_id = 2; // Primary volume ID for the task string collection = 3; // Collection name string data_center = 4; // Primary data center string rack = 5; // Primary rack uint64 volume_size = 6; // Original volume size in bytes for tracking size changes // Unified source and target arrays for all task types repeated TaskSource sources = 7; // Source locations (volume replicas, EC shards, etc.) repeated TaskTarget targets = 8; // Target locations (destinations, new replicas, etc.) // Typed task parameters oneof task_params { VacuumTaskParams vacuum_params = 9; ErasureCodingTaskParams erasure_coding_params = 10; BalanceTaskParams balance_params = 11; ReplicationTaskParams replication_params = 12; } } // VacuumTaskParams for vacuum operations message VacuumTaskParams { double garbage_threshold = 1; // Minimum garbage ratio to trigger vacuum bool force_vacuum = 2; // Force vacuum even if below threshold int32 batch_size = 3; // Number of files to process per batch string working_dir = 4; // Working directory for temporary files bool verify_checksum = 5; // Verify file checksums during vacuum } // ErasureCodingTaskParams for EC encoding operations message ErasureCodingTaskParams { uint64 estimated_shard_size = 1; // Estimated size per shard int32 data_shards = 2; // Number of data shards (default: 10) int32 parity_shards = 3; // Number of parity shards (default: 4) string working_dir = 4; // Working directory for EC processing string master_client = 5; // Master server address bool cleanup_source = 6; // Whether to cleanup source volume after EC uint32 generation = 7; // Generation number for EC shards (0=default, >0=generational) } // TaskSource represents a unified source location for any task type message TaskSource { string node = 1; // Source server address uint32 disk_id = 2; // Source disk ID string rack = 3; // Source rack for tracking string data_center = 4; // Source data center for tracking uint32 volume_id = 5; // Volume ID (for volume operations) repeated uint32 shard_ids = 6; // Shard IDs (for EC shard operations) uint64 estimated_size = 7; // Estimated size to be processed uint32 generation = 8; // Generation number (for EC operations) } // TaskTarget represents a unified target location for any task type message TaskTarget { string node = 1; // Target server address uint32 disk_id = 2; // Target disk ID string rack = 3; // Target rack for tracking string data_center = 4; // Target data center for tracking uint32 volume_id = 5; // Volume ID (for volume operations) repeated uint32 shard_ids = 6; // Shard IDs (for EC shard operations) uint64 estimated_size = 7; // Estimated size to be created } // BalanceTaskParams for volume balancing operations message BalanceTaskParams { bool force_move = 1; // Force move even with conflicts int32 timeout_seconds = 2; // Operation timeout } // ReplicationTaskParams for adding replicas message ReplicationTaskParams { int32 replica_count = 1; // Target replica count bool verify_consistency = 2; // Verify replica consistency after creation } // TaskUpdate reports task progress message TaskUpdate { string task_id = 1; string worker_id = 2; string status = 3; float progress = 4; string message = 5; map metadata = 6; } // TaskComplete reports task completion message TaskComplete { string task_id = 1; string worker_id = 2; bool success = 3; string error_message = 4; int64 completion_time = 5; map result_metadata = 6; } // TaskCancellation from admin to cancel a task message TaskCancellation { string task_id = 1; string reason = 2; bool force = 3; } // WorkerShutdown notifies admin that worker is shutting down message WorkerShutdown { string worker_id = 1; string reason = 2; repeated string pending_task_ids = 3; } // AdminShutdown notifies worker that admin is shutting down message AdminShutdown { string reason = 1; int32 graceful_shutdown_seconds = 2; } // ========== Task Log Messages ========== // TaskLogRequest requests logs for a specific task message TaskLogRequest { string task_id = 1; string worker_id = 2; bool include_metadata = 3; // Include task metadata int32 max_entries = 4; // Maximum number of log entries (0 = all) string log_level = 5; // Filter by log level (INFO, WARNING, ERROR, DEBUG) int64 start_time = 6; // Unix timestamp for start time filter int64 end_time = 7; // Unix timestamp for end time filter } // TaskLogResponse returns task logs and metadata message TaskLogResponse { string task_id = 1; string worker_id = 2; bool success = 3; string error_message = 4; TaskLogMetadata metadata = 5; repeated TaskLogEntry log_entries = 6; } // TaskLogMetadata contains metadata about task execution message TaskLogMetadata { string task_id = 1; string task_type = 2; string worker_id = 3; int64 start_time = 4; int64 end_time = 5; int64 duration_ms = 6; string status = 7; float progress = 8; uint32 volume_id = 9; string server = 10; string collection = 11; string log_file_path = 12; int64 created_at = 13; map custom_data = 14; } // TaskLogEntry represents a single log entry message TaskLogEntry { int64 timestamp = 1; string level = 2; string message = 3; map fields = 4; float progress = 5; string status = 6; } // ========== Maintenance Configuration Messages ========== // MaintenanceConfig holds configuration for the maintenance system message MaintenanceConfig { bool enabled = 1; int32 scan_interval_seconds = 2; // How often to scan for maintenance needs int32 worker_timeout_seconds = 3; // Worker heartbeat timeout int32 task_timeout_seconds = 4; // Individual task timeout int32 retry_delay_seconds = 5; // Delay between retries int32 max_retries = 6; // Default max retries for tasks int32 cleanup_interval_seconds = 7; // How often to clean up old tasks int32 task_retention_seconds = 8; // How long to keep completed/failed tasks MaintenancePolicy policy = 9; } // MaintenancePolicy defines policies for maintenance operations message MaintenancePolicy { map task_policies = 1; // Task type -> policy mapping int32 global_max_concurrent = 2; // Overall limit across all task types int32 default_repeat_interval_seconds = 3; // Default seconds if task doesn't specify int32 default_check_interval_seconds = 4; // Default seconds for periodic checks } // TaskPolicy represents configuration for a specific task type message TaskPolicy { bool enabled = 1; int32 max_concurrent = 2; int32 repeat_interval_seconds = 3; // Seconds to wait before repeating int32 check_interval_seconds = 4; // Seconds between checks // Typed task-specific configuration (replaces generic map) oneof task_config { ErasureCodingTaskConfig erasure_coding_config = 5; EcVacuumTaskConfig ec_vacuum_config = 6; } } // Task-specific configuration messages // ErasureCodingTaskConfig contains EC-specific configuration message ErasureCodingTaskConfig { double fullness_ratio = 1; // Minimum fullness ratio to trigger EC (0.0-1.0) int32 quiet_for_seconds = 2; // Minimum quiet time before EC int32 min_volume_size_mb = 3; // Minimum volume size for EC string collection_filter = 4; // Only process volumes from specific collections uint32 generation = 5; // Generation number for EC shards (0=default, >0=generational) } // EcVacuumTaskConfig contains EC vacuum-specific configuration message EcVacuumTaskConfig { double deletion_threshold = 1; // Minimum deletion ratio to trigger vacuum (0.0-1.0) int32 min_volume_age_seconds = 2; // Minimum age before considering vacuum (in seconds) string collection_filter = 3; // Only vacuum EC volumes in this collection (empty = all) int32 min_size_mb = 4; // Minimum original EC volume size to consider (in MB) } // ========== Task Persistence Messages ========== // MaintenanceTaskData represents complete task state for persistence message MaintenanceTaskData { string id = 1; string type = 2; string priority = 3; string status = 4; uint32 volume_id = 5; string server = 6; string collection = 7; TaskParams typed_params = 8; string reason = 9; int64 created_at = 10; int64 scheduled_at = 11; int64 started_at = 12; int64 completed_at = 13; string worker_id = 14; string error = 15; double progress = 16; int32 retry_count = 17; int32 max_retries = 18; // Enhanced fields for detailed task tracking string created_by = 19; string creation_context = 20; repeated TaskAssignmentRecord assignment_history = 21; string detailed_reason = 22; map tags = 23; TaskCreationMetrics creation_metrics = 24; } // TaskAssignmentRecord tracks worker assignments for a task message TaskAssignmentRecord { string worker_id = 1; string worker_address = 2; int64 assigned_at = 3; int64 unassigned_at = 4; // Optional: when worker was unassigned string reason = 5; // Reason for assignment/unassignment } // TaskCreationMetrics tracks why and how a task was created message TaskCreationMetrics { string trigger_metric = 1; // Name of metric that triggered creation double metric_value = 2; // Value that triggered creation double threshold = 3; // Threshold that was exceeded VolumeHealthMetrics volume_metrics = 4; // Volume health at creation time map additional_data = 5; // Additional context data } // VolumeHealthMetrics captures volume state at task creation message VolumeHealthMetrics { uint64 total_size = 1; uint64 used_size = 2; uint64 garbage_size = 3; double garbage_ratio = 4; int32 file_count = 5; int32 deleted_file_count = 6; int64 last_modified = 7; int32 replica_count = 8; bool is_ec_volume = 9; string collection = 10; } // TaskStateFile wraps task data with metadata for persistence message TaskStateFile { MaintenanceTaskData task = 1; int64 last_updated = 2; string admin_version = 3; } // GetMasterAddressesRequest sent by worker to get master server addresses message GetMasterAddressesRequest { string worker_id = 1; // Worker identification } // GetMasterAddressesResponse returns master addresses to worker message GetMasterAddressesResponse { repeated string master_addresses = 1; // List of available master addresses string primary_master = 2; // Primary master address (if applicable) }