Browse Source

admin start wtih grpc port. worker has its own working directory

worker-execute-ec-tasks
chrislu 4 months ago
parent
commit
cd28ce7786
  1. 5
      weed/admin/dash/admin_server.go
  2. 17
      weed/command/admin.go
  3. 48
      weed/command/worker.go
  4. 1
      weed/worker/types/config_types.go
  5. 1
      weed/worker/types/task_types.go
  6. 7
      weed/worker/worker.go

5
weed/admin/dash/admin_server.go

@ -1265,14 +1265,11 @@ func (as *AdminServer) GetMaintenanceWorkersData() (*MaintenanceWorkersData, err
} }
// StartWorkerGrpcServer starts the worker gRPC server // StartWorkerGrpcServer starts the worker gRPC server
func (s *AdminServer) StartWorkerGrpcServer(httpPort int) error {
func (s *AdminServer) StartWorkerGrpcServer(grpcPort int) error {
if s.workerGrpcServer != nil { if s.workerGrpcServer != nil {
return fmt.Errorf("worker gRPC server is already running") return fmt.Errorf("worker gRPC server is already running")
} }
// Calculate gRPC port (HTTP port + 10000)
grpcPort := httpPort + 10000
s.workerGrpcServer = NewWorkerGrpcServer(s) s.workerGrpcServer = NewWorkerGrpcServer(s)
return s.workerGrpcServer.StartWithTLS(grpcPort) return s.workerGrpcServer.StartWithTLS(grpcPort)
} }

17
weed/command/admin.go

