diff --git a/weed/plugin/worker/iceberg/detection.go b/weed/plugin/worker/iceberg/detection.go index 4523af10c..49b6552c2 100644 --- a/weed/plugin/worker/iceberg/detection.go +++ b/weed/plugin/worker/iceberg/detection.go @@ -403,13 +403,13 @@ func needsMaintenance(meta table.Metadata, config Config) bool { } // buildMaintenanceProposal creates a JobProposal for a table needing maintenance. -func (h *Handler) buildMaintenanceProposal(t tableInfo, filerAddress string) *plugin_pb.JobProposal { +func (h *Handler) buildMaintenanceProposal(t tableInfo, filerAddress, resourceGroup string) *plugin_pb.JobProposal { dedupeKey := fmt.Sprintf("iceberg_maintenance:%s/%s/%s", t.BucketName, t.Namespace, t.TableName) snapshotCount := len(t.Metadata.Snapshots()) summary := fmt.Sprintf("Maintain %s/%s/%s (%d snapshots)", t.BucketName, t.Namespace, t.TableName, snapshotCount) - return &plugin_pb.JobProposal{ + proposal := &plugin_pb.JobProposal{ ProposalId: fmt.Sprintf("iceberg-%s-%s-%s-%d", t.BucketName, t.Namespace, t.TableName, time.Now().UnixMilli()), DedupeKey: dedupeKey, JobType: jobType, @@ -428,4 +428,9 @@ func (h *Handler) buildMaintenanceProposal(t tableInfo, filerAddress string) *pl "table": t.TableName, }, } + if resourceGroup != "" { + proposal.Parameters["resource_group"] = &plugin_pb.ConfigValue{Kind: &plugin_pb.ConfigValue_StringValue{StringValue: resourceGroup}} + proposal.Labels["resource_group"] = resourceGroup + } + return proposal } diff --git a/weed/plugin/worker/iceberg/handler.go b/weed/plugin/worker/iceberg/handler.go index 5ef03c185..bf03ab311 100644 --- a/weed/plugin/worker/iceberg/handler.go +++ b/weed/plugin/worker/iceberg/handler.go @@ -96,11 +96,36 @@ func (h *Handler) Descriptor() *plugin_pb.JobTypeDescriptor { }, }, }, + { + SectionId: "resources", + Title: "Resource Groups", + Description: "Controls for fair proposal distribution across buckets or namespaces.", + Fields: []*plugin_pb.ConfigField{ + { + Name: "resource_group_by", + Label: "Group Proposals By", + Description: "When set, detection emits proposals in round-robin order across the selected resource group.", + Placeholder: "none, bucket, namespace, or bucket_namespace", + FieldType: plugin_pb.ConfigFieldType_CONFIG_FIELD_TYPE_STRING, + Widget: plugin_pb.ConfigWidget_CONFIG_WIDGET_TEXT, + }, + { + Name: "max_tables_per_resource_group", + Label: "Max Tables Per Group", + Description: "Optional cap on how many proposals a single resource group can receive in one detection run. Zero disables the cap.", + FieldType: plugin_pb.ConfigFieldType_CONFIG_FIELD_TYPE_INT64, + Widget: plugin_pb.ConfigWidget_CONFIG_WIDGET_NUMBER, + MinValue: &plugin_pb.ConfigValue{Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 0}}, + }, + }, + }, }, DefaultValues: map[string]*plugin_pb.ConfigValue{ - "bucket_filter": {Kind: &plugin_pb.ConfigValue_StringValue{StringValue: ""}}, - "namespace_filter": {Kind: &plugin_pb.ConfigValue_StringValue{StringValue: ""}}, - "table_filter": {Kind: &plugin_pb.ConfigValue_StringValue{StringValue: ""}}, + "bucket_filter": {Kind: &plugin_pb.ConfigValue_StringValue{StringValue: ""}}, + "namespace_filter": {Kind: &plugin_pb.ConfigValue_StringValue{StringValue: ""}}, + "table_filter": {Kind: &plugin_pb.ConfigValue_StringValue{StringValue: ""}}, + "resource_group_by": {Kind: &plugin_pb.ConfigValue_StringValue{StringValue: resourceGroupNone}}, + "max_tables_per_resource_group": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 0}}, }, }, WorkerConfigForm: &plugin_pb.ConfigForm{ @@ -283,6 +308,10 @@ func (h *Handler) Detect(ctx context.Context, request *plugin_pb.RunDetectionReq bucketFilter := strings.TrimSpace(readStringConfig(request.GetAdminConfigValues(), "bucket_filter", "")) namespaceFilter := strings.TrimSpace(readStringConfig(request.GetAdminConfigValues(), "namespace_filter", "")) tableFilter := strings.TrimSpace(readStringConfig(request.GetAdminConfigValues(), "table_filter", "")) + resourceGroups, err := readResourceGroupConfig(request.GetAdminConfigValues()) + if err != nil { + return fmt.Errorf("invalid admin resource group config: %w", err) + } // Connect to filer — try each address until one succeeds. filerAddress, conn, err := h.connectToFiler(ctx, filerAddresses) @@ -293,7 +322,11 @@ func (h *Handler) Detect(ctx context.Context, request *plugin_pb.RunDetectionReq filerClient := filer_pb.NewSeaweedFilerClient(conn) maxResults := int(request.MaxResults) - tables, err := h.scanTablesForMaintenance(ctx, filerClient, workerConfig, bucketFilter, namespaceFilter, tableFilter, maxResults) + scanLimit := maxResults + if resourceGroups.enabled() { + scanLimit = 0 + } + tables, err := h.scanTablesForMaintenance(ctx, filerClient, workerConfig, bucketFilter, namespaceFilter, tableFilter, scanLimit) if err != nil { _ = sender.SendActivity(pluginworker.BuildDetectorActivity("scan_error", fmt.Sprintf("error scanning tables: %v", err), nil)) return fmt.Errorf("scan tables: %w", err) @@ -305,15 +338,11 @@ func (h *Handler) Detect(ctx context.Context, request *plugin_pb.RunDetectionReq "tables_found": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(len(tables))}}, })) - hasMore := false - if maxResults > 0 && len(tables) > maxResults { - hasMore = true - tables = tables[:maxResults] - } + tables, hasMore := selectTablesByResourceGroup(tables, resourceGroups, maxResults) proposals := make([]*plugin_pb.JobProposal, 0, len(tables)) for _, t := range tables { - proposal := h.buildMaintenanceProposal(t, filerAddress) + proposal := h.buildMaintenanceProposal(t, filerAddress, resourceGroupKey(t, resourceGroups.GroupBy)) proposals = append(proposals, proposal) } diff --git a/weed/plugin/worker/iceberg/handler_test.go b/weed/plugin/worker/iceberg/handler_test.go index 42a9c8b4c..1ee73efb3 100644 --- a/weed/plugin/worker/iceberg/handler_test.go +++ b/weed/plugin/worker/iceberg/handler_test.go @@ -219,7 +219,7 @@ func TestBuildMaintenanceProposal(t *testing.T) { Metadata: meta, } - proposal := handler.buildMaintenanceProposal(info, "localhost:8888") + proposal := handler.buildMaintenanceProposal(info, "localhost:8888", "my-bucket") expectedDedupe := "iceberg_maintenance:my-bucket/analytics/events" if proposal.DedupeKey != expectedDedupe { @@ -241,6 +241,93 @@ func TestBuildMaintenanceProposal(t *testing.T) { if readStringConfig(proposal.Parameters, "filer_address", "") != "localhost:8888" { t.Error("expected filer_address=localhost:8888 in parameters") } + if readStringConfig(proposal.Parameters, "resource_group", "") != "my-bucket" { + t.Error("expected resource_group=my-bucket in parameters") + } + if proposal.Labels["resource_group"] != "my-bucket" { + t.Error("expected resource_group label to be set") + } +} + +func TestReadResourceGroupConfig(t *testing.T) { + cfg, err := readResourceGroupConfig(nil) + if err != nil { + t.Fatalf("readResourceGroupConfig(nil): %v", err) + } + if cfg.GroupBy != resourceGroupNone { + t.Fatalf("expected default groupBy=%q, got %q", resourceGroupNone, cfg.GroupBy) + } + + cfg, err = readResourceGroupConfig(map[string]*plugin_pb.ConfigValue{ + "resource_group_by": {Kind: &plugin_pb.ConfigValue_StringValue{StringValue: "bucket_namespace"}}, + "max_tables_per_resource_group": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 2}}, + }) + if err != nil { + t.Fatalf("readResourceGroupConfig(valid): %v", err) + } + if cfg.GroupBy != resourceGroupBucketNamespace { + t.Fatalf("expected bucket_namespace grouping, got %q", cfg.GroupBy) + } + if cfg.MaxTablesPerGroup != 2 { + t.Fatalf("expected max tables per group=2, got %d", cfg.MaxTablesPerGroup) + } + + if _, err := readResourceGroupConfig(map[string]*plugin_pb.ConfigValue{ + "resource_group_by": {Kind: &plugin_pb.ConfigValue_StringValue{StringValue: "invalid"}}, + }); err == nil { + t.Fatal("expected invalid resource_group_by to fail") + } + + if _, err := readResourceGroupConfig(map[string]*plugin_pb.ConfigValue{ + "max_tables_per_resource_group": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 1}}, + }); err == nil { + t.Fatal("expected group cap without grouping to fail") + } +} + +func TestSelectTablesByResourceGroupRoundRobin(t *testing.T) { + tables := []tableInfo{ + {BucketName: "a", Namespace: "ns1", TableName: "t1"}, + {BucketName: "a", Namespace: "ns1", TableName: "t2"}, + {BucketName: "b", Namespace: "ns2", TableName: "t3"}, + {BucketName: "b", Namespace: "ns2", TableName: "t4"}, + } + + selected, hasMore := selectTablesByResourceGroup(tables, resourceGroupConfig{ + GroupBy: resourceGroupBucket, + }, 3) + if !hasMore { + t.Fatal("expected hasMore when maxResults truncates the selection") + } + if len(selected) != 3 { + t.Fatalf("expected 3 selected tables, got %d", len(selected)) + } + if selected[0].BucketName != "a" || selected[1].BucketName != "b" || selected[2].BucketName != "a" { + t.Fatalf("expected round-robin bucket order [a, b, a], got [%s, %s, %s]", selected[0].BucketName, selected[1].BucketName, selected[2].BucketName) + } +} + +func TestSelectTablesByResourceGroupCap(t *testing.T) { + tables := []tableInfo{ + {BucketName: "a", Namespace: "ns1", TableName: "t1"}, + {BucketName: "a", Namespace: "ns1", TableName: "t2"}, + {BucketName: "b", Namespace: "ns2", TableName: "t3"}, + {BucketName: "b", Namespace: "ns2", TableName: "t4"}, + } + + selected, hasMore := selectTablesByResourceGroup(tables, resourceGroupConfig{ + GroupBy: resourceGroupBucket, + MaxTablesPerGroup: 1, + }, 0) + if !hasMore { + t.Fatal("expected hasMore when per-group cap omits tables") + } + if len(selected) != 2 { + t.Fatalf("expected 2 selected tables, got %d", len(selected)) + } + if selected[0].BucketName != "a" || selected[1].BucketName != "b" { + t.Fatalf("expected one table per bucket, got [%s, %s]", selected[0].BucketName, selected[1].BucketName) + } } func TestManifestRewritePathConsistency(t *testing.T) { diff --git a/weed/plugin/worker/iceberg/resource_groups.go b/weed/plugin/worker/iceberg/resource_groups.go new file mode 100644 index 000000000..a8476b1a1 --- /dev/null +++ b/weed/plugin/worker/iceberg/resource_groups.go @@ -0,0 +1,111 @@ +package iceberg + +import ( + "fmt" + "strings" + + "github.com/seaweedfs/seaweedfs/weed/pb/plugin_pb" +) + +const ( + resourceGroupNone = "none" + resourceGroupBucket = "bucket" + resourceGroupNamespace = "namespace" + resourceGroupBucketNamespace = "bucket_namespace" +) + +type resourceGroupConfig struct { + GroupBy string + MaxTablesPerGroup int64 +} + +func readResourceGroupConfig(values map[string]*plugin_pb.ConfigValue) (resourceGroupConfig, error) { + groupBy := strings.TrimSpace(strings.ToLower(readStringConfig(values, "resource_group_by", ""))) + if groupBy == "" { + groupBy = resourceGroupNone + } + + switch groupBy { + case resourceGroupNone, resourceGroupBucket, resourceGroupNamespace, resourceGroupBucketNamespace: + default: + return resourceGroupConfig{}, fmt.Errorf("invalid resource_group_by %q (valid: none, bucket, namespace, bucket_namespace)", groupBy) + } + + maxTablesPerGroup := readInt64Config(values, "max_tables_per_resource_group", 0) + if maxTablesPerGroup < 0 { + return resourceGroupConfig{}, fmt.Errorf("max_tables_per_resource_group must be >= 0, got %d", maxTablesPerGroup) + } + if groupBy == resourceGroupNone && maxTablesPerGroup > 0 { + return resourceGroupConfig{}, fmt.Errorf("max_tables_per_resource_group requires resource_group_by to be set") + } + + return resourceGroupConfig{ + GroupBy: groupBy, + MaxTablesPerGroup: maxTablesPerGroup, + }, nil +} + +func (c resourceGroupConfig) enabled() bool { + return c.GroupBy != "" && c.GroupBy != resourceGroupNone +} + +func resourceGroupKey(info tableInfo, groupBy string) string { + switch groupBy { + case resourceGroupBucket: + return info.BucketName + case resourceGroupNamespace: + return info.Namespace + case resourceGroupBucketNamespace: + return info.BucketName + "/" + info.Namespace + default: + return "" + } +} + +func selectTablesByResourceGroup(tables []tableInfo, cfg resourceGroupConfig, maxResults int) ([]tableInfo, bool) { + if !cfg.enabled() { + if maxResults > 0 && len(tables) > maxResults { + return tables[:maxResults], true + } + return tables, false + } + + grouped := make(map[string][]tableInfo) + groupOrder := make([]string, 0) + for _, table := range tables { + key := resourceGroupKey(table, cfg.GroupBy) + if _, ok := grouped[key]; !ok { + groupOrder = append(groupOrder, key) + } + grouped[key] = append(grouped[key], table) + } + + selected := make([]tableInfo, 0, len(tables)) + selectedPerGroup := make(map[string]int64) + for { + progress := false + for _, key := range groupOrder { + if maxResults > 0 && len(selected) >= maxResults { + return selected, len(selected) < len(tables) + } + if cfg.MaxTablesPerGroup > 0 && selectedPerGroup[key] >= cfg.MaxTablesPerGroup { + continue + } + + queue := grouped[key] + if len(queue) == 0 { + continue + } + + selected = append(selected, queue[0]) + grouped[key] = queue[1:] + selectedPerGroup[key]++ + progress = true + } + if !progress { + break + } + } + + return selected, len(selected) < len(tables) +}