From e36e6af71118dd9157691dd65c803bb5731ac8d3 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 9 Feb 2026 21:12:17 -0800 Subject: [PATCH] s3api/iceberg: wire list pagination tokens and page size --- weed/s3api/iceberg/iceberg.go | 63 +++++++++++++++-- weed/s3api/iceberg/iceberg_pagination_test.go | 70 +++++++++++++++++++ 2 files changed, 126 insertions(+), 7 deletions(-) create mode 100644 weed/s3api/iceberg/iceberg_pagination_test.go diff --git a/weed/s3api/iceberg/iceberg.go b/weed/s3api/iceberg/iceberg.go index 8bffdd1b3..81417034a 100644 --- a/weed/s3api/iceberg/iceberg.go +++ b/weed/s3api/iceberg/iceberg.go @@ -10,6 +10,7 @@ import ( "net/http" "os" "path" + "strconv" "strings" "time" @@ -368,6 +369,38 @@ func buildTableBucketARN(bucketName string) string { return arn } +const ( + defaultListPageSize = 1000 + maxListPageSize = 1000 +) + +func getPaginationQueryParam(r *http.Request, primary, fallback string) string { + if v := strings.TrimSpace(r.URL.Query().Get(primary)); v != "" { + return v + } + return strings.TrimSpace(r.URL.Query().Get(fallback)) +} + +func parsePagination(r *http.Request) (pageToken string, pageSize int, err error) { + pageToken = getPaginationQueryParam(r, "pageToken", "page-token") + pageSize = defaultListPageSize + + pageSizeValue := getPaginationQueryParam(r, "pageSize", "page-size") + if pageSizeValue == "" { + return pageToken, pageSize, nil + } + + parsedPageSize, parseErr := strconv.Atoi(pageSizeValue) + if parseErr != nil || parsedPageSize <= 0 { + return "", 0, fmt.Errorf("invalid pageSize %q: must be a positive integer", pageSizeValue) + } + if parsedPageSize > maxListPageSize { + return "", 0, fmt.Errorf("invalid pageSize %q: must be <= %d", pageSizeValue, maxListPageSize) + } + + return pageToken, parsedPageSize, nil +} + // handleConfig returns catalog configuration. func (s *Server) handleConfig(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json") @@ -388,14 +421,21 @@ func (s *Server) handleListNamespaces(w http.ResponseWriter, r *http.Request) { // Extract identity from context identityName := s3_constants.GetIdentityNameFromContext(r) + pageToken, pageSize, err := parsePagination(r) + if err != nil { + writeError(w, http.StatusBadRequest, "BadRequestException", err.Error()) + return + } + // Use S3 Tables manager to list namespaces var resp s3tables.ListNamespacesResponse req := &s3tables.ListNamespacesRequest{ - TableBucketARN: bucketARN, - MaxNamespaces: 1000, + TableBucketARN: bucketARN, + ContinuationToken: pageToken, + MaxNamespaces: pageSize, } - err := s.filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + err = s.filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { mgrClient := s3tables.NewManagerClient(client) return s.tablesManager.Execute(r.Context(), mgrClient, "ListNamespaces", req, &resp, identityName) }) @@ -413,6 +453,7 @@ func (s *Server) handleListNamespaces(w http.ResponseWriter, r *http.Request) { } result := ListNamespacesResponse{ + NextPageToken: resp.ContinuationToken, Namespaces: namespaces, } writeJSON(w, http.StatusOK, result) @@ -615,14 +656,21 @@ func (s *Server) handleListTables(w http.ResponseWriter, r *http.Request) { // Extract identity from context identityName := s3_constants.GetIdentityNameFromContext(r) + pageToken, pageSize, err := parsePagination(r) + if err != nil { + writeError(w, http.StatusBadRequest, "BadRequestException", err.Error()) + return + } + listReq := &s3tables.ListTablesRequest{ - TableBucketARN: bucketARN, - Namespace: namespace, - MaxTables: 1000, + TableBucketARN: bucketARN, + Namespace: namespace, + ContinuationToken: pageToken, + MaxTables: pageSize, } var listResp s3tables.ListTablesResponse - err := s.filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + err = s.filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { mgrClient := s3tables.NewManagerClient(client) return s.tablesManager.Execute(r.Context(), mgrClient, "ListTables", listReq, &listResp, identityName) }) @@ -647,6 +695,7 @@ func (s *Server) handleListTables(w http.ResponseWriter, r *http.Request) { } result := ListTablesResponse{ + NextPageToken: listResp.ContinuationToken, Identifiers: identifiers, } writeJSON(w, http.StatusOK, result) diff --git a/weed/s3api/iceberg/iceberg_pagination_test.go b/weed/s3api/iceberg/iceberg_pagination_test.go new file mode 100644 index 000000000..507770992 --- /dev/null +++ b/weed/s3api/iceberg/iceberg_pagination_test.go @@ -0,0 +1,70 @@ +package iceberg + +import ( + "net/http/httptest" + "testing" +) + +func TestParsePaginationDefaultValues(t *testing.T) { + req := httptest.NewRequest("GET", "/v1/namespaces", nil) + + pageToken, pageSize, err := parsePagination(req) + if err != nil { + t.Fatalf("parsePagination() error = %v", err) + } + if pageToken != "" { + t.Fatalf("pageToken = %q, want empty", pageToken) + } + if pageSize != defaultListPageSize { + t.Fatalf("pageSize = %d, want %d", pageSize, defaultListPageSize) + } +} + +func TestParsePaginationUsesCamelCaseParameters(t *testing.T) { + req := httptest.NewRequest("GET", "/v1/namespaces?pageToken=abc&pageSize=25", nil) + + pageToken, pageSize, err := parsePagination(req) + if err != nil { + t.Fatalf("parsePagination() error = %v", err) + } + if pageToken != "abc" { + t.Fatalf("pageToken = %q, want %q", pageToken, "abc") + } + if pageSize != 25 { + t.Fatalf("pageSize = %d, want %d", pageSize, 25) + } +} + +func TestParsePaginationSupportsHyphenatedFallback(t *testing.T) { + req := httptest.NewRequest("GET", "/v1/namespaces?page-token=abc&page-size=17", nil) + + pageToken, pageSize, err := parsePagination(req) + if err != nil { + t.Fatalf("parsePagination() error = %v", err) + } + if pageToken != "abc" { + t.Fatalf("pageToken = %q, want %q", pageToken, "abc") + } + if pageSize != 17 { + t.Fatalf("pageSize = %d, want %d", pageSize, 17) + } +} + +func TestParsePaginationRejectsInvalidPageSize(t *testing.T) { + testCases := []string{ + "/v1/namespaces?pageSize=0", + "/v1/namespaces?pageSize=-1", + "/v1/namespaces?pageSize=foo", + "/v1/namespaces?pageSize=1001", + } + + for _, rawURL := range testCases { + t.Run(rawURL, func(t *testing.T) { + req := httptest.NewRequest("GET", rawURL, nil) + if _, _, err := parsePagination(req); err == nil { + t.Fatalf("parsePagination() expected error for url %q", rawURL) + } + }) + } +} +