Browse Source
add fs.mergeVolumes command into shell (#4907)
add fs.mergeVolumes command into shell (#4907)
* add fs.compact command into shell * rename fs.compact to fs.mergeVolumespull/4908/head
committed by
GitHub
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 270 additions and 0 deletions
@ -0,0 +1,270 @@ |
|||
package shell |
|||
|
|||
import ( |
|||
"context" |
|||
"errors" |
|||
"flag" |
|||
"fmt" |
|||
"io" |
|||
"net/http" |
|||
"strings" |
|||
|
|||
"github.com/seaweedfs/seaweedfs/weed/storage/needle" |
|||
"github.com/seaweedfs/seaweedfs/weed/wdclient" |
|||
|
|||
"github.com/seaweedfs/seaweedfs/weed/operation" |
|||
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" |
|||
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb" |
|||
"github.com/seaweedfs/seaweedfs/weed/util" |
|||
) |
|||
|
|||
var ( |
|||
client *http.Client |
|||
) |
|||
|
|||
func init() { |
|||
client = &http.Client{} |
|||
Commands = append(Commands, &commandFsMergeVolumes{}) |
|||
} |
|||
|
|||
type commandFsMergeVolumes struct { |
|||
volumes map[needle.VolumeId]*master_pb.VolumeInformationMessage |
|||
} |
|||
|
|||
func (c *commandFsMergeVolumes) Name() string { |
|||
return "fs.mergeVolumes" |
|||
} |
|||
|
|||
func (c *commandFsMergeVolumes) Help() string { |
|||
return `re-locate chunks into target volumes and try to clear lighter volumes. |
|||
|
|||
This would help clear half-full volumes and let vacuum system to delete them later. |
|||
|
|||
fs.mergeVolumes -toVolumeId=y [-fromVolumeId=x] [-apply] /dir/ |
|||
` |
|||
} |
|||
|
|||
func (c *commandFsMergeVolumes) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { |
|||
|
|||
dir, err := commandEnv.parseUrl(findInputDirectory(args)) |
|||
if err != nil { |
|||
return err |
|||
} |
|||
dir = strings.TrimRight(dir, "/") |
|||
fsMergeVolumesCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) |
|||
fromVolumeArg := fsMergeVolumesCommand.Uint("fromVolumeId", 0, "move chunks with this volume id") |
|||
toVolumeArg := fsMergeVolumesCommand.Uint("toVolumeId", 0, "change chunks to this volume id") |
|||
apply := fsMergeVolumesCommand.Bool("apply", false, "applying the metadata changes") |
|||
if err = fsMergeVolumesCommand.Parse(args); err != nil { |
|||
return err |
|||
} |
|||
fromVolumeId := needle.VolumeId(*fromVolumeArg) |
|||
toVolumeId := needle.VolumeId(*toVolumeArg) |
|||
|
|||
if toVolumeId == 0 { |
|||
return fmt.Errorf("volume id can not be zero") |
|||
} |
|||
|
|||
c.reloadVolumesInfo(commandEnv.MasterClient) |
|||
|
|||
toVolumeInfo, err := c.getVolumeInfoById(toVolumeId) |
|||
if err != nil { |
|||
return err |
|||
} |
|||
if toVolumeInfo.ReadOnly { |
|||
return fmt.Errorf("volume is readonly: %d", toVolumeId) |
|||
} |
|||
|
|||
if fromVolumeId != 0 { |
|||
if fromVolumeId == toVolumeId { |
|||
return fmt.Errorf("no volume id changes, %d == %d", fromVolumeId, toVolumeId) |
|||
} |
|||
compatible, err := c.volumesAreCompatible(fromVolumeId, toVolumeId) |
|||
if err != nil { |
|||
return fmt.Errorf("cannot determine volumes are compatible: %d and %d", fromVolumeId, toVolumeId) |
|||
} |
|||
if !compatible { |
|||
return fmt.Errorf("volume %d is not compatible with volume %d", fromVolumeId, toVolumeId) |
|||
} |
|||
} |
|||
defer client.CloseIdleConnections() |
|||
|
|||
compatibility := make(map[string]bool) |
|||
|
|||
return commandEnv.WithFilerClient(false, func(filerClient filer_pb.SeaweedFilerClient) error { |
|||
return filer_pb.TraverseBfs(commandEnv, util.FullPath(dir), func(parentPath util.FullPath, entry *filer_pb.Entry) { |
|||
if !entry.IsDirectory { |
|||
for _, chunk := range entry.Chunks { |
|||
if chunk.IsChunkManifest { |
|||
fmt.Printf("Change volume id for large file is not implemented yet: %s/%s\n", parentPath, entry.Name) |
|||
continue |
|||
} |
|||
chunkVolumeId := needle.VolumeId(chunk.Fid.VolumeId) |
|||
if chunkVolumeId == toVolumeId || (fromVolumeId != 0 && fromVolumeId != chunkVolumeId) { |
|||
continue |
|||
} |
|||
cacheKey := fmt.Sprintf("%d-%d", chunkVolumeId, toVolumeId) |
|||
compatible, cached := compatibility[cacheKey] |
|||
if !cached { |
|||
compatible, err = c.volumesAreCompatible(chunkVolumeId, toVolumeId) |
|||
if err != nil { |
|||
_ = fmt.Errorf("cannot determine volumes are compatible: %d and %d", chunkVolumeId, toVolumeId) |
|||
return |
|||
} |
|||
compatibility[cacheKey] = compatible |
|||
} |
|||
if !compatible { |
|||
if fromVolumeId != 0 { |
|||
_ = fmt.Errorf("volumes are incompatible: %d and %d", fromVolumeId, toVolumeId) |
|||
return |
|||
} |
|||
continue |
|||
} |
|||
path := parentPath.Child(entry.Name) |
|||
|
|||
fmt.Printf("move %s(%s)\n", path, chunk.GetFileIdString()) |
|||
if !*apply { |
|||
continue |
|||
} |
|||
if err = moveChunk(chunk, toVolumeId, commandEnv.MasterClient); err != nil { |
|||
fmt.Printf("failed to move %s/%s: %v\n", path, chunk.GetFileIdString(), err) |
|||
continue |
|||
} |
|||
|
|||
if err = filer_pb.UpdateEntry(filerClient, &filer_pb.UpdateEntryRequest{ |
|||
Directory: string(parentPath), |
|||
Entry: entry, |
|||
}); err != nil { |
|||
fmt.Printf("failed to update %s: %v\n", path, err) |
|||
} |
|||
} |
|||
} |
|||
}) |
|||
}) |
|||
} |
|||
|
|||
func (c *commandFsMergeVolumes) getVolumeInfoById(vid needle.VolumeId) (*master_pb.VolumeInformationMessage, error) { |
|||
info := c.volumes[vid] |
|||
var err error |
|||
if info == nil { |
|||
err = errors.New("cannot find volume") |
|||
} |
|||
return info, err |
|||
} |
|||
|
|||
func (c *commandFsMergeVolumes) volumesAreCompatible(src needle.VolumeId, dest needle.VolumeId) (bool, error) { |
|||
srcInfo, err := c.getVolumeInfoById(src) |
|||
if err != nil { |
|||
return false, err |
|||
} |
|||
destInfo, err := c.getVolumeInfoById(dest) |
|||
if err != nil { |
|||
return false, err |
|||
} |
|||
return (srcInfo.Collection == destInfo.Collection && |
|||
srcInfo.Ttl == destInfo.Ttl && |
|||
srcInfo.ReplicaPlacement == destInfo.ReplicaPlacement), nil |
|||
} |
|||
|
|||
func (c *commandFsMergeVolumes) reloadVolumesInfo(masterClient *wdclient.MasterClient) error { |
|||
c.volumes = make(map[needle.VolumeId]*master_pb.VolumeInformationMessage) |
|||
|
|||
return masterClient.WithClient(false, func(client master_pb.SeaweedClient) error { |
|||
volumes, err := client.VolumeList(context.Background(), &master_pb.VolumeListRequest{}) |
|||
if err != nil { |
|||
return err |
|||
} |
|||
for _, dc := range volumes.TopologyInfo.DataCenterInfos { |
|||
for _, rack := range dc.RackInfos { |
|||
for _, node := range rack.DataNodeInfos { |
|||
for _, disk := range node.DiskInfos { |
|||
for _, volume := range disk.VolumeInfos { |
|||
vid := needle.VolumeId(volume.Id) |
|||
if found := c.volumes[vid]; found == nil { |
|||
c.volumes[vid] = volume |
|||
} |
|||
} |
|||
} |
|||
} |
|||
} |
|||
} |
|||
return nil |
|||
}) |
|||
} |
|||
|
|||
func moveChunk(chunk *filer_pb.FileChunk, toVolumeId needle.VolumeId, masterClient *wdclient.MasterClient) error { |
|||
fromFid := needle.NewFileId(needle.VolumeId(chunk.Fid.VolumeId), chunk.Fid.FileKey, chunk.Fid.Cookie) |
|||
toFid := needle.NewFileId(toVolumeId, chunk.Fid.FileKey, chunk.Fid.Cookie) |
|||
|
|||
downloadURLs, err := masterClient.LookupVolumeServerUrl(fromFid.VolumeId.String()) |
|||
if err != nil { |
|||
return err |
|||
} |
|||
|
|||
downloadURL := fmt.Sprintf("http://%s/%s", downloadURLs[0], fromFid.String()) |
|||
|
|||
uploadURLs, err := masterClient.LookupVolumeServerUrl(toVolumeId.String()) |
|||
if err != nil { |
|||
return err |
|||
} |
|||
uploadURL := fmt.Sprintf("http://%s/%s", uploadURLs[0], toFid.String()) |
|||
|
|||
resp, reader, err := readUrl(downloadURL) |
|||
if err != nil { |
|||
return err |
|||
} |
|||
defer util.CloseResponse(resp) |
|||
defer reader.Close() |
|||
|
|||
var filename string |
|||
|
|||
contentDisposition := resp.Header.Get("Content-Disposition") |
|||
if len(contentDisposition) > 0 { |
|||
idx := strings.Index(contentDisposition, "filename=") |
|||
if idx != -1 { |
|||
filename = contentDisposition[idx+len("filename="):] |
|||
filename = strings.Trim(filename, "\"") |
|||
} |
|||
} |
|||
|
|||
contentType := resp.Header.Get("Content-Type") |
|||
isCompressed := resp.Header.Get("Content-Encoding") == "gzip" |
|||
md5 := resp.Header.Get("Content-MD5") |
|||
|
|||
_, err, _ = operation.Upload(reader, &operation.UploadOption{ |
|||
UploadUrl: uploadURL, |
|||
Filename: filename, |
|||
IsInputCompressed: isCompressed, |
|||
Cipher: false, |
|||
MimeType: contentType, |
|||
PairMap: nil, |
|||
Md5: md5, |
|||
}) |
|||
if err != nil { |
|||
return err |
|||
} |
|||
chunk.Fid.VolumeId = uint32(toVolumeId) |
|||
chunk.FileId = "" |
|||
|
|||
return nil |
|||
} |
|||
|
|||
func readUrl(fileUrl string) (*http.Response, io.ReadCloser, error) { |
|||
|
|||
req, err := http.NewRequest("GET", fileUrl, nil) |
|||
if err != nil { |
|||
return nil, nil, err |
|||
} |
|||
req.Header.Add("Accept-Encoding", "gzip") |
|||
|
|||
r, err := client.Do(req) |
|||
if err != nil { |
|||
return nil, nil, err |
|||
} |
|||
if r.StatusCode >= 400 { |
|||
util.CloseResponse(r) |
|||
return nil, nil, fmt.Errorf("%s: %s", fileUrl, r.Status) |
|||
} |
|||
|
|||
return r, r.Body, nil |
|||
} |
Write
Preview
Loading…
Cancel
Save
Reference in new issue