@ -33,6 +33,7 @@ var (
type AdminOptions struct { type AdminOptions struct {
port *int port *int
grpcPort *int
masters *string masters *string
adminUser *string adminUser *string
adminPassword *string adminPassword *string
@ -42,6 +43,7 @@ type AdminOptions struct {
func init() { func init() {
cmdAdmin.Run = runAdmin // break init cycle cmdAdmin.Run = runAdmin // break init cycle
a.port = cmdAdmin.Flag.Int("port", 23646, "admin server port") a.port = cmdAdmin.Flag.Int("port", 23646, "admin server port")
a.grpcPort = cmdAdmin.Flag.Int("port.grpc", 0, "gRPC server port for worker connections (default: http port + 10000)")
a.masters = cmdAdmin.Flag.String("masters", "localhost:9333", "comma-separated master servers") a.masters = cmdAdmin.Flag.String("masters", "localhost:9333", "comma-separated master servers")
a.dataDir = cmdAdmin.Flag.String("dataDir", "", "directory to store admin configuration and data files") a.dataDir = cmdAdmin.Flag.String("dataDir", "", "directory to store admin configuration and data files")
@ -50,7 +52,7 @@ func init() {
} }
var cmdAdmin = &Command{ var cmdAdmin = &Command{
UsageLine: "admin -port=23646 -masters=localhost:9333 [-dataDir=/path/to/data]",
UsageLine: "admin -port=23646 -masters=localhost:9333 [-port.grpc=33646] [-dataDir=/path/to/data]",
Short: "start SeaweedFS web admin interface", Short: "start SeaweedFS web admin interface",
Long: `Start a web admin interface for SeaweedFS cluster management. Long: `Start a web admin interface for SeaweedFS cluster management.
@ -63,12 +65,13 @@ var cmdAdmin = &Command{
- Maintenance operations - Maintenance operations
The admin interface automatically discovers filers from the master servers. The admin interface automatically discovers filers from the master servers.
A gRPC server for worker connections runs on HTTP port + 10000.
A gRPC server for worker connections runs on the configured gRPC port (default: HTTP port + 10000).
Example Usage: Example Usage:
weed admin -port=23646 -masters="master1:9333,master2:9333" weed admin -port=23646 -masters="master1:9333,master2:9333"
weed admin -port=23646 -masters="localhost:9333" -dataDir="/var/lib/seaweedfs-admin" weed admin -port=23646 -masters="localhost:9333" -dataDir="/var/lib/seaweedfs-admin"
weed admin -port=23646 -masters="localhost:9333" -dataDir="~/seaweedfs-admin"
weed admin -port=23646 -port.grpc=33646 -masters="localhost:9333" -dataDir="~/seaweedfs-admin"
weed admin -port=9900 -port.grpc=19900 -masters="localhost:9333"
Data Directory: Data Directory:
- If dataDir is specified, admin configuration and maintenance data is persisted - If dataDir is specified, admin configuration and maintenance data is persisted
@ -128,6 +131,11 @@ func runAdmin(cmd *Command, args []string) bool {
return false return false
} }
// Set default gRPC port if not specified
if *a.grpcPort == 0 {
*a.grpcPort = *a.port + 10000
}
// Security warnings // Security warnings
if *a.adminPassword == "" { if *a.adminPassword == "" {
fmt.Println("WARNING: Admin interface is running without authentication!") fmt.Println("WARNING: Admin interface is running without authentication!")
@ -135,6 +143,7 @@ func runAdmin(cmd *Command, args []string) bool {
} }
fmt.Printf("Starting SeaweedFS Admin Interface on port %d\n", *a.port) fmt.Printf("Starting SeaweedFS Admin Interface on port %d\n", *a.port)
fmt.Printf("Worker gRPC server will run on port %d\n", *a.grpcPort)
fmt.Printf("Masters: %s\n", *a.masters) fmt.Printf("Masters: %s\n", *a.masters)
fmt.Printf("Filers will be discovered automatically from masters\n") fmt.Printf("Filers will be discovered automatically from masters\n")
if *a.dataDir != "" { if *a.dataDir != "" {
@ -232,7 +241,7 @@ func startAdminServer(ctx context.Context, options AdminOptions) error {
} }
// Start worker gRPC server for worker connections // Start worker gRPC server for worker connections
err = adminServer.StartWorkerGrpcServer(*options.port)
err = adminServer.StartWorkerGrpcServer(*options.grpcPort)
if err != nil { if err != nil {
return fmt.Errorf("failed to start worker gRPC server: %w", err) return fmt.Errorf("failed to start worker gRPC server: %w", err)
} }

48
weed/command/worker.go

@ -3,6 +3,7 @@ package command
import ( import (
"os" "os"
"os/signal" "os/signal"
"path/filepath"
"strings" "strings"
"syscall" "syscall"
"time" "time"
@ -21,7 +22,7 @@ import (
) )
var cmdWorker = &Command{ var cmdWorker = &Command{
UsageLine: "worker -admin=<admin_server> [-capabilities=<task_types>] [-maxConcurrent=<num>]",
UsageLine: "worker -admin=<admin_server> [-capabilities=<task_types>] [-maxConcurrent=<num>] [-workingDir=<path>]",
Short: "start a maintenance worker to process cluster maintenance tasks", Short: "start a maintenance worker to process cluster maintenance tasks",
Long: `Start a maintenance worker that connects to an admin server to process Long: `Start a maintenance worker that connects to an admin server to process
maintenance tasks like vacuum, erasure coding, remote upload, and replication fixes. maintenance tasks like vacuum, erasure coding, remote upload, and replication fixes.
@ -34,6 +35,7 @@ Examples:
weed worker -admin=admin.example.com:23646 weed worker -admin=admin.example.com:23646
weed worker -admin=localhost:23646 -capabilities=vacuum,replication weed worker -admin=localhost:23646 -capabilities=vacuum,replication
weed worker -admin=localhost:23646 -maxConcurrent=4 weed worker -admin=localhost:23646 -maxConcurrent=4
weed worker -admin=localhost:23646 -workingDir=/tmp/worker
`, `,
} }
@ -43,6 +45,7 @@ var (
workerMaxConcurrent = cmdWorker.Flag.Int("maxConcurrent", 2, "maximum number of concurrent tasks") workerMaxConcurrent = cmdWorker.Flag.Int("maxConcurrent", 2, "maximum number of concurrent tasks")
workerHeartbeatInterval = cmdWorker.Flag.Duration("heartbeat", 30*time.Second, "heartbeat interval") workerHeartbeatInterval = cmdWorker.Flag.Duration("heartbeat", 30*time.Second, "heartbeat interval")
workerTaskRequestInterval = cmdWorker.Flag.Duration("taskInterval", 5*time.Second, "task request interval") workerTaskRequestInterval = cmdWorker.Flag.Duration("taskInterval", 5*time.Second, "task request interval")
workerWorkingDir = cmdWorker.Flag.String("workingDir", "", "working directory for the worker")
) )
func init() { func init() {
@ -67,6 +70,33 @@ func runWorker(cmd *Command, args []string) bool {
return false return false
} }
// Set working directory and create task-specific subdirectories
var baseWorkingDir string
if *workerWorkingDir != "" {
glog.Infof("Setting working directory to: %s", *workerWorkingDir)
if err := os.Chdir(*workerWorkingDir); err != nil {
glog.Fatalf("Failed to change working directory: %v", err)
return false
}
wd, err := os.Getwd()
if err != nil {
glog.Fatalf("Failed to get working directory: %v", err)
return false
}
baseWorkingDir = wd
glog.Infof("Current working directory: %s", baseWorkingDir)
// Create task-specific subdirectories
for _, capability := range capabilities {
taskDir := filepath.Join(baseWorkingDir, string(capability))
if err := os.MkdirAll(taskDir, 0755); err != nil {
glog.Fatalf("Failed to create task directory %s: %v", taskDir, err)
return false
}
glog.Infof("Created task directory: %s", taskDir)
}
}
// Create worker configuration // Create worker configuration
config := &types.WorkerConfig{ config := &types.WorkerConfig{
AdminServer: *workerAdminServer, AdminServer: *workerAdminServer,
@ -74,6 +104,7 @@ func runWorker(cmd *Command, args []string) bool {
MaxConcurrent: *workerMaxConcurrent, MaxConcurrent: *workerMaxConcurrent,
HeartbeatInterval: *workerHeartbeatInterval, HeartbeatInterval: *workerHeartbeatInterval,
TaskRequestInterval: *workerTaskRequestInterval, TaskRequestInterval: *workerTaskRequestInterval,
BaseWorkingDir: baseWorkingDir,
} }
// Create worker instance // Create worker instance
@ -94,6 +125,21 @@ func runWorker(cmd *Command, args []string) bool {
// Set admin client // Set admin client
workerInstance.SetAdminClient(adminClient) workerInstance.SetAdminClient(adminClient)
// Set working directory
if *workerWorkingDir != "" {
glog.Infof("Setting working directory to: %s", *workerWorkingDir)
if err := os.Chdir(*workerWorkingDir); err != nil {
glog.Fatalf("Failed to change working directory: %v", err)
return false
}
wd, err := os.Getwd()
if err != nil {
glog.Fatalf("Failed to get working directory: %v", err)
return false
}
glog.Infof("Current working directory: %s", wd)
}
// Start the worker // Start the worker
err = workerInstance.Start() err = workerInstance.Start()
if err != nil { if err != nil {

1
weed/worker/types/config_types.go

@ -12,6 +12,7 @@ type WorkerConfig struct {
MaxConcurrent int `json:"max_concurrent"` MaxConcurrent int `json:"max_concurrent"`
HeartbeatInterval time.Duration `json:"heartbeat_interval"` HeartbeatInterval time.Duration `json:"heartbeat_interval"`
TaskRequestInterval time.Duration `json:"task_request_interval"` TaskRequestInterval time.Duration `json:"task_request_interval"`
BaseWorkingDir string `json:"base_working_dir,omitempty"`
CustomParameters map[string]interface{} `json:"custom_parameters,omitempty"` CustomParameters map[string]interface{} `json:"custom_parameters,omitempty"`
} }

1
weed/worker/types/task_types.go

@ -60,6 +60,7 @@ type TaskParams struct {
VolumeID uint32 `json:"volume_id,omitempty"` VolumeID uint32 `json:"volume_id,omitempty"`
Server string `json:"server,omitempty"` Server string `json:"server,omitempty"`
Collection string `json:"collection,omitempty"` Collection string `json:"collection,omitempty"`
WorkingDir string `json:"working_dir,omitempty"`
Parameters map[string]interface{} `json:"parameters,omitempty"` Parameters map[string]interface{} `json:"parameters,omitempty"`
} }

7
weed/worker/worker.go

@ -257,11 +257,18 @@ func (w *Worker) executeTask(task *types.Task) {
glog.Infof("Worker %s executing task %s: %s", w.id, task.ID, task.Type) glog.Infof("Worker %s executing task %s: %s", w.id, task.ID, task.Type)
// Determine task-specific working directory
var taskWorkingDir string
if w.config.BaseWorkingDir != "" {
taskWorkingDir = fmt.Sprintf("%s/%s", w.config.BaseWorkingDir, string(task.Type))
}
// Create task instance // Create task instance
taskParams := types.TaskParams{ taskParams := types.TaskParams{
VolumeID: task.VolumeID, VolumeID: task.VolumeID,
Server: task.Server, Server: task.Server,
Collection: task.Collection, Collection: task.Collection,
WorkingDir: taskWorkingDir,
Parameters: task.Parameters, Parameters: task.Parameters,
} }

Loading…
Cancel
Save