You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							95 lines
						
					
					
						
							2.8 KiB
						
					
					
				
			
		
		
		
			
			
			
		
		
	
	
							95 lines
						
					
					
						
							2.8 KiB
						
					
					
				| package weed_server | |
| 
 | |
| import ( | |
| 	"fmt" | |
| 	"os" | |
| 	"time" | |
| 
 | |
| 	"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb" | |
| 	"github.com/seaweedfs/seaweedfs/weed/storage/backend" | |
| 	"github.com/seaweedfs/seaweedfs/weed/storage/needle" | |
| ) | |
| 
 | |
| // VolumeTierMoveDatToRemote copy dat file to a remote tier | |
| func (vs *VolumeServer) VolumeTierMoveDatToRemote(req *volume_server_pb.VolumeTierMoveDatToRemoteRequest, stream volume_server_pb.VolumeServer_VolumeTierMoveDatToRemoteServer) error { | |
| 
 | |
| 	// find existing volume | |
| 	v := vs.store.GetVolume(needle.VolumeId(req.VolumeId)) | |
| 	if v == nil { | |
| 		return fmt.Errorf("volume %d not found", req.VolumeId) | |
| 	} | |
| 
 | |
| 	// verify the collection | |
| 	if v.Collection != req.Collection { | |
| 		return fmt.Errorf("existing collection:%v unexpected input: %v", v.Collection, req.Collection) | |
| 	} | |
| 
 | |
| 	// locate the disk file | |
| 	diskFile, ok := v.DataBackend.(*backend.DiskFile) | |
| 	if !ok { | |
| 		return nil // already copied to remove. fmt.Errorf("volume %d is not on local disk", req.VolumeId) | |
| 	} | |
| 
 | |
| 	// check valid storage backend type | |
| 	backendStorage, found := backend.BackendStorages[req.DestinationBackendName] | |
| 	if !found { | |
| 		var keys []string | |
| 		for key := range backend.BackendStorages { | |
| 			keys = append(keys, key) | |
| 		} | |
| 		return fmt.Errorf("destination %s not found, supported: %v", req.DestinationBackendName, keys) | |
| 	} | |
| 
 | |
| 	// check whether the existing backend storage is the same as requested | |
| 	// if same, skip | |
| 	backendType, backendId := backend.BackendNameToTypeId(req.DestinationBackendName) | |
| 	for _, remoteFile := range v.GetVolumeInfo().GetFiles() { | |
| 		if remoteFile.BackendType == backendType && remoteFile.BackendId == backendId { | |
| 			return fmt.Errorf("destination %s already exists", req.DestinationBackendName) | |
| 		} | |
| 	} | |
| 
 | |
| 	startTime := time.Now() | |
| 	fn := func(progressed int64, percentage float32) error { | |
| 		now := time.Now() | |
| 		if now.Sub(startTime) < time.Second { | |
| 			return nil | |
| 		} | |
| 		startTime = now | |
| 		return stream.Send(&volume_server_pb.VolumeTierMoveDatToRemoteResponse{ | |
| 			Processed:           progressed, | |
| 			ProcessedPercentage: percentage, | |
| 		}) | |
| 	} | |
| 
 | |
| 	// copy the data file | |
| 	key, size, err := backendStorage.CopyFile(diskFile.File, fn) | |
| 	if err != nil { | |
| 		return fmt.Errorf("backend %s copy file %s: %v", req.DestinationBackendName, diskFile.Name(), err) | |
| 	} | |
| 
 | |
| 	// save the remote file to volume tier info | |
| 	v.GetVolumeInfo().Files = append(v.GetVolumeInfo().GetFiles(), &volume_server_pb.RemoteFile{ | |
| 		BackendType:  backendType, | |
| 		BackendId:    backendId, | |
| 		Key:          key, | |
| 		Offset:       0, | |
| 		FileSize:     uint64(size), | |
| 		ModifiedTime: uint64(time.Now().Unix()), | |
| 		Extension:    ".dat", | |
| 	}) | |
| 
 | |
| 	if err := v.SaveVolumeInfo(); err != nil { | |
| 		return fmt.Errorf("volume %d failed to save remote file info: %v", v.Id, err) | |
| 	} | |
| 
 | |
| 	if err := v.LoadRemoteFile(); err != nil { | |
| 		return fmt.Errorf("volume %d failed to load remote file: %v", v.Id, err) | |
| 	} | |
| 
 | |
| 	if !req.KeepLocalDatFile { | |
| 		os.Remove(v.FileName(".dat")) | |
| 	} | |
| 
 | |
| 	return nil | |
| }
 |