From 2941cdf95e641080701fd76d0218ab5a59a8bc84 Mon Sep 17 00:00:00 2001 From: chrislu Date: Thu, 24 Jul 2025 13:07:11 -0700 Subject: [PATCH] tasks are assigned --- docker/admin_integration/Dockerfile.admin | 13 +- docker/admin_integration/Dockerfile.worker | 8 +- docker/admin_integration/admin-entrypoint.sh | 302 ++++++++ .../admin-grpc-entrypoint.sh | 73 ++ docker/admin_integration/admin_grpc_server.go | 663 ++++++++++++++++++ .../docker-compose-ec-test.yml | 8 +- .../worker-grpc-entrypoint.sh | 67 ++ 7 files changed, 1123 insertions(+), 11 deletions(-) create mode 100644 docker/admin_integration/admin-grpc-entrypoint.sh create mode 100644 docker/admin_integration/admin_grpc_server.go create mode 100644 docker/admin_integration/worker-grpc-entrypoint.sh diff --git a/docker/admin_integration/Dockerfile.admin b/docker/admin_integration/Dockerfile.admin index 9aaf69960..ce1e8aac2 100644 --- a/docker/admin_integration/Dockerfile.admin +++ b/docker/admin_integration/Dockerfile.admin @@ -6,19 +6,22 @@ RUN apk --no-cache add curl ca-certificates go WORKDIR /root/ -# Copy admin server binary (if it exists) or create a simple one -COPY ./docker/admin_integration/admin-entrypoint.sh /entrypoint.sh +# Copy gRPC admin files +COPY ./docker/admin_integration/admin-grpc-entrypoint.sh /entrypoint.sh +COPY ./docker/admin_integration/admin_grpc_server.go /admin_grpc_server.go +COPY ./weed/pb/worker.proto /worker.proto RUN chmod +x /entrypoint.sh # Create directories RUN mkdir -p /data /config /work -# Expose admin port -EXPOSE 9900 +# Expose admin ports (HTTP and gRPC) +EXPOSE 9900 9901 # Set environment variables ENV MASTER_ADDRESS="master:9333" -ENV ADMIN_PORT="9900" +ENV ADMIN_PORT="9900" +ENV GRPC_PORT="9901" ENV SCAN_INTERVAL="30s" ENV WORKER_TIMEOUT="5m" ENV TASK_TIMEOUT="30m" diff --git a/docker/admin_integration/Dockerfile.worker b/docker/admin_integration/Dockerfile.worker index 1e8de5b2c..8155fb7ee 100644 --- a/docker/admin_integration/Dockerfile.worker +++ b/docker/admin_integration/Dockerfile.worker @@ -6,8 +6,10 @@ RUN apk --no-cache add curl ca-certificates go WORKDIR /root/ -# Copy worker entrypoint script -COPY ./docker/admin_integration/worker-entrypoint.sh /entrypoint.sh +# Copy gRPC worker files +COPY ./docker/admin_integration/worker-grpc-entrypoint.sh /entrypoint.sh +COPY ./docker/admin_integration/worker_grpc_client.go /worker_grpc_client.go +COPY ./weed/pb/worker.proto /worker.proto RUN chmod +x /entrypoint.sh # Create working directories @@ -17,7 +19,7 @@ RUN mkdir -p /work /tmp/ec_work EXPOSE 9001 # Set environment variables -ENV ADMIN_ADDRESS="admin:9900" +ENV ADMIN_GRPC_ADDRESS="admin:9901" ENV WORKER_ID="worker-1" ENV WORKER_ADDRESS="worker:9001" ENV CAPABILITIES="erasure_coding" diff --git a/docker/admin_integration/admin-entrypoint.sh b/docker/admin_integration/admin-entrypoint.sh index 9deb0076d..14f1d3cab 100755 --- a/docker/admin_integration/admin-entrypoint.sh +++ b/docker/admin_integration/admin-entrypoint.sh @@ -140,6 +140,304 @@ func (s *AdminServer) heartbeatHandler(w http.ResponseWriter, r *http.Request) { json.NewEncoder(w).Encode(map[string]string{"status": "ok"}) } +func (s *AdminServer) assignTaskHandler(w http.ResponseWriter, r *http.Request) { + if r.Method != "POST" { + http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) + return + } + + var request struct { + WorkerID string `json:"worker_id"` + Capabilities []string `json:"capabilities"` + } + + if err := json.NewDecoder(r.Body).Decode(&request); err != nil { + http.Error(w, "Invalid JSON", http.StatusBadRequest) + return + } + + // Find a pending task that matches worker capabilities + for i, task := range s.tasks { + if task.Status == "pending" { + // Assign task to worker + s.tasks[i].Status = "assigned" + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(map[string]interface{}{ + "task_id": task.ID, + "type": task.Type, + "volume_id": task.VolumeID, + "parameters": map[string]interface{}{ + "server": "volume1:8080", // Simplified assignment + }, + }) + + log.Printf("Assigned task %s to worker %s", task.ID, request.WorkerID) + return + } + } + + // No tasks available + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(map[string]string{"status": "no_tasks"}) +} + +func (s *AdminServer) taskProgressHandler(w http.ResponseWriter, r *http.Request) { + if r.Method != "POST" { + http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) + return + } + + var progress struct { + TaskID string `json:"task_id"` + Progress float64 `json:"progress"` + Status string `json:"status"` + Message string `json:"message"` + } + + if err := json.NewDecoder(r.Body).Decode(&progress); err != nil { + http.Error(w, "Invalid JSON", http.StatusBadRequest) + return + } + + // Update task progress + for i, task := range s.tasks { + if task.ID == progress.TaskID { + s.tasks[i].Progress = progress.Progress + s.tasks[i].Status = progress.Status + + log.Printf("Task %s: %.1f%% - %s", progress.TaskID, progress.Progress, progress.Message) + break + } + } + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(map[string]string{"status": "updated"}) +} + +func (s *AdminServer) webUIHandler(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "text/html") + + html := ` + + + SeaweedFS Admin - EC Task Monitor + + + + + +
+
+

