diff --git a/docker/admin_integration/Dockerfile.admin b/docker/admin_integration/Dockerfile.admin
index 9aaf69960..ce1e8aac2 100644
--- a/docker/admin_integration/Dockerfile.admin
+++ b/docker/admin_integration/Dockerfile.admin
@@ -6,19 +6,22 @@ RUN apk --no-cache add curl ca-certificates go
WORKDIR /root/
-# Copy admin server binary (if it exists) or create a simple one
-COPY ./docker/admin_integration/admin-entrypoint.sh /entrypoint.sh
+# Copy gRPC admin files
+COPY ./docker/admin_integration/admin-grpc-entrypoint.sh /entrypoint.sh
+COPY ./docker/admin_integration/admin_grpc_server.go /admin_grpc_server.go
+COPY ./weed/pb/worker.proto /worker.proto
RUN chmod +x /entrypoint.sh
# Create directories
RUN mkdir -p /data /config /work
-# Expose admin port
-EXPOSE 9900
+# Expose admin ports (HTTP and gRPC)
+EXPOSE 9900 9901
# Set environment variables
ENV MASTER_ADDRESS="master:9333"
-ENV ADMIN_PORT="9900"
+ENV ADMIN_PORT="9900"
+ENV GRPC_PORT="9901"
ENV SCAN_INTERVAL="30s"
ENV WORKER_TIMEOUT="5m"
ENV TASK_TIMEOUT="30m"
diff --git a/docker/admin_integration/Dockerfile.worker b/docker/admin_integration/Dockerfile.worker
index 1e8de5b2c..8155fb7ee 100644
--- a/docker/admin_integration/Dockerfile.worker
+++ b/docker/admin_integration/Dockerfile.worker
@@ -6,8 +6,10 @@ RUN apk --no-cache add curl ca-certificates go
WORKDIR /root/
-# Copy worker entrypoint script
-COPY ./docker/admin_integration/worker-entrypoint.sh /entrypoint.sh
+# Copy gRPC worker files
+COPY ./docker/admin_integration/worker-grpc-entrypoint.sh /entrypoint.sh
+COPY ./docker/admin_integration/worker_grpc_client.go /worker_grpc_client.go
+COPY ./weed/pb/worker.proto /worker.proto
RUN chmod +x /entrypoint.sh
# Create working directories
@@ -17,7 +19,7 @@ RUN mkdir -p /work /tmp/ec_work
EXPOSE 9001
# Set environment variables
-ENV ADMIN_ADDRESS="admin:9900"
+ENV ADMIN_GRPC_ADDRESS="admin:9901"
ENV WORKER_ID="worker-1"
ENV WORKER_ADDRESS="worker:9001"
ENV CAPABILITIES="erasure_coding"
diff --git a/docker/admin_integration/admin-entrypoint.sh b/docker/admin_integration/admin-entrypoint.sh
index 9deb0076d..14f1d3cab 100755
--- a/docker/admin_integration/admin-entrypoint.sh
+++ b/docker/admin_integration/admin-entrypoint.sh
@@ -140,6 +140,304 @@ func (s *AdminServer) heartbeatHandler(w http.ResponseWriter, r *http.Request) {
json.NewEncoder(w).Encode(map[string]string{"status": "ok"})
}
+func (s *AdminServer) assignTaskHandler(w http.ResponseWriter, r *http.Request) {
+ if r.Method != "POST" {
+ http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
+ return
+ }
+
+ var request struct {
+ WorkerID string `json:"worker_id"`
+ Capabilities []string `json:"capabilities"`
+ }
+
+ if err := json.NewDecoder(r.Body).Decode(&request); err != nil {
+ http.Error(w, "Invalid JSON", http.StatusBadRequest)
+ return
+ }
+
+ // Find a pending task that matches worker capabilities
+ for i, task := range s.tasks {
+ if task.Status == "pending" {
+ // Assign task to worker
+ s.tasks[i].Status = "assigned"
+
+ w.Header().Set("Content-Type", "application/json")
+ json.NewEncoder(w).Encode(map[string]interface{}{
+ "task_id": task.ID,
+ "type": task.Type,
+ "volume_id": task.VolumeID,
+ "parameters": map[string]interface{}{
+ "server": "volume1:8080", // Simplified assignment
+ },
+ })
+
+ log.Printf("Assigned task %s to worker %s", task.ID, request.WorkerID)
+ return
+ }
+ }
+
+ // No tasks available
+ w.Header().Set("Content-Type", "application/json")
+ json.NewEncoder(w).Encode(map[string]string{"status": "no_tasks"})
+}
+
+func (s *AdminServer) taskProgressHandler(w http.ResponseWriter, r *http.Request) {
+ if r.Method != "POST" {
+ http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
+ return
+ }
+
+ var progress struct {
+ TaskID string `json:"task_id"`
+ Progress float64 `json:"progress"`
+ Status string `json:"status"`
+ Message string `json:"message"`
+ }
+
+ if err := json.NewDecoder(r.Body).Decode(&progress); err != nil {
+ http.Error(w, "Invalid JSON", http.StatusBadRequest)
+ return
+ }
+
+ // Update task progress
+ for i, task := range s.tasks {
+ if task.ID == progress.TaskID {
+ s.tasks[i].Progress = progress.Progress
+ s.tasks[i].Status = progress.Status
+
+ log.Printf("Task %s: %.1f%% - %s", progress.TaskID, progress.Progress, progress.Message)
+ break
+ }
+ }
+
+ w.Header().Set("Content-Type", "application/json")
+ json.NewEncoder(w).Encode(map[string]string{"status": "updated"})
+}
+
+func (s *AdminServer) webUIHandler(w http.ResponseWriter, r *http.Request) {
+ w.Header().Set("Content-Type", "text/html")
+
+ html := `
+
+
+ SeaweedFS Admin - EC Task Monitor
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ | Task ID |
+ Type |
+ Volume ID |
+ Status |
+ Progress |
+ Created |
+
+
+
+ | Loading tasks... |
+
+
+
+
+
+
+
+
+
+
+
+ | Worker ID |
+ Address |
+ Status |
+ Capabilities |
+ Last Seen |
+
+
+
+ | Loading workers... |
+
+
+
+
+
+
+ Last updated: -- |
+ Auto-refresh every 5 seconds
+
+
+
+
+
+`
+
+ fmt.Fprint(w, html)
+}
+
func (s *AdminServer) detectVolumesForEC() {
// Simulate volume detection logic
// In real implementation, this would query the master for volume status
@@ -199,7 +497,11 @@ func main() {
http.HandleFunc("/health", server.healthHandler)
http.HandleFunc("/status", server.statusHandler)
http.HandleFunc("/register", server.registerWorkerHandler)
+ http.HandleFunc("/register-worker", server.registerWorkerHandler) // Worker compatibility
http.HandleFunc("/heartbeat", server.heartbeatHandler)
+ http.HandleFunc("/assign-task", server.assignTaskHandler)
+ http.HandleFunc("/task-progress", server.taskProgressHandler)
+ http.HandleFunc("/", server.webUIHandler) // Web UI
// Start volume detection
server.detectVolumesForEC()
diff --git a/docker/admin_integration/admin-grpc-entrypoint.sh b/docker/admin_integration/admin-grpc-entrypoint.sh
new file mode 100644
index 000000000..928b7907a
--- /dev/null
+++ b/docker/admin_integration/admin-grpc-entrypoint.sh
@@ -0,0 +1,73 @@
+#!/bin/sh
+
+set -e
+
+echo "Starting SeaweedFS Admin Server (gRPC)..."
+echo "Master Address: $MASTER_ADDRESS"
+echo "Admin HTTP Port: $ADMIN_PORT"
+echo "Admin gRPC Port: $GRPC_PORT"
+
+# Wait for master to be ready
+echo "Waiting for master to be ready..."
+until wget --quiet --tries=1 --spider http://$MASTER_ADDRESS/cluster/status > /dev/null 2>&1; do
+ echo "Master not ready, waiting..."
+ sleep 5
+done
+echo "Master is ready!"
+
+# Install protobuf compiler and Go protobuf plugins
+apk add --no-cache protobuf protobuf-dev
+
+# Set up Go environment
+export GOPATH=/tmp/go
+export PATH=$PATH:$GOPATH/bin
+mkdir -p $GOPATH/src $GOPATH/bin $GOPATH/pkg
+
+# Install Go protobuf plugins globally first
+export GOPATH=/tmp/go
+mkdir -p $GOPATH
+go install google.golang.org/protobuf/cmd/protoc-gen-go@latest
+go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest
+
+# Set up working directory for compilation
+cd /tmp
+mkdir -p admin-project
+cd admin-project
+
+# Create a basic go.mod with required dependencies
+cat > go.mod << 'EOF'
+module admin-server
+
+go 1.24
+
+require (
+ google.golang.org/grpc v1.65.0
+ google.golang.org/protobuf v1.34.2
+)
+EOF
+
+go mod tidy
+
+# Add Go bin to PATH
+export PATH=$PATH:$(go env GOPATH)/bin
+
+# Create directory structure for protobuf
+mkdir -p worker_pb
+
+# Copy the admin server source and existing worker protobuf file
+cp /admin_grpc_server.go .
+cp /worker.proto .
+
+# Generate Go code from the existing worker protobuf
+echo "Generating gRPC code from worker.proto..."
+protoc --go_out=. --go_opt=paths=source_relative \
+ --go-grpc_out=. --go-grpc_opt=paths=source_relative \
+ worker.proto
+
+# Build and run the admin server
+echo "Building admin server..."
+go mod tidy
+go build -o admin-server admin_grpc_server.go
+
+echo "Starting admin server..."
+exec ./admin-server
\ No newline at end of file
diff --git a/docker/admin_integration/admin_grpc_server.go b/docker/admin_integration/admin_grpc_server.go
new file mode 100644
index 000000000..d54005b0d
--- /dev/null
+++ b/docker/admin_integration/admin_grpc_server.go
@@ -0,0 +1,663 @@
+package main
+
+import (
+ "encoding/json"
+ "fmt"
+ "io"
+ "log"
+ "net"
+ "net/http"
+ "os"
+ "sync"
+ "time"
+
+ pb "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/reflection"
+)
+
+type AdminServer struct {
+ pb.UnimplementedWorkerServiceServer
+
+ // Server configuration
+ grpcPort string
+ httpPort string
+ masterAddr string
+ startTime time.Time
+
+ // Data storage
+ workers map[string]*WorkerInfo
+ tasks map[string]*TaskInfo
+ taskQueue []string
+
+ // Synchronization
+ mu sync.RWMutex
+
+ // Active streams for workers
+ workerStreams map[string]pb.WorkerService_WorkerStreamServer
+ streamMu sync.RWMutex
+}
+
+type WorkerInfo struct {
+ ID string
+ Address string
+ Capabilities []string
+ MaxConcurrent int32
+ Status string
+ CurrentLoad int32
+ LastSeen time.Time
+ TasksCompleted int32
+ TasksFailed int32
+ UptimeSeconds int64
+ Stream pb.WorkerService_WorkerStreamServer
+}
+
+type TaskInfo struct {
+ ID string
+ Type string
+ VolumeID uint32
+ Status string
+ Progress float32
+ AssignedTo string
+ Created time.Time
+ Updated time.Time
+ Server string
+ Collection string
+ DataCenter string
+ Rack string
+ Parameters map[string]string
+}
+
+// gRPC service implementation
+func (s *AdminServer) WorkerStream(stream pb.WorkerService_WorkerStreamServer) error {
+ var workerID string
+
+ for {
+ req, err := stream.Recv()
+ if err == io.EOF {
+ log.Printf("Worker %s disconnected", workerID)
+ s.removeWorkerStream(workerID)
+ return nil
+ }
+ if err != nil {
+ log.Printf("Stream error from worker %s: %v", workerID, err)
+ s.removeWorkerStream(workerID)
+ return err
+ }
+
+ // Handle different message types
+ switch msg := req.Message.(type) {
+ case *pb.WorkerMessage_Registration:
+ workerID = msg.Registration.WorkerId
+ s.handleWorkerRegistration(workerID, msg.Registration, stream)
+
+ case *pb.WorkerMessage_Heartbeat:
+ s.handleWorkerHeartbeat(msg.Heartbeat, stream)
+
+ case *pb.WorkerMessage_TaskRequest:
+ s.handleTaskRequest(msg.TaskRequest, stream)
+
+ case *pb.WorkerMessage_TaskUpdate:
+ s.handleTaskUpdate(msg.TaskUpdate)
+
+ case *pb.WorkerMessage_TaskComplete:
+ s.handleTaskComplete(msg.TaskComplete)
+
+ case *pb.WorkerMessage_Shutdown:
+ log.Printf("Worker %s shutting down: %s", msg.Shutdown.WorkerId, msg.Shutdown.Reason)
+ s.removeWorkerStream(msg.Shutdown.WorkerId)
+ return nil
+ }
+ }
+}
+
+func (s *AdminServer) handleWorkerRegistration(workerID string, reg *pb.WorkerRegistration, stream pb.WorkerService_WorkerStreamServer) {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+
+ worker := &WorkerInfo{
+ ID: reg.WorkerId,
+ Address: reg.Address,
+ Capabilities: reg.Capabilities,
+ MaxConcurrent: reg.MaxConcurrent,
+ Status: "active",
+ CurrentLoad: 0,
+ LastSeen: time.Now(),
+ TasksCompleted: 0,
+ TasksFailed: 0,
+ UptimeSeconds: 0,
+ Stream: stream,
+ }
+
+ s.workers[reg.WorkerId] = worker
+ s.addWorkerStream(reg.WorkerId, stream)
+
+ log.Printf("Registered worker %s with capabilities: %v", reg.WorkerId, reg.Capabilities)
+
+ // Send registration response
+ response := &pb.AdminMessage{
+ AdminId: "admin-server",
+ Timestamp: time.Now().Unix(),
+ Message: &pb.AdminMessage_RegistrationResponse{
+ RegistrationResponse: &pb.RegistrationResponse{
+ Success: true,
+ Message: "Worker registered successfully",
+ AssignedWorkerId: reg.WorkerId,
+ },
+ },
+ }
+
+ if err := stream.Send(response); err != nil {
+ log.Printf("Failed to send registration response to worker %s: %v", reg.WorkerId, err)
+ }
+}
+
+func (s *AdminServer) handleWorkerHeartbeat(heartbeat *pb.WorkerHeartbeat, stream pb.WorkerService_WorkerStreamServer) {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+
+ if worker, exists := s.workers[heartbeat.WorkerId]; exists {
+ worker.Status = heartbeat.Status
+ worker.CurrentLoad = heartbeat.CurrentLoad
+ worker.LastSeen = time.Now()
+ worker.TasksCompleted = heartbeat.TasksCompleted
+ worker.TasksFailed = heartbeat.TasksFailed
+ worker.UptimeSeconds = heartbeat.UptimeSeconds
+
+ log.Printf("Heartbeat from worker %s: status=%s, load=%d/%d",
+ heartbeat.WorkerId, heartbeat.Status, heartbeat.CurrentLoad, heartbeat.MaxConcurrent)
+ }
+
+ // Send heartbeat response
+ response := &pb.AdminMessage{
+ AdminId: "admin-server",
+ Timestamp: time.Now().Unix(),
+ Message: &pb.AdminMessage_HeartbeatResponse{
+ HeartbeatResponse: &pb.HeartbeatResponse{
+ Success: true,
+ Message: "Heartbeat received",
+ },
+ },
+ }
+
+ if err := stream.Send(response); err != nil {
+ log.Printf("Failed to send heartbeat response to worker %s: %v", heartbeat.WorkerId, err)
+ }
+}
+
+func (s *AdminServer) handleTaskRequest(taskReq *pb.TaskRequest, stream pb.WorkerService_WorkerStreamServer) {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+
+ // Find a pending task that matches worker capabilities
+ for taskID, task := range s.tasks {
+ if task.Status == "pending" {
+ // Check if worker has required capability
+ hasCapability := false
+ for _, capability := range taskReq.Capabilities {
+ if capability == task.Type {
+ hasCapability = true
+ break
+ }
+ }
+
+ if hasCapability && taskReq.AvailableSlots > 0 {
+ // Assign task to worker
+ task.Status = "assigned"
+ task.AssignedTo = taskReq.WorkerId
+ task.Updated = time.Now()
+
+ log.Printf("Assigned task %s (volume %d) to worker %s", taskID, task.VolumeID, taskReq.WorkerId)
+
+ // Send task assignment
+ response := &pb.AdminMessage{
+ AdminId: "admin-server",
+ Timestamp: time.Now().Unix(),
+ Message: &pb.AdminMessage_TaskAssignment{
+ TaskAssignment: &pb.TaskAssignment{
+ TaskId: taskID,
+ TaskType: task.Type,
+ Priority: 1,
+ CreatedTime: task.Created.Unix(),
+ Params: &pb.TaskParams{
+ VolumeId: task.VolumeID,
+ Server: task.Server,
+ Collection: task.Collection,
+ DataCenter: task.DataCenter,
+ Rack: task.Rack,
+ Parameters: task.Parameters,
+ },
+ },
+ },
+ }
+
+ if err := stream.Send(response); err != nil {
+ log.Printf("Failed to send task assignment to worker %s: %v", taskReq.WorkerId, err)
+ }
+ return
+ }
+ }
+ }
+
+ log.Printf("No suitable tasks available for worker %s", taskReq.WorkerId)
+}
+
+func (s *AdminServer) handleTaskUpdate(update *pb.TaskUpdate) {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+
+ if task, exists := s.tasks[update.TaskId]; exists {
+ task.Progress = update.Progress
+ task.Status = update.Status
+ task.Updated = time.Now()
+
+ log.Printf("Task %s progress: %.1f%% - %s", update.TaskId, update.Progress, update.Message)
+ }
+}
+
+func (s *AdminServer) handleTaskComplete(complete *pb.TaskComplete) {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+
+ if task, exists := s.tasks[complete.TaskId]; exists {
+ if complete.Success {
+ task.Status = "completed"
+ task.Progress = 100.0
+ } else {
+ task.Status = "failed"
+ }
+ task.Updated = time.Now()
+
+ // Update worker stats
+ if worker, workerExists := s.workers[complete.WorkerId]; workerExists {
+ if complete.Success {
+ worker.TasksCompleted++
+ } else {
+ worker.TasksFailed++
+ }
+ if worker.CurrentLoad > 0 {
+ worker.CurrentLoad--
+ }
+ }
+
+ log.Printf("Task %s completed by worker %s: success=%v", complete.TaskId, complete.WorkerId, complete.Success)
+ }
+}
+
+func (s *AdminServer) addWorkerStream(workerID string, stream pb.WorkerService_WorkerStreamServer) {
+ s.streamMu.Lock()
+ defer s.streamMu.Unlock()
+ s.workerStreams[workerID] = stream
+}
+
+func (s *AdminServer) removeWorkerStream(workerID string) {
+ s.streamMu.Lock()
+ defer s.streamMu.Unlock()
+ delete(s.workerStreams, workerID)
+
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ delete(s.workers, workerID)
+}
+
+// HTTP handlers for web UI
+func (s *AdminServer) statusHandler(w http.ResponseWriter, r *http.Request) {
+ s.mu.RLock()
+ defer s.mu.RUnlock()
+
+ // Convert internal data to JSON-friendly format
+ taskList := make([]map[string]interface{}, 0, len(s.tasks))
+ for _, task := range s.tasks {
+ taskList = append(taskList, map[string]interface{}{
+ "id": task.ID,
+ "type": task.Type,
+ "volume_id": task.VolumeID,
+ "status": task.Status,
+ "progress": task.Progress,
+ "created": task.Created,
+ })
+ }
+
+ workerList := make([]map[string]interface{}, 0, len(s.workers))
+ for _, worker := range s.workers {
+ workerList = append(workerList, map[string]interface{}{
+ "id": worker.ID,
+ "address": worker.Address,
+ "capabilities": worker.Capabilities,
+ "status": worker.Status,
+ "last_seen": worker.LastSeen,
+ })
+ }
+
+ w.Header().Set("Content-Type", "application/json")
+ json.NewEncoder(w).Encode(map[string]interface{}{
+ "admin_server": "running",
+ "master_addr": s.masterAddr,
+ "tasks": taskList,
+ "workers": workerList,
+ "uptime": time.Since(s.startTime).String(),
+ })
+}
+
+func (s *AdminServer) healthHandler(w http.ResponseWriter, r *http.Request) {
+ s.mu.RLock()
+ defer s.mu.RUnlock()
+
+ w.Header().Set("Content-Type", "application/json")
+ json.NewEncoder(w).Encode(map[string]interface{}{
+ "status": "healthy",
+ "uptime": time.Since(s.startTime).String(),
+ "tasks": len(s.tasks),
+ "workers": len(s.workers),
+ })
+}
+
+func (s *AdminServer) webUIHandler(w http.ResponseWriter, r *http.Request) {
+ w.Header().Set("Content-Type", "text/html")
+
+ html := `
+
+
+ SeaweedFS Admin - EC Task Monitor
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ | Task ID |
+ Type |
+ Volume ID |
+ Status |
+ Progress |
+ Created |
+
+
+
+ | Loading tasks... |
+
+
+
+
+
+
+
+
+
+
+
+ | Worker ID |
+ Address |
+ Status |
+ Capabilities |
+ Last Seen |
+
+
+
+ | Loading workers... |
+
+
+
+
+
+
+
+
+`
+
+ fmt.Fprint(w, html)
+}
+
+// Task detection and creation
+func (s *AdminServer) detectVolumesForEC() {
+ ticker := time.NewTicker(30 * time.Second)
+ go func() {
+ for range ticker.C {
+ s.mu.Lock()
+
+ log.Println("Scanning for volumes requiring EC...")
+
+ // Simulate volume detection - in real implementation, query master
+ if len(s.tasks) < 5 { // Don't create too many tasks
+ taskID := fmt.Sprintf("ec-task-%d", time.Now().Unix())
+ volumeID := uint32(1000 + len(s.tasks))
+
+ task := &TaskInfo{
+ ID: taskID,
+ Type: "erasure_coding",
+ VolumeID: volumeID,
+ Status: "pending",
+ Progress: 0.0,
+ Created: time.Now(),
+ Updated: time.Now(),
+ Server: "volume1:8080", // Simplified
+ Collection: "",
+ DataCenter: "dc1",
+ Rack: "rack1",
+ Parameters: map[string]string{
+ "data_shards": "10",
+ "parity_shards": "4",
+ },
+ }
+
+ s.tasks[taskID] = task
+ log.Printf("Created EC task %s for volume %d", taskID, volumeID)
+ }
+
+ s.mu.Unlock()
+ }
+ }()
+}
+
+func main() {
+ grpcPort := os.Getenv("GRPC_PORT")
+ if grpcPort == "" {
+ grpcPort = "9901"
+ }
+
+ httpPort := os.Getenv("ADMIN_PORT")
+ if httpPort == "" {
+ httpPort = "9900"
+ }
+
+ masterAddr := os.Getenv("MASTER_ADDRESS")
+ if masterAddr == "" {
+ masterAddr = "master:9333"
+ }
+
+ server := &AdminServer{
+ grpcPort: grpcPort,
+ httpPort: httpPort,
+ masterAddr: masterAddr,
+ startTime: time.Now(),
+ workers: make(map[string]*WorkerInfo),
+ tasks: make(map[string]*TaskInfo),
+ taskQueue: make([]string, 0),
+ workerStreams: make(map[string]pb.WorkerService_WorkerStreamServer),
+ }
+
+ // Start gRPC server
+ go func() {
+ lis, err := net.Listen("tcp", ":"+grpcPort)
+ if err != nil {
+ log.Fatalf("Failed to listen on gRPC port %s: %v", grpcPort, err)
+ }
+
+ grpcServer := grpc.NewServer()
+ pb.RegisterWorkerServiceServer(grpcServer, server)
+ reflection.Register(grpcServer)
+
+ log.Printf("gRPC server starting on port %s", grpcPort)
+ if err := grpcServer.Serve(lis); err != nil {
+ log.Fatalf("Failed to serve gRPC: %v", err)
+ }
+ }()
+
+ // Start HTTP server for web UI
+ go func() {
+ http.HandleFunc("/", server.webUIHandler)
+ http.HandleFunc("/status", server.statusHandler)
+ http.HandleFunc("/health", server.healthHandler)
+
+ log.Printf("HTTP server starting on port %s", httpPort)
+ if err := http.ListenAndServe(":"+httpPort, nil); err != nil {
+ log.Fatalf("Failed to serve HTTP: %v", err)
+ }
+ }()
+
+ // Start task detection
+ server.detectVolumesForEC()
+
+ log.Printf("Admin server started - gRPC port: %s, HTTP port: %s, Master: %s", grpcPort, httpPort, masterAddr)
+
+ // Keep the main goroutine running
+ select {}
+}
diff --git a/docker/admin_integration/docker-compose-ec-test.yml b/docker/admin_integration/docker-compose-ec-test.yml
index da1ab5603..de9073219 100644
--- a/docker/admin_integration/docker-compose-ec-test.yml
+++ b/docker/admin_integration/docker-compose-ec-test.yml
@@ -227,9 +227,11 @@ services:
container_name: seaweed-admin
ports:
- "9900:9900"
+ - "9901:9901"
environment:
- MASTER_ADDRESS=master:9333
- ADMIN_PORT=9900
+ - GRPC_PORT=9901
- SCAN_INTERVAL=30s
- WORKER_TIMEOUT=5m
- TASK_TIMEOUT=30m
@@ -258,7 +260,7 @@ services:
dockerfile: docker/admin_integration/Dockerfile.worker
container_name: seaweed-worker1
environment:
- - ADMIN_ADDRESS=admin:9900
+ - ADMIN_GRPC_ADDRESS=admin:9901
- WORKER_ID=worker-1
- WORKER_ADDRESS=worker1:9001
- CAPABILITIES=erasure_coding
@@ -284,7 +286,7 @@ services:
dockerfile: docker/admin_integration/Dockerfile.worker
container_name: seaweed-worker2
environment:
- - ADMIN_ADDRESS=admin:9900
+ - ADMIN_GRPC_ADDRESS=admin:9901
- WORKER_ID=worker-2
- WORKER_ADDRESS=worker2:9001
- CAPABILITIES=erasure_coding,vacuum
@@ -310,7 +312,7 @@ services:
dockerfile: docker/admin_integration/Dockerfile.worker
container_name: seaweed-worker3
environment:
- - ADMIN_ADDRESS=admin:9900
+ - ADMIN_GRPC_ADDRESS=admin:9901
- WORKER_ID=worker-3
- WORKER_ADDRESS=worker3:9001
- CAPABILITIES=erasure_coding,vacuum
diff --git a/docker/admin_integration/worker-grpc-entrypoint.sh b/docker/admin_integration/worker-grpc-entrypoint.sh
new file mode 100644
index 000000000..a92cac189
--- /dev/null
+++ b/docker/admin_integration/worker-grpc-entrypoint.sh
@@ -0,0 +1,67 @@
+#!/bin/sh
+
+set -e
+
+echo "Starting SeaweedFS EC Worker (gRPC)..."
+echo "Worker ID: $WORKER_ID"
+echo "Admin gRPC Address: $ADMIN_GRPC_ADDRESS"
+
+# Wait for admin to be ready
+echo "Waiting for admin to be ready..."
+until curl -f http://admin:9900/health > /dev/null 2>&1; do
+ echo "Admin not ready, waiting..."
+ sleep 5
+done
+echo "Admin is ready!"
+
+# Install protobuf compiler and Go protobuf plugins
+apk add --no-cache protobuf protobuf-dev
+
+# Set up Go environment
+export GOPATH=/tmp/go
+export PATH=$PATH:$GOPATH/bin
+mkdir -p $GOPATH/src $GOPATH/bin $GOPATH/pkg
+
+# Install Go protobuf plugins
+cd /tmp
+go mod init worker-client
+
+# Create a basic go.mod with required dependencies
+cat > go.mod << 'EOF'
+module worker-client
+
+go 1.24
+
+require (
+ google.golang.org/grpc v1.65.0
+ google.golang.org/protobuf v1.34.2
+)
+EOF
+
+go mod tidy
+go install google.golang.org/protobuf/cmd/protoc-gen-go@latest
+go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest
+
+# Add Go bin to PATH
+export PATH=$PATH:$(go env GOPATH)/bin
+
+# Create directory structure for protobuf
+mkdir -p worker_pb
+
+# Copy the worker client source and existing worker protobuf file
+cp /worker_grpc_client.go .
+cp /worker.proto .
+
+# Generate Go code from the existing worker protobuf
+echo "Generating gRPC code from worker.proto..."
+protoc --go_out=. --go_opt=paths=source_relative \
+ --go-grpc_out=. --go-grpc_opt=paths=source_relative \
+ worker.proto
+
+# Build and run the worker
+echo "Building worker..."
+go mod tidy
+go build -o worker-client worker_grpc_client.go
+
+echo "Starting worker..."
+exec ./worker-client
\ No newline at end of file