|
|
package filer2
import ( "context" "fmt" "io" "math" "strings" "sync"
"github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/util" )
func VolumeId(fileId string) string { lastCommaIndex := strings.LastIndex(fileId, ",") if lastCommaIndex > 0 { return fileId[:lastCommaIndex] } return fileId }
type FilerClient interface { WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error AdjustedUrl(hostAndPort string) string }
func ReadIntoBuffer(filerClient FilerClient, fullFilePath FullPath, buff []byte, chunkViews []*ChunkView, baseOffset int64) (totalRead int64, err error) { var vids []string for _, chunkView := range chunkViews { vids = append(vids, VolumeId(chunkView.FileId)) }
vid2Locations := make(map[string]*filer_pb.Locations)
err = filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
glog.V(4).Infof("read fh lookup volume id locations: %v", vids) resp, err := client.LookupVolume(context.Background(), &filer_pb.LookupVolumeRequest{ VolumeIds: vids, }) if err != nil { return err }
vid2Locations = resp.LocationsMap
return nil })
if err != nil { return 0, fmt.Errorf("failed to lookup volume ids %v: %v", vids, err) }
var wg sync.WaitGroup for _, chunkView := range chunkViews { wg.Add(1) go func(chunkView *ChunkView) { defer wg.Done()
glog.V(4).Infof("read fh reading chunk: %+v", chunkView)
locations := vid2Locations[VolumeId(chunkView.FileId)] if locations == nil || len(locations.Locations) == 0 { glog.V(0).Infof("failed to locate %s", chunkView.FileId) err = fmt.Errorf("failed to locate %s", chunkView.FileId) return }
volumeServerAddress := filerClient.AdjustedUrl(locations.Locations[0].Url) var n int64 n, err = util.ReadUrl(fmt.Sprintf("http://%s/%s", volumeServerAddress, chunkView.FileId), chunkView.CipherKey, chunkView.isGzipped, chunkView.IsFullChunk, chunkView.Offset, int(chunkView.Size), buff[chunkView.LogicOffset-baseOffset:chunkView.LogicOffset-baseOffset+int64(chunkView.Size)])
if err != nil {
glog.V(0).Infof("%v read http://%s/%v %v bytes: %v", fullFilePath, volumeServerAddress, chunkView.FileId, n, err)
err = fmt.Errorf("failed to read http://%s/%s: %v", volumeServerAddress, chunkView.FileId, err) return }
glog.V(4).Infof("read fh read %d bytes: %+v", n, chunkView) totalRead += n
}(chunkView) } wg.Wait() return }
func GetEntry(filerClient FilerClient, fullFilePath FullPath) (entry *filer_pb.Entry, err error) {
dir, name := fullFilePath.DirAndName()
err = filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.LookupDirectoryEntryRequest{ Directory: dir, Name: name, }
// glog.V(3).Infof("read %s request: %v", fullFilePath, request)
resp, err := filer_pb.LookupEntry(client, request) if err != nil { if err == filer_pb.ErrNotFound { return nil } glog.V(3).Infof("read %s %v: %v", fullFilePath, resp, err) return err }
if resp.Entry == nil { // glog.V(3).Infof("read %s entry: %v", fullFilePath, entry)
return nil }
entry = resp.Entry return nil })
return }
func ReadDirAllEntries(filerClient FilerClient, fullDirPath FullPath, prefix string, fn func(entry *filer_pb.Entry, isLast bool)) (err error) {
err = filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
lastEntryName := ""
request := &filer_pb.ListEntriesRequest{ Directory: string(fullDirPath), Prefix: prefix, StartFromFileName: lastEntryName, Limit: math.MaxUint32, }
glog.V(3).Infof("read directory: %v", request) stream, err := client.ListEntries(context.Background(), request) if err != nil { return fmt.Errorf("list %s: %v", fullDirPath, err) }
var prevEntry *filer_pb.Entry for { resp, recvErr := stream.Recv() if recvErr != nil { if recvErr == io.EOF { if prevEntry != nil { fn(prevEntry, true) } break } else { return recvErr } } if prevEntry != nil { fn(prevEntry, false) } prevEntry = resp.Entry }
return nil
})
return }
|