๐Ÿงช SeaweedFS EC Task Monitor

+

Real-time Erasure Coding Task Management Dashboard

+
+ + + +
+
+
0
+
Total Tasks
+
+
+
0
+
Active Workers
+
+
+
0
+
Completed
+
+
+
--
+
Uptime
+
+
+ +
+
๐Ÿ“‹ EC Tasks
+
+ + + + + + + + + + + + + + +
Task IDTypeVolume IDStatusProgressCreated
Loading tasks...
+
+
+ +
+
โš™๏ธ Workers
+
+ + + + + + + + + + + + + +
Worker IDAddressStatusCapabilitiesLast Seen
Loading workers...
+
+
+ +
+ Last updated: -- | + Auto-refresh every 5 seconds +
+
+ + + +` + + fmt.Fprint(w, html) +} + func (s *AdminServer) detectVolumesForEC() { // Simulate volume detection logic // In real implementation, this would query the master for volume status @@ -199,7 +497,11 @@ func main() { http.HandleFunc("/health", server.healthHandler) http.HandleFunc("/status", server.statusHandler) http.HandleFunc("/register", server.registerWorkerHandler) + http.HandleFunc("/register-worker", server.registerWorkerHandler) // Worker compatibility http.HandleFunc("/heartbeat", server.heartbeatHandler) + http.HandleFunc("/assign-task", server.assignTaskHandler) + http.HandleFunc("/task-progress", server.taskProgressHandler) + http.HandleFunc("/", server.webUIHandler) // Web UI // Start volume detection server.detectVolumesForEC() diff --git a/docker/admin_integration/admin-grpc-entrypoint.sh b/docker/admin_integration/admin-grpc-entrypoint.sh new file mode 100644 index 000000000..928b7907a --- /dev/null +++ b/docker/admin_integration/admin-grpc-entrypoint.sh @@ -0,0 +1,73 @@ +#!/bin/sh + +set -e + +echo "Starting SeaweedFS Admin Server (gRPC)..." +echo "Master Address: $MASTER_ADDRESS" +echo "Admin HTTP Port: $ADMIN_PORT" +echo "Admin gRPC Port: $GRPC_PORT" + +# Wait for master to be ready +echo "Waiting for master to be ready..." +until wget --quiet --tries=1 --spider http://$MASTER_ADDRESS/cluster/status > /dev/null 2>&1; do + echo "Master not ready, waiting..." + sleep 5 +done +echo "Master is ready!" + +# Install protobuf compiler and Go protobuf plugins +apk add --no-cache protobuf protobuf-dev + +# Set up Go environment +export GOPATH=/tmp/go +export PATH=$PATH:$GOPATH/bin +mkdir -p $GOPATH/src $GOPATH/bin $GOPATH/pkg + +# Install Go protobuf plugins globally first +export GOPATH=/tmp/go +mkdir -p $GOPATH +go install google.golang.org/protobuf/cmd/protoc-gen-go@latest +go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest + +# Set up working directory for compilation +cd /tmp +mkdir -p admin-project +cd admin-project + +# Create a basic go.mod with required dependencies +cat > go.mod << 'EOF' +module admin-server + +go 1.24 + +require ( + google.golang.org/grpc v1.65.0 + google.golang.org/protobuf v1.34.2 +) +EOF + +go mod tidy + +# Add Go bin to PATH +export PATH=$PATH:$(go env GOPATH)/bin + +# Create directory structure for protobuf +mkdir -p worker_pb + +# Copy the admin server source and existing worker protobuf file +cp /admin_grpc_server.go . +cp /worker.proto . + +# Generate Go code from the existing worker protobuf +echo "Generating gRPC code from worker.proto..." +protoc --go_out=. --go_opt=paths=source_relative \ + --go-grpc_out=. --go-grpc_opt=paths=source_relative \ + worker.proto + +# Build and run the admin server +echo "Building admin server..." +go mod tidy +go build -o admin-server admin_grpc_server.go + +echo "Starting admin server..." +exec ./admin-server \ No newline at end of file diff --git a/docker/admin_integration/admin_grpc_server.go b/docker/admin_integration/admin_grpc_server.go new file mode 100644 index 000000000..d54005b0d --- /dev/null +++ b/docker/admin_integration/admin_grpc_server.go @@ -0,0 +1,663 @@ +package main + +import ( + "encoding/json" + "fmt" + "io" + "log" + "net" + "net/http" + "os" + "sync" + "time" + + pb "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb" + "google.golang.org/grpc" + "google.golang.org/grpc/reflection" +) + +type AdminServer struct { + pb.UnimplementedWorkerServiceServer + + // Server configuration + grpcPort string + httpPort string + masterAddr string + startTime time.Time + + // Data storage + workers map[string]*WorkerInfo + tasks map[string]*TaskInfo + taskQueue []string + + // Synchronization + mu sync.RWMutex + + // Active streams for workers + workerStreams map[string]pb.WorkerService_WorkerStreamServer + streamMu sync.RWMutex +} + +type WorkerInfo struct { + ID string + Address string + Capabilities []string + MaxConcurrent int32 + Status string + CurrentLoad int32 + LastSeen time.Time + TasksCompleted int32 + TasksFailed int32 + UptimeSeconds int64 + Stream pb.WorkerService_WorkerStreamServer +} + +type TaskInfo struct { + ID string + Type string + VolumeID uint32 + Status string + Progress float32 + AssignedTo string + Created time.Time + Updated time.Time + Server string + Collection string + DataCenter string + Rack string + Parameters map[string]string +} + +// gRPC service implementation +func (s *AdminServer) WorkerStream(stream pb.WorkerService_WorkerStreamServer) error { + var workerID string + + for { + req, err := stream.Recv() + if err == io.EOF { + log.Printf("Worker %s disconnected", workerID) + s.removeWorkerStream(workerID) + return nil + } + if err != nil { + log.Printf("Stream error from worker %s: %v", workerID, err) + s.removeWorkerStream(workerID) + return err + } + + // Handle different message types + switch msg := req.Message.(type) { + case *pb.WorkerMessage_Registration: + workerID = msg.Registration.WorkerId + s.handleWorkerRegistration(workerID, msg.Registration, stream) + + case *pb.WorkerMessage_Heartbeat: + s.handleWorkerHeartbeat(msg.Heartbeat, stream) + + case *pb.WorkerMessage_TaskRequest: + s.handleTaskRequest(msg.TaskRequest, stream) + + case *pb.WorkerMessage_TaskUpdate: + s.handleTaskUpdate(msg.TaskUpdate) + + case *pb.WorkerMessage_TaskComplete: + s.handleTaskComplete(msg.TaskComplete) + + case *pb.WorkerMessage_Shutdown: + log.Printf("Worker %s shutting down: %s", msg.Shutdown.WorkerId, msg.Shutdown.Reason) + s.removeWorkerStream(msg.Shutdown.WorkerId) + return nil + } + } +} + +func (s *AdminServer) handleWorkerRegistration(workerID string, reg *pb.WorkerRegistration, stream pb.WorkerService_WorkerStreamServer) { + s.mu.Lock() + defer s.mu.Unlock() + + worker := &WorkerInfo{ + ID: reg.WorkerId, + Address: reg.Address, + Capabilities: reg.Capabilities, + MaxConcurrent: reg.MaxConcurrent, + Status: "active", + CurrentLoad: 0, + LastSeen: time.Now(), + TasksCompleted: 0, + TasksFailed: 0, + UptimeSeconds: 0, + Stream: stream, + } + + s.workers[reg.WorkerId] = worker + s.addWorkerStream(reg.WorkerId, stream) + + log.Printf("Registered worker %s with capabilities: %v", reg.WorkerId, reg.Capabilities) + + // Send registration response + response := &pb.AdminMessage{ + AdminId: "admin-server", + Timestamp: time.Now().Unix(), + Message: &pb.AdminMessage_RegistrationResponse{ + RegistrationResponse: &pb.RegistrationResponse{ + Success: true, + Message: "Worker registered successfully", + AssignedWorkerId: reg.WorkerId, + }, + }, + } + + if err := stream.Send(response); err != nil { + log.Printf("Failed to send registration response to worker %s: %v", reg.WorkerId, err) + } +} + +func (s *AdminServer) handleWorkerHeartbeat(heartbeat *pb.WorkerHeartbeat, stream pb.WorkerService_WorkerStreamServer) { + s.mu.Lock() + defer s.mu.Unlock() + + if worker, exists := s.workers[heartbeat.WorkerId]; exists { + worker.Status = heartbeat.Status + worker.CurrentLoad = heartbeat.CurrentLoad + worker.LastSeen = time.Now() + worker.TasksCompleted = heartbeat.TasksCompleted + worker.TasksFailed = heartbeat.TasksFailed + worker.UptimeSeconds = heartbeat.UptimeSeconds + + log.Printf("Heartbeat from worker %s: status=%s, load=%d/%d", + heartbeat.WorkerId, heartbeat.Status, heartbeat.CurrentLoad, heartbeat.MaxConcurrent) + } + + // Send heartbeat response + response := &pb.AdminMessage{ + AdminId: "admin-server", + Timestamp: time.Now().Unix(), + Message: &pb.AdminMessage_HeartbeatResponse{ + HeartbeatResponse: &pb.HeartbeatResponse{ + Success: true, + Message: "Heartbeat received", + }, + }, + } + + if err := stream.Send(response); err != nil { + log.Printf("Failed to send heartbeat response to worker %s: %v", heartbeat.WorkerId, err) + } +} + +func (s *AdminServer) handleTaskRequest(taskReq *pb.TaskRequest, stream pb.WorkerService_WorkerStreamServer) { + s.mu.Lock() + defer s.mu.Unlock() + + // Find a pending task that matches worker capabilities + for taskID, task := range s.tasks { + if task.Status == "pending" { + // Check if worker has required capability + hasCapability := false + for _, capability := range taskReq.Capabilities { + if capability == task.Type { + hasCapability = true + break + } + } + + if hasCapability && taskReq.AvailableSlots > 0 { + // Assign task to worker + task.Status = "assigned" + task.AssignedTo = taskReq.WorkerId + task.Updated = time.Now() + + log.Printf("Assigned task %s (volume %d) to worker %s", taskID, task.VolumeID, taskReq.WorkerId) + + // Send task assignment + response := &pb.AdminMessage{ + AdminId: "admin-server", + Timestamp: time.Now().Unix(), + Message: &pb.AdminMessage_TaskAssignment{ + TaskAssignment: &pb.TaskAssignment{ + TaskId: taskID, + TaskType: task.Type, + Priority: 1, + CreatedTime: task.Created.Unix(), + Params: &pb.TaskParams{ + VolumeId: task.VolumeID, + Server: task.Server, + Collection: task.Collection, + DataCenter: task.DataCenter, + Rack: task.Rack, + Parameters: task.Parameters, + }, + }, + }, + } + + if err := stream.Send(response); err != nil { + log.Printf("Failed to send task assignment to worker %s: %v", taskReq.WorkerId, err) + } + return + } + } + } + + log.Printf("No suitable tasks available for worker %s", taskReq.WorkerId) +} + +func (s *AdminServer) handleTaskUpdate(update *pb.TaskUpdate) { + s.mu.Lock() + defer s.mu.Unlock() + + if task, exists := s.tasks[update.TaskId]; exists { + task.Progress = update.Progress + task.Status = update.Status + task.Updated = time.Now() + + log.Printf("Task %s progress: %.1f%% - %s", update.TaskId, update.Progress, update.Message) + } +} + +func (s *AdminServer) handleTaskComplete(complete *pb.TaskComplete) { + s.mu.Lock() + defer s.mu.Unlock() + + if task, exists := s.tasks[complete.TaskId]; exists { + if complete.Success { + task.Status = "completed" + task.Progress = 100.0 + } else { + task.Status = "failed" + } + task.Updated = time.Now() + + // Update worker stats + if worker, workerExists := s.workers[complete.WorkerId]; workerExists { + if complete.Success { + worker.TasksCompleted++ + } else { + worker.TasksFailed++ + } + if worker.CurrentLoad > 0 { + worker.CurrentLoad-- + } + } + + log.Printf("Task %s completed by worker %s: success=%v", complete.TaskId, complete.WorkerId, complete.Success) + } +} + +func (s *AdminServer) addWorkerStream(workerID string, stream pb.WorkerService_WorkerStreamServer) { + s.streamMu.Lock() + defer s.streamMu.Unlock() + s.workerStreams[workerID] = stream +} + +func (s *AdminServer) removeWorkerStream(workerID string) { + s.streamMu.Lock() + defer s.streamMu.Unlock() + delete(s.workerStreams, workerID) + + s.mu.Lock() + defer s.mu.Unlock() + delete(s.workers, workerID) +} + +// HTTP handlers for web UI +func (s *AdminServer) statusHandler(w http.ResponseWriter, r *http.Request) { + s.mu.RLock() + defer s.mu.RUnlock() + + // Convert internal data to JSON-friendly format + taskList := make([]map[string]interface{}, 0, len(s.tasks)) + for _, task := range s.tasks { + taskList = append(taskList, map[string]interface{}{ + "id": task.ID, + "type": task.Type, + "volume_id": task.VolumeID, + "status": task.Status, + "progress": task.Progress, + "created": task.Created, + }) + } + + workerList := make([]map[string]interface{}, 0, len(s.workers)) + for _, worker := range s.workers { + workerList = append(workerList, map[string]interface{}{ + "id": worker.ID, + "address": worker.Address, + "capabilities": worker.Capabilities, + "status": worker.Status, + "last_seen": worker.LastSeen, + }) + } + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(map[string]interface{}{ + "admin_server": "running", + "master_addr": s.masterAddr, + "tasks": taskList, + "workers": workerList, + "uptime": time.Since(s.startTime).String(), + }) +} + +func (s *AdminServer) healthHandler(w http.ResponseWriter, r *http.Request) { + s.mu.RLock() + defer s.mu.RUnlock() + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(map[string]interface{}{ + "status": "healthy", + "uptime": time.Since(s.startTime).String(), + "tasks": len(s.tasks), + "workers": len(s.workers), + }) +} + +func (s *AdminServer) webUIHandler(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "text/html") + + html := ` + + + SeaweedFS Admin - EC Task Monitor + + + + + +
+
+

