Browse Source

sticky worker id

worker-execute-ec-tasks
chrislu 4 months ago
parent
commit
afdca513ac
  1. 21
      weed/admin/maintenance/maintenance_worker.go
  2. 67
      weed/worker/worker.go

21
weed/admin/maintenance/maintenance_worker.go

@ -7,13 +7,9 @@ import (
"time"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/worker"
"github.com/seaweedfs/seaweedfs/weed/worker/tasks"
"github.com/seaweedfs/seaweedfs/weed/worker/types"
// Import task packages to trigger their auto-registration
_ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/balance"
_ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/erasure_coding"
_ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/vacuum"
)
// MaintenanceWorkerService manages maintenance task execution
@ -396,10 +392,19 @@ func NewMaintenanceWorkerCommand(workerID, address, adminServer string) *Mainten
// Run starts the maintenance worker as a standalone service
func (mwc *MaintenanceWorkerCommand) Run() error {
// Generate worker ID if not provided
// Generate or load persistent worker ID if not provided
if mwc.workerService.workerID == "" {
hostname, _ := os.Hostname()
mwc.workerService.workerID = fmt.Sprintf("worker-%s-%d", hostname, time.Now().Unix())
// Get current working directory for worker ID persistence
wd, err := os.Getwd()
if err != nil {
return fmt.Errorf("failed to get working directory: %w", err)
}
workerID, err := worker.GenerateOrLoadWorkerID(wd)
if err != nil {
return fmt.Errorf("failed to generate or load worker ID: %w", err)
}
mwc.workerService.workerID = workerID
}
// Start the worker service

67
weed/worker/worker.go

@ -1,8 +1,11 @@
package worker
import (
"crypto/rand"
"fmt"
"os"
"path/filepath"
"strings"
"sync"
"time"
@ -45,15 +48,73 @@ type AdminClient interface {
IsConnected() bool
}
// GenerateOrLoadWorkerID generates a unique worker ID or loads existing one from working directory
func GenerateOrLoadWorkerID(workingDir string) (string, error) {
const workerIDFile = "worker.id"
var idFilePath string
if workingDir != "" {
idFilePath = filepath.Join(workingDir, workerIDFile)
} else {
// Use current working directory if none specified
wd, err := os.Getwd()
if err != nil {
return "", fmt.Errorf("failed to get working directory: %w", err)
}
idFilePath = filepath.Join(wd, workerIDFile)
}
// Try to read existing worker ID
if data, err := os.ReadFile(idFilePath); err == nil {
workerID := strings.TrimSpace(string(data))
if workerID != "" {
glog.Infof("Loaded existing worker ID from %s: %s", idFilePath, workerID)
return workerID, nil
}
}
// Generate new unique worker ID
hostname, _ := os.Hostname()
if hostname == "" {
hostname = "unknown"
}
// Generate random component for uniqueness
randomBytes := make([]byte, 4)
var workerID string
if _, err := rand.Read(randomBytes); err != nil {
// Fallback to timestamp if crypto/rand fails
workerID = fmt.Sprintf("worker-%s-%d", hostname, time.Now().Unix())
glog.Infof("Generated fallback worker ID: %s", workerID)
} else {
// Use random bytes + timestamp for uniqueness
randomHex := fmt.Sprintf("%x", randomBytes)
timestamp := time.Now().Unix()
workerID = fmt.Sprintf("worker-%s-%s-%d", hostname, randomHex, timestamp)
glog.Infof("Generated new worker ID: %s", workerID)
}
// Save worker ID to file
if err := os.WriteFile(idFilePath, []byte(workerID), 0644); err != nil {
glog.Warningf("Failed to save worker ID to %s: %v", idFilePath, err)
} else {
glog.Infof("Saved worker ID to %s", idFilePath)
}
return workerID, nil
}
// NewWorker creates a new worker instance
func NewWorker(config *types.WorkerConfig) (*Worker, error) {
if config == nil {
config = types.DefaultWorkerConfig()
}
// Always auto-generate worker ID
hostname, _ := os.Hostname()
workerID := fmt.Sprintf("worker-%s-%d", hostname, time.Now().Unix())
// Generate or load persistent worker ID
workerID, err := GenerateOrLoadWorkerID(config.BaseWorkingDir)
if err != nil {
return nil, fmt.Errorf("failed to generate or load worker ID: %w", err)
}
// Use the global registry that already has all tasks registered
registry := tasks.GetGlobalRegistry()

Loading…
Cancel
Save