From 4f647e1036ebfaec5f770cdadd8c31f52681df57 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Fri, 27 Feb 2026 12:22:21 -0800 Subject: [PATCH] Worker set its working directory (#8461) * set working directory * consolidate to worker directory * working directory * correct directory name * refactoring to use wildcard matcher * simplify * cleaning ec working directory * fix reference * clean * adjust test --- weed/command/mini.go | 30 +- weed/command/plugin_worker_test.go | 6 +- weed/command/worker.go | 2 +- weed/command/worker_runtime.go | 30 +- weed/plugin/worker/erasure_coding_handler.go | 13 +- weed/plugin/worker/vacuum_handler.go | 147 +----- weed/plugin/worker/volume_metrics.go | 168 +++++++ weed/s3api/auth_credentials.go | 6 +- weed/s3api/auth_credentials_test.go | 6 +- weed/s3api/policy_engine/conditions.go | 7 +- weed/s3api/policy_engine/engine_test.go | 3 +- weed/s3api/policy_engine/types.go | 23 +- .../policy_engine/wildcard_matcher_test.go | 469 ------------------ weed/s3api/s3tables/permissions.go | 11 +- weed/util/wildcard/filter.go | 39 ++ .../wildcard}/wildcard_matcher.go | 116 +---- weed/util/wildcard/wildcard_matcher_test.go | 211 ++++++++ weed/worker/tasks/erasure_coding/detection.go | 27 +- weed/worker/tasks/erasure_coding/ec_task.go | 18 +- weed/worker/tasks/vacuum/detection.go | 8 +- weed/worker/types/base/task.go | 11 + weed/worker/types/task.go | 15 + weed/worker/worker.go | 8 + 23 files changed, 559 insertions(+), 815 deletions(-) create mode 100644 weed/plugin/worker/volume_metrics.go delete mode 100644 weed/s3api/policy_engine/wildcard_matcher_test.go create mode 100644 weed/util/wildcard/filter.go rename weed/{s3api/policy_engine => util/wildcard}/wildcard_matcher.go (56%) create mode 100644 weed/util/wildcard/wildcard_matcher_test.go diff --git a/weed/command/mini.go b/weed/command/mini.go index 0b832614a..2ba00d26e 100644 --- a/weed/command/mini.go +++ b/weed/command/mini.go @@ -728,6 +728,7 @@ func saveMiniConfiguration(dataFolder string) error { } func runMini(cmd *Command, args []string) bool { + *miniDataFolders = util.ResolvePath(*miniDataFolders) // Capture which port flags were explicitly passed on CLI BEFORE config file is applied // This is necessary to distinguish user-specified ports from defaults or config file options @@ -1030,9 +1031,15 @@ func startMiniAdminWithWorker(allServicesReady chan struct{}) { glog.Fatalf("Admin server readiness check failed: %v", err) } - // Start worker after admin server is ready - startMiniWorker() - startMiniPluginWorker(ctx) + // Start consolidated worker runtime (both standard and plugin runtimes) + workerDir := filepath.Join(*miniDataFolders, "worker") + if err := os.MkdirAll(workerDir, 0755); err != nil { + glog.Fatalf("Failed to create unified worker directory: %v", err) + } + + glog.Infof("Starting consolidated maintenance worker system (directory: %s)", workerDir) + startMiniWorker(workerDir) + startMiniPluginWorker(ctx, workerDir) // Wait for worker to be ready by polling its gRPC port workerGrpcAddr := fmt.Sprintf("%s:%d", bindIp, *miniAdminOptions.grpcPort) @@ -1091,17 +1098,13 @@ func waitForWorkerReady(workerGrpcAddr string) { } // startMiniWorker starts a single worker for the admin server -func startMiniWorker() { - glog.Infof("Starting maintenance worker for admin server") +func startMiniWorker(workerDir string) { + glog.V(1).Infof("Initializing standard worker runtime") adminAddr := fmt.Sprintf("%s:%d", *miniIp, *miniAdminOptions.port) capabilities := "vacuum,ec,balance" - // Use worker directory under main data folder - workerDir := filepath.Join(*miniDataFolders, "worker") - if err := os.MkdirAll(workerDir, 0755); err != nil { - glog.Fatalf("Failed to create worker directory: %v", err) - } + // Use common worker directory glog.Infof("Worker connecting to admin server: %s", adminAddr) glog.Infof("Worker capabilities: %s", capabilities) @@ -1170,7 +1173,7 @@ func startMiniWorker() { glog.Infof("Maintenance worker %s started successfully", workerInstance.ID()) } -func startMiniPluginWorker(ctx context.Context) { +func startMiniPluginWorker(ctx context.Context, workerDir string) { glog.Infof("Starting plugin worker for admin server") adminAddr := fmt.Sprintf("%s:%d", *miniIp, *miniAdminOptions.port) @@ -1179,10 +1182,7 @@ func startMiniPluginWorker(ctx context.Context) { glog.Infof("Resolved mini plugin worker admin endpoint: %s -> %s", adminAddr, resolvedAdminAddr) } - workerDir := filepath.Join(*miniDataFolders, "plugin_worker") - if err := os.MkdirAll(workerDir, 0755); err != nil { - glog.Fatalf("Failed to create plugin worker directory: %v", err) - } + // Use common worker directory util.LoadConfiguration("security", false) grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.worker") diff --git a/weed/command/plugin_worker_test.go b/weed/command/plugin_worker_test.go index ed4fe91f3..7442b4eb8 100644 --- a/weed/command/plugin_worker_test.go +++ b/weed/command/plugin_worker_test.go @@ -153,11 +153,11 @@ func TestResolvePluginWorkerID(t *testing.T) { if generated == "" { t.Fatalf("expected generated id") } - if len(generated) < 7 || generated[:7] != "plugin-" { - t.Fatalf("expected generated id prefix plugin-, got %q", generated) + if len(generated) < 2 || generated[:2] != "w-" { + t.Fatalf("expected generated id prefix w-, got %q", generated) } - persistedPath := filepath.Join(dir, "plugin.worker.id") + persistedPath := filepath.Join(dir, "worker.id") if _, statErr := os.Stat(persistedPath); statErr != nil { t.Fatalf("expected persisted worker id file: %v", statErr) } diff --git a/weed/command/worker.go b/weed/command/worker.go index 685e79780..0e02578ef 100644 --- a/weed/command/worker.go +++ b/weed/command/worker.go @@ -17,7 +17,7 @@ heartbeat/load reporting, detection, and execution. Behavior: - Use -jobType to choose one or more plugin job handlers (comma-separated list) - - Use -workingDir to persist plugin.worker.id for stable worker identity across restarts + - Use -workingDir to persist worker.id for stable worker identity across restarts - Use -metricsPort/-metricsIp to expose /health, /ready, and /metrics Examples: diff --git a/weed/command/worker_runtime.go b/weed/command/worker_runtime.go index 180d35140..a7affc3ae 100644 --- a/weed/command/worker_runtime.go +++ b/weed/command/worker_runtime.go @@ -7,7 +7,6 @@ import ( "net/http" "os" "os/signal" - "path/filepath" "strconv" "strings" "syscall" @@ -20,6 +19,7 @@ import ( statsCollect "github.com/seaweedfs/seaweedfs/weed/stats" "github.com/seaweedfs/seaweedfs/weed/util" "github.com/seaweedfs/seaweedfs/weed/util/version" + "github.com/seaweedfs/seaweedfs/weed/worker" "google.golang.org/grpc" ) @@ -130,31 +130,11 @@ func runPluginWorkerWithOptions(options pluginWorkerRunOptions) bool { } func resolvePluginWorkerID(explicitID string, workingDir string) (string, error) { - id := strings.TrimSpace(explicitID) - if id != "" { - return id, nil + if explicitID != "" { + return explicitID, nil } - - workingDir = strings.TrimSpace(workingDir) - if workingDir == "" { - return "", nil - } - if err := os.MkdirAll(workingDir, 0755); err != nil { - return "", err - } - - workerIDPath := filepath.Join(workingDir, "plugin.worker.id") - if data, err := os.ReadFile(workerIDPath); err == nil { - if persisted := strings.TrimSpace(string(data)); persisted != "" { - return persisted, nil - } - } - - generated := fmt.Sprintf("plugin-%d", time.Now().UnixNano()) - if err := os.WriteFile(workerIDPath, []byte(generated+"\n"), 0644); err != nil { - return "", err - } - return generated, nil + // Use the same ID generation/loading logic as the standard worker + return worker.GenerateOrLoadWorkerID(workingDir) } // buildPluginWorkerHandler constructs the JobHandler for the given job type. diff --git a/weed/plugin/worker/erasure_coding_handler.go b/weed/plugin/worker/erasure_coding_handler.go index 21aee9339..81758289a 100644 --- a/weed/plugin/worker/erasure_coding_handler.go +++ b/weed/plugin/worker/erasure_coding_handler.go @@ -12,6 +12,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/pb/plugin_pb" "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb" ecstorage "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding" + "github.com/seaweedfs/seaweedfs/weed/util/wildcard" erasurecodingtask "github.com/seaweedfs/seaweedfs/weed/worker/tasks/erasure_coding" workertypes "github.com/seaweedfs/seaweedfs/weed/worker/types" "google.golang.org/grpc" @@ -280,7 +281,7 @@ func emitErasureCodingDetectionDecisionTrace( quietThreshold := time.Duration(taskConfig.QuietForSeconds) * time.Second minSizeBytes := uint64(taskConfig.MinSizeMB) * 1024 * 1024 - allowedCollections := erasurecodingtask.ParseCollectionFilter(taskConfig.CollectionFilter) + allowedCollections := wildcard.CompileWildcardMatchers(taskConfig.CollectionFilter) volumeGroups := make(map[uint32][]*workertypes.VolumeHealthMetrics) for _, metric := range metrics { @@ -318,7 +319,7 @@ func emitErasureCodingDetectionDecisionTrace( skippedTooSmall++ continue } - if len(allowedCollections) > 0 && !allowedCollections[metric.Collection] { + if len(allowedCollections) > 0 && !wildcard.MatchesAnyWildcard(allowedCollections, metric.Collection) { skippedCollectionFilter++ continue } @@ -569,9 +570,7 @@ func (h *ErasureCodingHandler) collectVolumeMetrics( masterAddresses []string, collectionFilter string, ) ([]*workertypes.VolumeHealthMetrics, *topology.ActiveTopology, error) { - // Reuse the same master topology fetch/build flow used by the vacuum handler. - helper := &VacuumHandler{grpcDialOption: h.grpcDialOption} - return helper.collectVolumeMetrics(ctx, masterAddresses, collectionFilter) + return collectVolumeMetricsFromMasters(ctx, masterAddresses, collectionFilter, h.grpcDialOption) } func deriveErasureCodingWorkerConfig(values map[string]*plugin_pb.ConfigValue) *erasureCodingWorkerConfig { @@ -906,7 +905,7 @@ func assignECShardIDs(totalShards int, targetCount int) [][]uint32 { func defaultErasureCodingWorkingDir(baseWorkingDir string) string { dir := strings.TrimSpace(baseWorkingDir) if dir == "" { - return filepath.Join(".", "seaweedfs-ec") + return filepath.Join(".", "erasure_coding") } - return filepath.Join(dir, "seaweedfs-ec") + return filepath.Join(dir, "erasure_coding") } diff --git a/weed/plugin/worker/vacuum_handler.go b/weed/plugin/worker/vacuum_handler.go index c2199e02f..e3e9a7052 100644 --- a/weed/plugin/worker/vacuum_handler.go +++ b/weed/plugin/worker/vacuum_handler.go @@ -3,15 +3,12 @@ package pluginworker import ( "context" "fmt" - "sort" "strconv" "strings" "time" "github.com/seaweedfs/seaweedfs/weed/admin/topology" "github.com/seaweedfs/seaweedfs/weed/glog" - "github.com/seaweedfs/seaweedfs/weed/pb" - "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" "github.com/seaweedfs/seaweedfs/weed/pb/plugin_pb" "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb" vacuumtask "github.com/seaweedfs/seaweedfs/weed/worker/tasks/vacuum" @@ -492,62 +489,7 @@ func (h *VacuumHandler) collectVolumeMetrics( masterAddresses []string, collectionFilter string, ) ([]*workertypes.VolumeHealthMetrics, *topology.ActiveTopology, error) { - if h.grpcDialOption == nil { - return nil, nil, fmt.Errorf("grpc dial option is not configured") - } - if len(masterAddresses) == 0 { - return nil, nil, fmt.Errorf("no master addresses provided in cluster context") - } - - for _, masterAddress := range masterAddresses { - response, err := h.fetchVolumeList(ctx, masterAddress) - if err != nil { - glog.Warningf("Plugin worker failed master volume list at %s: %v", masterAddress, err) - continue - } - - metrics, activeTopology, buildErr := buildVolumeMetrics(response, collectionFilter) - if buildErr != nil { - glog.Warningf("Plugin worker failed to build metrics from master %s: %v", masterAddress, buildErr) - continue - } - return metrics, activeTopology, nil - } - - return nil, nil, fmt.Errorf("failed to load topology from all provided masters") -} - -func (h *VacuumHandler) fetchVolumeList(ctx context.Context, address string) (*master_pb.VolumeListResponse, error) { - var lastErr error - for _, candidate := range masterAddressCandidates(address) { - if ctx.Err() != nil { - return nil, ctx.Err() - } - - dialCtx, cancelDial := context.WithTimeout(ctx, 5*time.Second) - conn, err := pb.GrpcDial(dialCtx, candidate, false, h.grpcDialOption) - cancelDial() - if err != nil { - lastErr = err - continue - } - - client := master_pb.NewSeaweedClient(conn) - callCtx, cancelCall := context.WithTimeout(ctx, 10*time.Second) - response, callErr := client.VolumeList(callCtx, &master_pb.VolumeListRequest{}) - cancelCall() - _ = conn.Close() - - if callErr == nil { - return response, nil - } - lastErr = callErr - } - - if lastErr == nil { - lastErr = fmt.Errorf("no valid master address candidate") - } - return nil, lastErr + return collectVolumeMetricsFromMasters(ctx, masterAddresses, collectionFilter, h.grpcDialOption) } func deriveVacuumConfig(values map[string]*plugin_pb.ConfigValue) *vacuumtask.Config { @@ -558,74 +500,6 @@ func deriveVacuumConfig(values map[string]*plugin_pb.ConfigValue) *vacuumtask.Co return config } -func buildVolumeMetrics( - response *master_pb.VolumeListResponse, - collectionFilter string, -) ([]*workertypes.VolumeHealthMetrics, *topology.ActiveTopology, error) { - if response == nil || response.TopologyInfo == nil { - return nil, nil, fmt.Errorf("volume list response has no topology info") - } - - activeTopology := topology.NewActiveTopology(10) - if err := activeTopology.UpdateTopology(response.TopologyInfo); err != nil { - return nil, nil, err - } - - filter := strings.TrimSpace(collectionFilter) - volumeSizeLimitBytes := uint64(response.VolumeSizeLimitMb) * 1024 * 1024 - now := time.Now() - metrics := make([]*workertypes.VolumeHealthMetrics, 0, 256) - - for _, dc := range response.TopologyInfo.DataCenterInfos { - for _, rack := range dc.RackInfos { - for _, node := range rack.DataNodeInfos { - for diskType, diskInfo := range node.DiskInfos { - for _, volume := range diskInfo.VolumeInfos { - if filter != "" && volume.Collection != filter { - continue - } - - metric := &workertypes.VolumeHealthMetrics{ - VolumeID: volume.Id, - Server: node.Id, - ServerAddress: string(pb.NewServerAddressFromDataNode(node)), - DiskType: diskType, - DiskId: volume.DiskId, - DataCenter: dc.Id, - Rack: rack.Id, - Collection: volume.Collection, - Size: volume.Size, - DeletedBytes: volume.DeletedByteCount, - LastModified: time.Unix(volume.ModifiedAtSecond, 0), - ReplicaCount: 1, - ExpectedReplicas: int(volume.ReplicaPlacement), - IsReadOnly: volume.ReadOnly, - } - if metric.Size > 0 { - metric.GarbageRatio = float64(metric.DeletedBytes) / float64(metric.Size) - } - if volumeSizeLimitBytes > 0 { - metric.FullnessRatio = float64(metric.Size) / float64(volumeSizeLimitBytes) - } - metric.Age = now.Sub(metric.LastModified) - metrics = append(metrics, metric) - } - } - } - } - } - - replicaCounts := make(map[uint32]int) - for _, metric := range metrics { - replicaCounts[metric.VolumeID]++ - } - for _, metric := range metrics { - metric.ReplicaCount = replicaCounts[metric.VolumeID] - } - - return metrics, activeTopology, nil -} - func buildVacuumProposal(result *workertypes.TaskDetectionResult) (*plugin_pb.JobProposal, error) { if result == nil { return nil, fmt.Errorf("task detection result is nil") @@ -834,25 +708,6 @@ func mapTaskPriority(priority workertypes.TaskPriority) plugin_pb.JobPriority { } } -func masterAddressCandidates(address string) []string { - trimmed := strings.TrimSpace(address) - if trimmed == "" { - return nil - } - candidateSet := map[string]struct{}{ - trimmed: {}, - } - converted := pb.ServerToGrpcAddress(trimmed) - candidateSet[converted] = struct{}{} - - candidates := make([]string, 0, len(candidateSet)) - for candidate := range candidateSet { - candidates = append(candidates, candidate) - } - sort.Strings(candidates) - return candidates -} - func shouldSkipDetectionByInterval(lastSuccessfulRun *timestamppb.Timestamp, minIntervalSeconds int) bool { if lastSuccessfulRun == nil || minIntervalSeconds <= 0 { return false diff --git a/weed/plugin/worker/volume_metrics.go b/weed/plugin/worker/volume_metrics.go new file mode 100644 index 000000000..32a11f489 --- /dev/null +++ b/weed/plugin/worker/volume_metrics.go @@ -0,0 +1,168 @@ +package pluginworker + +import ( + "context" + "fmt" + "sort" + "strings" + "time" + + "github.com/seaweedfs/seaweedfs/weed/admin/topology" + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/pb" + "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" + "github.com/seaweedfs/seaweedfs/weed/util/wildcard" + workertypes "github.com/seaweedfs/seaweedfs/weed/worker/types" + "google.golang.org/grpc" +) + +func collectVolumeMetricsFromMasters( + ctx context.Context, + masterAddresses []string, + collectionFilter string, + grpcDialOption grpc.DialOption, +) ([]*workertypes.VolumeHealthMetrics, *topology.ActiveTopology, error) { + if grpcDialOption == nil { + return nil, nil, fmt.Errorf("grpc dial option is not configured") + } + if len(masterAddresses) == 0 { + return nil, nil, fmt.Errorf("no master addresses provided in cluster context") + } + + for _, masterAddress := range masterAddresses { + response, err := fetchVolumeList(ctx, masterAddress, grpcDialOption) + if err != nil { + glog.Warningf("Plugin worker failed master volume list at %s: %v", masterAddress, err) + continue + } + + metrics, activeTopology, buildErr := buildVolumeMetrics(response, collectionFilter) + if buildErr != nil { + glog.Warningf("Plugin worker failed to build metrics from master %s: %v", masterAddress, buildErr) + continue + } + return metrics, activeTopology, nil + } + + return nil, nil, fmt.Errorf("failed to load topology from all provided masters") +} + +func fetchVolumeList(ctx context.Context, address string, grpcDialOption grpc.DialOption) (*master_pb.VolumeListResponse, error) { + var lastErr error + for _, candidate := range masterAddressCandidates(address) { + if ctx.Err() != nil { + return nil, ctx.Err() + } + + dialCtx, cancelDial := context.WithTimeout(ctx, 5*time.Second) + conn, err := pb.GrpcDial(dialCtx, candidate, false, grpcDialOption) + cancelDial() + if err != nil { + lastErr = err + continue + } + + client := master_pb.NewSeaweedClient(conn) + callCtx, cancelCall := context.WithTimeout(ctx, 10*time.Second) + response, callErr := client.VolumeList(callCtx, &master_pb.VolumeListRequest{}) + cancelCall() + _ = conn.Close() + + if callErr == nil { + return response, nil + } + lastErr = callErr + } + + if lastErr == nil { + lastErr = fmt.Errorf("no valid master address candidate") + } + return nil, lastErr +} + +func buildVolumeMetrics( + response *master_pb.VolumeListResponse, + collectionFilter string, +) ([]*workertypes.VolumeHealthMetrics, *topology.ActiveTopology, error) { + if response == nil || response.TopologyInfo == nil { + return nil, nil, fmt.Errorf("volume list response has no topology info") + } + + activeTopology := topology.NewActiveTopology(10) + if err := activeTopology.UpdateTopology(response.TopologyInfo); err != nil { + return nil, nil, err + } + + patterns := wildcard.CompileWildcardMatchers(collectionFilter) + volumeSizeLimitBytes := uint64(response.VolumeSizeLimitMb) * 1024 * 1024 + now := time.Now() + metrics := make([]*workertypes.VolumeHealthMetrics, 0, 256) + + for _, dc := range response.TopologyInfo.DataCenterInfos { + for _, rack := range dc.RackInfos { + for _, node := range rack.DataNodeInfos { + for diskType, diskInfo := range node.DiskInfos { + for _, volume := range diskInfo.VolumeInfos { + if !wildcard.MatchesAnyWildcard(patterns, volume.Collection) { + continue + } + + metric := &workertypes.VolumeHealthMetrics{ + VolumeID: volume.Id, + Server: node.Id, + ServerAddress: string(pb.NewServerAddressFromDataNode(node)), + DiskType: diskType, + DiskId: volume.DiskId, + DataCenter: dc.Id, + Rack: rack.Id, + Collection: volume.Collection, + Size: volume.Size, + DeletedBytes: volume.DeletedByteCount, + LastModified: time.Unix(volume.ModifiedAtSecond, 0), + ReplicaCount: 1, + ExpectedReplicas: int(volume.ReplicaPlacement), + IsReadOnly: volume.ReadOnly, + } + if metric.Size > 0 { + metric.GarbageRatio = float64(metric.DeletedBytes) / float64(metric.Size) + } + if volumeSizeLimitBytes > 0 { + metric.FullnessRatio = float64(metric.Size) / float64(volumeSizeLimitBytes) + } + metric.Age = now.Sub(metric.LastModified) + metrics = append(metrics, metric) + } + } + } + } + } + + replicaCounts := make(map[uint32]int) + for _, metric := range metrics { + replicaCounts[metric.VolumeID]++ + } + for _, metric := range metrics { + metric.ReplicaCount = replicaCounts[metric.VolumeID] + } + + return metrics, activeTopology, nil +} + +func masterAddressCandidates(address string) []string { + trimmed := strings.TrimSpace(address) + if trimmed == "" { + return nil + } + candidateSet := map[string]struct{}{ + trimmed: {}, + } + converted := pb.ServerToGrpcAddress(trimmed) + candidateSet[converted] = struct{}{} + + candidates := make([]string, 0, len(candidateSet)) + for candidate := range candidateSet { + candidates = append(candidates, candidate) + } + sort.Strings(candidates) + return candidates +} diff --git a/weed/s3api/auth_credentials.go b/weed/s3api/auth_credentials.go index 0d8dda899..cadc48b66 100644 --- a/weed/s3api/auth_credentials.go +++ b/weed/s3api/auth_credentials.go @@ -21,9 +21,9 @@ import ( "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/pb/iam_pb" - "github.com/seaweedfs/seaweedfs/weed/s3api/policy_engine" "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants" "github.com/seaweedfs/seaweedfs/weed/s3api/s3err" + "github.com/seaweedfs/seaweedfs/weed/util/wildcard" "github.com/seaweedfs/seaweedfs/weed/wdclient" // Import KMS providers to register them @@ -1471,10 +1471,10 @@ func (identity *Identity) CanDo(action Action, bucket string, objectKey string) act := string(a) if strings.ContainsAny(act, "*?") { // Pattern has wildcards - use smart matching - if policy_engine.MatchesWildcard(act, target) { + if wildcard.MatchesWildcard(act, target) { return true } - if policy_engine.MatchesWildcard(act, adminTarget) { + if wildcard.MatchesWildcard(act, adminTarget) { return true } } else { diff --git a/weed/s3api/auth_credentials_test.go b/weed/s3api/auth_credentials_test.go index 95611a7ff..5e4f80ee6 100644 --- a/weed/s3api/auth_credentials_test.go +++ b/weed/s3api/auth_credentials_test.go @@ -8,8 +8,8 @@ import ( "testing" "github.com/seaweedfs/seaweedfs/weed/credential" - "github.com/seaweedfs/seaweedfs/weed/s3api/policy_engine" . "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants" + "github.com/seaweedfs/seaweedfs/weed/util/wildcard" "github.com/stretchr/testify/assert" "github.com/seaweedfs/seaweedfs/weed/pb/iam_pb" @@ -252,9 +252,9 @@ func TestMatchWildcardPattern(t *testing.T) { for _, tt := range tests { t.Run(tt.pattern+"_"+tt.target, func(t *testing.T) { - result := policy_engine.MatchesWildcard(tt.pattern, tt.target) + result := wildcard.MatchesWildcard(tt.pattern, tt.target) if result != tt.match { - t.Errorf("policy_engine.MatchesWildcard(%q, %q) = %v, want %v", tt.pattern, tt.target, result, tt.match) + t.Errorf("wildcard.MatchesWildcard(%q, %q) = %v, want %v", tt.pattern, tt.target, result, tt.match) } }) } diff --git a/weed/s3api/policy_engine/conditions.go b/weed/s3api/policy_engine/conditions.go index d805bcd18..b32f11594 100644 --- a/weed/s3api/policy_engine/conditions.go +++ b/weed/s3api/policy_engine/conditions.go @@ -11,6 +11,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants" + "github.com/seaweedfs/seaweedfs/weed/util/wildcard" ) // LRUNode represents a node in the doubly-linked list for efficient LRU operations @@ -210,7 +211,7 @@ func (e *StringLikeEvaluator) Evaluate(conditionValue interface{}, contextValues patterns := getCachedNormalizedValues(conditionValue) for _, pattern := range patterns { for _, contextValue := range contextValues { - if MatchesWildcard(pattern, contextValue) { + if wildcard.MatchesWildcard(pattern, contextValue) { return true } } @@ -225,7 +226,7 @@ func (e *StringNotLikeEvaluator) Evaluate(conditionValue interface{}, contextVal patterns := getCachedNormalizedValues(conditionValue) for _, pattern := range patterns { for _, contextValue := range contextValues { - if MatchesWildcard(pattern, contextValue) { + if wildcard.MatchesWildcard(pattern, contextValue) { return false } } @@ -628,7 +629,7 @@ func (e *ArnLikeEvaluator) Evaluate(conditionValue interface{}, contextValues [] patterns := getCachedNormalizedValues(conditionValue) for _, pattern := range patterns { for _, contextValue := range contextValues { - if MatchesWildcard(pattern, contextValue) { + if wildcard.MatchesWildcard(pattern, contextValue) { return true } } diff --git a/weed/s3api/policy_engine/engine_test.go b/weed/s3api/policy_engine/engine_test.go index 6b0180be5..7f2da1887 100644 --- a/weed/s3api/policy_engine/engine_test.go +++ b/weed/s3api/policy_engine/engine_test.go @@ -7,6 +7,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants" "github.com/seaweedfs/seaweedfs/weed/s3api/s3err" + "github.com/seaweedfs/seaweedfs/weed/util/wildcard" ) // tagsToEntry converts a map of tag key-value pairs to the entry.Extended format @@ -749,7 +750,7 @@ func TestWildcardMatching(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - result := MatchesWildcard(tt.pattern, tt.str) + result := wildcard.MatchesWildcard(tt.pattern, tt.str) if result != tt.expected { t.Errorf("Pattern %s against %s: expected %v, got %v", tt.pattern, tt.str, tt.expected, result) } diff --git a/weed/s3api/policy_engine/types.go b/weed/s3api/policy_engine/types.go index 60cc87216..abb129596 100644 --- a/weed/s3api/policy_engine/types.go +++ b/weed/s3api/policy_engine/types.go @@ -10,6 +10,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/glog" s3const "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants" + "github.com/seaweedfs/seaweedfs/weed/util/wildcard" ) // Policy Engine Types @@ -180,9 +181,9 @@ type CompiledPolicy struct { // CompiledStatement represents a compiled policy statement type CompiledStatement struct { Statement *PolicyStatement - ActionMatchers []*WildcardMatcher - ResourceMatchers []*WildcardMatcher - PrincipalMatchers []*WildcardMatcher + ActionMatchers []*wildcard.WildcardMatcher + ResourceMatchers []*wildcard.WildcardMatcher + PrincipalMatchers []*wildcard.WildcardMatcher // Keep regex patterns for backward compatibility ActionPatterns []*regexp.Regexp ResourcePatterns []*regexp.Regexp @@ -195,7 +196,7 @@ type CompiledStatement struct { // NotResource patterns (resource should NOT match these) NotResourcePatterns []*regexp.Regexp - NotResourceMatchers []*WildcardMatcher + NotResourceMatchers []*wildcard.WildcardMatcher DynamicNotResourcePatterns []string } @@ -328,7 +329,7 @@ func compileStatement(stmt *PolicyStatement) (*CompiledStatement, error) { } compiled.ActionPatterns = append(compiled.ActionPatterns, pattern) - matcher, err := NewWildcardMatcher(action) + matcher, err := wildcard.NewWildcardMatcher(action) if err != nil { return nil, fmt.Errorf("failed to create action matcher %s: %v", action, err) } @@ -352,7 +353,7 @@ func compileStatement(stmt *PolicyStatement) (*CompiledStatement, error) { } compiled.ResourcePatterns = append(compiled.ResourcePatterns, pattern) - matcher, err := NewWildcardMatcher(resource) + matcher, err := wildcard.NewWildcardMatcher(resource) if err != nil { return nil, fmt.Errorf("failed to create resource matcher %s: %v", resource, err) } @@ -377,7 +378,7 @@ func compileStatement(stmt *PolicyStatement) (*CompiledStatement, error) { } compiled.PrincipalPatterns = append(compiled.PrincipalPatterns, pattern) - matcher, err := NewWildcardMatcher(principal) + matcher, err := wildcard.NewWildcardMatcher(principal) if err != nil { return nil, fmt.Errorf("failed to create principal matcher %s: %v", principal, err) } @@ -403,7 +404,7 @@ func compileStatement(stmt *PolicyStatement) (*CompiledStatement, error) { } compiled.NotResourcePatterns = append(compiled.NotResourcePatterns, pattern) - matcher, err := NewWildcardMatcher(notResource) + matcher, err := wildcard.NewWildcardMatcher(notResource) if err != nil { return nil, fmt.Errorf("failed to create NotResource matcher %s: %v", notResource, err) } @@ -419,7 +420,7 @@ func compileStatement(stmt *PolicyStatement) (*CompiledStatement, error) { // compilePattern compiles a wildcard pattern to regex func compilePattern(pattern string) (*regexp.Regexp, error) { - return CompileWildcardPattern(pattern) + return wildcard.CompileWildcardPattern(pattern) } // normalizeToStringSlice converts various types to string slice - kept for backward compatibility @@ -571,11 +572,11 @@ func (cp *CompiledPolicy) EvaluatePolicy(args *PolicyEvaluationArgs) (bool, Poli // FastMatchesWildcard uses cached WildcardMatcher for performance func FastMatchesWildcard(pattern, str string) bool { - matcher, err := GetCachedWildcardMatcher(pattern) + matcher, err := wildcard.GetCachedWildcardMatcher(pattern) if err != nil { glog.Errorf("Error getting cached WildcardMatcher for pattern %s: %v", pattern, err) // Fall back to the original implementation - return MatchesWildcard(pattern, str) + return wildcard.MatchesWildcard(pattern, str) } return matcher.Match(str) } diff --git a/weed/s3api/policy_engine/wildcard_matcher_test.go b/weed/s3api/policy_engine/wildcard_matcher_test.go deleted file mode 100644 index 43e16284e..000000000 --- a/weed/s3api/policy_engine/wildcard_matcher_test.go +++ /dev/null @@ -1,469 +0,0 @@ -package policy_engine - -import ( - "testing" -) - -func TestMatchesWildcard(t *testing.T) { - tests := []struct { - name string - pattern string - str string - expected bool - }{ - // Basic functionality tests - { - name: "Exact match", - pattern: "test", - str: "test", - expected: true, - }, - { - name: "Single wildcard", - pattern: "*", - str: "anything", - expected: true, - }, - { - name: "Empty string with wildcard", - pattern: "*", - str: "", - expected: true, - }, - - // Star (*) wildcard tests - { - name: "Prefix wildcard", - pattern: "test*", - str: "test123", - expected: true, - }, - { - name: "Suffix wildcard", - pattern: "*test", - str: "123test", - expected: true, - }, - { - name: "Middle wildcard", - pattern: "test*123", - str: "testABC123", - expected: true, - }, - { - name: "Multiple wildcards", - pattern: "test*abc*123", - str: "testXYZabcDEF123", - expected: true, - }, - { - name: "No match", - pattern: "test*", - str: "other", - expected: false, - }, - - // Question mark (?) wildcard tests - { - name: "Single question mark", - pattern: "test?", - str: "test1", - expected: true, - }, - { - name: "Multiple question marks", - pattern: "test??", - str: "test12", - expected: true, - }, - { - name: "Question mark no match", - pattern: "test?", - str: "test12", - expected: false, - }, - { - name: "Mixed wildcards", - pattern: "test*abc?def", - str: "testXYZabc1def", - expected: true, - }, - - // Edge cases - { - name: "Empty pattern", - pattern: "", - str: "", - expected: true, - }, - { - name: "Empty pattern with string", - pattern: "", - str: "test", - expected: false, - }, - { - name: "Pattern with string empty", - pattern: "test", - str: "", - expected: false, - }, - - // Special characters - { - name: "Pattern with regex special chars", - pattern: "test[abc]", - str: "test[abc]", - expected: true, - }, - { - name: "Pattern with dots", - pattern: "test.txt", - str: "test.txt", - expected: true, - }, - { - name: "Pattern with dots and wildcard", - pattern: "*.txt", - str: "test.txt", - expected: true, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - result := MatchesWildcard(tt.pattern, tt.str) - if result != tt.expected { - t.Errorf("Pattern %s against %s: expected %v, got %v", tt.pattern, tt.str, tt.expected, result) - } - }) - } -} - -func TestWildcardMatcher(t *testing.T) { - tests := []struct { - name string - pattern string - strings []string - expected []bool - }{ - { - name: "Simple star pattern", - pattern: "test*", - strings: []string{"test", "test123", "testing", "other"}, - expected: []bool{true, true, true, false}, - }, - { - name: "Question mark pattern", - pattern: "test?", - strings: []string{"test1", "test2", "test", "test12"}, - expected: []bool{true, true, false, false}, - }, - { - name: "Mixed pattern", - pattern: "*.txt", - strings: []string{"file.txt", "test.txt", "file.doc", "txt"}, - expected: []bool{true, true, false, false}, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - matcher, err := NewWildcardMatcher(tt.pattern) - if err != nil { - t.Fatalf("Failed to create matcher: %v", err) - } - - for i, str := range tt.strings { - result := matcher.Match(str) - if result != tt.expected[i] { - t.Errorf("Pattern %s against %s: expected %v, got %v", tt.pattern, str, tt.expected[i], result) - } - } - }) - } -} - -func TestCompileWildcardPattern(t *testing.T) { - tests := []struct { - name string - pattern string - input string - want bool - }{ - {"Star wildcard", "s3:Get*", "s3:GetObject", true}, - {"Question mark wildcard", "s3:Get?bject", "s3:GetObject", true}, - {"Mixed wildcards", "s3:*Object*", "s3:GetObjectAcl", true}, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - regex, err := CompileWildcardPattern(tt.pattern) - if err != nil { - t.Errorf("CompileWildcardPattern() error = %v", err) - return - } - got := regex.MatchString(tt.input) - if got != tt.want { - t.Errorf("CompileWildcardPattern() = %v, want %v", got, tt.want) - } - }) - } -} - -// BenchmarkWildcardMatchingPerformance demonstrates the performance benefits of caching -func BenchmarkWildcardMatchingPerformance(b *testing.B) { - patterns := []string{ - "s3:Get*", - "s3:Put*", - "s3:Delete*", - "s3:List*", - "arn:aws:s3:::bucket/*", - "arn:aws:s3:::bucket/prefix*", - "user:*", - "user:admin-*", - } - - inputs := []string{ - "s3:GetObject", - "s3:PutObject", - "s3:DeleteObject", - "s3:ListBucket", - "arn:aws:s3:::bucket/file.txt", - "arn:aws:s3:::bucket/prefix/file.txt", - "user:admin", - "user:admin-john", - } - - b.Run("WithoutCache", func(b *testing.B) { - for i := 0; i < b.N; i++ { - for _, pattern := range patterns { - for _, input := range inputs { - MatchesWildcard(pattern, input) - } - } - } - }) - - b.Run("WithCache", func(b *testing.B) { - for i := 0; i < b.N; i++ { - for _, pattern := range patterns { - for _, input := range inputs { - FastMatchesWildcard(pattern, input) - } - } - } - }) -} - -// BenchmarkWildcardMatcherReuse demonstrates the performance benefits of reusing WildcardMatcher instances -func BenchmarkWildcardMatcherReuse(b *testing.B) { - pattern := "s3:Get*" - input := "s3:GetObject" - - b.Run("NewMatcherEveryTime", func(b *testing.B) { - for i := 0; i < b.N; i++ { - matcher, _ := NewWildcardMatcher(pattern) - matcher.Match(input) - } - }) - - b.Run("CachedMatcher", func(b *testing.B) { - for i := 0; i < b.N; i++ { - matcher, _ := GetCachedWildcardMatcher(pattern) - matcher.Match(input) - } - }) -} - -// TestWildcardMatcherCaching verifies that caching works correctly -func TestWildcardMatcherCaching(t *testing.T) { - pattern := "s3:Get*" - - // Get the first matcher - matcher1, err := GetCachedWildcardMatcher(pattern) - if err != nil { - t.Fatalf("Failed to get cached matcher: %v", err) - } - - // Get the second matcher - should be the same instance - matcher2, err := GetCachedWildcardMatcher(pattern) - if err != nil { - t.Fatalf("Failed to get cached matcher: %v", err) - } - - // Check that they're the same instance (same pointer) - if matcher1 != matcher2 { - t.Errorf("Expected same matcher instance, got different instances") - } - - // Test that both matchers work correctly - testInput := "s3:GetObject" - if !matcher1.Match(testInput) { - t.Errorf("First matcher failed to match %s", testInput) - } - if !matcher2.Match(testInput) { - t.Errorf("Second matcher failed to match %s", testInput) - } -} - -// TestFastMatchesWildcard verifies that the fast matching function works correctly -func TestFastMatchesWildcard(t *testing.T) { - tests := []struct { - pattern string - input string - want bool - }{ - {"s3:Get*", "s3:GetObject", true}, - {"s3:Put*", "s3:GetObject", false}, - {"arn:aws:s3:::bucket/*", "arn:aws:s3:::bucket/file.txt", true}, - {"user:admin-*", "user:admin-john", true}, - {"user:admin-*", "user:guest-john", false}, - } - - for _, tt := range tests { - t.Run(tt.pattern+"_"+tt.input, func(t *testing.T) { - got := FastMatchesWildcard(tt.pattern, tt.input) - if got != tt.want { - t.Errorf("FastMatchesWildcard(%q, %q) = %v, want %v", tt.pattern, tt.input, got, tt.want) - } - }) - } -} - -// TestWildcardMatcherCacheBounding tests the bounded cache functionality -func TestWildcardMatcherCacheBounding(t *testing.T) { - // Clear cache before test - wildcardMatcherCache.ClearCache() - - // Get original max size - originalMaxSize := wildcardMatcherCache.maxSize - - // Set a small max size for testing - wildcardMatcherCache.maxSize = 3 - defer func() { - wildcardMatcherCache.maxSize = originalMaxSize - wildcardMatcherCache.ClearCache() - }() - - // Add patterns up to max size - patterns := []string{"pattern1", "pattern2", "pattern3"} - for _, pattern := range patterns { - _, err := GetCachedWildcardMatcher(pattern) - if err != nil { - t.Fatalf("Failed to get cached matcher for %s: %v", pattern, err) - } - } - - // Verify cache size - size, maxSize := wildcardMatcherCache.GetCacheStats() - if size != 3 { - t.Errorf("Expected cache size 3, got %d", size) - } - if maxSize != 3 { - t.Errorf("Expected max size 3, got %d", maxSize) - } - - // Add another pattern, should evict the least recently used - _, err := GetCachedWildcardMatcher("pattern4") - if err != nil { - t.Fatalf("Failed to get cached matcher for pattern4: %v", err) - } - - // Cache should still be at max size - size, _ = wildcardMatcherCache.GetCacheStats() - if size != 3 { - t.Errorf("Expected cache size 3 after eviction, got %d", size) - } - - // The first pattern should have been evicted - wildcardMatcherCache.mu.RLock() - if _, exists := wildcardMatcherCache.matchers["pattern1"]; exists { - t.Errorf("Expected pattern1 to be evicted, but it still exists") - } - if _, exists := wildcardMatcherCache.matchers["pattern4"]; !exists { - t.Errorf("Expected pattern4 to be in cache, but it doesn't exist") - } - wildcardMatcherCache.mu.RUnlock() -} - -// TestWildcardMatcherCacheLRU tests the LRU eviction policy -func TestWildcardMatcherCacheLRU(t *testing.T) { - // Clear cache before test - wildcardMatcherCache.ClearCache() - - // Get original max size - originalMaxSize := wildcardMatcherCache.maxSize - - // Set a small max size for testing - wildcardMatcherCache.maxSize = 3 - defer func() { - wildcardMatcherCache.maxSize = originalMaxSize - wildcardMatcherCache.ClearCache() - }() - - // Add patterns to fill cache - patterns := []string{"pattern1", "pattern2", "pattern3"} - for _, pattern := range patterns { - _, err := GetCachedWildcardMatcher(pattern) - if err != nil { - t.Fatalf("Failed to get cached matcher for %s: %v", pattern, err) - } - } - - // Access pattern1 to make it most recently used - _, err := GetCachedWildcardMatcher("pattern1") - if err != nil { - t.Fatalf("Failed to access pattern1: %v", err) - } - - // Add another pattern, should evict pattern2 (now least recently used) - _, err = GetCachedWildcardMatcher("pattern4") - if err != nil { - t.Fatalf("Failed to get cached matcher for pattern4: %v", err) - } - - // pattern1 should still be in cache (was accessed recently) - // pattern2 should be evicted (was least recently used) - wildcardMatcherCache.mu.RLock() - if _, exists := wildcardMatcherCache.matchers["pattern1"]; !exists { - t.Errorf("Expected pattern1 to remain in cache (most recently used)") - } - if _, exists := wildcardMatcherCache.matchers["pattern2"]; exists { - t.Errorf("Expected pattern2 to be evicted (least recently used)") - } - if _, exists := wildcardMatcherCache.matchers["pattern3"]; !exists { - t.Errorf("Expected pattern3 to remain in cache") - } - if _, exists := wildcardMatcherCache.matchers["pattern4"]; !exists { - t.Errorf("Expected pattern4 to be in cache") - } - wildcardMatcherCache.mu.RUnlock() -} - -// TestWildcardMatcherCacheClear tests the cache clearing functionality -func TestWildcardMatcherCacheClear(t *testing.T) { - // Add some patterns to cache - patterns := []string{"pattern1", "pattern2", "pattern3"} - for _, pattern := range patterns { - _, err := GetCachedWildcardMatcher(pattern) - if err != nil { - t.Fatalf("Failed to get cached matcher for %s: %v", pattern, err) - } - } - - // Verify cache has patterns - size, _ := wildcardMatcherCache.GetCacheStats() - if size == 0 { - t.Errorf("Expected cache to have patterns before clearing") - } - - // Clear cache - wildcardMatcherCache.ClearCache() - - // Verify cache is empty - size, _ = wildcardMatcherCache.GetCacheStats() - if size != 0 { - t.Errorf("Expected cache to be empty after clearing, got size %d", size) - } -} diff --git a/weed/s3api/s3tables/permissions.go b/weed/s3api/s3tables/permissions.go index e699526bd..4ce198b6d 100644 --- a/weed/s3api/s3tables/permissions.go +++ b/weed/s3api/s3tables/permissions.go @@ -8,6 +8,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/s3api/policy_engine" "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants" + "github.com/seaweedfs/seaweedfs/weed/util/wildcard" ) // Permission represents a specific action permission @@ -217,7 +218,7 @@ func hasIdentityPermission(operation string, ctx *PolicyContext) bool { if action == candidate { return true } - if strings.ContainsAny(action, "*?") && policy_engine.MatchesWildcard(action, candidate) { + if strings.ContainsAny(action, "*?") && wildcard.MatchesWildcard(action, candidate) { return true } } @@ -238,7 +239,7 @@ func matchesPrincipal(principalSpec interface{}, principal string) bool { return true } // Support wildcard matching for principals (e.g., "arn:aws:iam::*:user/admin") - return policy_engine.MatchesWildcard(p, principal) + return wildcard.MatchesWildcard(p, principal) case []interface{}: // Array of principals for _, item := range p { @@ -247,7 +248,7 @@ func matchesPrincipal(principalSpec interface{}, principal string) bool { return true } // Support wildcard matching - if policy_engine.MatchesWildcard(str, principal) { + if wildcard.MatchesWildcard(str, principal) { return true } } @@ -302,7 +303,7 @@ func matchesActionPattern(pattern, action string) bool { // Wildcard match using policy engine's wildcard matcher // Supports both * (any sequence) and ? (single character) anywhere in the pattern - return policy_engine.MatchesWildcard(pattern, action) + return wildcard.MatchesWildcard(pattern, action) } func matchesConditions(conditions map[string]map[string]interface{}, ctx *PolicyContext) bool { @@ -411,7 +412,7 @@ func matchesResourcePattern(pattern, resourceARN string) bool { } // Wildcard match using policy engine's wildcard matcher - return policy_engine.MatchesWildcard(pattern, resourceARN) + return wildcard.MatchesWildcard(pattern, resourceARN) } // Helper functions for specific permissions diff --git a/weed/util/wildcard/filter.go b/weed/util/wildcard/filter.go new file mode 100644 index 000000000..2960bb080 --- /dev/null +++ b/weed/util/wildcard/filter.go @@ -0,0 +1,39 @@ +package wildcard + +import "strings" + +// CompileWildcardMatchers parses comma-separated wildcard patterns and compiles them. +// Empty tokens are ignored. Invalid patterns are skipped. +func CompileWildcardMatchers(filter string) []*WildcardMatcher { + parts := strings.Split(filter, ",") + matchers := make([]*WildcardMatcher, 0, len(parts)) + for _, part := range parts { + trimmed := strings.TrimSpace(part) + if trimmed == "" { + continue + } + matcher, err := NewWildcardMatcher(trimmed) + if err != nil { + continue + } + matchers = append(matchers, matcher) + } + if len(matchers) == 0 { + return nil + } + return matchers +} + +// MatchesAnyWildcard returns true when no matcher is provided, +// or when any matcher matches the given value. +func MatchesAnyWildcard(matchers []*WildcardMatcher, value string) bool { + if len(matchers) == 0 { + return true + } + for _, matcher := range matchers { + if matcher != nil && matcher.Match(value) { + return true + } + } + return false +} diff --git a/weed/s3api/policy_engine/wildcard_matcher.go b/weed/util/wildcard/wildcard_matcher.go similarity index 56% rename from weed/s3api/policy_engine/wildcard_matcher.go rename to weed/util/wildcard/wildcard_matcher.go index 7ba01c52e..0d74ffe6c 100644 --- a/weed/s3api/policy_engine/wildcard_matcher.go +++ b/weed/util/wildcard/wildcard_matcher.go @@ -1,4 +1,4 @@ -package policy_engine +package wildcard import ( "regexp" @@ -8,27 +8,25 @@ import ( "github.com/seaweedfs/seaweedfs/weed/glog" ) -// WildcardMatcher provides unified wildcard matching functionality +// WildcardMatcher provides unified wildcard matching functionality. type WildcardMatcher struct { - // Use regex for complex patterns with ? wildcards - // Use string manipulation for simple * patterns (better performance) useRegex bool regex *regexp.Regexp pattern string } -// WildcardMatcherCache provides caching for WildcardMatcher instances +// WildcardMatcherCache provides caching for WildcardMatcher instances. type WildcardMatcherCache struct { mu sync.RWMutex matchers map[string]*WildcardMatcher maxSize int - accessOrder []string // For LRU eviction + accessOrder []string } -// NewWildcardMatcherCache creates a new WildcardMatcherCache with a configurable maxSize +// NewWildcardMatcherCache creates a new WildcardMatcherCache with a configurable maxSize. func NewWildcardMatcherCache(maxSize int) *WildcardMatcherCache { if maxSize <= 0 { - maxSize = 1000 // Default value + maxSize = 1000 } return &WildcardMatcherCache{ matchers: make(map[string]*WildcardMatcher), @@ -36,12 +34,10 @@ func NewWildcardMatcherCache(maxSize int) *WildcardMatcherCache { } } -// Global cache instance -var wildcardMatcherCache = NewWildcardMatcherCache(1000) // Default maxSize +var wildcardMatcherCache = NewWildcardMatcherCache(1000) -// GetCachedWildcardMatcher gets or creates a cached WildcardMatcher for the given pattern +// GetCachedWildcardMatcher gets or creates a cached WildcardMatcher for the given pattern. func GetCachedWildcardMatcher(pattern string) (*WildcardMatcher, error) { - // Fast path: check if already in cache wildcardMatcherCache.mu.RLock() if matcher, exists := wildcardMatcherCache.matchers[pattern]; exists { wildcardMatcherCache.mu.RUnlock() @@ -50,66 +46,53 @@ func GetCachedWildcardMatcher(pattern string) (*WildcardMatcher, error) { } wildcardMatcherCache.mu.RUnlock() - // Slow path: create new matcher and cache it wildcardMatcherCache.mu.Lock() defer wildcardMatcherCache.mu.Unlock() - // Double-check after acquiring write lock if matcher, exists := wildcardMatcherCache.matchers[pattern]; exists { wildcardMatcherCache.updateAccessOrderLocked(pattern) return matcher, nil } - // Create new matcher matcher, err := NewWildcardMatcher(pattern) if err != nil { return nil, err } - // Evict old entries if cache is full if len(wildcardMatcherCache.matchers) >= wildcardMatcherCache.maxSize { wildcardMatcherCache.evictLeastRecentlyUsed() } - // Cache it wildcardMatcherCache.matchers[pattern] = matcher wildcardMatcherCache.accessOrder = append(wildcardMatcherCache.accessOrder, pattern) return matcher, nil } -// updateAccessOrder updates the access order for LRU eviction (with read lock) func (c *WildcardMatcherCache) updateAccessOrder(pattern string) { c.mu.Lock() defer c.mu.Unlock() c.updateAccessOrderLocked(pattern) } -// updateAccessOrderLocked updates the access order for LRU eviction (without locking) func (c *WildcardMatcherCache) updateAccessOrderLocked(pattern string) { - // Remove pattern from its current position for i, p := range c.accessOrder { if p == pattern { c.accessOrder = append(c.accessOrder[:i], c.accessOrder[i+1:]...) break } } - // Add pattern to the end (most recently used) c.accessOrder = append(c.accessOrder, pattern) } -// evictLeastRecentlyUsed removes the least recently used pattern from the cache func (c *WildcardMatcherCache) evictLeastRecentlyUsed() { if len(c.accessOrder) == 0 { return } - - // Remove the least recently used pattern (first in the list) lruPattern := c.accessOrder[0] c.accessOrder = c.accessOrder[1:] delete(c.matchers, lruPattern) } -// ClearCache clears all cached patterns (useful for testing) func (c *WildcardMatcherCache) ClearCache() { c.mu.Lock() defer c.mu.Unlock() @@ -117,26 +100,19 @@ func (c *WildcardMatcherCache) ClearCache() { c.accessOrder = c.accessOrder[:0] } -// GetCacheStats returns cache statistics func (c *WildcardMatcherCache) GetCacheStats() (size int, maxSize int) { c.mu.RLock() defer c.mu.RUnlock() return len(c.matchers), c.maxSize } -// NewWildcardMatcher creates a new wildcard matcher for the given pattern -// The matcher uses an efficient string-based algorithm that handles both * and ? wildcards -// without requiring regex compilation. +// NewWildcardMatcher creates a new wildcard matcher for the given pattern. func NewWildcardMatcher(pattern string) (*WildcardMatcher, error) { - matcher := &WildcardMatcher{ - pattern: pattern, - useRegex: false, // String-based matching now handles both * and ? - } - + matcher := &WildcardMatcher{pattern: pattern, useRegex: false} return matcher, nil } -// Match checks if a string matches the wildcard pattern +// Match checks if a string matches the wildcard pattern. func (m *WildcardMatcher) Match(str string) bool { if m.useRegex { return m.regex.MatchString(str) @@ -144,32 +120,27 @@ func (m *WildcardMatcher) Match(str string) bool { return matchWildcardString(m.pattern, str) } -// MatchesWildcard provides a simple function interface for wildcard matching -// This function consolidates the logic from the previous separate implementations -// -// Rules: -// - '*' matches any sequence of characters (including empty string) -// - '?' matches exactly one character (any character) +// MatchesWildcard provides a simple function interface for wildcard matching. func MatchesWildcard(pattern, str string) bool { - // matchWildcardString now handles both * and ? efficiently without regex return matchWildcardString(pattern, str) } -// CompileWildcardPattern converts a wildcard pattern to a compiled regex -// This replaces the previous compilePattern function +// FastMatchesWildcard uses a cached WildcardMatcher for repeated pattern matching. +func FastMatchesWildcard(pattern, str string) bool { + matcher, err := GetCachedWildcardMatcher(pattern) + if err != nil { + glog.Errorf("Error getting cached WildcardMatcher for pattern %s: %v. Falling back to MatchesWildcard.", pattern, err) + return MatchesWildcard(pattern, str) + } + return matcher.Match(str) +} + +// CompileWildcardPattern converts a wildcard pattern to a compiled regex. func CompileWildcardPattern(pattern string) (*regexp.Regexp, error) { return compileWildcardPattern(pattern) } -// matchWildcardString uses efficient string manipulation for * and ? wildcards -// This implementation uses a backtracking algorithm that handles both wildcard types -// without requiring regex compilation. -// -// Rules: -// - '*' matches any sequence of characters (including empty string) -// - '?' matches exactly one character (any character) func matchWildcardString(pattern, str string) bool { - // Handle simple cases if pattern == "*" { return true } @@ -179,74 +150,37 @@ func matchWildcardString(pattern, str string) bool { targetIndex := 0 patternIndex := 0 - - // Index of the most recent '*' in the pattern (-1 if none) lastStarIndex := -1 - - // Index in target where the last '*' started matching lastStarMatchIndex := 0 for targetIndex < len(str) { switch { - // Case 1: Current characters match directly or '?' matches any single character - case patternIndex < len(pattern) && - (pattern[patternIndex] == '?' || pattern[patternIndex] == str[targetIndex]): - + case patternIndex < len(pattern) && (pattern[patternIndex] == '?' || pattern[patternIndex] == str[targetIndex]): targetIndex++ patternIndex++ - - // Case 2: Wildcard '*' found in pattern - case patternIndex < len(pattern) && - pattern[patternIndex] == '*': - + case patternIndex < len(pattern) && pattern[patternIndex] == '*': lastStarIndex = patternIndex lastStarMatchIndex = targetIndex patternIndex++ - - // Case 3: Previous '*' can absorb one more character case lastStarIndex != -1: - patternIndex = lastStarIndex + 1 lastStarMatchIndex++ targetIndex = lastStarMatchIndex - - // Case 4: No match possible default: return false } } - // Consume any trailing '*' in the pattern for patternIndex < len(pattern) && pattern[patternIndex] == '*' { patternIndex++ } - - // Match is valid only if the entire pattern is consumed return patternIndex == len(pattern) } -// matchWildcardRegex uses WildcardMatcher for patterns with ? wildcards -func matchWildcardRegex(pattern, str string) bool { - matcher, err := GetCachedWildcardMatcher(pattern) - if err != nil { - glog.Errorf("Error getting WildcardMatcher for pattern %s: %v. Falling back to matchWildcardString.", pattern, err) - // Fallback to matchWildcardString - return matchWildcardString(pattern, str) - } - return matcher.Match(str) -} - -// compileWildcardPattern converts a wildcard pattern to regex func compileWildcardPattern(pattern string) (*regexp.Regexp, error) { - // Escape special regex characters except * and ? escaped := regexp.QuoteMeta(pattern) - - // Replace escaped wildcards with regex equivalents escaped = strings.ReplaceAll(escaped, `\*`, `.*`) escaped = strings.ReplaceAll(escaped, `\?`, `.`) - - // Anchor the pattern escaped = "^" + escaped + "$" - return regexp.Compile(escaped) } diff --git a/weed/util/wildcard/wildcard_matcher_test.go b/weed/util/wildcard/wildcard_matcher_test.go new file mode 100644 index 000000000..26f5f43a8 --- /dev/null +++ b/weed/util/wildcard/wildcard_matcher_test.go @@ -0,0 +1,211 @@ +package wildcard + +import "testing" + +func TestMatchesWildcard(t *testing.T) { + tests := []struct { + name string + pattern string + str string + expected bool + }{ + {"Exact match", "test", "test", true}, + {"Single wildcard", "*", "anything", true}, + {"Empty string with wildcard", "*", "", true}, + {"Prefix wildcard", "test*", "test123", true}, + {"Suffix wildcard", "*test", "123test", true}, + {"Middle wildcard", "test*123", "testABC123", true}, + {"Multiple wildcards", "test*abc*123", "testXYZabcDEF123", true}, + {"No match", "test*", "other", false}, + {"Single question mark", "test?", "test1", true}, + {"Multiple question marks", "test??", "test12", true}, + {"Question mark no match", "test?", "test12", false}, + {"Mixed wildcards", "test*abc?def", "testXYZabc1def", true}, + {"Empty pattern", "", "", true}, + {"Empty pattern with string", "", "test", false}, + {"Pattern with string empty", "test", "", false}, + {"Pattern with regex special chars", "test[abc]", "test[abc]", true}, + {"Pattern with dots", "test.txt", "test.txt", true}, + {"Pattern with dots and wildcard", "*.txt", "test.txt", true}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := MatchesWildcard(tt.pattern, tt.str) + if got != tt.expected { + t.Errorf("MatchesWildcard(%q, %q) = %v, want %v", tt.pattern, tt.str, got, tt.expected) + } + }) + } +} + +func TestWildcardMatcherMatch(t *testing.T) { + tests := []struct { + name string + pattern string + inputs []string + expected []bool + }{ + {"Simple star", "test*", []string{"test", "test123", "testing", "other"}, []bool{true, true, true, false}}, + {"Question mark", "test?", []string{"test1", "test2", "test", "test12"}, []bool{true, true, false, false}}, + {"Extension filter", "*.txt", []string{"file.txt", "test.txt", "file.doc", "txt"}, []bool{true, true, false, false}}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + m, err := NewWildcardMatcher(tt.pattern) + if err != nil { + t.Fatalf("NewWildcardMatcher: %v", err) + } + for i, s := range tt.inputs { + got := m.Match(s) + if got != tt.expected[i] { + t.Errorf("Match(%q) = %v, want %v", s, got, tt.expected[i]) + } + } + }) + } +} + +func TestCompileWildcardPattern(t *testing.T) { + tests := []struct { + pattern string + input string + want bool + }{ + {"s3:Get*", "s3:GetObject", true}, + {"s3:Get?bject", "s3:GetObject", true}, + {"s3:*Object*", "s3:GetObjectAcl", true}, + } + for _, tt := range tests { + t.Run(tt.pattern, func(t *testing.T) { + re, err := CompileWildcardPattern(tt.pattern) + if err != nil { + t.Fatalf("CompileWildcardPattern: %v", err) + } + if got := re.MatchString(tt.input); got != tt.want { + t.Errorf("got %v, want %v", got, tt.want) + } + }) + } +} + +func TestFastMatchesWildcard(t *testing.T) { + tests := []struct { + pattern string + input string + want bool + }{ + {"s3:Get*", "s3:GetObject", true}, + {"s3:Put*", "s3:GetObject", false}, + {"arn:aws:s3:::bucket/*", "arn:aws:s3:::bucket/file.txt", true}, + {"user:admin-*", "user:admin-john", true}, + {"user:admin-*", "user:guest-john", false}, + } + for _, tt := range tests { + t.Run(tt.pattern+"_"+tt.input, func(t *testing.T) { + got := FastMatchesWildcard(tt.pattern, tt.input) + if got != tt.want { + t.Errorf("FastMatchesWildcard(%q, %q) = %v, want %v", tt.pattern, tt.input, got, tt.want) + } + }) + } +} + +func TestWildcardMatcherCaching(t *testing.T) { + m1, err := GetCachedWildcardMatcher("s3:Get*") + if err != nil { + t.Fatal(err) + } + m2, err := GetCachedWildcardMatcher("s3:Get*") + if err != nil { + t.Fatal(err) + } + if m1 != m2 { + t.Error("expected same cached instance") + } + if !m1.Match("s3:GetObject") { + t.Error("expected match") + } +} + +func TestWildcardMatcherCacheBounding(t *testing.T) { + wildcardMatcherCache.ClearCache() + orig := wildcardMatcherCache.maxSize + wildcardMatcherCache.maxSize = 3 + defer func() { + wildcardMatcherCache.maxSize = orig + wildcardMatcherCache.ClearCache() + }() + for _, p := range []string{"p1", "p2", "p3"} { + GetCachedWildcardMatcher(p) + } + size, maxSize := wildcardMatcherCache.GetCacheStats() + if size != 3 { + t.Errorf("expected size 3, got %d", size) + } + if maxSize != 3 { + t.Errorf("expected maxSize 3, got %d", maxSize) + } + GetCachedWildcardMatcher("p4") + size, _ = wildcardMatcherCache.GetCacheStats() + if size != 3 { + t.Errorf("expected size 3 after eviction, got %d", size) + } + wildcardMatcherCache.mu.RLock() + defer wildcardMatcherCache.mu.RUnlock() + if _, ok := wildcardMatcherCache.matchers["p1"]; ok { + t.Error("p1 should have been evicted (LRU)") + } + if _, ok := wildcardMatcherCache.matchers["p4"]; !ok { + t.Error("p4 should be in cache") + } +} + +func TestWildcardMatcherCacheLRU(t *testing.T) { + wildcardMatcherCache.ClearCache() + orig := wildcardMatcherCache.maxSize + wildcardMatcherCache.maxSize = 3 + defer func() { + wildcardMatcherCache.maxSize = orig + wildcardMatcherCache.ClearCache() + }() + for _, p := range []string{"p1", "p2", "p3"} { + GetCachedWildcardMatcher(p) + } + GetCachedWildcardMatcher("p1") // access p1 to make it most-recently used + GetCachedWildcardMatcher("p4") // should evict p2 (now LRU) + wildcardMatcherCache.mu.RLock() + defer wildcardMatcherCache.mu.RUnlock() + if _, ok := wildcardMatcherCache.matchers["p2"]; ok { + t.Error("p2 should be evicted (least recently used)") + } + if _, ok := wildcardMatcherCache.matchers["p1"]; !ok { + t.Error("p1 should remain (recently accessed)") + } + if _, ok := wildcardMatcherCache.matchers["p3"]; !ok { + t.Error("p3 should remain") + } + if _, ok := wildcardMatcherCache.matchers["p4"]; !ok { + t.Error("p4 should be in cache") + } +} + +func TestWildcardMatcherCacheClear(t *testing.T) { + GetCachedWildcardMatcher("test") + wildcardMatcherCache.ClearCache() + size, _ := wildcardMatcherCache.GetCacheStats() + if size != 0 { + t.Errorf("expected 0 after clear, got %d", size) + } +} + +func BenchmarkMatchesWildcard(b *testing.B) { + for i := 0; i < b.N; i++ { + MatchesWildcard("s3:Get*", "s3:GetObject") + } +} + +func BenchmarkFastMatchesWildcard(b *testing.B) { + for i := 0; i < b.N; i++ { + FastMatchesWildcard("s3:Get*", "s3:GetObject") + } +} diff --git a/weed/worker/tasks/erasure_coding/detection.go b/weed/worker/tasks/erasure_coding/detection.go index a0e9784fe..382deccca 100644 --- a/weed/worker/tasks/erasure_coding/detection.go +++ b/weed/worker/tasks/erasure_coding/detection.go @@ -4,7 +4,6 @@ import ( "context" "fmt" "sort" - "strings" "time" "github.com/seaweedfs/seaweedfs/weed/admin/topology" @@ -12,6 +11,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb" "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding" "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding/placement" + "github.com/seaweedfs/seaweedfs/weed/util/wildcard" "github.com/seaweedfs/seaweedfs/weed/worker/tasks/base" "github.com/seaweedfs/seaweedfs/weed/worker/tasks/util" "github.com/seaweedfs/seaweedfs/weed/worker/types" @@ -51,7 +51,7 @@ func Detection(ctx context.Context, metrics []*types.VolumeHealthMetrics, cluste var planner *ecPlacementPlanner - allowedCollections := ParseCollectionFilter(ecConfig.CollectionFilter) + allowedCollections := wildcard.CompileWildcardMatchers(ecConfig.CollectionFilter) // Group metrics by VolumeID to handle replicas and select canonical server volumeGroups := make(map[uint32][]*types.VolumeHealthMetrics) @@ -108,12 +108,9 @@ func Detection(ctx context.Context, metrics []*types.VolumeHealthMetrics, cluste } // Check collection filter if specified - if len(allowedCollections) > 0 { - // Skip if volume's collection is not in the allowed list - if !allowedCollections[metric.Collection] { - skippedCollectionFilter++ - continue - } + if len(allowedCollections) > 0 && !wildcard.MatchesAnyWildcard(allowedCollections, metric.Collection) { + skippedCollectionFilter++ + continue } // Check quiet duration and fullness criteria @@ -336,20 +333,6 @@ func Detection(ctx context.Context, metrics []*types.VolumeHealthMetrics, cluste return results, hasMore, nil } -func ParseCollectionFilter(filter string) map[string]bool { - allowed := make(map[string]bool) - for _, collection := range strings.Split(filter, ",") { - trimmed := strings.TrimSpace(collection) - if trimmed != "" { - allowed[trimmed] = true - } - } - if len(allowed) == 0 { - return nil - } - return allowed -} - type ecDiskState struct { baseAvailable int64 reservedVolumes int32 diff --git a/weed/worker/tasks/erasure_coding/ec_task.go b/weed/worker/tasks/erasure_coding/ec_task.go index 3613930df..0000e22ca 100644 --- a/weed/worker/tasks/erasure_coding/ec_task.go +++ b/weed/worker/tasks/erasure_coding/ec_task.go @@ -106,9 +106,10 @@ func (t *ErasureCodingTask) Execute(ctx context.Context, params *worker_pb.TaskP } // Use the working directory from task parameters, or fall back to a default - baseWorkDir := t.workDir - - // Create unique working directory for this task + baseWorkDir := ecParams.WorkingDir + if baseWorkDir == "" { + baseWorkDir = t.GetWorkingDir() + } taskWorkDir := filepath.Join(baseWorkDir, fmt.Sprintf("vol_%d_%d", t.volumeID, time.Now().Unix())) if err := os.MkdirAll(taskWorkDir, 0755); err != nil { return fmt.Errorf("failed to create task working directory %s: %v", taskWorkDir, err) @@ -119,9 +120,9 @@ func (t *ErasureCodingTask) Execute(ctx context.Context, params *worker_pb.TaskP t.workDir = taskWorkDir glog.V(1).Infof("Task working directory configured: %s (logs will be written here)", taskWorkDir) - // Ensure cleanup of working directory (but preserve logs) + // Ensure cleanup of working directory defer func() { - // Clean up volume files and EC shards, but preserve the directory structure and any logs + // Clean up volume files and EC shards patterns := []string{"*.dat", "*.idx", "*.ec*", "*.vif"} for _, pattern := range patterns { matches, err := filepath.Glob(filepath.Join(taskWorkDir, pattern)) @@ -134,7 +135,12 @@ func (t *ErasureCodingTask) Execute(ctx context.Context, params *worker_pb.TaskP } } } - glog.V(1).Infof("Cleaned up volume files from working directory: %s (logs preserved)", taskWorkDir) + // Remove the entire working directory + if err := os.RemoveAll(taskWorkDir); err != nil { + glog.V(2).Infof("Could not remove working directory %s: %v", taskWorkDir, err) + } else { + glog.V(1).Infof("Cleaned up working directory: %s", taskWorkDir) + } }() // Step 1: Mark volume readonly diff --git a/weed/worker/tasks/vacuum/detection.go b/weed/worker/tasks/vacuum/detection.go index cd0e44e69..88901ee41 100644 --- a/weed/worker/tasks/vacuum/detection.go +++ b/weed/worker/tasks/vacuum/detection.go @@ -98,10 +98,10 @@ func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterI // This function is moved from MaintenanceIntegration.createVacuumTaskParams to the detection logic func createVacuumTaskParams(task *types.TaskDetectionResult, metric *types.VolumeHealthMetrics, vacuumConfig *Config, clusterInfo *types.ClusterInfo) *worker_pb.TaskParams { // Use configured values or defaults - garbageThreshold := 0.3 // Default 30% - verifyChecksum := true // Default to verify - batchSize := int32(1000) // Default batch size - workingDir := "/tmp/seaweedfs_vacuum_work" // Default working directory + garbageThreshold := 0.3 // Default 30% + verifyChecksum := true // Default to verify + batchSize := int32(1000) // Default batch size + workingDir := "" // Use worker-provided default if empty if vacuumConfig != nil { garbageThreshold = vacuumConfig.GarbageThreshold diff --git a/weed/worker/types/base/task.go b/weed/worker/types/base/task.go index 243df5630..79ca614de 100644 --- a/weed/worker/types/base/task.go +++ b/weed/worker/types/base/task.go @@ -16,6 +16,7 @@ type BaseTask struct { logger types.Logger cancelled bool currentStage string + workingDir string } // NewBaseTask creates a new base task @@ -116,3 +117,13 @@ func (t *BaseTask) EstimateTime(params *worker_pb.TaskParams) time.Duration { // Subclasses must implement this return 0 } + +// SetWorkingDir sets the task working directory +func (t *BaseTask) SetWorkingDir(workingDir string) { + t.workingDir = workingDir +} + +// GetWorkingDir returns the task working directory +func (t *BaseTask) GetWorkingDir() string { + return t.workingDir +} diff --git a/weed/worker/types/task.go b/weed/worker/types/task.go index 9106a63e3..7e924453c 100644 --- a/weed/worker/types/task.go +++ b/weed/worker/types/task.go @@ -29,6 +29,10 @@ type Task interface { // Progress GetProgress() float64 SetProgressCallback(func(float64, string)) + + // Working Directory + SetWorkingDir(string) + GetWorkingDir() string } // TaskWithLogging extends Task with logging capabilities @@ -131,6 +135,7 @@ type UnifiedBaseTask struct { logger Logger cancelled bool currentStage string + workingDir string } // NewBaseTask creates a new base task @@ -206,3 +211,13 @@ func (t *UnifiedBaseTask) SetLogger(logger Logger) { func (t *UnifiedBaseTask) GetLogger() Logger { return t.logger } + +// SetWorkingDir sets the task working directory +func (t *UnifiedBaseTask) SetWorkingDir(workingDir string) { + t.workingDir = workingDir +} + +// GetWorkingDir returns the task working directory +func (t *UnifiedBaseTask) GetWorkingDir() string { + return t.workingDir +} diff --git a/weed/worker/worker.go b/weed/worker/worker.go index 035fd659d..0277c78a0 100644 --- a/weed/worker/worker.go +++ b/weed/worker/worker.go @@ -172,6 +172,13 @@ func NewWorker(config *types.WorkerConfig) (*Worker, error) { // Use the global unified registry that already has all tasks registered registry := tasks.GetGlobalTaskRegistry() + // Ensure the base working directory exists + if config.BaseWorkingDir != "" { + if err := os.MkdirAll(config.BaseWorkingDir, 0755); err != nil { + return nil, fmt.Errorf("failed to create base working directory %s: %v", config.BaseWorkingDir, err) + } + } + // Initialize task log handler logDir := filepath.Join(config.BaseWorkingDir, "task_logs") // Ensure the base task log directory exists to avoid errors when admin requests logs @@ -677,6 +684,7 @@ func (w *Worker) executeTask(task *types.TaskInput) { // Task execution uses the new unified Task interface glog.V(2).Infof("Executing task %s in working directory: %s", task.ID, taskWorkingDir) + taskInstance.SetWorkingDir(taskWorkingDir) // If we have a file logger, adapt it so task WithFields logs are captured into file if fileLogger != nil {