diff --git a/weed/s3api/iceberg/iceberg.go b/weed/s3api/iceberg/iceberg.go index 8bffdd1b3..7fd2936d3 100644 --- a/weed/s3api/iceberg/iceberg.go +++ b/weed/s3api/iceberg/iceberg.go @@ -10,6 +10,7 @@ import ( "net/http" "os" "path" + "strconv" "strings" "time" @@ -368,6 +369,38 @@ func buildTableBucketARN(bucketName string) string { return arn } +const ( + defaultListPageSize = 1000 + maxListPageSize = 1000 +) + +func getPaginationQueryParam(r *http.Request, primary, fallback string) string { + if v := strings.TrimSpace(r.URL.Query().Get(primary)); v != "" { + return v + } + return strings.TrimSpace(r.URL.Query().Get(fallback)) +} + +func parsePagination(r *http.Request) (pageToken string, pageSize int, err error) { + pageToken = getPaginationQueryParam(r, "pageToken", "page-token") + pageSize = defaultListPageSize + + pageSizeValue := getPaginationQueryParam(r, "pageSize", "page-size") + if pageSizeValue == "" { + return pageToken, pageSize, nil + } + + parsedPageSize, parseErr := strconv.Atoi(pageSizeValue) + if parseErr != nil || parsedPageSize <= 0 { + return pageToken, 0, fmt.Errorf("invalid pageSize %q: must be a positive integer", pageSizeValue) + } + if parsedPageSize > maxListPageSize { + return pageToken, 0, fmt.Errorf("invalid pageSize %q: must be <= %d", pageSizeValue, maxListPageSize) + } + + return pageToken, parsedPageSize, nil +} + // handleConfig returns catalog configuration. func (s *Server) handleConfig(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json") @@ -388,14 +421,21 @@ func (s *Server) handleListNamespaces(w http.ResponseWriter, r *http.Request) { // Extract identity from context identityName := s3_constants.GetIdentityNameFromContext(r) + pageToken, pageSize, err := parsePagination(r) + if err != nil { + writeError(w, http.StatusBadRequest, "BadRequestException", err.Error()) + return + } + // Use S3 Tables manager to list namespaces var resp s3tables.ListNamespacesResponse req := &s3tables.ListNamespacesRequest{ - TableBucketARN: bucketARN, - MaxNamespaces: 1000, + TableBucketARN: bucketARN, + ContinuationToken: pageToken, + MaxNamespaces: pageSize, } - err := s.filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + err = s.filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { mgrClient := s3tables.NewManagerClient(client) return s.tablesManager.Execute(r.Context(), mgrClient, "ListNamespaces", req, &resp, identityName) }) @@ -413,7 +453,8 @@ func (s *Server) handleListNamespaces(w http.ResponseWriter, r *http.Request) { } result := ListNamespacesResponse{ - Namespaces: namespaces, + NextPageToken: resp.ContinuationToken, + Namespaces: namespaces, } writeJSON(w, http.StatusOK, result) } @@ -615,14 +656,21 @@ func (s *Server) handleListTables(w http.ResponseWriter, r *http.Request) { // Extract identity from context identityName := s3_constants.GetIdentityNameFromContext(r) + pageToken, pageSize, err := parsePagination(r) + if err != nil { + writeError(w, http.StatusBadRequest, "BadRequestException", err.Error()) + return + } + listReq := &s3tables.ListTablesRequest{ - TableBucketARN: bucketARN, - Namespace: namespace, - MaxTables: 1000, + TableBucketARN: bucketARN, + Namespace: namespace, + ContinuationToken: pageToken, + MaxTables: pageSize, } var listResp s3tables.ListTablesResponse - err := s.filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + err = s.filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { mgrClient := s3tables.NewManagerClient(client) return s.tablesManager.Execute(r.Context(), mgrClient, "ListTables", listReq, &listResp, identityName) }) @@ -647,7 +695,8 @@ func (s *Server) handleListTables(w http.ResponseWriter, r *http.Request) { } result := ListTablesResponse{ - Identifiers: identifiers, + NextPageToken: listResp.ContinuationToken, + Identifiers: identifiers, } writeJSON(w, http.StatusOK, result) } diff --git a/weed/s3api/iceberg/iceberg_pagination_test.go b/weed/s3api/iceberg/iceberg_pagination_test.go new file mode 100644 index 000000000..1cd190727 --- /dev/null +++ b/weed/s3api/iceberg/iceberg_pagination_test.go @@ -0,0 +1,69 @@ +package iceberg + +import ( + "net/http/httptest" + "testing" +) + +func TestParsePaginationDefaultValues(t *testing.T) { + req := httptest.NewRequest("GET", "/v1/namespaces", nil) + + pageToken, pageSize, err := parsePagination(req) + if err != nil { + t.Fatalf("parsePagination() error = %v", err) + } + if pageToken != "" { + t.Fatalf("pageToken = %q, want empty", pageToken) + } + if pageSize != defaultListPageSize { + t.Fatalf("pageSize = %d, want %d", pageSize, defaultListPageSize) + } +} + +func TestParsePaginationUsesCamelCaseParameters(t *testing.T) { + req := httptest.NewRequest("GET", "/v1/namespaces?pageToken=abc&pageSize=25", nil) + + pageToken, pageSize, err := parsePagination(req) + if err != nil { + t.Fatalf("parsePagination() error = %v", err) + } + if pageToken != "abc" { + t.Fatalf("pageToken = %q, want %q", pageToken, "abc") + } + if pageSize != 25 { + t.Fatalf("pageSize = %d, want %d", pageSize, 25) + } +} + +func TestParsePaginationSupportsHyphenatedFallback(t *testing.T) { + req := httptest.NewRequest("GET", "/v1/namespaces?page-token=abc&page-size=17", nil) + + pageToken, pageSize, err := parsePagination(req) + if err != nil { + t.Fatalf("parsePagination() error = %v", err) + } + if pageToken != "abc" { + t.Fatalf("pageToken = %q, want %q", pageToken, "abc") + } + if pageSize != 17 { + t.Fatalf("pageSize = %d, want %d", pageSize, 17) + } +} + +func TestParsePaginationRejectsInvalidPageSize(t *testing.T) { + testCases := []string{ + "/v1/namespaces?pageSize=0", + "/v1/namespaces?pageSize=-1", + "/v1/namespaces?pageSize=foo", + "/v1/namespaces?pageSize=1001", + } + + for _, rawURL := range testCases { + t.Run(rawURL, func(t *testing.T) { + req := httptest.NewRequest("GET", rawURL, nil) + if _, _, err := parsePagination(req); err == nil { + t.Fatalf("parsePagination() expected error for url %q", rawURL) + } + }) + } +} diff --git a/weed/server/volume_grpc_erasure_coding_test.go b/weed/server/volume_grpc_erasure_coding_test.go index 9cce6c14e..dd82734b8 100644 --- a/weed/server/volume_grpc_erasure_coding_test.go +++ b/weed/server/volume_grpc_erasure_coding_test.go @@ -54,4 +54,3 @@ func TestCheckEcVolumeStatusCountOnlyDataShards(t *testing.T) { t.Fatalf("expected shardCount=3, got %d", shardCount) } } -