๐Ÿงช SeaweedFS EC Task Monitor + gRPC Streaming + worker.proto +

+

Real-time Erasure Coding Task Management Dashboard

+
+ + + +
+
+
0
+
Total Tasks
+
+
+
0
+
Active Workers
+
+
+
0
+
Completed
+
+
+
--
+
Uptime
+
+
+ +
+
๐Ÿ“‹ EC Tasks
+
+ + + + + + + + + + + + + + +
Task IDTypeVolume IDStatusProgressCreated
Loading tasks...
+
+
+ +
+
โš™๏ธ Workers (via gRPC Streaming)
+
+ + + + + + + + + + + + + +
Worker IDAddressStatusCapabilitiesLast Seen
Loading workers...
+
+
+
+ + + +` + + fmt.Fprint(w, html) +} + +// Task detection and creation +func (s *AdminServer) detectVolumesForEC() { + ticker := time.NewTicker(30 * time.Second) + go func() { + for range ticker.C { + s.mu.Lock() + + log.Println("Scanning for volumes requiring EC...") + + // Simulate volume detection - in real implementation, query master + if len(s.tasks) < 5 { // Don't create too many tasks + taskID := fmt.Sprintf("ec-task-%d", time.Now().Unix()) + volumeID := uint32(1000 + len(s.tasks)) + + task := &TaskInfo{ + ID: taskID, + Type: "erasure_coding", + VolumeID: volumeID, + Status: "pending", + Progress: 0.0, + Created: time.Now(), + Updated: time.Now(), + Server: "volume1:8080", // Simplified + Collection: "", + DataCenter: "dc1", + Rack: "rack1", + Parameters: map[string]string{ + "data_shards": "10", + "parity_shards": "4", + }, + } + + s.tasks[taskID] = task + log.Printf("Created EC task %s for volume %d", taskID, volumeID) + } + + s.mu.Unlock() + } + }() +} + +func main() { + grpcPort := os.Getenv("GRPC_PORT") + if grpcPort == "" { + grpcPort = "9901" + } + + httpPort := os.Getenv("ADMIN_PORT") + if httpPort == "" { + httpPort = "9900" + } + + masterAddr := os.Getenv("MASTER_ADDRESS") + if masterAddr == "" { + masterAddr = "master:9333" + } + + server := &AdminServer{ + grpcPort: grpcPort, + httpPort: httpPort, + masterAddr: masterAddr, + startTime: time.Now(), + workers: make(map[string]*WorkerInfo), + tasks: make(map[string]*TaskInfo), + taskQueue: make([]string, 0), + workerStreams: make(map[string]pb.WorkerService_WorkerStreamServer), + } + + // Start gRPC server + go func() { + lis, err := net.Listen("tcp", ":"+grpcPort) + if err != nil { + log.Fatalf("Failed to listen on gRPC port %s: %v", grpcPort, err) + } + + grpcServer := grpc.NewServer() + pb.RegisterWorkerServiceServer(grpcServer, server) + reflection.Register(grpcServer) + + log.Printf("gRPC server starting on port %s", grpcPort) + if err := grpcServer.Serve(lis); err != nil { + log.Fatalf("Failed to serve gRPC: %v", err) + } + }() + + // Start HTTP server for web UI + go func() { + http.HandleFunc("/", server.webUIHandler) + http.HandleFunc("/status", server.statusHandler) + http.HandleFunc("/health", server.healthHandler) + + log.Printf("HTTP server starting on port %s", httpPort) + if err := http.ListenAndServe(":"+httpPort, nil); err != nil { + log.Fatalf("Failed to serve HTTP: %v", err) + } + }() + + // Start task detection + server.detectVolumesForEC() + + log.Printf("Admin server started - gRPC port: %s, HTTP port: %s, Master: %s", grpcPort, httpPort, masterAddr) + + // Keep the main goroutine running + select {} +} diff --git a/docker/admin_integration/docker-compose-ec-test.yml b/docker/admin_integration/docker-compose-ec-test.yml index da1ab5603..de9073219 100644 --- a/docker/admin_integration/docker-compose-ec-test.yml +++ b/docker/admin_integration/docker-compose-ec-test.yml @@ -227,9 +227,11 @@ services: container_name: seaweed-admin ports: - "9900:9900" + - "9901:9901" environment: - MASTER_ADDRESS=master:9333 - ADMIN_PORT=9900 + - GRPC_PORT=9901 - SCAN_INTERVAL=30s - WORKER_TIMEOUT=5m - TASK_TIMEOUT=30m @@ -258,7 +260,7 @@ services: dockerfile: docker/admin_integration/Dockerfile.worker container_name: seaweed-worker1 environment: - - ADMIN_ADDRESS=admin:9900 + - ADMIN_GRPC_ADDRESS=admin:9901 - WORKER_ID=worker-1 - WORKER_ADDRESS=worker1:9001 - CAPABILITIES=erasure_coding @@ -284,7 +286,7 @@ services: dockerfile: docker/admin_integration/Dockerfile.worker container_name: seaweed-worker2 environment: - - ADMIN_ADDRESS=admin:9900 + - ADMIN_GRPC_ADDRESS=admin:9901 - WORKER_ID=worker-2 - WORKER_ADDRESS=worker2:9001 - CAPABILITIES=erasure_coding,vacuum @@ -310,7 +312,7 @@ services: dockerfile: docker/admin_integration/Dockerfile.worker container_name: seaweed-worker3 environment: - - ADMIN_ADDRESS=admin:9900 + - ADMIN_GRPC_ADDRESS=admin:9901 - WORKER_ID=worker-3 - WORKER_ADDRESS=worker3:9001 - CAPABILITIES=erasure_coding,vacuum diff --git a/docker/admin_integration/worker-grpc-entrypoint.sh b/docker/admin_integration/worker-grpc-entrypoint.sh new file mode 100644 index 000000000..a92cac189 --- /dev/null +++ b/docker/admin_integration/worker-grpc-entrypoint.sh @@ -0,0 +1,67 @@ +#!/bin/sh + +set -e + +echo "Starting SeaweedFS EC Worker (gRPC)..." +echo "Worker ID: $WORKER_ID" +echo "Admin gRPC Address: $ADMIN_GRPC_ADDRESS" + +# Wait for admin to be ready +echo "Waiting for admin to be ready..." +until curl -f http://admin:9900/health > /dev/null 2>&1; do + echo "Admin not ready, waiting..." + sleep 5 +done +echo "Admin is ready!" + +# Install protobuf compiler and Go protobuf plugins +apk add --no-cache protobuf protobuf-dev + +# Set up Go environment +export GOPATH=/tmp/go +export PATH=$PATH:$GOPATH/bin +mkdir -p $GOPATH/src $GOPATH/bin $GOPATH/pkg + +# Install Go protobuf plugins +cd /tmp +go mod init worker-client + +# Create a basic go.mod with required dependencies +cat > go.mod << 'EOF' +module worker-client + +go 1.24 + +require ( + google.golang.org/grpc v1.65.0 + google.golang.org/protobuf v1.34.2 +) +EOF + +go mod tidy +go install google.golang.org/protobuf/cmd/protoc-gen-go@latest +go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest + +# Add Go bin to PATH +export PATH=$PATH:$(go env GOPATH)/bin + +# Create directory structure for protobuf +mkdir -p worker_pb + +# Copy the worker client source and existing worker protobuf file +cp /worker_grpc_client.go . +cp /worker.proto . + +# Generate Go code from the existing worker protobuf +echo "Generating gRPC code from worker.proto..." +protoc --go_out=. --go_opt=paths=source_relative \ + --go-grpc_out=. --go-grpc_opt=paths=source_relative \ + worker.proto + +# Build and run the worker +echo "Building worker..." +go mod tidy +go build -o worker-client worker_grpc_client.go + +echo "Starting worker..." +exec ./worker-client \ No newline at end of file