Browse Source

Merge branch 'master' of https://github.com/seaweedfs/seaweedfs

pull/3480/head
chrislu 2 years ago
parent
commit
3c8e95eeb3
  1. 1
      weed/pb/volume_server.proto
  2. 1493
      weed/pb/volume_server_pb/volume_server.pb.go
  3. 24
      weed/server/volume_grpc_copy.go
  4. 2
      weed/shell/command_volume_balance.go
  5. 2
      weed/shell/command_volume_copy.go
  6. 16
      weed/shell/command_volume_move.go
  7. 7
      weed/shell/command_volume_tier_move.go

1
weed/pb/volume_server.proto

@ -254,6 +254,7 @@ message VolumeCopyRequest {
string ttl = 4; string ttl = 4;
string source_data_node = 5; string source_data_node = 5;
string disk_type = 6; string disk_type = 6;
int64 io_byte_per_second = 7;
} }
message VolumeCopyResponse { message VolumeCopyResponse {
uint64 last_append_at_ns = 1; uint64 last_append_at_ns = 1;

1493
weed/pb/volume_server_pb/volume_server.pb.go
File diff suppressed because it is too large
View File

24
weed/server/volume_grpc_copy.go

@ -108,7 +108,14 @@ func (vs *VolumeServer) VolumeCopy(req *volume_server_pb.VolumeCopyRequest, stre
nextReportTarget := reportInterval nextReportTarget := reportInterval
var modifiedTsNs int64 var modifiedTsNs int64
var sendErr error var sendErr error
if modifiedTsNs, err = vs.doCopyFile(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.DatFileSize, dataBaseFileName, ".dat", false, true, func(processed int64) bool {
var ioBytePerSecond int64
if req.IoBytePerSecond <= 0 {
ioBytePerSecond = vs.compactionBytePerSecond
} else {
ioBytePerSecond = req.IoBytePerSecond
}
throttler := util.NewWriteThrottler(ioBytePerSecond)
if modifiedTsNs, err = vs.doCopyFileWithThrottler(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.DatFileSize, dataBaseFileName, ".dat", false, true, func(processed int64) bool {
if processed > nextReportTarget { if processed > nextReportTarget {
copyResponse.ProcessedBytes = processed copyResponse.ProcessedBytes = processed
if sendErr = stream.Send(copyResponse); sendErr != nil { if sendErr = stream.Send(copyResponse); sendErr != nil {
@ -117,7 +124,7 @@ func (vs *VolumeServer) VolumeCopy(req *volume_server_pb.VolumeCopyRequest, stre
nextReportTarget = processed + reportInterval nextReportTarget = processed + reportInterval
} }
return true return true
}); err != nil {
}, throttler); err != nil {
return err return err
} }
if sendErr != nil { if sendErr != nil {
@ -127,14 +134,14 @@ func (vs *VolumeServer) VolumeCopy(req *volume_server_pb.VolumeCopyRequest, stre
os.Chtimes(dataBaseFileName+".dat", time.Unix(0, modifiedTsNs), time.Unix(0, modifiedTsNs)) os.Chtimes(dataBaseFileName+".dat", time.Unix(0, modifiedTsNs), time.Unix(0, modifiedTsNs))
} }
if modifiedTsNs, err = vs.doCopyFile(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.IdxFileSize, indexBaseFileName, ".idx", false, false, nil); err != nil {
if modifiedTsNs, err = vs.doCopyFileWithThrottler(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.IdxFileSize, indexBaseFileName, ".idx", false, false, nil, throttler); err != nil {
return err return err
} }
if modifiedTsNs > 0 { if modifiedTsNs > 0 {
os.Chtimes(indexBaseFileName+".idx", time.Unix(0, modifiedTsNs), time.Unix(0, modifiedTsNs)) os.Chtimes(indexBaseFileName+".idx", time.Unix(0, modifiedTsNs), time.Unix(0, modifiedTsNs))
} }
if modifiedTsNs, err = vs.doCopyFile(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.DatFileSize, dataBaseFileName, ".vif", false, true, nil); err != nil {
if modifiedTsNs, err = vs.doCopyFileWithThrottler(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.DatFileSize, dataBaseFileName, ".vif", false, true, nil, throttler); err != nil {
return err return err
} }
if modifiedTsNs > 0 { if modifiedTsNs > 0 {
@ -184,6 +191,10 @@ func (vs *VolumeServer) VolumeCopy(req *volume_server_pb.VolumeCopyRequest, stre
} }
func (vs *VolumeServer) doCopyFile(client volume_server_pb.VolumeServerClient, isEcVolume bool, collection string, vid, compactRevision uint32, stopOffset uint64, baseFileName, ext string, isAppend, ignoreSourceFileNotFound bool, progressFn storage.ProgressFunc) (modifiedTsNs int64, err error) { func (vs *VolumeServer) doCopyFile(client volume_server_pb.VolumeServerClient, isEcVolume bool, collection string, vid, compactRevision uint32, stopOffset uint64, baseFileName, ext string, isAppend, ignoreSourceFileNotFound bool, progressFn storage.ProgressFunc) (modifiedTsNs int64, err error) {
return vs.doCopyFileWithThrottler(client, isEcVolume, collection, vid, compactRevision, stopOffset, baseFileName, ext, isAppend, ignoreSourceFileNotFound, progressFn, util.NewWriteThrottler(vs.compactionBytePerSecond))
}
func (vs *VolumeServer) doCopyFileWithThrottler(client volume_server_pb.VolumeServerClient, isEcVolume bool, collection string, vid, compactRevision uint32, stopOffset uint64, baseFileName, ext string, isAppend, ignoreSourceFileNotFound bool, progressFn storage.ProgressFunc, throttler *util.WriteThrottler) (modifiedTsNs int64, err error) {
copyFileClient, err := client.CopyFile(context.Background(), &volume_server_pb.CopyFileRequest{ copyFileClient, err := client.CopyFile(context.Background(), &volume_server_pb.CopyFileRequest{
VolumeId: vid, VolumeId: vid,
@ -198,7 +209,7 @@ func (vs *VolumeServer) doCopyFile(client volume_server_pb.VolumeServerClient, i
return modifiedTsNs, fmt.Errorf("failed to start copying volume %d %s file: %v", vid, ext, err) return modifiedTsNs, fmt.Errorf("failed to start copying volume %d %s file: %v", vid, ext, err)
} }
modifiedTsNs, err = writeToFile(copyFileClient, baseFileName+ext, util.NewWriteThrottler(vs.compactionBytePerSecond), isAppend, progressFn)
modifiedTsNs, err = writeToFile(copyFileClient, baseFileName+ext, throttler, isAppend, progressFn)
if err != nil { if err != nil {
return modifiedTsNs, fmt.Errorf("failed to copy %s file: %v", baseFileName+ext, err) return modifiedTsNs, fmt.Errorf("failed to copy %s file: %v", baseFileName+ext, err)
} }
@ -207,7 +218,8 @@ func (vs *VolumeServer) doCopyFile(client volume_server_pb.VolumeServerClient, i
} }
/**
/*
*
only check the the differ of the file size only check the the differ of the file size
todo: maybe should check the received count and deleted count of the volume todo: maybe should check the received count and deleted count of the volume
*/ */

2
weed/shell/command_volume_balance.go

@ -330,7 +330,7 @@ func moveVolume(commandEnv *CommandEnv, v *master_pb.VolumeInformationMessage, f
} }
fmt.Fprintf(os.Stdout, " moving %s volume %s%d %s => %s\n", v.DiskType, collectionPrefix, v.Id, fullNode.info.Id, emptyNode.info.Id) fmt.Fprintf(os.Stdout, " moving %s volume %s%d %s => %s\n", v.DiskType, collectionPrefix, v.Id, fullNode.info.Id, emptyNode.info.Id)
if applyChange { if applyChange {
return LiveMoveVolume(commandEnv.option.GrpcDialOption, os.Stderr, needle.VolumeId(v.Id), pb.NewServerAddressFromDataNode(fullNode.info), pb.NewServerAddressFromDataNode(emptyNode.info), 5*time.Second, v.DiskType, false)
return LiveMoveVolume(commandEnv.option.GrpcDialOption, os.Stderr, needle.VolumeId(v.Id), pb.NewServerAddressFromDataNode(fullNode.info), pb.NewServerAddressFromDataNode(emptyNode.info), 5*time.Second, v.DiskType, 0, false)
} }
return nil return nil
} }

2
weed/shell/command_volume_copy.go

@ -53,6 +53,6 @@ func (c *commandVolumeCopy) Do(args []string, commandEnv *CommandEnv, writer io.
return fmt.Errorf("source and target volume servers are the same!") return fmt.Errorf("source and target volume servers are the same!")
} }
_, err = copyVolume(commandEnv.option.GrpcDialOption, writer, volumeId, sourceVolumeServer, targetVolumeServer, "")
_, err = copyVolume(commandEnv.option.GrpcDialOption, writer, volumeId, sourceVolumeServer, targetVolumeServer, "", 0)
return return
} }

16
weed/shell/command_volume_move.go

@ -55,6 +55,7 @@ func (c *commandVolumeMove) Do(args []string, commandEnv *CommandEnv, writer io.
sourceNodeStr := volMoveCommand.String("source", "", "the source volume server <host>:<port>") sourceNodeStr := volMoveCommand.String("source", "", "the source volume server <host>:<port>")
targetNodeStr := volMoveCommand.String("target", "", "the target volume server <host>:<port>") targetNodeStr := volMoveCommand.String("target", "", "the target volume server <host>:<port>")
diskTypeStr := volMoveCommand.String("disk", "", "[hdd|ssd|<tag>] hard drive or solid state drive or any tag") diskTypeStr := volMoveCommand.String("disk", "", "[hdd|ssd|<tag>] hard drive or solid state drive or any tag")
ioBytePerSecond := volMoveCommand.Int64("ioBytePerSecond", 0, "limit the speed of move")
if err = volMoveCommand.Parse(args); err != nil { if err = volMoveCommand.Parse(args); err != nil {
return nil return nil
} }
@ -71,14 +72,14 @@ func (c *commandVolumeMove) Do(args []string, commandEnv *CommandEnv, writer io.
return fmt.Errorf("source and target volume servers are the same!") return fmt.Errorf("source and target volume servers are the same!")
} }
return LiveMoveVolume(commandEnv.option.GrpcDialOption, writer, volumeId, sourceVolumeServer, targetVolumeServer, 5*time.Second, *diskTypeStr, false)
return LiveMoveVolume(commandEnv.option.GrpcDialOption, writer, volumeId, sourceVolumeServer, targetVolumeServer, 5*time.Second, *diskTypeStr, *ioBytePerSecond, false)
} }
// LiveMoveVolume moves one volume from one source volume server to one target volume server, with idleTimeout to drain the incoming requests. // LiveMoveVolume moves one volume from one source volume server to one target volume server, with idleTimeout to drain the incoming requests.
func LiveMoveVolume(grpcDialOption grpc.DialOption, writer io.Writer, volumeId needle.VolumeId, sourceVolumeServer, targetVolumeServer pb.ServerAddress, idleTimeout time.Duration, diskType string, skipTailError bool) (err error) {
func LiveMoveVolume(grpcDialOption grpc.DialOption, writer io.Writer, volumeId needle.VolumeId, sourceVolumeServer, targetVolumeServer pb.ServerAddress, idleTimeout time.Duration, diskType string, ioBytePerSecond int64, skipTailError bool) (err error) {
log.Printf("copying volume %d from %s to %s", volumeId, sourceVolumeServer, targetVolumeServer) log.Printf("copying volume %d from %s to %s", volumeId, sourceVolumeServer, targetVolumeServer)
lastAppendAtNs, err := copyVolume(grpcDialOption, writer, volumeId, sourceVolumeServer, targetVolumeServer, diskType)
lastAppendAtNs, err := copyVolume(grpcDialOption, writer, volumeId, sourceVolumeServer, targetVolumeServer, diskType, ioBytePerSecond)
if err != nil { if err != nil {
return fmt.Errorf("copy volume %d from %s to %s: %v", volumeId, sourceVolumeServer, targetVolumeServer, err) return fmt.Errorf("copy volume %d from %s to %s: %v", volumeId, sourceVolumeServer, targetVolumeServer, err)
} }
@ -101,7 +102,7 @@ func LiveMoveVolume(grpcDialOption grpc.DialOption, writer io.Writer, volumeId n
return nil return nil
} }
func copyVolume(grpcDialOption grpc.DialOption, writer io.Writer, volumeId needle.VolumeId, sourceVolumeServer, targetVolumeServer pb.ServerAddress, diskType string) (lastAppendAtNs uint64, err error) {
func copyVolume(grpcDialOption grpc.DialOption, writer io.Writer, volumeId needle.VolumeId, sourceVolumeServer, targetVolumeServer pb.ServerAddress, diskType string, ioBytePerSecond int64) (lastAppendAtNs uint64, err error) {
// check to see if the volume is already read-only and if its not then we need // check to see if the volume is already read-only and if its not then we need
// to mark it as read-only and then before we return we need to undo what we // to mark it as read-only and then before we return we need to undo what we
@ -142,9 +143,10 @@ func copyVolume(grpcDialOption grpc.DialOption, writer io.Writer, volumeId needl
err = operation.WithVolumeServerClient(true, targetVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { err = operation.WithVolumeServerClient(true, targetVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
stream, replicateErr := volumeServerClient.VolumeCopy(context.Background(), &volume_server_pb.VolumeCopyRequest{ stream, replicateErr := volumeServerClient.VolumeCopy(context.Background(), &volume_server_pb.VolumeCopyRequest{
VolumeId: uint32(volumeId),
SourceDataNode: string(sourceVolumeServer),
DiskType: diskType,
VolumeId: uint32(volumeId),
SourceDataNode: string(sourceVolumeServer),
DiskType: diskType,
IoBytePerSecond: ioBytePerSecond,
}) })
if replicateErr != nil { if replicateErr != nil {
return replicateErr return replicateErr

7
weed/shell/command_volume_tier_move.go

@ -58,6 +58,7 @@ func (c *commandVolumeTierMove) Do(args []string, commandEnv *CommandEnv, writer
target := tierCommand.String("toDiskType", "", "the target disk type") target := tierCommand.String("toDiskType", "", "the target disk type")
parallelLimit := tierCommand.Int("parallelLimit", 0, "limit the number of parallel copying jobs") parallelLimit := tierCommand.Int("parallelLimit", 0, "limit the number of parallel copying jobs")
applyChange := tierCommand.Bool("force", false, "actually apply the changes") applyChange := tierCommand.Bool("force", false, "actually apply the changes")
ioBytePerSecond := tierCommand.Int64("ioBytePerSecond", 0, "limit the speed of move")
if err = tierCommand.Parse(args); err != nil { if err = tierCommand.Parse(args); err != nil {
return nil return nil
} }
@ -118,7 +119,7 @@ func (c *commandVolumeTierMove) Do(args []string, commandEnv *CommandEnv, writer
unlock := c.Lock(job.src) unlock := c.Lock(job.src)
if applyChanges { if applyChanges {
if err := c.doMoveOneVolume(commandEnv, writer, job.vid, toDiskType, locations, job.src, dst); err != nil {
if err := c.doMoveOneVolume(commandEnv, writer, job.vid, toDiskType, locations, job.src, dst, *ioBytePerSecond); err != nil {
fmt.Fprintf(writer, "move volume %d %s => %s: %v\n", job.vid, job.src, dst.dataNode.Id, err) fmt.Fprintf(writer, "move volume %d %s => %s: %v\n", job.vid, job.src, dst.dataNode.Id, err)
} }
} }
@ -219,13 +220,13 @@ func (c *commandVolumeTierMove) doVolumeTierMove(commandEnv *CommandEnv, writer
return nil return nil
} }
func (c *commandVolumeTierMove) doMoveOneVolume(commandEnv *CommandEnv, writer io.Writer, vid needle.VolumeId, toDiskType types.DiskType, locations []wdclient.Location, sourceVolumeServer pb.ServerAddress, dst location) (err error) {
func (c *commandVolumeTierMove) doMoveOneVolume(commandEnv *CommandEnv, writer io.Writer, vid needle.VolumeId, toDiskType types.DiskType, locations []wdclient.Location, sourceVolumeServer pb.ServerAddress, dst location, ioBytePerSecond int64) (err error) {
// mark all replicas as read only // mark all replicas as read only
if err = markVolumeReplicasWritable(commandEnv.option.GrpcDialOption, vid, locations, false); err != nil { if err = markVolumeReplicasWritable(commandEnv.option.GrpcDialOption, vid, locations, false); err != nil {
return fmt.Errorf("mark volume %d as readonly on %s: %v", vid, locations[0].Url, err) return fmt.Errorf("mark volume %d as readonly on %s: %v", vid, locations[0].Url, err)
} }
if err = LiveMoveVolume(commandEnv.option.GrpcDialOption, writer, vid, sourceVolumeServer, pb.NewServerAddressFromDataNode(dst.dataNode), 5*time.Second, toDiskType.ReadableString(), true); err != nil {
if err = LiveMoveVolume(commandEnv.option.GrpcDialOption, writer, vid, sourceVolumeServer, pb.NewServerAddressFromDataNode(dst.dataNode), 5*time.Second, toDiskType.ReadableString(), ioBytePerSecond, true); err != nil {
// mark all replicas as writable // mark all replicas as writable
if err = markVolumeReplicasWritable(commandEnv.option.GrpcDialOption, vid, locations, true); err != nil { if err = markVolumeReplicasWritable(commandEnv.option.GrpcDialOption, vid, locations, true); err != nil {

Loading…
Cancel
Save