Browse Source

fix: address additional Gemini review feedback (round 2)

- High: Fix listFilesWithPattern and listAllFiles to properly check for io.EOF
  instead of silently swallowing all errors
- High: Fix GetSmallDataFiles N+1 query problem - use size from ListEntries
  response instead of making separate LookupEntry calls per file
- Critical: Add job type encoding in CreateTaskParams (format: 'job_type:path')
  and parsing in CreateTask to support all maintenance job types
- Medium: Extract duplicated default TaskPolicy into helper function
- Improve documentation for incomplete implementations (readFileContent,
  GetReferencedFiles) with clear notes about Avro library requirements
pull/8177/head
Chris Lu 2 weeks ago
parent
commit
121f54952d
  1. 24
      weed/admin/dash/config_persistence.go
  2. 9
      weed/worker/tasks/table_maintenance/detection.go
  3. 71
      weed/worker/tasks/table_maintenance/iceberg_ops.go
  4. 28
      weed/worker/tasks/table_maintenance/register.go

24
weed/admin/dash/config_persistence.go

@ -522,6 +522,16 @@ func (cp *ConfigPersistence) LoadBalanceTaskPolicy() (*worker_pb.TaskPolicy, err
return nil, fmt.Errorf("failed to unmarshal balance task configuration")
}
// defaultTableMaintenanceTaskPolicy returns the default table maintenance task policy
func defaultTableMaintenanceTaskPolicy() *worker_pb.TaskPolicy {
return &worker_pb.TaskPolicy{
Enabled: true,
MaxConcurrent: 2,
RepeatIntervalSeconds: 30 * 60, // 30 minutes
CheckIntervalSeconds: 30 * 60, // 30 minutes
}
}
// SaveTableMaintenanceTaskPolicy saves table maintenance task policy to protobuf file
func (cp *ConfigPersistence) SaveTableMaintenanceTaskPolicy(policy *worker_pb.TaskPolicy) error {
return cp.saveTaskConfig(TableMaintenanceConfigFile, policy)
@ -531,12 +541,7 @@ func (cp *ConfigPersistence) SaveTableMaintenanceTaskPolicy(policy *worker_pb.Ta
func (cp *ConfigPersistence) LoadTableMaintenanceTaskPolicy() (*worker_pb.TaskPolicy, error) {
if cp.dataDir == "" {
// Return default policy if no data directory
return &worker_pb.TaskPolicy{
Enabled: true,
MaxConcurrent: 2,
RepeatIntervalSeconds: 30 * 60, // 30 minutes
CheckIntervalSeconds: 30 * 60, // 30 minutes
}, nil
return defaultTableMaintenanceTaskPolicy(), nil
}
confDir := filepath.Join(cp.dataDir, ConfigSubdir)
@ -545,12 +550,7 @@ func (cp *ConfigPersistence) LoadTableMaintenanceTaskPolicy() (*worker_pb.TaskPo
// Check if file exists
if _, err := os.Stat(configPath); os.IsNotExist(err) {
// Return default policy if file doesn't exist
return &worker_pb.TaskPolicy{
Enabled: true,
MaxConcurrent: 2,
RepeatIntervalSeconds: 30 * 60, // 30 minutes
CheckIntervalSeconds: 30 * 60, // 30 minutes
}, nil
return defaultTableMaintenanceTaskPolicy(), nil
}
// Read file

9
weed/worker/tasks/table_maintenance/detection.go

@ -1,6 +1,7 @@
package table_maintenance
import (
"fmt"
"time"
"github.com/seaweedfs/seaweedfs/weed/glog"
@ -173,12 +174,16 @@ func (s *TableMaintenanceScanner) needsOrphanCleanup(table TableInfo) bool {
return table.DeletedFileCount > 0
}
// CreateTaskParams creates task parameters for a maintenance job
// CreateTaskParams creates task parameters for a maintenance job.
// The job type is encoded in the Node field as "job_type:table_path" format
// to allow the worker to know which maintenance operation to perform.
func CreateTaskParams(job *TableMaintenanceJob) *worker_pb.TaskParams {
// Encode job type in the node field: "job_type:table_path"
nodeValue := fmt.Sprintf("%s:%s", job.JobType, job.TablePath)
return &worker_pb.TaskParams{
Sources: []*worker_pb.TaskSource{
{
Node: job.TablePath,
Node: nodeValue,
},
},
VolumeId: 0, // Not volume-specific

71
weed/worker/tasks/table_maintenance/iceberg_ops.go

@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"io"
"path"
"sort"
"strings"
@ -143,7 +144,10 @@ func (mc *TableMaintenanceContext) listFilesWithPattern(ctx context.Context, dir
for {
entry, err := resp.Recv()
if err != nil {
break
if err == io.EOF {
break
}
return nil, err
}
name := entry.Entry.Name
if strings.HasPrefix(name, prefix) && strings.HasSuffix(name, suffix) {
@ -169,7 +173,10 @@ func (mc *TableMaintenanceContext) listAllFiles(ctx context.Context, dir string)
for {
entry, err := resp.Recv()
if err != nil {
break
if err == io.EOF {
break
}
return nil, err
}
if entry.Entry.IsDirectory {
// Recurse into subdirectory
@ -187,7 +194,10 @@ func (mc *TableMaintenanceContext) listAllFiles(ctx context.Context, dir string)
return files, nil
}
// readFileContent reads the content of a file
// readFileContent reads the content of a file.
// Note: This implementation handles inline content only. For chunked files (large metadata files),
// a full implementation would need to use the filer read interface to assemble chunks from volume servers.
// Iceberg metadata files are typically small (KB-MB range) and fit in inline content.
func (mc *TableMaintenanceContext) readFileContent(ctx context.Context, filePath string) ([]byte, error) {
dir, name := splitPath(filePath)
resp, err := filer_pb.LookupEntry(ctx, mc.FilerClient, &filer_pb.LookupDirectoryEntryRequest{
@ -199,14 +209,14 @@ func (mc *TableMaintenanceContext) readFileContent(ctx context.Context, filePath
}
// For small metadata files, the content may be inline
// For larger files, we need to read chunks
if len(resp.Entry.Content) > 0 {
return resp.Entry.Content, nil
}
// For files with chunks, we need to read from volume servers
// This is a simplified implementation - in production, use the full chunk reading logic
return nil, fmt.Errorf("file %s requires chunk reading (not inline)", filePath)
// For chunked files, we need to read from volume servers using the chunk reading API.
// This requires access to volume server clients which would be passed via context.
// For now, return an error - the caller should use the filer's HTTP read API for large files.
return nil, fmt.Errorf("file %s requires chunk reading - use filer HTTP API for large files", filePath)
}
// deleteFile deletes a single file
@ -220,7 +230,12 @@ func (mc *TableMaintenanceContext) deleteFile(ctx context.Context, filePath stri
return err
}
// GetReferencedFiles returns all files referenced by the current table metadata
// GetReferencedFiles returns all files referenced by the current table metadata.
// IMPORTANT: This is a partial implementation. Full implementation requires:
// 1. Parsing Avro manifest list files to extract manifest file paths
// 2. Parsing Avro manifest files to extract data file paths
// Currently only marks manifest list files as referenced. DO NOT use for orphan deletion
// until manifest parsing is implemented via an Avro library (e.g., goavro).
func (mc *TableMaintenanceContext) GetReferencedFiles(ctx context.Context, metadata *IcebergTableMetadata) (map[string]bool, error) {
referenced := make(map[string]bool)
@ -230,10 +245,12 @@ func (mc *TableMaintenanceContext) GetReferencedFiles(ctx context.Context, metad
referenced[snapshot.ManifestList] = true
}
// TODO: Parse manifest list to get individual manifest files
// TODO: Parse manifests to get data files
// This requires reading Avro files, which is complex
// For now, we mark the manifest list as referenced
// NOTE: Full implementation would:
// 1. Read the manifest list Avro file
// 2. Extract all manifest file paths from it
// 3. For each manifest, read the Avro file
// 4. Extract all data file paths from the manifest entries
// This requires goavro or similar library for Avro deserialization
}
return referenced, nil
@ -269,25 +286,31 @@ func (mc *TableMaintenanceContext) GetExpiredSnapshots(metadata *IcebergTableMet
// GetSmallDataFiles returns data files smaller than the target size
func (mc *TableMaintenanceContext) GetSmallDataFiles(ctx context.Context, targetSizeBytes int64) ([]string, error) {
// List all files in the data directory
dataFiles, err := mc.listAllFiles(ctx, mc.DataDir)
var smallFiles []string
// List all files in the data directory with their size information
resp, err := mc.FilerClient.ListEntries(ctx, &filer_pb.ListEntriesRequest{
Directory: mc.DataDir,
Limit: 10000,
})
if err != nil {
return nil, err
}
var smallFiles []string
for _, file := range dataFiles {
dir, name := splitPath(file)
resp, err := filer_pb.LookupEntry(ctx, mc.FilerClient, &filer_pb.LookupDirectoryEntryRequest{
Directory: dir,
Name: name,
})
for {
entry, err := resp.Recv()
if err != nil {
continue
if err == io.EOF {
break
}
return nil, err
}
if resp.Entry.Attributes != nil && resp.Entry.Attributes.FileSize < uint64(targetSizeBytes) {
smallFiles = append(smallFiles, file)
if !entry.Entry.IsDirectory {
// Use the FileSize from the Entry attributes directly
if entry.Entry.Attributes != nil && entry.Entry.Attributes.FileSize < uint64(targetSizeBytes) {
smallFiles = append(smallFiles, path.Join(mc.DataDir, entry.Entry.Name))
}
}
}

28
weed/worker/tasks/table_maintenance/register.go

@ -2,6 +2,7 @@ package table_maintenance
import (
"fmt"
"strings"
"time"
"github.com/seaweedfs/seaweedfs/weed/glog"
@ -50,13 +51,30 @@ func RegisterTableMaintenanceTask() {
tablePath := params.Sources[0].Node
tableBucket := params.Collection
// Parse job type from params if available
// TODO: Define TableMaintenanceTaskParams in protobuf to pass job type explicitly
// For now, default to compaction. In production, the job type would be determined
// by the table scanner based on the table's maintenance needs (see detection.go)
// Determine job type from source node format:
// Format: "job_type:table_path" (e.g., "compaction:/table-buckets/bucket/ns/table")
// If no prefix, default to compaction for backward compatibility.
// NOTE: A proper implementation would define TableMaintenanceTaskParams in
// weed/pb/worker.proto to pass job details explicitly, similar to VacuumTaskParams.
jobType := JobTypeCompaction
if colonIdx := strings.Index(tablePath, ":"); colonIdx > 0 && colonIdx < len(tablePath)-1 {
jobTypeStr := tablePath[:colonIdx]
tablePath = tablePath[colonIdx+1:]
switch TableMaintenanceJobType(jobTypeStr) {
case JobTypeCompaction:
jobType = JobTypeCompaction
case JobTypeSnapshotExpiration:
jobType = JobTypeSnapshotExpiration
case JobTypeOrphanCleanup:
jobType = JobTypeOrphanCleanup
case JobTypeManifestRewrite:
jobType = JobTypeManifestRewrite
default:
glog.Warningf("Unknown job type '%s', defaulting to compaction", jobTypeStr)
}
}
// Create a default maintenance job
// Create the maintenance job
job := &TableMaintenanceJob{
JobType: jobType,
TableBucket: tableBucket,

Loading…
Cancel
Save