You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

1172 lines
34 KiB

package iceberg
import (
"bytes"
"context"
"fmt"
"io"
"path"
"testing"
"time"
"github.com/apache/iceberg-go"
"github.com/apache/iceberg-go/table"
"github.com/parquet-go/parquet-go"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/plugin_pb"
)
func TestParseConfig(t *testing.T) {
config := ParseConfig(nil)
if config.SnapshotRetentionHours != defaultSnapshotRetentionHours {
t.Errorf("expected SnapshotRetentionHours=%d, got %d", defaultSnapshotRetentionHours, config.SnapshotRetentionHours)
}
if config.MaxSnapshotsToKeep != defaultMaxSnapshotsToKeep {
t.Errorf("expected MaxSnapshotsToKeep=%d, got %d", defaultMaxSnapshotsToKeep, config.MaxSnapshotsToKeep)
}
if config.OrphanOlderThanHours != defaultOrphanOlderThanHours {
t.Errorf("expected OrphanOlderThanHours=%d, got %d", defaultOrphanOlderThanHours, config.OrphanOlderThanHours)
}
if config.MaxCommitRetries != defaultMaxCommitRetries {
t.Errorf("expected MaxCommitRetries=%d, got %d", defaultMaxCommitRetries, config.MaxCommitRetries)
}
if config.Operations != defaultOperations {
t.Errorf("expected Operations=%q, got %q", defaultOperations, config.Operations)
}
}
func TestParseOperations(t *testing.T) {
tests := []struct {
input string
expected []string
wantErr bool
}{
{"all", []string{"compact", "expire_snapshots", "remove_orphans", "rewrite_manifests"}, false},
{"", []string{"compact", "expire_snapshots", "remove_orphans", "rewrite_manifests"}, false},
{"expire_snapshots", []string{"expire_snapshots"}, false},
{"compact", []string{"compact"}, false},
{"rewrite_manifests,expire_snapshots", []string{"expire_snapshots", "rewrite_manifests"}, false},
{"compact,expire_snapshots", []string{"compact", "expire_snapshots"}, false},
{"remove_orphans, rewrite_manifests", []string{"remove_orphans", "rewrite_manifests"}, false},
{"expire_snapshots,remove_orphans,rewrite_manifests", []string{"expire_snapshots", "remove_orphans", "rewrite_manifests"}, false},
{"compact,expire_snapshots,remove_orphans,rewrite_manifests", []string{"compact", "expire_snapshots", "remove_orphans", "rewrite_manifests"}, false},
{"unknown_op", nil, true},
{"expire_snapshots,bad_op", nil, true},
}
for _, tc := range tests {
result, err := parseOperations(tc.input)
if tc.wantErr {
if err == nil {
t.Errorf("parseOperations(%q) expected error, got %v", tc.input, result)
}
continue
}
if err != nil {
t.Errorf("parseOperations(%q) unexpected error: %v", tc.input, err)
continue
}
if len(result) != len(tc.expected) {
t.Errorf("parseOperations(%q) = %v, want %v", tc.input, result, tc.expected)
continue
}
for i := range result {
if result[i] != tc.expected[i] {
t.Errorf("parseOperations(%q)[%d] = %q, want %q", tc.input, i, result[i], tc.expected[i])
}
}
}
}
func TestExtractMetadataVersion(t *testing.T) {
tests := []struct {
input string
expected int
}{
{"v1.metadata.json", 1},
{"v5.metadata.json", 5},
{"v100.metadata.json", 100},
{"v0.metadata.json", 0},
{"invalid.metadata.json", 0},
{"metadata.json", 0},
{"", 0},
{"v.metadata.json", 0},
{"v7-1709766000.metadata.json", 7},
{"v42-abc123.metadata.json", 42},
{"v5-.metadata.json", 5},
}
for _, tc := range tests {
result := extractMetadataVersion(tc.input)
if result != tc.expected {
t.Errorf("extractMetadataVersion(%q) = %d, want %d", tc.input, result, tc.expected)
}
}
}
func TestNeedsMaintenanceNoSnapshots(t *testing.T) {
config := Config{
SnapshotRetentionHours: 24,
MaxSnapshotsToKeep: 2,
}
meta := buildTestMetadata(t, nil)
if needsMaintenance(meta, config) {
t.Error("expected no maintenance for table with no snapshots")
}
}
func TestNeedsMaintenanceExceedsMaxSnapshots(t *testing.T) {
config := Config{
SnapshotRetentionHours: 24 * 365, // very long retention
MaxSnapshotsToKeep: 2,
}
now := time.Now().UnixMilli()
snapshots := []table.Snapshot{
{SnapshotID: 1, TimestampMs: now, ManifestList: "metadata/snap-1.avro"},
{SnapshotID: 2, TimestampMs: now + 1, ManifestList: "metadata/snap-2.avro"},
{SnapshotID: 3, TimestampMs: now + 2, ManifestList: "metadata/snap-3.avro"},
}
meta := buildTestMetadata(t, snapshots)
if !needsMaintenance(meta, config) {
t.Error("expected maintenance for table exceeding max snapshots")
}
}
func TestNeedsMaintenanceWithinLimits(t *testing.T) {
config := Config{
SnapshotRetentionHours: 24 * 365, // very long retention
MaxSnapshotsToKeep: 5,
}
now := time.Now().UnixMilli()
snapshots := []table.Snapshot{
{SnapshotID: 1, TimestampMs: now, ManifestList: "metadata/snap-1.avro"},
}
meta := buildTestMetadata(t, snapshots)
if needsMaintenance(meta, config) {
t.Error("expected no maintenance for table within limits")
}
}
func TestNeedsMaintenanceOldSnapshot(t *testing.T) {
// Use a retention of 0 hours so that any snapshot is considered "old"
config := Config{
SnapshotRetentionHours: 0, // instant expiry
MaxSnapshotsToKeep: 10,
}
now := time.Now().UnixMilli()
snapshots := []table.Snapshot{
{SnapshotID: 1, TimestampMs: now - 1, ManifestList: "metadata/snap-1.avro"},
}
meta := buildTestMetadata(t, snapshots)
// With 0 retention, any snapshot with timestamp < now should need maintenance
if !needsMaintenance(meta, config) {
t.Error("expected maintenance for table with expired snapshot")
}
}
func TestCapabilityAndDescriptor(t *testing.T) {
handler := NewHandler(nil)
cap := handler.Capability()
if cap.JobType != jobType {
t.Errorf("expected job type %q, got %q", jobType, cap.JobType)
}
if !cap.CanDetect {
t.Error("expected CanDetect=true")
}
if !cap.CanExecute {
t.Error("expected CanExecute=true")
}
desc := handler.Descriptor()
if desc.JobType != jobType {
t.Errorf("expected job type %q, got %q", jobType, desc.JobType)
}
if desc.AdminConfigForm == nil {
t.Error("expected admin config form")
}
if desc.WorkerConfigForm == nil {
t.Error("expected worker config form")
}
if desc.AdminRuntimeDefaults == nil {
t.Error("expected admin runtime defaults")
}
if desc.AdminRuntimeDefaults.Enabled {
t.Error("expected disabled by default")
}
}
func TestBuildMaintenanceProposal(t *testing.T) {
handler := NewHandler(nil)
now := time.Now().UnixMilli()
snapshots := []table.Snapshot{
{SnapshotID: 1, TimestampMs: now},
{SnapshotID: 2, TimestampMs: now + 1},
}
meta := buildTestMetadata(t, snapshots)
info := tableInfo{
BucketName: "my-bucket",
Namespace: "analytics",
TableName: "events",
TablePath: "analytics/events",
Metadata: meta,
}
proposal := handler.buildMaintenanceProposal(info, "localhost:8888")
expectedDedupe := "iceberg_maintenance:my-bucket/analytics/events"
if proposal.DedupeKey != expectedDedupe {
t.Errorf("expected dedupe key %q, got %q", expectedDedupe, proposal.DedupeKey)
}
if proposal.JobType != jobType {
t.Errorf("expected job type %q, got %q", jobType, proposal.JobType)
}
if readStringConfig(proposal.Parameters, "bucket_name", "") != "my-bucket" {
t.Error("expected bucket_name=my-bucket in parameters")
}
if readStringConfig(proposal.Parameters, "namespace", "") != "analytics" {
t.Error("expected namespace=analytics in parameters")
}
if readStringConfig(proposal.Parameters, "table_name", "") != "events" {
t.Error("expected table_name=events in parameters")
}
if readStringConfig(proposal.Parameters, "filer_address", "") != "localhost:8888" {
t.Error("expected filer_address=localhost:8888 in parameters")
}
}
func TestManifestRewritePathConsistency(t *testing.T) {
// Verify that WriteManifest returns a ManifestFile whose FilePath()
// matches the path we pass in. This ensures the pattern used in
// rewriteManifests (compute filename once, pass to both WriteManifest
// and saveFilerFile) produces consistent references.
schema := newTestSchema()
spec := *iceberg.UnpartitionedSpec
snapshotID := int64(42)
manifestFileName := fmt.Sprintf("merged-%d-%d.avro", snapshotID, int64(1700000000000))
manifestPath := "metadata/" + manifestFileName
// Create a minimal manifest entry to write
dfBuilder, err := iceberg.NewDataFileBuilder(
spec,
iceberg.EntryContentData,
"data/test.parquet",
iceberg.ParquetFile,
map[int]any{},
nil, nil,
1, // recordCount
1024, // fileSizeBytes
)
if err != nil {
t.Fatalf("failed to build data file: %v", err)
}
entry := iceberg.NewManifestEntry(
iceberg.EntryStatusADDED,
&snapshotID,
nil, nil,
dfBuilder.Build(),
)
var buf bytes.Buffer
mf, err := iceberg.WriteManifest(
manifestPath,
&buf,
2, // version
spec,
schema,
snapshotID,
[]iceberg.ManifestEntry{entry},
)
if err != nil {
t.Fatalf("WriteManifest failed: %v", err)
}
if mf.FilePath() != manifestPath {
t.Errorf("manifest FilePath() = %q, want %q", mf.FilePath(), manifestPath)
}
// Verify the filename we'd use for saveFilerFile matches
if path.Base(mf.FilePath()) != manifestFileName {
t.Errorf("manifest base name = %q, want %q", path.Base(mf.FilePath()), manifestFileName)
}
}
func TestManifestRewriteNestedPathConsistency(t *testing.T) {
// Verify that WriteManifest with nested paths preserves the full path
// and that loadFileByIcebergPath (via normalizeIcebergPath) would
// resolve them correctly.
schema := newTestSchema()
spec := *iceberg.UnpartitionedSpec
snapshotID := int64(42)
testCases := []struct {
name string
manifestPath string
}{
{"nested two levels", "metadata/a/b/merged-42-1700000000000.avro"},
{"nested one level", "metadata/subdir/manifest-42.avro"},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
dfBuilder, err := iceberg.NewDataFileBuilder(
spec,
iceberg.EntryContentData,
"data/test.parquet",
iceberg.ParquetFile,
map[int]any{},
nil, nil,
1, 1024,
)
if err != nil {
t.Fatalf("failed to build data file: %v", err)
}
entry := iceberg.NewManifestEntry(
iceberg.EntryStatusADDED,
&snapshotID,
nil, nil,
dfBuilder.Build(),
)
var buf bytes.Buffer
mf, err := iceberg.WriteManifest(
tc.manifestPath, &buf, 2, spec, schema, snapshotID,
[]iceberg.ManifestEntry{entry},
)
if err != nil {
t.Fatalf("WriteManifest failed: %v", err)
}
if mf.FilePath() != tc.manifestPath {
t.Errorf("FilePath() = %q, want %q", mf.FilePath(), tc.manifestPath)
}
// normalizeIcebergPath should return the path unchanged when already relative
normalized := normalizeIcebergPath(tc.manifestPath, "bucket", "ns/table")
if normalized != tc.manifestPath {
t.Errorf("normalizeIcebergPath(%q) = %q, want %q", tc.manifestPath, normalized, tc.manifestPath)
}
// Verify normalization strips S3 scheme prefix correctly
s3Path := "s3://bucket/ns/table/" + tc.manifestPath
normalized = normalizeIcebergPath(s3Path, "bucket", "ns/table")
if normalized != tc.manifestPath {
t.Errorf("normalizeIcebergPath(%q) = %q, want %q", s3Path, normalized, tc.manifestPath)
}
})
}
}
func TestNormalizeIcebergPath(t *testing.T) {
tests := []struct {
name string
icebergPath string
bucket string
tablePath string
expected string
}{
{
"relative metadata path",
"metadata/snap-1.avro",
"mybucket", "ns/table",
"metadata/snap-1.avro",
},
{
"relative data path",
"data/file.parquet",
"mybucket", "ns/table",
"data/file.parquet",
},
{
"S3 URL",
"s3://mybucket/ns/table/metadata/snap-1.avro",
"mybucket", "ns/table",
"metadata/snap-1.avro",
},
{
"absolute filer path",
"/buckets/mybucket/ns/table/data/file.parquet",
"mybucket", "ns/table",
"data/file.parquet",
},
{
"nested data path",
"data/region=us/city=sf/file.parquet",
"mybucket", "ns/table",
"data/region=us/city=sf/file.parquet",
},
{
"S3 URL nested",
"s3://mybucket/ns/table/data/region=us/file.parquet",
"mybucket", "ns/table",
"data/region=us/file.parquet",
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
result := normalizeIcebergPath(tc.icebergPath, tc.bucket, tc.tablePath)
if result != tc.expected {
t.Errorf("normalizeIcebergPath(%q, %q, %q) = %q, want %q",
tc.icebergPath, tc.bucket, tc.tablePath, result, tc.expected)
}
})
}
}
func TestPartitionKey(t *testing.T) {
tests := []struct {
name string
partition map[int]any
expected string
}{
{"empty partition", map[int]any{}, "__unpartitioned__"},
{"nil partition", nil, "__unpartitioned__"},
{"single field", map[int]any{1: "us-east"}, "1=\"us-east\""},
{"multiple fields sorted", map[int]any{3: "2024", 1: "us-east"}, "1=\"us-east\"\x003=\"2024\""},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
result := partitionKey(tc.partition)
if result != tc.expected {
t.Errorf("partitionKey(%v) = %q, want %q", tc.partition, result, tc.expected)
}
})
}
}
func TestBuildCompactionBins(t *testing.T) {
targetSize := int64(256 * 1024 * 1024) // 256MB
minFiles := 3
// Create test entries: small files in same partition
entries := makeTestEntries(t, []testEntrySpec{
{path: "data/f1.parquet", size: 1024, partition: map[int]any{}},
{path: "data/f2.parquet", size: 2048, partition: map[int]any{}},
{path: "data/f3.parquet", size: 4096, partition: map[int]any{}},
})
bins := buildCompactionBins(entries, targetSize, minFiles)
if len(bins) != 1 {
t.Fatalf("expected 1 bin, got %d", len(bins))
}
if len(bins[0].Entries) != 3 {
t.Errorf("expected 3 entries in bin, got %d", len(bins[0].Entries))
}
}
func TestBuildCompactionBinsFiltersLargeFiles(t *testing.T) {
targetSize := int64(4000)
minFiles := 2
entries := makeTestEntries(t, []testEntrySpec{
{path: "data/small1.parquet", size: 1024, partition: map[int]any{}},
{path: "data/small2.parquet", size: 2048, partition: map[int]any{}},
{path: "data/large.parquet", size: 5000, partition: map[int]any{}},
})
bins := buildCompactionBins(entries, targetSize, minFiles)
if len(bins) != 1 {
t.Fatalf("expected 1 bin, got %d", len(bins))
}
if len(bins[0].Entries) != 2 {
t.Errorf("expected 2 entries (large excluded), got %d", len(bins[0].Entries))
}
}
func TestBuildCompactionBinsMinFilesThreshold(t *testing.T) {
targetSize := int64(256 * 1024 * 1024)
minFiles := 5
entries := makeTestEntries(t, []testEntrySpec{
{path: "data/f1.parquet", size: 1024, partition: map[int]any{}},
{path: "data/f2.parquet", size: 2048, partition: map[int]any{}},
})
bins := buildCompactionBins(entries, targetSize, minFiles)
if len(bins) != 0 {
t.Errorf("expected 0 bins (below min threshold), got %d", len(bins))
}
}
func TestBuildCompactionBinsMultiplePartitions(t *testing.T) {
targetSize := int64(256 * 1024 * 1024)
minFiles := 2
partA := map[int]any{1: "us-east"}
partB := map[int]any{1: "eu-west"}
partitionSpec := iceberg.NewPartitionSpec(iceberg.PartitionField{
SourceID: 1,
FieldID: 1000,
Name: "region",
Transform: iceberg.IdentityTransform{},
})
entries := makeTestEntries(t, []testEntrySpec{
{path: "data/a1.parquet", size: 1024, partition: partA, partitionSpec: &partitionSpec},
{path: "data/a2.parquet", size: 2048, partition: partA, partitionSpec: &partitionSpec},
{path: "data/b1.parquet", size: 1024, partition: partB, partitionSpec: &partitionSpec},
{path: "data/b2.parquet", size: 2048, partition: partB, partitionSpec: &partitionSpec},
{path: "data/b3.parquet", size: 4096, partition: partB, partitionSpec: &partitionSpec},
})
bins := buildCompactionBins(entries, targetSize, minFiles)
if len(bins) != 2 {
t.Fatalf("expected 2 bins (one per partition), got %d", len(bins))
}
}
func TestSplitOversizedBinRespectsTargetSize(t *testing.T) {
targetSize := int64(100)
minFiles := 2
entries := makeTestEntries(t, []testEntrySpec{
{path: "data/f1.parquet", size: 80, partition: map[int]any{}},
{path: "data/f2.parquet", size: 80, partition: map[int]any{}},
{path: "data/f3.parquet", size: 10, partition: map[int]any{}},
{path: "data/f4.parquet", size: 10, partition: map[int]any{}},
})
bins := splitOversizedBin(compactionBin{
PartitionKey: "__unpartitioned__",
Partition: map[int]any{},
Entries: entries,
TotalSize: 180,
}, targetSize, minFiles)
if len(bins) == 0 {
t.Fatal("expected at least one valid bin")
}
for i, bin := range bins {
if bin.TotalSize > targetSize {
t.Fatalf("bin %d exceeds target size: got %d want <= %d", i, bin.TotalSize, targetSize)
}
if len(bin.Entries) < minFiles {
t.Fatalf("bin %d does not meet minFiles: got %d want >= %d", i, len(bin.Entries), minFiles)
}
}
}
func TestSplitOversizedBinDropsImpossibleRunts(t *testing.T) {
targetSize := int64(100)
minFiles := 2
entries := makeTestEntries(t, []testEntrySpec{
{path: "data/f1.parquet", size: 60, partition: map[int]any{}},
{path: "data/f2.parquet", size: 60, partition: map[int]any{}},
{path: "data/f3.parquet", size: 60, partition: map[int]any{}},
})
bins := splitOversizedBin(compactionBin{
PartitionKey: "__unpartitioned__",
Partition: map[int]any{},
Entries: entries,
TotalSize: 180,
}, targetSize, minFiles)
if len(bins) != 0 {
t.Fatalf("expected no valid bins, got %d", len(bins))
}
}
type testEntrySpec struct {
path string
size int64
partition map[int]any
partitionSpec *iceberg.PartitionSpec
specID int32 // partition spec ID; 0 uses UnpartitionedSpec
}
func buildTestDataFile(t *testing.T, spec testEntrySpec) iceberg.DataFile {
t.Helper()
partitionSpec := iceberg.UnpartitionedSpec
if spec.partitionSpec != nil {
partitionSpec = spec.partitionSpec
} else if len(spec.partition) > 0 {
t.Fatalf("partition spec is required for partitioned test entry %s", spec.path)
}
dfBuilder, err := iceberg.NewDataFileBuilder(
*partitionSpec,
iceberg.EntryContentData,
spec.path,
iceberg.ParquetFile,
spec.partition,
nil, nil,
1, // recordCount (must be > 0)
spec.size,
)
if err != nil {
t.Fatalf("failed to build data file %s: %v", spec.path, err)
}
return dfBuilder.Build()
}
func makeManifestEntriesWithSnapshot(
t *testing.T,
specs []testEntrySpec,
snapshotID int64,
status iceberg.ManifestEntryStatus,
) []iceberg.ManifestEntry {
t.Helper()
entries := make([]iceberg.ManifestEntry, 0, len(specs))
for _, spec := range specs {
entries = append(entries, iceberg.NewManifestEntry(
status,
&snapshotID,
nil, nil,
buildTestDataFile(t, spec),
))
}
return entries
}
// makeTestEntries creates manifest entries using the default unpartitioned
// spec. For multi-spec testing, use makeTestEntriesWithSpec instead.
func makeTestEntries(t *testing.T, specs []testEntrySpec) []iceberg.ManifestEntry {
t.Helper()
return makeManifestEntriesWithSnapshot(t, specs, 1, iceberg.EntryStatusADDED)
}
// makeTestEntriesWithSpec creates manifest entries using specific partition specs.
// Each spec in the specs slice can specify a specID; the entry is built using
// a PartitionSpec with that ID.
func makeTestEntriesWithSpec(t *testing.T, specs []testEntrySpec, partSpecs map[int32]iceberg.PartitionSpec) []iceberg.ManifestEntry {
t.Helper()
entries := make([]iceberg.ManifestEntry, 0, len(specs))
for _, s := range specs {
ps, ok := partSpecs[s.specID]
if !ok {
t.Fatalf("spec ID %d not found in partSpecs map", s.specID)
}
dfBuilder, err := iceberg.NewDataFileBuilder(
ps,
iceberg.EntryContentData,
s.path,
iceberg.ParquetFile,
s.partition,
nil, nil,
1,
s.size,
)
if err != nil {
t.Fatalf("failed to build data file %s: %v", s.path, err)
}
snapID := int64(1)
entry := iceberg.NewManifestEntry(iceberg.EntryStatusADDED, &snapID, nil, nil, dfBuilder.Build())
entries = append(entries, entry)
}
return entries
}
func TestBuildCompactionBinsMultipleSpecs(t *testing.T) {
targetSize := int64(256 * 1024 * 1024)
minFiles := 2
// Two partition specs with different IDs
spec0 := iceberg.NewPartitionSpecID(0)
spec1 := iceberg.NewPartitionSpecID(1)
partSpecs := map[int32]iceberg.PartitionSpec{
0: spec0,
1: spec1,
}
// Entries from two different specs with same partition values should go
// to separate bins.
entries := makeTestEntriesWithSpec(t, []testEntrySpec{
{path: "data/s0-f1.parquet", size: 1024, partition: map[int]any{}, specID: 0},
{path: "data/s0-f2.parquet", size: 2048, partition: map[int]any{}, specID: 0},
{path: "data/s1-f1.parquet", size: 1024, partition: map[int]any{}, specID: 1},
{path: "data/s1-f2.parquet", size: 2048, partition: map[int]any{}, specID: 1},
}, partSpecs)
bins := buildCompactionBins(entries, targetSize, minFiles)
if len(bins) != 2 {
t.Fatalf("expected 2 bins (one per spec), got %d", len(bins))
}
// Verify each bin has entries from only one spec
specsSeen := make(map[int32]bool)
for _, bin := range bins {
specsSeen[bin.SpecID] = true
for _, entry := range bin.Entries {
if entry.DataFile().SpecID() != bin.SpecID {
t.Errorf("bin specID=%d contains entry with specID=%d", bin.SpecID, entry.DataFile().SpecID())
}
}
}
if !specsSeen[0] || !specsSeen[1] {
t.Errorf("expected bins for spec 0 and 1, got specs %v", specsSeen)
}
}
func TestBuildCompactionBinsSingleSpec(t *testing.T) {
// Verify existing behavior: all entries with spec 0 still group correctly.
targetSize := int64(256 * 1024 * 1024)
minFiles := 2
spec0 := iceberg.NewPartitionSpecID(0)
partSpecs := map[int32]iceberg.PartitionSpec{0: spec0}
entries := makeTestEntriesWithSpec(t, []testEntrySpec{
{path: "data/f1.parquet", size: 1024, partition: map[int]any{}, specID: 0},
{path: "data/f2.parquet", size: 2048, partition: map[int]any{}, specID: 0},
{path: "data/f3.parquet", size: 4096, partition: map[int]any{}, specID: 0},
}, partSpecs)
bins := buildCompactionBins(entries, targetSize, minFiles)
if len(bins) != 1 {
t.Fatalf("expected 1 bin, got %d", len(bins))
}
if len(bins[0].Entries) != 3 {
t.Errorf("expected 3 entries in bin, got %d", len(bins[0].Entries))
}
if bins[0].SpecID != 0 {
t.Errorf("expected specID=0, got %d", bins[0].SpecID)
}
}
func TestParseConfigApplyDeletes(t *testing.T) {
// Default: true
config := ParseConfig(nil)
if !config.ApplyDeletes {
t.Error("expected ApplyDeletes=true by default")
}
// Explicit false
config = ParseConfig(map[string]*plugin_pb.ConfigValue{
"apply_deletes": {Kind: &plugin_pb.ConfigValue_BoolValue{BoolValue: false}},
})
if config.ApplyDeletes {
t.Error("expected ApplyDeletes=false when explicitly set")
}
// String "false"
config = ParseConfig(map[string]*plugin_pb.ConfigValue{
"apply_deletes": {Kind: &plugin_pb.ConfigValue_StringValue{StringValue: "false"}},
})
if config.ApplyDeletes {
t.Error("expected ApplyDeletes=false when set via string 'false'")
}
}
func TestCollectPositionDeletes(t *testing.T) {
fs, client := startFakeFiler(t)
// Create a position delete Parquet file with file_path + pos columns
type posDeleteRow struct {
FilePath string `parquet:"file_path"`
Pos int64 `parquet:"pos"`
}
var buf bytes.Buffer
writer := parquet.NewWriter(&buf, parquet.SchemaOf(new(posDeleteRow)))
rows := []posDeleteRow{
{"data/file1.parquet", 0},
{"data/file1.parquet", 5},
{"data/file1.parquet", 10},
{"data/file2.parquet", 3},
}
for _, r := range rows {
if err := writer.Write(&r); err != nil {
t.Fatalf("write pos delete row: %v", err)
}
}
if err := writer.Close(); err != nil {
t.Fatalf("close pos delete writer: %v", err)
}
// Store in fake filer
dataDir := "/buckets/test-bucket/ns/tbl/data"
fs.putEntry(dataDir, "pos-delete-1.parquet", &filer_pb.Entry{
Name: "pos-delete-1.parquet",
Content: buf.Bytes(),
})
// Create a manifest entry for the position delete file
spec := *iceberg.UnpartitionedSpec
dfBuilder, err := iceberg.NewDataFileBuilder(
spec,
iceberg.EntryContentPosDeletes,
"data/pos-delete-1.parquet",
iceberg.ParquetFile,
map[int]any{},
nil, nil,
int64(len(rows)),
int64(buf.Len()),
)
if err != nil {
t.Fatalf("build pos delete data file: %v", err)
}
snapID := int64(1)
entry := iceberg.NewManifestEntry(iceberg.EntryStatusADDED, &snapID, nil, nil, dfBuilder.Build())
result, err := collectPositionDeletes(context.Background(), client, "test-bucket", "ns/tbl", []iceberg.ManifestEntry{entry})
if err != nil {
t.Fatalf("collectPositionDeletes: %v", err)
}
// Verify results
if len(result["data/file1.parquet"]) != 3 {
t.Errorf("expected 3 positions for file1, got %d", len(result["data/file1.parquet"]))
}
if len(result["data/file2.parquet"]) != 1 {
t.Errorf("expected 1 position for file2, got %d", len(result["data/file2.parquet"]))
}
// Verify sorted
positions := result["data/file1.parquet"]
for i := 1; i < len(positions); i++ {
if positions[i] <= positions[i-1] {
t.Errorf("positions not sorted: %v", positions)
break
}
}
}
func TestCollectEqualityDeletes(t *testing.T) {
fs, client := startFakeFiler(t)
// Create an equality delete Parquet file with id + name columns
type eqDeleteRow struct {
ID int64 `parquet:"id"`
Name string `parquet:"name"`
}
var buf bytes.Buffer
writer := parquet.NewWriter(&buf, parquet.SchemaOf(new(eqDeleteRow)))
deleteRows := []eqDeleteRow{
{1, "alice"},
{2, "bob"},
{3, "charlie"},
}
for _, r := range deleteRows {
if err := writer.Write(&r); err != nil {
t.Fatalf("write eq delete row: %v", err)
}
}
if err := writer.Close(); err != nil {
t.Fatalf("close eq delete writer: %v", err)
}
dataDir := "/buckets/test-bucket/ns/tbl/data"
fs.putEntry(dataDir, "eq-delete-1.parquet", &filer_pb.Entry{
Name: "eq-delete-1.parquet",
Content: buf.Bytes(),
})
// Create manifest entry with equality field IDs
schema := newTestSchema() // has field 1=id, 2=name
spec := *iceberg.UnpartitionedSpec
dfBuilder, err := iceberg.NewDataFileBuilder(
spec,
iceberg.EntryContentEqDeletes,
"data/eq-delete-1.parquet",
iceberg.ParquetFile,
map[int]any{},
nil, nil,
int64(len(deleteRows)),
int64(buf.Len()),
)
if err != nil {
t.Fatalf("build eq delete data file: %v", err)
}
dfBuilder.EqualityFieldIDs([]int{1, 2}) // id + name
snapID := int64(1)
entry := iceberg.NewManifestEntry(iceberg.EntryStatusADDED, &snapID, nil, nil, dfBuilder.Build())
groups, err := collectEqualityDeletes(context.Background(), client, "test-bucket", "ns/tbl", []iceberg.ManifestEntry{entry}, schema)
if err != nil {
t.Fatalf("collectEqualityDeletes: %v", err)
}
if len(groups) != 1 {
t.Fatalf("expected 1 equality delete group, got %d", len(groups))
}
if len(groups[0].Keys) != 3 {
t.Errorf("expected 3 equality delete keys, got %d", len(groups[0].Keys))
}
if len(groups[0].FieldIDs) != 2 {
t.Errorf("expected 2 field IDs, got %d", len(groups[0].FieldIDs))
}
}
func TestMergeParquetFilesWithPositionDeletes(t *testing.T) {
fs, client := startFakeFiler(t)
// Create two data files with known rows
type dataRow struct {
ID int64 `parquet:"id"`
Name string `parquet:"name"`
}
writeDataFile := func(name string, rows []dataRow) {
var buf bytes.Buffer
w := parquet.NewWriter(&buf, parquet.SchemaOf(new(dataRow)))
for _, r := range rows {
if err := w.Write(&r); err != nil {
t.Fatalf("write row: %v", err)
}
}
if err := w.Close(); err != nil {
t.Fatalf("close: %v", err)
}
dataDir := "/buckets/test-bucket/ns/tbl/data"
fs.putEntry(dataDir, name, &filer_pb.Entry{
Name: name,
Content: buf.Bytes(),
})
}
writeDataFile("file1.parquet", []dataRow{
{1, "alice"}, {2, "bob"}, {3, "charlie"}, {4, "dave"}, {5, "eve"},
})
writeDataFile("file2.parquet", []dataRow{
{6, "frank"}, {7, "grace"},
})
spec := *iceberg.UnpartitionedSpec
makeEntry := func(path string, size int64) iceberg.ManifestEntry {
dfb, err := iceberg.NewDataFileBuilder(spec, iceberg.EntryContentData, path, iceberg.ParquetFile, map[int]any{}, nil, nil, 1, size)
if err != nil {
t.Fatalf("build: %v", err)
}
snapID := int64(1)
return iceberg.NewManifestEntry(iceberg.EntryStatusADDED, &snapID, nil, nil, dfb.Build())
}
entries := []iceberg.ManifestEntry{
makeEntry("data/file1.parquet", 1024),
makeEntry("data/file2.parquet", 512),
}
// Delete rows 1 (bob) and 3 (dave) from file1
posDeletes := map[string][]int64{
"data/file1.parquet": {1, 3},
}
merged, count, err := mergeParquetFiles(
context.Background(), client, "test-bucket", "ns/tbl",
entries, posDeletes, nil, nil,
)
if err != nil {
t.Fatalf("mergeParquetFiles: %v", err)
}
// Original: 5 + 2 = 7 rows, deleted 2 = 5 rows
if count != 5 {
t.Errorf("expected 5 rows after position deletes, got %d", count)
}
// Verify merged output is valid Parquet
reader := parquet.NewReader(bytes.NewReader(merged))
defer reader.Close()
var outputRows []dataRow
for {
var r dataRow
err := reader.Read(&r)
if err == io.EOF {
break
}
if err != nil {
t.Fatalf("read merged row: %v", err)
}
outputRows = append(outputRows, r)
}
if len(outputRows) != 5 {
t.Fatalf("expected 5 output rows, got %d", len(outputRows))
}
// Verify bob (id=2) and dave (id=4) are NOT in output
for _, r := range outputRows {
if r.ID == 2 || r.ID == 4 {
t.Errorf("row with id=%d should have been deleted", r.ID)
}
}
}
func TestMergeParquetFilesWithEqualityDeletes(t *testing.T) {
fs, client := startFakeFiler(t)
type dataRow struct {
ID int64 `parquet:"id"`
Name string `parquet:"name"`
}
var buf bytes.Buffer
w := parquet.NewWriter(&buf, parquet.SchemaOf(new(dataRow)))
dataRows := []dataRow{
{1, "alice"}, {2, "bob"}, {3, "charlie"}, {4, "dave"},
}
for _, r := range dataRows {
if err := w.Write(&r); err != nil {
t.Fatalf("write: %v", err)
}
}
if err := w.Close(); err != nil {
t.Fatalf("close: %v", err)
}
dataDir := "/buckets/test-bucket/ns/tbl/data"
fs.putEntry(dataDir, "data1.parquet", &filer_pb.Entry{
Name: "data1.parquet",
Content: buf.Bytes(),
})
spec := *iceberg.UnpartitionedSpec
dfb, err := iceberg.NewDataFileBuilder(spec, iceberg.EntryContentData, "data/data1.parquet", iceberg.ParquetFile, map[int]any{}, nil, nil, 4, int64(buf.Len()))
if err != nil {
t.Fatalf("build: %v", err)
}
snapID := int64(1)
entries := []iceberg.ManifestEntry{
iceberg.NewManifestEntry(iceberg.EntryStatusADDED, &snapID, nil, nil, dfb.Build()),
}
// Build equality deletes via the collect pipeline so keys match the
// encoding used by mergeParquetFiles.
schema := newTestSchema() // field 1=id, field 2=name
type eqDeleteRow struct {
ID int64 `parquet:"id"`
Name string `parquet:"name"`
}
var eqBuf bytes.Buffer
eqWriter := parquet.NewWriter(&eqBuf, parquet.SchemaOf(new(eqDeleteRow)))
for _, r := range []eqDeleteRow{{0, "bob"}, {0, "dave"}} {
if err := eqWriter.Write(&r); err != nil {
t.Fatalf("write eq delete: %v", err)
}
}
if err := eqWriter.Close(); err != nil {
t.Fatalf("close eq writer: %v", err)
}
eqDataDir := "/buckets/test-bucket/ns/tbl/data"
fs.putEntry(eqDataDir, "eq-del.parquet", &filer_pb.Entry{
Name: "eq-del.parquet", Content: eqBuf.Bytes(),
})
eqDfb, err := iceberg.NewDataFileBuilder(spec, iceberg.EntryContentEqDeletes,
"data/eq-del.parquet", iceberg.ParquetFile, map[int]any{}, nil, nil, 2, int64(eqBuf.Len()))
if err != nil {
t.Fatalf("build eq delete: %v", err)
}
eqDfb.EqualityFieldIDs([]int{2}) // field 2 = name
eqEntry := iceberg.NewManifestEntry(iceberg.EntryStatusADDED, &snapID, nil, nil, eqDfb.Build())
eqGroups, err := collectEqualityDeletes(context.Background(), client, "test-bucket", "ns/tbl",
[]iceberg.ManifestEntry{eqEntry}, schema)
if err != nil {
t.Fatalf("collectEqualityDeletes: %v", err)
}
merged, count, err := mergeParquetFiles(
context.Background(), client, "test-bucket", "ns/tbl",
entries, nil, eqGroups, schema,
)
if err != nil {
t.Fatalf("mergeParquetFiles: %v", err)
}
if count != 2 {
t.Errorf("expected 2 rows after equality deletes, got %d", count)
}
reader := parquet.NewReader(bytes.NewReader(merged))
defer reader.Close()
var outputRows []dataRow
for {
var r dataRow
err := reader.Read(&r)
if err == io.EOF {
break
}
if err != nil {
t.Fatalf("read: %v", err)
}
outputRows = append(outputRows, r)
}
for _, r := range outputRows {
if r.Name == "bob" || r.Name == "dave" {
t.Errorf("row %q should have been deleted", r.Name)
}
}
}
func TestDetectNilRequest(t *testing.T) {
handler := NewHandler(nil)
err := handler.Detect(nil, nil, nil)
if err == nil {
t.Error("expected error for nil request")
}
}
func TestExecuteNilRequest(t *testing.T) {
handler := NewHandler(nil)
err := handler.Execute(nil, nil, nil)
if err == nil {
t.Error("expected error for nil request")
}
}
// ---------------------------------------------------------------------------
// Test helpers
// ---------------------------------------------------------------------------
// buildTestMetadata creates a minimal Iceberg metadata for testing.
// When snapshots is nil or empty, the metadata has no snapshots.
func buildTestMetadata(t *testing.T, snapshots []table.Snapshot) table.Metadata {
t.Helper()
schema := newTestSchema()
meta, err := table.NewMetadata(schema, iceberg.UnpartitionedSpec, table.UnsortedSortOrder, "s3://test-bucket/test-table", nil)
if err != nil {
t.Fatalf("failed to create test metadata: %v", err)
}
if len(snapshots) == 0 {
return meta
}
builder, err := table.MetadataBuilderFromBase(meta, "s3://test-bucket/test-table")
if err != nil {
t.Fatalf("failed to create metadata builder: %v", err)
}
var lastSnapID int64
for _, snap := range snapshots {
s := snap // copy
if err := builder.AddSnapshot(&s); err != nil {
t.Fatalf("failed to add snapshot %d: %v", snap.SnapshotID, err)
}
lastSnapID = snap.SnapshotID
}
if err := builder.SetSnapshotRef(table.MainBranch, lastSnapID, table.BranchRef); err != nil {
t.Fatalf("failed to set snapshot ref: %v", err)
}
result, err := builder.Build()
if err != nil {
t.Fatalf("failed to build metadata: %v", err)
}
return result
}
func newTestSchema() *iceberg.Schema {
return iceberg.NewSchema(0,
iceberg.NestedField{ID: 1, Type: iceberg.PrimitiveTypes.Int64, Name: "id", Required: true},
iceberg.NestedField{ID: 2, Type: iceberg.PrimitiveTypes.String, Name: "name", Required: false},
)
}