From 8cb610e18daf3940dd47e464926f8c6b69dea15a Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Tue, 10 Feb 2026 10:02:16 -0800 Subject: [PATCH] s3tables: prevent shared table-location bucket mapping overwrite --- weed/s3api/bucket_metadata.go | 2 +- weed/s3api/bucket_paths.go | 82 +++++++++++++++++- weed/s3api/bucket_paths_test.go | 45 ++++++++++ weed/s3api/s3tables/handler_table.go | 83 +++++++++++++++++-- .../s3tables/table_location_mapping_test.go | 77 +++++++++++++++++ weed/s3api/s3tables/utils.go | 28 +++++++ 6 files changed, 307 insertions(+), 10 deletions(-) create mode 100644 weed/s3api/bucket_paths_test.go create mode 100644 weed/s3api/s3tables/table_location_mapping_test.go diff --git a/weed/s3api/bucket_metadata.go b/weed/s3api/bucket_metadata.go index 07db3dc1e..80e764ca0 100644 --- a/weed/s3api/bucket_metadata.go +++ b/weed/s3api/bucket_metadata.go @@ -49,7 +49,7 @@ type BucketRegistry struct { metadataCache map[string]*BucketMetaData metadataCacheLock sync.RWMutex - tableLocationCache map[string]string // Cache for table location mappings (bucket -> table path) + tableLocationCache map[string]string // Cache for table location mappings (external bucket -> mapped root path) tableLocationLock sync.RWMutex notFound map[string]struct{} diff --git a/weed/s3api/bucket_paths.go b/weed/s3api/bucket_paths.go index ccd49e539..79f8b92e5 100644 --- a/weed/s3api/bucket_paths.go +++ b/weed/s3api/bucket_paths.go @@ -1,7 +1,9 @@ package s3api import ( + "context" "errors" + "io" "path" "strings" @@ -58,8 +60,13 @@ func (s3a *S3ApiServer) tableLocationDir(bucket string) (string, bool) { entry, err := s3a.getEntry(s3tables.GetTableLocationMappingDir(), bucket) tablePath := "" - if err == nil && entry != nil && len(entry.Content) > 0 { - tablePath = strings.TrimSpace(string(entry.Content)) + if err == nil && entry != nil { + if entry.IsDirectory { + tablePath, err = s3a.readTableLocationMappingFromDirectory(bucket) + } else if len(entry.Content) > 0 { + // Backward compatibility with legacy single-file mappings. + tablePath = normalizeTableLocationMappingPath(string(entry.Content)) + } } // Only cache definitive results: successful lookup (tablePath set) or definitive not-found (ErrNotFound) @@ -82,6 +89,77 @@ func (s3a *S3ApiServer) tableLocationDir(bucket string) (string, bool) { return tablePath, true } +func (s3a *S3ApiServer) readTableLocationMappingFromDirectory(bucket string) (string, error) { + mappingDir := s3tables.GetTableLocationMappingPath(bucket) + var mappedPath string + conflict := false + + err := s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + stream, err := client.ListEntries(context.Background(), &filer_pb.ListEntriesRequest{ + Directory: mappingDir, + Limit: 1024, + }) + if err != nil { + return err + } + + for { + resp, recvErr := stream.Recv() + if recvErr == io.EOF { + return nil + } + if recvErr != nil { + return recvErr + } + if resp == nil || resp.Entry == nil || resp.Entry.IsDirectory || len(resp.Entry.Content) == 0 { + continue + } + + candidate := normalizeTableLocationMappingPath(string(resp.Entry.Content)) + if candidate == "" { + continue + } + if mappedPath == "" { + mappedPath = candidate + continue + } + if mappedPath != candidate { + conflict = true + return nil + } + } + }) + if err != nil { + return "", err + } + + if conflict { + glog.V(1).Infof("table location mapping conflict for %s: multiple mapped roots found", bucket) + return "", nil + } + return mappedPath, nil +} + +func normalizeTableLocationMappingPath(rawPath string) string { + rawPath = strings.TrimSpace(rawPath) + if rawPath == "" { + return "" + } + + normalized := path.Clean("/" + strings.TrimPrefix(rawPath, "/")) + tablesPrefix := strings.TrimSuffix(s3tables.TablesPath, "/") + "/" + if !strings.HasPrefix(normalized, tablesPrefix) { + return normalized + } + + remaining := strings.TrimPrefix(normalized, tablesPrefix) + bucketName, _, _ := strings.Cut(remaining, "/") + if bucketName == "" { + return "" + } + return path.Join(s3tables.TablesPath, bucketName) +} + func (s3a *S3ApiServer) bucketRoot(bucket string) string { // Returns the unified buckets root path for all bucket types return s3a.option.BucketsPath diff --git a/weed/s3api/bucket_paths_test.go b/weed/s3api/bucket_paths_test.go new file mode 100644 index 000000000..cfee593a9 --- /dev/null +++ b/weed/s3api/bucket_paths_test.go @@ -0,0 +1,45 @@ +package s3api + +import "testing" + +func TestNormalizeTableLocationMappingPath(t *testing.T) { + testCases := []struct { + name string + raw string + want string + }{ + { + name: "legacy table path maps to bucket root", + raw: "/buckets/warehouse/analytics/orders", + want: "/buckets/warehouse", + }, + { + name: "already bucket root", + raw: "/buckets/warehouse", + want: "/buckets/warehouse", + }, + { + name: "relative buckets path normalized and reduced", + raw: "buckets/warehouse/analytics/orders", + want: "/buckets/warehouse", + }, + { + name: "non buckets path preserved", + raw: "/tmp/custom/path", + want: "/tmp/custom/path", + }, + { + name: "empty path", + raw: "", + want: "", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + if got := normalizeTableLocationMappingPath(tc.raw); got != tc.want { + t.Fatalf("normalizeTableLocationMappingPath(%q)=%q, want %q", tc.raw, got, tc.want) + } + }) + } +} diff --git a/weed/s3api/s3tables/handler_table.go b/weed/s3api/s3tables/handler_table.go index 48b9b78c0..54c2773dd 100644 --- a/weed/s3api/s3tables/handler_table.go +++ b/weed/s3api/s3tables/handler_table.go @@ -929,7 +929,7 @@ func (h *S3TablesHandler) handleDeleteTable(w http.ResponseWriter, r *http.Reque if err := h.deleteDirectory(r.Context(), client, tablePath); err != nil { return err } - if err := h.deleteTableLocationMapping(r.Context(), client, metadata.MetadataLocation); err != nil { + if err := h.deleteTableLocationMapping(r.Context(), client, metadata.MetadataLocation, tablePath); err != nil { glog.V(1).Infof("failed to delete table location mapping for %s: %v", metadata.MetadataLocation, err) } return nil @@ -1139,29 +1139,98 @@ func (h *S3TablesHandler) updateTableLocationMapping(ctx context.Context, client if !ok { return nil } + tableBucketPath, ok := tableBucketPathFromTablePath(tablePath) + if !ok { + return fmt.Errorf("invalid table path for location mapping: %s", tablePath) + } if err := h.ensureDirectory(ctx, client, GetTableLocationMappingDir()); err != nil { return err } + if err := h.ensureTableLocationMappingBucketDir(ctx, client, newTableLocationBucket); err != nil { + return err + } - // If the metadata location changed, delete the stale mapping for the old bucket + // If the metadata location changed, remove this table's stale mapping entry from the old bucket. if oldMetadataLocation != "" && oldMetadataLocation != newMetadataLocation { oldTableLocationBucket, ok := parseTableLocationBucket(oldMetadataLocation) if ok && oldTableLocationBucket != newTableLocationBucket { - oldMappingPath := GetTableLocationMappingPath(oldTableLocationBucket) - if err := h.deleteEntryIfExists(ctx, client, oldMappingPath); err != nil { + if err := h.removeTableLocationMappingEntry(ctx, client, oldTableLocationBucket, tablePath); err != nil { glog.V(1).Infof("failed to delete stale mapping for %s: %v", oldTableLocationBucket, err) } } } - return h.upsertFile(ctx, client, GetTableLocationMappingPath(newTableLocationBucket), []byte(tablePath)) + return h.upsertFile(ctx, client, GetTableLocationMappingEntryPath(newTableLocationBucket, tablePath), []byte(tableBucketPath)) } -func (h *S3TablesHandler) deleteTableLocationMapping(ctx context.Context, client filer_pb.SeaweedFilerClient, metadataLocation string) error { +func (h *S3TablesHandler) deleteTableLocationMapping(ctx context.Context, client filer_pb.SeaweedFilerClient, metadataLocation, tablePath string) error { tableLocationBucket, ok := parseTableLocationBucket(metadataLocation) if !ok { return nil } - return h.deleteEntryIfExists(ctx, client, GetTableLocationMappingPath(tableLocationBucket)) + return h.removeTableLocationMappingEntry(ctx, client, tableLocationBucket, tablePath) +} + +func (h *S3TablesHandler) ensureTableLocationMappingBucketDir(ctx context.Context, client filer_pb.SeaweedFilerClient, tableLocationBucket string) error { + mappingDir := GetTableLocationMappingDir() + bucketMappingPath := GetTableLocationMappingPath(tableLocationBucket) + + resp, err := filer_pb.LookupEntry(ctx, client, &filer_pb.LookupDirectoryEntryRequest{ + Directory: mappingDir, + Name: tableLocationBucket, + }) + if err == nil { + if resp != nil && resp.Entry != nil && resp.Entry.IsDirectory { + return nil + } + if removeErr := h.deleteEntryIfExists(ctx, client, bucketMappingPath); removeErr != nil && !errors.Is(removeErr, filer_pb.ErrNotFound) { + return removeErr + } + } else if !errors.Is(err, filer_pb.ErrNotFound) { + return err + } + + return h.ensureDirectory(ctx, client, bucketMappingPath) +} + +func (h *S3TablesHandler) removeTableLocationMappingEntry(ctx context.Context, client filer_pb.SeaweedFilerClient, tableLocationBucket, tablePath string) error { + entryPath := GetTableLocationMappingEntryPath(tableLocationBucket, tablePath) + if err := h.deleteEntryIfExists(ctx, client, entryPath); err != nil && !errors.Is(err, filer_pb.ErrNotFound) { + return err + } + return h.removeTableLocationMappingBucketDirIfEmpty(ctx, client, tableLocationBucket) +} + +func (h *S3TablesHandler) removeTableLocationMappingBucketDirIfEmpty(ctx context.Context, client filer_pb.SeaweedFilerClient, tableLocationBucket string) error { + bucketMappingPath := GetTableLocationMappingPath(tableLocationBucket) + + stream, err := client.ListEntries(ctx, &filer_pb.ListEntriesRequest{ + Directory: bucketMappingPath, + Limit: 1, + }) + if err != nil { + if errors.Is(err, filer_pb.ErrNotFound) { + return nil + } + return err + } + + for { + resp, recvErr := stream.Recv() + if recvErr == io.EOF { + break + } + if recvErr != nil { + return recvErr + } + if resp != nil && resp.Entry != nil { + return nil + } + } + + if err := h.deleteEntryIfExists(ctx, client, bucketMappingPath); err != nil && !errors.Is(err, filer_pb.ErrNotFound) { + return err + } + return nil } diff --git a/weed/s3api/s3tables/table_location_mapping_test.go b/weed/s3api/s3tables/table_location_mapping_test.go new file mode 100644 index 000000000..e4026d42f --- /dev/null +++ b/weed/s3api/s3tables/table_location_mapping_test.go @@ -0,0 +1,77 @@ +package s3tables + +import ( + "strings" + "testing" +) + +func TestGetTableLocationMappingEntryPathPerTable(t *testing.T) { + tableLocationBucket := "shared-location--table-s3" + tablePathA := GetTablePath("warehouse", "analytics", "orders") + tablePathB := GetTablePath("warehouse", "analytics", "customers") + + entryPathA := GetTableLocationMappingEntryPath(tableLocationBucket, tablePathA) + entryPathARepeat := GetTableLocationMappingEntryPath(tableLocationBucket, tablePathA) + entryPathB := GetTableLocationMappingEntryPath(tableLocationBucket, tablePathB) + + if entryPathA != entryPathARepeat { + t.Fatalf("mapping entry path should be deterministic: %q != %q", entryPathA, entryPathARepeat) + } + if entryPathA == entryPathB { + t.Fatalf("mapping entry path should differ per table path: %q == %q", entryPathA, entryPathB) + } + + expectedPrefix := GetTableLocationMappingPath(tableLocationBucket) + "/" + if !strings.HasPrefix(entryPathA, expectedPrefix) { + t.Fatalf("mapping entry path %q should start with %q", entryPathA, expectedPrefix) + } + if strings.TrimPrefix(entryPathA, expectedPrefix) == "" { + t.Fatalf("mapping entry name should not be empty: %q", entryPathA) + } +} + +func TestTableBucketPathFromTablePath(t *testing.T) { + testCases := []struct { + name string + tablePath string + expected string + ok bool + }{ + { + name: "valid table path", + tablePath: GetTablePath("warehouse", "analytics", "orders"), + expected: GetTableBucketPath("warehouse"), + ok: true, + }, + { + name: "valid table bucket root", + tablePath: GetTableBucketPath("warehouse"), + expected: GetTableBucketPath("warehouse"), + ok: true, + }, + { + name: "invalid non-tables path", + tablePath: "/tmp/warehouse/analytics/orders", + expected: "", + ok: false, + }, + { + name: "invalid empty bucket segment", + tablePath: "/buckets/", + expected: "", + ok: false, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + actual, ok := tableBucketPathFromTablePath(tc.tablePath) + if ok != tc.ok { + t.Fatalf("tableBucketPathFromTablePath(%q) ok=%v, want %v", tc.tablePath, ok, tc.ok) + } + if actual != tc.expected { + t.Fatalf("tableBucketPathFromTablePath(%q)=%q, want %q", tc.tablePath, actual, tc.expected) + } + }) + } +} diff --git a/weed/s3api/s3tables/utils.go b/weed/s3api/s3tables/utils.go index 762e51cb1..1d8357040 100644 --- a/weed/s3api/s3tables/utils.go +++ b/weed/s3api/s3tables/utils.go @@ -2,6 +2,7 @@ package s3tables import ( "crypto/rand" + "crypto/sha1" "encoding/hex" "fmt" "net/url" @@ -118,6 +119,33 @@ func GetTableLocationMappingPath(tableLocationBucket string) string { return path.Join(GetTableLocationMappingDir(), tableLocationBucket) } +// GetTableLocationMappingEntryPath returns the filer path for a table-specific mapping entry. +// Each table gets its own entry so multiple tables can share the same external table-location bucket. +func GetTableLocationMappingEntryPath(tableLocationBucket, tablePath string) string { + return path.Join(GetTableLocationMappingPath(tableLocationBucket), tableLocationMappingEntryName(tablePath)) +} + +func tableLocationMappingEntryName(tablePath string) string { + normalized := path.Clean("/" + strings.TrimSpace(strings.TrimPrefix(tablePath, "/"))) + sum := sha1.Sum([]byte(normalized)) + return hex.EncodeToString(sum[:]) +} + +func tableBucketPathFromTablePath(tablePath string) (string, bool) { + normalized := path.Clean("/" + strings.TrimSpace(strings.TrimPrefix(tablePath, "/"))) + tablesPrefix := strings.TrimSuffix(TablesPath, "/") + "/" + if !strings.HasPrefix(normalized, tablesPrefix) { + return "", false + } + + remaining := strings.TrimPrefix(normalized, tablesPrefix) + bucketName, _, _ := strings.Cut(remaining, "/") + if bucketName == "" { + return "", false + } + return path.Join(TablesPath, bucketName), true +} + // Metadata structures type tableBucketMetadata struct {