diff --git a/weed/command/backup.go b/weed/command/backup.go index 2f3296bf4..5510a0462 100644 --- a/weed/command/backup.go +++ b/weed/command/backup.go @@ -4,6 +4,8 @@ import ( "context" "fmt" + "google.golang.org/grpc" + "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/security" @@ -66,6 +68,92 @@ var cmdBackup = &Command{ `, } +// backupFromLocation attempts to backup a volume from a specific volume server location. +// Returns (error, isFatal) where isFatal=true means the error is due to invalid user input +// and should not be retried with other locations. +func backupFromLocation(volumeServer pb.ServerAddress, grpcDialOption grpc.DialOption, vid needle.VolumeId) (error, bool) { + stats, err := operation.GetVolumeSyncStatus(volumeServer, grpcDialOption, uint32(vid)) + if err != nil { + return fmt.Errorf("getting volume status: %w", err), false + } + + // Parse TTL + var ttl *needle.TTL + if *s.ttl != "" { + ttl, err = needle.ReadTTL(*s.ttl) + if err != nil { + // User-provided TTL is invalid - this is fatal + return fmt.Errorf("invalid user-provided ttl %s: %w", *s.ttl, err), true + } + } else { + ttl, err = needle.ReadTTL(stats.Ttl) + if err != nil { + return fmt.Errorf("parsing ttl %s from stats: %w", stats.Ttl, err), false + } + } + + // Parse replication + var replication *super_block.ReplicaPlacement + if *s.replication != "" { + replication, err = super_block.NewReplicaPlacementFromString(*s.replication) + if err != nil { + // User-provided replication is invalid - this is fatal + return fmt.Errorf("invalid user-provided replication %s: %w", *s.replication, err), true + } + } else { + replication, err = super_block.NewReplicaPlacementFromString(stats.Replication) + if err != nil { + return fmt.Errorf("parsing replication %s from stats: %w", stats.Replication, err), false + } + } + + ver := needle.Version(stats.Version) + + // Create or load the volume + v, err := storage.NewVolume(util.ResolvePath(*s.dir), util.ResolvePath(*s.dir), *s.collection, vid, storage.NeedleMapInMemory, replication, ttl, 0, ver, 0, 0) + if err != nil { + return fmt.Errorf("creating or reading volume: %w", err), false + } + defer v.Close() + + // Handle compaction if needed + if v.SuperBlock.CompactionRevision < uint16(stats.CompactRevision) { + if err = v.Compact2(0, 0, nil); err != nil { + return fmt.Errorf("compacting volume: %w", err), false + } + if err = v.CommitCompact(); err != nil { + return fmt.Errorf("committing compaction: %w", err), false + } + v.SuperBlock.CompactionRevision = uint16(stats.CompactRevision) + if _, err = v.DataBackend.WriteAt(v.SuperBlock.Bytes(), 0); err != nil { + return fmt.Errorf("writing superblock: %w", err), false + } + } + + datSize, _, _ := v.FileStat() + + // If local volume is larger than remote, recreate it + if datSize > stats.TailOffset { + if err := v.Destroy(false); err != nil { + return fmt.Errorf("destroying volume: %w", err), false + } + v.Close() + // recreate an empty volume + v, err = storage.NewVolume(util.ResolvePath(*s.dir), util.ResolvePath(*s.dir), *s.collection, vid, storage.NeedleMapInMemory, replication, ttl, 0, ver, 0, 0) + if err != nil { + return fmt.Errorf("recreating volume: %w", err), false + } + defer v.Close() + } + + // Perform the incremental backup + if err := v.IncrementalBackup(volumeServer, grpcDialOption); err != nil { + return fmt.Errorf("incremental backup: %w", err), false + } + + return nil, false +} + func runBackup(cmd *Command, args []string) bool { util.LoadSecurityConfiguration() @@ -99,105 +187,18 @@ func runBackup(cmd *Command, args []string) bool { volumeServer := location.ServerAddress() fmt.Printf("Attempting to backup volume %d from location %d/%d: %s\n", vid, i+1, len(lookup.Locations), volumeServer) - stats, err := operation.GetVolumeSyncStatus(volumeServer, grpcDialOption, uint32(vid)) + err, isFatal := backupFromLocation(volumeServer, grpcDialOption, vid) if err != nil { - fmt.Printf("Error getting volume %d status from %s: %v\n", vid, volumeServer, err) + fmt.Printf("Error backing up volume %d from %s: %v\n", vid, volumeServer, err) lastErr = err - continue - } - - var ttl *needle.TTL - if *s.ttl != "" { - ttl, err = needle.ReadTTL(*s.ttl) - if err != nil { - fmt.Printf("Error generate volume %d ttl %s: %v\n", vid, *s.ttl, err) - return true - } - } else { - ttl, err = needle.ReadTTL(stats.Ttl) - if err != nil { - fmt.Printf("Error parsing volume %d ttl %s from %s: %v\n", vid, stats.Ttl, volumeServer, err) - lastErr = err - continue - } - } - var replication *super_block.ReplicaPlacement - if *s.replication != "" { - replication, err = super_block.NewReplicaPlacementFromString(*s.replication) - if err != nil { - fmt.Printf("Error generate volume %d replication %s : %v\n", vid, *s.replication, err) + // Check if this is a fatal user-input error + if isFatal { return true } - } else { - replication, err = super_block.NewReplicaPlacementFromString(stats.Replication) - if err != nil { - fmt.Printf("Error parsing volume %d replication %s from %s: %v\n", vid, stats.Replication, volumeServer, err) - lastErr = err - continue - } - } - - ver := needle.Version(stats.Version) - - v, err := storage.NewVolume(util.ResolvePath(*s.dir), util.ResolvePath(*s.dir), *s.collection, vid, storage.NeedleMapInMemory, replication, ttl, 0, ver, 0, 0) - if err != nil { - fmt.Printf("Error creating or reading from volume %d from %s: %v\n", vid, volumeServer, err) - lastErr = err - continue - } - - if v.SuperBlock.CompactionRevision < uint16(stats.CompactRevision) { - if err = v.Compact2(0, 0, nil); err != nil { - fmt.Printf("Compact Volume before synchronizing from %s: %v\n", volumeServer, err) - v.Close() - lastErr = err - continue - } - if err = v.CommitCompact(); err != nil { - fmt.Printf("Commit Compact before synchronizing from %s: %v\n", volumeServer, err) - v.Close() - lastErr = err - continue - } - v.SuperBlock.CompactionRevision = uint16(stats.CompactRevision) - if _, err = v.DataBackend.WriteAt(v.SuperBlock.Bytes(), 0); err != nil { - fmt.Printf("Error writing superblock from %s: %v\n", volumeServer, err) - v.Close() - lastErr = err - continue - } - } - - datSize, _, _ := v.FileStat() - - if datSize > stats.TailOffset { - // remove the old data - if err := v.Destroy(false); err != nil { - fmt.Printf("Error destroying volume %d on %s: %v\n", vid, volumeServer, err) - v.Close() - lastErr = err - continue - } - v.Close() // Close the old volume handle before creating a new one - // recreate an empty volume - v, err = storage.NewVolume(util.ResolvePath(*s.dir), util.ResolvePath(*s.dir), *s.collection, vid, storage.NeedleMapInMemory, replication, ttl, 0, ver, 0, 0) - if err != nil { - fmt.Printf("Error recreating volume %d from %s: %v\n", vid, volumeServer, err) - lastErr = err - continue - } - } - - // Try the incremental backup - if err := v.IncrementalBackup(volumeServer, grpcDialOption); err != nil { - fmt.Printf("Error synchronizing volume %d from %s: %v\n", vid, volumeServer, err) - v.Close() - lastErr = err continue } // Success! - v.Close() fmt.Printf("Successfully backed up volume %d from %s\n", vid, volumeServer) return true }