|
|
|
@ -88,83 +88,107 @@ func runBackup(cmd *Command, args []string) bool { |
|
|
|
fmt.Printf("Error looking up volume %d: %v\n", vid, err) |
|
|
|
return true |
|
|
|
} |
|
|
|
volumeServer := lookup.Locations[0].ServerAddress() |
|
|
|
|
|
|
|
stats, err := operation.GetVolumeSyncStatus(volumeServer, grpcDialOption, uint32(vid)) |
|
|
|
if err != nil { |
|
|
|
fmt.Printf("Error get volume %d status: %v\n", vid, err) |
|
|
|
if len(lookup.Locations) == 0 { |
|
|
|
fmt.Printf("Error: volume %d has no locations available\n", vid) |
|
|
|
return true |
|
|
|
} |
|
|
|
var ttl *needle.TTL |
|
|
|
if *s.ttl != "" { |
|
|
|
ttl, err = needle.ReadTTL(*s.ttl) |
|
|
|
if err != nil { |
|
|
|
fmt.Printf("Error generate volume %d ttl %s: %v\n", vid, *s.ttl, err) |
|
|
|
return true |
|
|
|
} |
|
|
|
} else { |
|
|
|
ttl, err = needle.ReadTTL(stats.Ttl) |
|
|
|
|
|
|
|
// Try each available location until one succeeds
|
|
|
|
var lastErr error |
|
|
|
for i, location := range lookup.Locations { |
|
|
|
volumeServer := location.ServerAddress() |
|
|
|
fmt.Printf("Attempting to backup volume %d from location %d/%d: %s\n", vid, i+1, len(lookup.Locations), volumeServer) |
|
|
|
|
|
|
|
stats, err := operation.GetVolumeSyncStatus(volumeServer, grpcDialOption, uint32(vid)) |
|
|
|
if err != nil { |
|
|
|
fmt.Printf("Error get volume %d ttl %s: %v\n", vid, stats.Ttl, err) |
|
|
|
return true |
|
|
|
fmt.Printf("Error getting volume %d status from %s: %v\n", vid, volumeServer, err) |
|
|
|
lastErr = err |
|
|
|
continue |
|
|
|
} |
|
|
|
} |
|
|
|
var replication *super_block.ReplicaPlacement |
|
|
|
if *s.replication != "" { |
|
|
|
replication, err = super_block.NewReplicaPlacementFromString(*s.replication) |
|
|
|
if err != nil { |
|
|
|
fmt.Printf("Error generate volume %d replication %s : %v\n", vid, *s.replication, err) |
|
|
|
return true |
|
|
|
|
|
|
|
var ttl *needle.TTL |
|
|
|
if *s.ttl != "" { |
|
|
|
ttl, err = needle.ReadTTL(*s.ttl) |
|
|
|
if err != nil { |
|
|
|
fmt.Printf("Error generate volume %d ttl %s: %v\n", vid, *s.ttl, err) |
|
|
|
return true |
|
|
|
} |
|
|
|
} else { |
|
|
|
ttl, err = needle.ReadTTL(stats.Ttl) |
|
|
|
if err != nil { |
|
|
|
fmt.Printf("Error get volume %d ttl %s: %v\n", vid, stats.Ttl, err) |
|
|
|
return true |
|
|
|
} |
|
|
|
} |
|
|
|
} else { |
|
|
|
replication, err = super_block.NewReplicaPlacementFromString(stats.Replication) |
|
|
|
if err != nil { |
|
|
|
fmt.Printf("Error get volume %d replication %s : %v\n", vid, stats.Replication, err) |
|
|
|
return true |
|
|
|
var replication *super_block.ReplicaPlacement |
|
|
|
if *s.replication != "" { |
|
|
|
replication, err = super_block.NewReplicaPlacementFromString(*s.replication) |
|
|
|
if err != nil { |
|
|
|
fmt.Printf("Error generate volume %d replication %s : %v\n", vid, *s.replication, err) |
|
|
|
return true |
|
|
|
} |
|
|
|
} else { |
|
|
|
replication, err = super_block.NewReplicaPlacementFromString(stats.Replication) |
|
|
|
if err != nil { |
|
|
|
fmt.Printf("Error get volume %d replication %s : %v\n", vid, stats.Replication, err) |
|
|
|
return true |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
ver := needle.Version(stats.Version) |
|
|
|
ver := needle.Version(stats.Version) |
|
|
|
|
|
|
|
v, err := storage.NewVolume(util.ResolvePath(*s.dir), util.ResolvePath(*s.dir), *s.collection, vid, storage.NeedleMapInMemory, replication, ttl, 0, ver, 0, 0) |
|
|
|
if err != nil { |
|
|
|
fmt.Printf("Error creating or reading from volume %d: %v\n", vid, err) |
|
|
|
return true |
|
|
|
} |
|
|
|
|
|
|
|
if v.SuperBlock.CompactionRevision < uint16(stats.CompactRevision) { |
|
|
|
if err = v.Compact2(0, 0, nil); err != nil { |
|
|
|
fmt.Printf("Compact Volume before synchronizing %v\n", err) |
|
|
|
return true |
|
|
|
} |
|
|
|
if err = v.CommitCompact(); err != nil { |
|
|
|
fmt.Printf("Commit Compact before synchronizing %v\n", err) |
|
|
|
v, err := storage.NewVolume(util.ResolvePath(*s.dir), util.ResolvePath(*s.dir), *s.collection, vid, storage.NeedleMapInMemory, replication, ttl, 0, ver, 0, 0) |
|
|
|
if err != nil { |
|
|
|
fmt.Printf("Error creating or reading from volume %d: %v\n", vid, err) |
|
|
|
return true |
|
|
|
} |
|
|
|
v.SuperBlock.CompactionRevision = uint16(stats.CompactRevision) |
|
|
|
v.DataBackend.WriteAt(v.SuperBlock.Bytes(), 0) |
|
|
|
} |
|
|
|
|
|
|
|
datSize, _, _ := v.FileStat() |
|
|
|
if v.SuperBlock.CompactionRevision < uint16(stats.CompactRevision) { |
|
|
|
if err = v.Compact2(0, 0, nil); err != nil { |
|
|
|
fmt.Printf("Compact Volume before synchronizing %v\n", err) |
|
|
|
v.Close() |
|
|
|
return true |
|
|
|
} |
|
|
|
if err = v.CommitCompact(); err != nil { |
|
|
|
fmt.Printf("Commit Compact before synchronizing %v\n", err) |
|
|
|
v.Close() |
|
|
|
return true |
|
|
|
} |
|
|
|
v.SuperBlock.CompactionRevision = uint16(stats.CompactRevision) |
|
|
|
v.DataBackend.WriteAt(v.SuperBlock.Bytes(), 0) |
|
|
|
} |
|
|
|
|
|
|
|
if datSize > stats.TailOffset { |
|
|
|
// remove the old data
|
|
|
|
if err := v.Destroy(false); err != nil { |
|
|
|
fmt.Printf("Error destroying volume: %v\n", err) |
|
|
|
datSize, _, _ := v.FileStat() |
|
|
|
|
|
|
|
if datSize > stats.TailOffset { |
|
|
|
// remove the old data
|
|
|
|
if err := v.Destroy(false); err != nil { |
|
|
|
fmt.Printf("Error destroying volume: %v\n", err) |
|
|
|
} |
|
|
|
// recreate an empty volume
|
|
|
|
v, err = storage.NewVolume(util.ResolvePath(*s.dir), util.ResolvePath(*s.dir), *s.collection, vid, storage.NeedleMapInMemory, replication, ttl, 0, ver, 0, 0) |
|
|
|
if err != nil { |
|
|
|
fmt.Printf("Error creating or reading from volume %d: %v\n", vid, err) |
|
|
|
return true |
|
|
|
} |
|
|
|
} |
|
|
|
// recreate an empty volume
|
|
|
|
v, err = storage.NewVolume(util.ResolvePath(*s.dir), util.ResolvePath(*s.dir), *s.collection, vid, storage.NeedleMapInMemory, replication, ttl, 0, ver, 0, 0) |
|
|
|
if err != nil { |
|
|
|
fmt.Printf("Error creating or reading from volume %d: %v\n", vid, err) |
|
|
|
return true |
|
|
|
|
|
|
|
// Try the incremental backup
|
|
|
|
if err := v.IncrementalBackup(volumeServer, grpcDialOption); err != nil { |
|
|
|
fmt.Printf("Error synchronizing volume %d from %s: %v\n", vid, volumeServer, err) |
|
|
|
v.Close() |
|
|
|
lastErr = err |
|
|
|
continue |
|
|
|
} |
|
|
|
} |
|
|
|
defer v.Close() |
|
|
|
|
|
|
|
if err := v.IncrementalBackup(volumeServer, grpcDialOption); err != nil { |
|
|
|
fmt.Printf("Error synchronizing volume %d: %v\n", vid, err) |
|
|
|
// Success!
|
|
|
|
v.Close() |
|
|
|
fmt.Printf("Successfully backed up volume %d from %s\n", vid, volumeServer) |
|
|
|
return true |
|
|
|
} |
|
|
|
|
|
|
|
// All locations failed
|
|
|
|
fmt.Printf("Failed to backup volume %d after trying all %d locations. Last error: %v\n", vid, len(lookup.Locations), lastErr) |
|
|
|
|
|
|
|
return true |
|
|
|
} |