diff --git a/weed/admin/dash/config_persistence.go b/weed/admin/dash/config_persistence.go index 936488356..152e2c7bc 100644 --- a/weed/admin/dash/config_persistence.go +++ b/weed/admin/dash/config_persistence.go @@ -522,6 +522,16 @@ func (cp *ConfigPersistence) LoadBalanceTaskPolicy() (*worker_pb.TaskPolicy, err return nil, fmt.Errorf("failed to unmarshal balance task configuration") } +// defaultTableMaintenanceTaskPolicy returns the default table maintenance task policy +func defaultTableMaintenanceTaskPolicy() *worker_pb.TaskPolicy { + return &worker_pb.TaskPolicy{ + Enabled: true, + MaxConcurrent: 2, + RepeatIntervalSeconds: 30 * 60, // 30 minutes + CheckIntervalSeconds: 30 * 60, // 30 minutes + } +} + // SaveTableMaintenanceTaskPolicy saves table maintenance task policy to protobuf file func (cp *ConfigPersistence) SaveTableMaintenanceTaskPolicy(policy *worker_pb.TaskPolicy) error { return cp.saveTaskConfig(TableMaintenanceConfigFile, policy) @@ -531,12 +541,7 @@ func (cp *ConfigPersistence) SaveTableMaintenanceTaskPolicy(policy *worker_pb.Ta func (cp *ConfigPersistence) LoadTableMaintenanceTaskPolicy() (*worker_pb.TaskPolicy, error) { if cp.dataDir == "" { // Return default policy if no data directory - return &worker_pb.TaskPolicy{ - Enabled: true, - MaxConcurrent: 2, - RepeatIntervalSeconds: 30 * 60, // 30 minutes - CheckIntervalSeconds: 30 * 60, // 30 minutes - }, nil + return defaultTableMaintenanceTaskPolicy(), nil } confDir := filepath.Join(cp.dataDir, ConfigSubdir) @@ -545,12 +550,7 @@ func (cp *ConfigPersistence) LoadTableMaintenanceTaskPolicy() (*worker_pb.TaskPo // Check if file exists if _, err := os.Stat(configPath); os.IsNotExist(err) { // Return default policy if file doesn't exist - return &worker_pb.TaskPolicy{ - Enabled: true, - MaxConcurrent: 2, - RepeatIntervalSeconds: 30 * 60, // 30 minutes - CheckIntervalSeconds: 30 * 60, // 30 minutes - }, nil + return defaultTableMaintenanceTaskPolicy(), nil } // Read file diff --git a/weed/worker/tasks/table_maintenance/detection.go b/weed/worker/tasks/table_maintenance/detection.go index b251b5918..71a9111fb 100644 --- a/weed/worker/tasks/table_maintenance/detection.go +++ b/weed/worker/tasks/table_maintenance/detection.go @@ -1,6 +1,7 @@ package table_maintenance import ( + "fmt" "time" "github.com/seaweedfs/seaweedfs/weed/glog" @@ -173,12 +174,16 @@ func (s *TableMaintenanceScanner) needsOrphanCleanup(table TableInfo) bool { return table.DeletedFileCount > 0 } -// CreateTaskParams creates task parameters for a maintenance job +// CreateTaskParams creates task parameters for a maintenance job. +// The job type is encoded in the Node field as "job_type:table_path" format +// to allow the worker to know which maintenance operation to perform. func CreateTaskParams(job *TableMaintenanceJob) *worker_pb.TaskParams { + // Encode job type in the node field: "job_type:table_path" + nodeValue := fmt.Sprintf("%s:%s", job.JobType, job.TablePath) return &worker_pb.TaskParams{ Sources: []*worker_pb.TaskSource{ { - Node: job.TablePath, + Node: nodeValue, }, }, VolumeId: 0, // Not volume-specific diff --git a/weed/worker/tasks/table_maintenance/iceberg_ops.go b/weed/worker/tasks/table_maintenance/iceberg_ops.go index a93df332c..fb21d510e 100644 --- a/weed/worker/tasks/table_maintenance/iceberg_ops.go +++ b/weed/worker/tasks/table_maintenance/iceberg_ops.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "fmt" + "io" "path" "sort" "strings" @@ -143,7 +144,10 @@ func (mc *TableMaintenanceContext) listFilesWithPattern(ctx context.Context, dir for { entry, err := resp.Recv() if err != nil { - break + if err == io.EOF { + break + } + return nil, err } name := entry.Entry.Name if strings.HasPrefix(name, prefix) && strings.HasSuffix(name, suffix) { @@ -169,7 +173,10 @@ func (mc *TableMaintenanceContext) listAllFiles(ctx context.Context, dir string) for { entry, err := resp.Recv() if err != nil { - break + if err == io.EOF { + break + } + return nil, err } if entry.Entry.IsDirectory { // Recurse into subdirectory @@ -187,7 +194,10 @@ func (mc *TableMaintenanceContext) listAllFiles(ctx context.Context, dir string) return files, nil } -// readFileContent reads the content of a file +// readFileContent reads the content of a file. +// Note: This implementation handles inline content only. For chunked files (large metadata files), +// a full implementation would need to use the filer read interface to assemble chunks from volume servers. +// Iceberg metadata files are typically small (KB-MB range) and fit in inline content. func (mc *TableMaintenanceContext) readFileContent(ctx context.Context, filePath string) ([]byte, error) { dir, name := splitPath(filePath) resp, err := filer_pb.LookupEntry(ctx, mc.FilerClient, &filer_pb.LookupDirectoryEntryRequest{ @@ -199,14 +209,14 @@ func (mc *TableMaintenanceContext) readFileContent(ctx context.Context, filePath } // For small metadata files, the content may be inline - // For larger files, we need to read chunks if len(resp.Entry.Content) > 0 { return resp.Entry.Content, nil } - // For files with chunks, we need to read from volume servers - // This is a simplified implementation - in production, use the full chunk reading logic - return nil, fmt.Errorf("file %s requires chunk reading (not inline)", filePath) + // For chunked files, we need to read from volume servers using the chunk reading API. + // This requires access to volume server clients which would be passed via context. + // For now, return an error - the caller should use the filer's HTTP read API for large files. + return nil, fmt.Errorf("file %s requires chunk reading - use filer HTTP API for large files", filePath) } // deleteFile deletes a single file @@ -220,7 +230,12 @@ func (mc *TableMaintenanceContext) deleteFile(ctx context.Context, filePath stri return err } -// GetReferencedFiles returns all files referenced by the current table metadata +// GetReferencedFiles returns all files referenced by the current table metadata. +// IMPORTANT: This is a partial implementation. Full implementation requires: +// 1. Parsing Avro manifest list files to extract manifest file paths +// 2. Parsing Avro manifest files to extract data file paths +// Currently only marks manifest list files as referenced. DO NOT use for orphan deletion +// until manifest parsing is implemented via an Avro library (e.g., goavro). func (mc *TableMaintenanceContext) GetReferencedFiles(ctx context.Context, metadata *IcebergTableMetadata) (map[string]bool, error) { referenced := make(map[string]bool) @@ -230,10 +245,12 @@ func (mc *TableMaintenanceContext) GetReferencedFiles(ctx context.Context, metad referenced[snapshot.ManifestList] = true } - // TODO: Parse manifest list to get individual manifest files - // TODO: Parse manifests to get data files - // This requires reading Avro files, which is complex - // For now, we mark the manifest list as referenced + // NOTE: Full implementation would: + // 1. Read the manifest list Avro file + // 2. Extract all manifest file paths from it + // 3. For each manifest, read the Avro file + // 4. Extract all data file paths from the manifest entries + // This requires goavro or similar library for Avro deserialization } return referenced, nil @@ -269,25 +286,31 @@ func (mc *TableMaintenanceContext) GetExpiredSnapshots(metadata *IcebergTableMet // GetSmallDataFiles returns data files smaller than the target size func (mc *TableMaintenanceContext) GetSmallDataFiles(ctx context.Context, targetSizeBytes int64) ([]string, error) { - // List all files in the data directory - dataFiles, err := mc.listAllFiles(ctx, mc.DataDir) + var smallFiles []string + + // List all files in the data directory with their size information + resp, err := mc.FilerClient.ListEntries(ctx, &filer_pb.ListEntriesRequest{ + Directory: mc.DataDir, + Limit: 10000, + }) if err != nil { return nil, err } - var smallFiles []string - for _, file := range dataFiles { - dir, name := splitPath(file) - resp, err := filer_pb.LookupEntry(ctx, mc.FilerClient, &filer_pb.LookupDirectoryEntryRequest{ - Directory: dir, - Name: name, - }) + for { + entry, err := resp.Recv() if err != nil { - continue + if err == io.EOF { + break + } + return nil, err } - if resp.Entry.Attributes != nil && resp.Entry.Attributes.FileSize < uint64(targetSizeBytes) { - smallFiles = append(smallFiles, file) + if !entry.Entry.IsDirectory { + // Use the FileSize from the Entry attributes directly + if entry.Entry.Attributes != nil && entry.Entry.Attributes.FileSize < uint64(targetSizeBytes) { + smallFiles = append(smallFiles, path.Join(mc.DataDir, entry.Entry.Name)) + } } } diff --git a/weed/worker/tasks/table_maintenance/register.go b/weed/worker/tasks/table_maintenance/register.go index a1cfb9e70..3cf178b3e 100644 --- a/weed/worker/tasks/table_maintenance/register.go +++ b/weed/worker/tasks/table_maintenance/register.go @@ -2,6 +2,7 @@ package table_maintenance import ( "fmt" + "strings" "time" "github.com/seaweedfs/seaweedfs/weed/glog" @@ -50,13 +51,30 @@ func RegisterTableMaintenanceTask() { tablePath := params.Sources[0].Node tableBucket := params.Collection - // Parse job type from params if available - // TODO: Define TableMaintenanceTaskParams in protobuf to pass job type explicitly - // For now, default to compaction. In production, the job type would be determined - // by the table scanner based on the table's maintenance needs (see detection.go) + // Determine job type from source node format: + // Format: "job_type:table_path" (e.g., "compaction:/table-buckets/bucket/ns/table") + // If no prefix, default to compaction for backward compatibility. + // NOTE: A proper implementation would define TableMaintenanceTaskParams in + // weed/pb/worker.proto to pass job details explicitly, similar to VacuumTaskParams. jobType := JobTypeCompaction + if colonIdx := strings.Index(tablePath, ":"); colonIdx > 0 && colonIdx < len(tablePath)-1 { + jobTypeStr := tablePath[:colonIdx] + tablePath = tablePath[colonIdx+1:] + switch TableMaintenanceJobType(jobTypeStr) { + case JobTypeCompaction: + jobType = JobTypeCompaction + case JobTypeSnapshotExpiration: + jobType = JobTypeSnapshotExpiration + case JobTypeOrphanCleanup: + jobType = JobTypeOrphanCleanup + case JobTypeManifestRewrite: + jobType = JobTypeManifestRewrite + default: + glog.Warningf("Unknown job type '%s', defaulting to compaction", jobTypeStr) + } + } - // Create a default maintenance job + // Create the maintenance job job := &TableMaintenanceJob{ JobType: jobType, TableBucket: tableBucket,