Browse Source

s3api/iceberg: wire list pagination tokens and page size

pull/8275/head
Chris Lu 1 day ago
parent
commit
e36e6af711
  1. 63
      weed/s3api/iceberg/iceberg.go
  2. 70
      weed/s3api/iceberg/iceberg_pagination_test.go

63
weed/s3api/iceberg/iceberg.go

@ -10,6 +10,7 @@ import (
"net/http"
"os"
"path"
"strconv"
"strings"
"time"
@ -368,6 +369,38 @@ func buildTableBucketARN(bucketName string) string {
return arn
}
const (
defaultListPageSize = 1000
maxListPageSize = 1000
)
func getPaginationQueryParam(r *http.Request, primary, fallback string) string {
if v := strings.TrimSpace(r.URL.Query().Get(primary)); v != "" {
return v
}
return strings.TrimSpace(r.URL.Query().Get(fallback))
}
func parsePagination(r *http.Request) (pageToken string, pageSize int, err error) {
pageToken = getPaginationQueryParam(r, "pageToken", "page-token")
pageSize = defaultListPageSize
pageSizeValue := getPaginationQueryParam(r, "pageSize", "page-size")
if pageSizeValue == "" {
return pageToken, pageSize, nil
}
parsedPageSize, parseErr := strconv.Atoi(pageSizeValue)
if parseErr != nil || parsedPageSize <= 0 {
return "", 0, fmt.Errorf("invalid pageSize %q: must be a positive integer", pageSizeValue)
}
if parsedPageSize > maxListPageSize {
return "", 0, fmt.Errorf("invalid pageSize %q: must be <= %d", pageSizeValue, maxListPageSize)
}
return pageToken, parsedPageSize, nil
}
// handleConfig returns catalog configuration.
func (s *Server) handleConfig(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
@ -388,14 +421,21 @@ func (s *Server) handleListNamespaces(w http.ResponseWriter, r *http.Request) {
// Extract identity from context
identityName := s3_constants.GetIdentityNameFromContext(r)
pageToken, pageSize, err := parsePagination(r)
if err != nil {
writeError(w, http.StatusBadRequest, "BadRequestException", err.Error())
return
}
// Use S3 Tables manager to list namespaces
var resp s3tables.ListNamespacesResponse
req := &s3tables.ListNamespacesRequest{
TableBucketARN: bucketARN,
MaxNamespaces: 1000,
TableBucketARN: bucketARN,
ContinuationToken: pageToken,
MaxNamespaces: pageSize,
}
err := s.filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
err = s.filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
mgrClient := s3tables.NewManagerClient(client)
return s.tablesManager.Execute(r.Context(), mgrClient, "ListNamespaces", req, &resp, identityName)
})
@ -413,6 +453,7 @@ func (s *Server) handleListNamespaces(w http.ResponseWriter, r *http.Request) {
}
result := ListNamespacesResponse{
NextPageToken: resp.ContinuationToken,
Namespaces: namespaces,
}
writeJSON(w, http.StatusOK, result)
@ -615,14 +656,21 @@ func (s *Server) handleListTables(w http.ResponseWriter, r *http.Request) {
// Extract identity from context
identityName := s3_constants.GetIdentityNameFromContext(r)
pageToken, pageSize, err := parsePagination(r)
if err != nil {
writeError(w, http.StatusBadRequest, "BadRequestException", err.Error())
return
}
listReq := &s3tables.ListTablesRequest{
TableBucketARN: bucketARN,
Namespace: namespace,
MaxTables: 1000,
TableBucketARN: bucketARN,
Namespace: namespace,
ContinuationToken: pageToken,
MaxTables: pageSize,
}
var listResp s3tables.ListTablesResponse
err := s.filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
err = s.filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
mgrClient := s3tables.NewManagerClient(client)
return s.tablesManager.Execute(r.Context(), mgrClient, "ListTables", listReq, &listResp, identityName)
})
@ -647,6 +695,7 @@ func (s *Server) handleListTables(w http.ResponseWriter, r *http.Request) {
}
result := ListTablesResponse{
NextPageToken: listResp.ContinuationToken,
Identifiers: identifiers,
}
writeJSON(w, http.StatusOK, result)

70
weed/s3api/iceberg/iceberg_pagination_test.go

@ -0,0 +1,70 @@
package iceberg
import (
"net/http/httptest"
"testing"
)
func TestParsePaginationDefaultValues(t *testing.T) {
req := httptest.NewRequest("GET", "/v1/namespaces", nil)
pageToken, pageSize, err := parsePagination(req)
if err != nil {
t.Fatalf("parsePagination() error = %v", err)
}
if pageToken != "" {
t.Fatalf("pageToken = %q, want empty", pageToken)
}
if pageSize != defaultListPageSize {
t.Fatalf("pageSize = %d, want %d", pageSize, defaultListPageSize)
}
}
func TestParsePaginationUsesCamelCaseParameters(t *testing.T) {
req := httptest.NewRequest("GET", "/v1/namespaces?pageToken=abc&pageSize=25", nil)
pageToken, pageSize, err := parsePagination(req)
if err != nil {
t.Fatalf("parsePagination() error = %v", err)
}
if pageToken != "abc" {
t.Fatalf("pageToken = %q, want %q", pageToken, "abc")
}
if pageSize != 25 {
t.Fatalf("pageSize = %d, want %d", pageSize, 25)
}
}
func TestParsePaginationSupportsHyphenatedFallback(t *testing.T) {
req := httptest.NewRequest("GET", "/v1/namespaces?page-token=abc&page-size=17", nil)
pageToken, pageSize, err := parsePagination(req)
if err != nil {
t.Fatalf("parsePagination() error = %v", err)
}
if pageToken != "abc" {
t.Fatalf("pageToken = %q, want %q", pageToken, "abc")
}
if pageSize != 17 {
t.Fatalf("pageSize = %d, want %d", pageSize, 17)
}
}
func TestParsePaginationRejectsInvalidPageSize(t *testing.T) {
testCases := []string{
"/v1/namespaces?pageSize=0",
"/v1/namespaces?pageSize=-1",
"/v1/namespaces?pageSize=foo",
"/v1/namespaces?pageSize=1001",
}
for _, rawURL := range testCases {
t.Run(rawURL, func(t *testing.T) {
req := httptest.NewRequest("GET", rawURL, nil)
if _, _, err := parsePagination(req); err == nil {
t.Fatalf("parsePagination() expected error for url %q", rawURL)
}
})
}
}
Loading…
Cancel
Save