Browse Source

refactor

pull/7465/head
chrislu 4 weeks ago
parent
commit
e53d698728
  1. 149
      weed/command/backup.go

149
weed/command/backup.go

@ -4,6 +4,8 @@ import (
"context" "context"
"fmt" "fmt"
"google.golang.org/grpc"
"github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/security" "github.com/seaweedfs/seaweedfs/weed/security"
@ -66,138 +68,137 @@ var cmdBackup = &Command{
`, `,
} }
func runBackup(cmd *Command, args []string) bool {
util.LoadSecurityConfiguration()
grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
// Backward compatibility: if -server is provided, use it
masterServer := *s.master
if *s.server != "" {
masterServer = *s.server
}
if *s.volumeId == -1 {
return false
}
vid := needle.VolumeId(*s.volumeId)
// find volume location, replication, ttl info
lookup, err := operation.LookupVolumeId(func(_ context.Context) pb.ServerAddress { return pb.ServerAddress(masterServer) }, grpcDialOption, vid.String())
if err != nil {
fmt.Printf("Error looking up volume %d: %v\n", vid, err)
return true
}
if len(lookup.Locations) == 0 {
fmt.Printf("Error: volume %d has no locations available\n", vid)
return true
}
// Try each available location until one succeeds
var lastErr error
for i, location := range lookup.Locations {
volumeServer := location.ServerAddress()
fmt.Printf("Attempting to backup volume %d from location %d/%d: %s\n", vid, i+1, len(lookup.Locations), volumeServer)
// backupFromLocation attempts to backup a volume from a specific volume server location.
// Returns (error, isFatal) where isFatal=true means the error is due to invalid user input
// and should not be retried with other locations.
func backupFromLocation(volumeServer pb.ServerAddress, grpcDialOption grpc.DialOption, vid needle.VolumeId) (error, bool) {
stats, err := operation.GetVolumeSyncStatus(volumeServer, grpcDialOption, uint32(vid)) stats, err := operation.GetVolumeSyncStatus(volumeServer, grpcDialOption, uint32(vid))
if err != nil { if err != nil {
fmt.Printf("Error getting volume %d status from %s: %v\n", vid, volumeServer, err)
lastErr = err
continue
return fmt.Errorf("getting volume status: %w", err), false
} }
// Parse TTL
var ttl *needle.TTL var ttl *needle.TTL
if *s.ttl != "" { if *s.ttl != "" {
ttl, err = needle.ReadTTL(*s.ttl) ttl, err = needle.ReadTTL(*s.ttl)
if err != nil { if err != nil {
fmt.Printf("Error generate volume %d ttl %s: %v\n", vid, *s.ttl, err)
return true
// User-provided TTL is invalid - this is fatal
return fmt.Errorf("invalid user-provided ttl %s: %w", *s.ttl, err), true
} }
} else { } else {
ttl, err = needle.ReadTTL(stats.Ttl) ttl, err = needle.ReadTTL(stats.Ttl)
if err != nil { if err != nil {
fmt.Printf("Error parsing volume %d ttl %s from %s: %v\n", vid, stats.Ttl, volumeServer, err)
lastErr = err
continue
return fmt.Errorf("parsing ttl %s from stats: %w", stats.Ttl, err), false
} }
} }
// Parse replication
var replication *super_block.ReplicaPlacement var replication *super_block.ReplicaPlacement
if *s.replication != "" { if *s.replication != "" {
replication, err = super_block.NewReplicaPlacementFromString(*s.replication) replication, err = super_block.NewReplicaPlacementFromString(*s.replication)
if err != nil { if err != nil {
fmt.Printf("Error generate volume %d replication %s : %v\n", vid, *s.replication, err)
return true
// User-provided replication is invalid - this is fatal
return fmt.Errorf("invalid user-provided replication %s: %w", *s.replication, err), true
} }
} else { } else {
replication, err = super_block.NewReplicaPlacementFromString(stats.Replication) replication, err = super_block.NewReplicaPlacementFromString(stats.Replication)
if err != nil { if err != nil {
fmt.Printf("Error parsing volume %d replication %s from %s: %v\n", vid, stats.Replication, volumeServer, err)
lastErr = err
continue
return fmt.Errorf("parsing replication %s from stats: %w", stats.Replication, err), false
} }
} }
ver := needle.Version(stats.Version) ver := needle.Version(stats.Version)
// Create or load the volume
v, err := storage.NewVolume(util.ResolvePath(*s.dir), util.ResolvePath(*s.dir), *s.collection, vid, storage.NeedleMapInMemory, replication, ttl, 0, ver, 0, 0) v, err := storage.NewVolume(util.ResolvePath(*s.dir), util.ResolvePath(*s.dir), *s.collection, vid, storage.NeedleMapInMemory, replication, ttl, 0, ver, 0, 0)
if err != nil { if err != nil {
fmt.Printf("Error creating or reading from volume %d from %s: %v\n", vid, volumeServer, err)
lastErr = err
continue
return fmt.Errorf("creating or reading volume: %w", err), false
} }
defer v.Close()
// Handle compaction if needed
if v.SuperBlock.CompactionRevision < uint16(stats.CompactRevision) { if v.SuperBlock.CompactionRevision < uint16(stats.CompactRevision) {
if err = v.Compact2(0, 0, nil); err != nil { if err = v.Compact2(0, 0, nil); err != nil {
fmt.Printf("Compact Volume before synchronizing from %s: %v\n", volumeServer, err)
v.Close()
lastErr = err
continue
return fmt.Errorf("compacting volume: %w", err), false
} }
if err = v.CommitCompact(); err != nil { if err = v.CommitCompact(); err != nil {
fmt.Printf("Commit Compact before synchronizing from %s: %v\n", volumeServer, err)
v.Close()
lastErr = err
continue
return fmt.Errorf("committing compaction: %w", err), false
} }
v.SuperBlock.CompactionRevision = uint16(stats.CompactRevision) v.SuperBlock.CompactionRevision = uint16(stats.CompactRevision)
if _, err = v.DataBackend.WriteAt(v.SuperBlock.Bytes(), 0); err != nil { if _, err = v.DataBackend.WriteAt(v.SuperBlock.Bytes(), 0); err != nil {
fmt.Printf("Error writing superblock from %s: %v\n", volumeServer, err)
v.Close()
lastErr = err
continue
return fmt.Errorf("writing superblock: %w", err), false
} }
} }
datSize, _, _ := v.FileStat() datSize, _, _ := v.FileStat()
// If local volume is larger than remote, recreate it
if datSize > stats.TailOffset { if datSize > stats.TailOffset {
// remove the old data
if err := v.Destroy(false); err != nil { if err := v.Destroy(false); err != nil {
fmt.Printf("Error destroying volume %d on %s: %v\n", vid, volumeServer, err)
v.Close()
lastErr = err
continue
return fmt.Errorf("destroying volume: %w", err), false
} }
v.Close() // Close the old volume handle before creating a new one
v.Close()
// recreate an empty volume // recreate an empty volume
v, err = storage.NewVolume(util.ResolvePath(*s.dir), util.ResolvePath(*s.dir), *s.collection, vid, storage.NeedleMapInMemory, replication, ttl, 0, ver, 0, 0) v, err = storage.NewVolume(util.ResolvePath(*s.dir), util.ResolvePath(*s.dir), *s.collection, vid, storage.NeedleMapInMemory, replication, ttl, 0, ver, 0, 0)
if err != nil { if err != nil {
fmt.Printf("Error recreating volume %d from %s: %v\n", vid, volumeServer, err)
lastErr = err
continue
return fmt.Errorf("recreating volume: %w", err), false
} }
defer v.Close()
} }
// Try the incremental backup
// Perform the incremental backup
if err := v.IncrementalBackup(volumeServer, grpcDialOption); err != nil { if err := v.IncrementalBackup(volumeServer, grpcDialOption); err != nil {
fmt.Printf("Error synchronizing volume %d from %s: %v\n", vid, volumeServer, err)
v.Close()
return fmt.Errorf("incremental backup: %w", err), false
}
return nil, false
}
func runBackup(cmd *Command, args []string) bool {
util.LoadSecurityConfiguration()
grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
// Backward compatibility: if -server is provided, use it
masterServer := *s.master
if *s.server != "" {
masterServer = *s.server
}
if *s.volumeId == -1 {
return false
}
vid := needle.VolumeId(*s.volumeId)
// find volume location, replication, ttl info
lookup, err := operation.LookupVolumeId(func(_ context.Context) pb.ServerAddress { return pb.ServerAddress(masterServer) }, grpcDialOption, vid.String())
if err != nil {
fmt.Printf("Error looking up volume %d: %v\n", vid, err)
return true
}
if len(lookup.Locations) == 0 {
fmt.Printf("Error: volume %d has no locations available\n", vid)
return true
}
// Try each available location until one succeeds
var lastErr error
for i, location := range lookup.Locations {
volumeServer := location.ServerAddress()
fmt.Printf("Attempting to backup volume %d from location %d/%d: %s\n", vid, i+1, len(lookup.Locations), volumeServer)
err, isFatal := backupFromLocation(volumeServer, grpcDialOption, vid)
if err != nil {
fmt.Printf("Error backing up volume %d from %s: %v\n", vid, volumeServer, err)
lastErr = err lastErr = err
// Check if this is a fatal user-input error
if isFatal {
return true
}
continue continue
} }
// Success! // Success!
v.Close()
fmt.Printf("Successfully backed up volume %d from %s\n", vid, volumeServer) fmt.Printf("Successfully backed up volume %d from %s\n", vid, volumeServer)
return true return true
} }

Loading…
Cancel
Save