Browse Source

Revert "mount: when outside cluster network, use filer as proxy to access volume servers"

This reverts commit 096e088d7b.
pull/1759/head
Chris Lu 4 years ago
parent
commit
6ca10725b8
  1. 1
      weed/command/mount_std.go
  2. 6
      weed/filer/reader_at.go
  3. 4
      weed/filesys/file.go
  4. 6
      weed/filesys/filehandle.go
  5. 13
      weed/filesys/wfs.go
  6. 4
      weed/filesys/wfs_deletion.go
  7. 7
      weed/filesys/wfs_filer_client.go
  8. 5
      weed/filesys/wfs_write.go
  9. 4
      weed/messaging/broker/broker_append.go
  10. 1
      weed/pb/filer_pb/filer_client.go
  11. 3
      weed/replication/sink/filersink/fetch_write.go
  12. 4
      weed/replication/source/filer_source.go
  13. 3
      weed/s3api/s3api_handlers.go
  14. 5
      weed/server/webdav_server.go
  15. 4
      weed/shell/commands.go

1
weed/command/mount_std.go

@ -169,7 +169,6 @@ func RunMount(option *MountOptions, umask os.FileMode) bool {
} }
seaweedFileSystem := filesys.NewSeaweedFileSystem(&filesys.Option{ seaweedFileSystem := filesys.NewSeaweedFileSystem(&filesys.Option{
FilerAddress: filer,
FilerGrpcAddress: filerGrpcAddress, FilerGrpcAddress: filerGrpcAddress,
GrpcDialOption: grpcDialOption, GrpcDialOption: grpcDialOption,
FilerMountRootPath: mountRoot, FilerMountRootPath: mountRoot,

6
weed/filer/reader_at.go

@ -71,7 +71,7 @@ func LookupFn(filerClient filer_pb.FilerClient) wdclient.LookupFileIdFunctionTyp
} }
for _, loc := range locations.Locations { for _, loc := range locations.Locations {
volumeServerAddress := loc.Url
volumeServerAddress := filerClient.AdjustedUrl(loc)
targetUrl := fmt.Sprintf("http://%s/%s", volumeServerAddress, fileId) targetUrl := fmt.Sprintf("http://%s/%s", volumeServerAddress, fileId)
targetUrls = append(targetUrls, targetUrl) targetUrls = append(targetUrls, targetUrl)
} }
@ -85,11 +85,11 @@ func LookupFn(filerClient filer_pb.FilerClient) wdclient.LookupFileIdFunctionTyp
} }
} }
func NewChunkReaderAtFromClient(lookupFn wdclient.LookupFileIdFunctionType, chunkViews []*ChunkView, chunkCache chunk_cache.ChunkCache, fileSize int64) *ChunkReadAt {
func NewChunkReaderAtFromClient(filerClient filer_pb.FilerClient, chunkViews []*ChunkView, chunkCache chunk_cache.ChunkCache, fileSize int64) *ChunkReadAt {
return &ChunkReadAt{ return &ChunkReadAt{
chunkViews: chunkViews, chunkViews: chunkViews,
lookupFileId: lookupFn,
lookupFileId: LookupFn(filerClient),
chunkCache: chunkCache, chunkCache: chunkCache,
fileSize: fileSize, fileSize: fileSize,
} }

4
weed/filesys/file.go

@ -147,7 +147,7 @@ func (file *File) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *f
} }
} }
file.entry.Chunks = chunks file.entry.Chunks = chunks
file.entryViewCache, _ = filer.NonOverlappingVisibleIntervals(file.wfs.LookupFn(), chunks)
file.entryViewCache, _ = filer.NonOverlappingVisibleIntervals(filer.LookupFn(file.wfs), chunks)
file.reader = nil file.reader = nil
file.wfs.deleteFileChunks(truncatedChunks) file.wfs.deleteFileChunks(truncatedChunks)
} }
@ -329,7 +329,7 @@ func (file *File) addChunks(chunks []*filer_pb.FileChunk) {
func (file *File) setEntry(entry *filer_pb.Entry) { func (file *File) setEntry(entry *filer_pb.Entry) {
file.entry = entry file.entry = entry
file.entryViewCache, _ = filer.NonOverlappingVisibleIntervals(file.wfs.LookupFn(), entry.Chunks)
file.entryViewCache, _ = filer.NonOverlappingVisibleIntervals(filer.LookupFn(file.wfs), entry.Chunks)
file.reader = nil file.reader = nil
} }

6
weed/filesys/filehandle.go

@ -119,7 +119,7 @@ func (fh *FileHandle) readFromChunks(buff []byte, offset int64) (int64, error) {
var chunkResolveErr error var chunkResolveErr error
if fh.f.entryViewCache == nil { if fh.f.entryViewCache == nil {
fh.f.entryViewCache, chunkResolveErr = filer.NonOverlappingVisibleIntervals(fh.f.wfs.LookupFn(), fh.f.entry.Chunks)
fh.f.entryViewCache, chunkResolveErr = filer.NonOverlappingVisibleIntervals(filer.LookupFn(fh.f.wfs), fh.f.entry.Chunks)
if chunkResolveErr != nil { if chunkResolveErr != nil {
return 0, fmt.Errorf("fail to resolve chunk manifest: %v", chunkResolveErr) return 0, fmt.Errorf("fail to resolve chunk manifest: %v", chunkResolveErr)
} }
@ -128,7 +128,7 @@ func (fh *FileHandle) readFromChunks(buff []byte, offset int64) (int64, error) {
if fh.f.reader == nil { if fh.f.reader == nil {
chunkViews := filer.ViewFromVisibleIntervals(fh.f.entryViewCache, 0, math.MaxInt64) chunkViews := filer.ViewFromVisibleIntervals(fh.f.entryViewCache, 0, math.MaxInt64)
fh.f.reader = filer.NewChunkReaderAtFromClient(fh.f.wfs.LookupFn(), chunkViews, fh.f.wfs.chunkCache, fileSize)
fh.f.reader = filer.NewChunkReaderAtFromClient(fh.f.wfs, chunkViews, fh.f.wfs.chunkCache, fileSize)
} }
totalRead, err := fh.f.reader.ReadAt(buff, offset) totalRead, err := fh.f.reader.ReadAt(buff, offset)
@ -269,7 +269,7 @@ func (fh *FileHandle) doFlush(ctx context.Context, header fuse.Header) error {
manifestChunks, nonManifestChunks := filer.SeparateManifestChunks(fh.f.entry.Chunks) manifestChunks, nonManifestChunks := filer.SeparateManifestChunks(fh.f.entry.Chunks)
chunks, _ := filer.CompactFileChunks(fh.f.wfs.LookupFn(), nonManifestChunks)
chunks, _ := filer.CompactFileChunks(filer.LookupFn(fh.f.wfs), nonManifestChunks)
chunks, manifestErr := filer.MaybeManifestize(fh.f.wfs.saveDataAsChunk(fh.f.fullpath()), chunks) chunks, manifestErr := filer.MaybeManifestize(fh.f.wfs.saveDataAsChunk(fh.f.fullpath()), chunks)
if manifestErr != nil { if manifestErr != nil {
// not good, but should be ok // not good, but should be ok

13
weed/filesys/wfs.go

@ -3,8 +3,6 @@ package filesys
import ( import (
"context" "context"
"fmt" "fmt"
"github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/wdclient"
"math" "math"
"os" "os"
"path" "path"
@ -26,7 +24,6 @@ import (
) )
type Option struct { type Option struct {
FilerAddress string
FilerGrpcAddress string FilerGrpcAddress string
GrpcDialOption grpc.DialOption GrpcDialOption grpc.DialOption
FilerMountRootPath string FilerMountRootPath string
@ -240,13 +237,3 @@ func (wfs *WFS) mapPbIdFromLocalToFiler(entry *filer_pb.Entry) {
} }
entry.Attributes.Uid, entry.Attributes.Gid = wfs.option.UidGidMapper.LocalToFiler(entry.Attributes.Uid, entry.Attributes.Gid) entry.Attributes.Uid, entry.Attributes.Gid = wfs.option.UidGidMapper.LocalToFiler(entry.Attributes.Uid, entry.Attributes.Gid)
} }
func (wfs *WFS) LookupFn() wdclient.LookupFileIdFunctionType {
if wfs.option.OutsideContainerClusterMode {
return func(fileId string) (targetUrls []string, err error) {
return []string{"http://" + wfs.option.FilerAddress + "/?proxyChunkId=" + fileId}, nil
}
}
return filer.LookupFn(wfs)
}

4
weed/filesys/wfs_deletion.go

@ -22,7 +22,7 @@ func (wfs *WFS) deleteFileChunks(chunks []*filer_pb.FileChunk) {
fileIds = append(fileIds, chunk.GetFileIdString()) fileIds = append(fileIds, chunk.GetFileIdString())
continue continue
} }
dataChunks, manifestResolveErr := filer.ResolveOneChunkManifest(wfs.LookupFn(), chunk)
dataChunks, manifestResolveErr := filer.ResolveOneChunkManifest(filer.LookupFn(wfs), chunk)
if manifestResolveErr != nil { if manifestResolveErr != nil {
glog.V(0).Infof("failed to resolve manifest %s: %v", chunk.FileId, manifestResolveErr) glog.V(0).Infof("failed to resolve manifest %s: %v", chunk.FileId, manifestResolveErr)
} }
@ -68,7 +68,7 @@ func (wfs *WFS) deleteFileIds(grpcDialOption grpc.DialOption, client filer_pb.Se
} }
for _, loc := range locations.Locations { for _, loc := range locations.Locations {
lr.Locations = append(lr.Locations, operation.Location{ lr.Locations = append(lr.Locations, operation.Location{
Url: loc.Url,
Url: wfs.AdjustedUrl(loc),
PublicUrl: loc.PublicUrl, PublicUrl: loc.PublicUrl,
}) })
} }

7
weed/filesys/wfs_filer_client.go

@ -25,3 +25,10 @@ func (wfs *WFS) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) erro
return err return err
} }
func (wfs *WFS) AdjustedUrl(location *filer_pb.Location) string {
if wfs.option.OutsideContainerClusterMode {
return location.PublicUrl
}
return location.Url
}

5
weed/filesys/wfs_write.go

@ -44,7 +44,7 @@ func (wfs *WFS) saveDataAsChunk(fullPath util.FullPath) filer.SaveDataAsChunkFun
Url: resp.Url, Url: resp.Url,
PublicUrl: resp.PublicUrl, PublicUrl: resp.PublicUrl,
} }
host = loc.Url
host = wfs.AdjustedUrl(loc)
collection, replication = resp.Collection, resp.Replication collection, replication = resp.Collection, resp.Replication
return nil return nil
@ -53,9 +53,6 @@ func (wfs *WFS) saveDataAsChunk(fullPath util.FullPath) filer.SaveDataAsChunkFun
} }
fileUrl := fmt.Sprintf("http://%s/%s", host, fileId) fileUrl := fmt.Sprintf("http://%s/%s", host, fileId)
if wfs.option.OutsideContainerClusterMode {
fileUrl = fmt.Sprintf("http://%s/?proxyChunkId=%s", wfs.option.FilerAddress, fileId)
}
uploadResult, err, data := operation.Upload(fileUrl, filename, wfs.option.Cipher, reader, false, "", nil, auth) uploadResult, err, data := operation.Upload(fileUrl, filename, wfs.option.Cipher, reader, false, "", nil, auth)
if err != nil { if err != nil {
glog.V(0).Infof("upload data %v to %s: %v", filename, fileUrl, err) glog.V(0).Infof("upload data %v to %s: %v", filename, fileUrl, err)

4
weed/messaging/broker/broker_append.go

@ -107,3 +107,7 @@ func (broker *MessageBroker) WithFilerClient(fn func(filer_pb.SeaweedFilerClient
return return
} }
func (broker *MessageBroker) AdjustedUrl(location *filer_pb.Location) string {
return location.Url
}

1
weed/pb/filer_pb/filer_client.go

@ -20,6 +20,7 @@ var (
type FilerClient interface { type FilerClient interface {
WithFilerClient(fn func(SeaweedFilerClient) error) error WithFilerClient(fn func(SeaweedFilerClient) error) error
AdjustedUrl(location *Location) string
} }
func GetEntry(filerClient FilerClient, fullFilePath util.FullPath) (entry *Entry, err error) { func GetEntry(filerClient FilerClient, fullFilePath util.FullPath) (entry *Entry, err error) {

3
weed/replication/sink/filersink/fetch_write.go

@ -128,3 +128,6 @@ func (fs *FilerSink) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error)
}, fs.grpcAddress, fs.grpcDialOption) }, fs.grpcAddress, fs.grpcDialOption)
} }
func (fs *FilerSink) AdjustedUrl(location *filer_pb.Location) string {
return location.Url
}

4
weed/replication/source/filer_source.go

@ -124,6 +124,10 @@ func (fs *FilerSource) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) erro
} }
func (fs *FilerSource) AdjustedUrl(location *filer_pb.Location) string {
return location.Url
}
func volumeId(fileId string) string { func volumeId(fileId string) string {
lastCommaIndex := strings.LastIndex(fileId, ",") lastCommaIndex := strings.LastIndex(fileId, ",")
if lastCommaIndex > 0 { if lastCommaIndex > 0 {

3
weed/s3api/s3api_handlers.go

@ -50,6 +50,9 @@ func (s3a *S3ApiServer) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) err
}, s3a.option.FilerGrpcAddress, s3a.option.GrpcDialOption) }, s3a.option.FilerGrpcAddress, s3a.option.GrpcDialOption)
} }
func (s3a *S3ApiServer) AdjustedUrl(location *filer_pb.Location) string {
return location.Url
}
// If none of the http routes match respond with MethodNotAllowed // If none of the http routes match respond with MethodNotAllowed
func notFoundHandler(w http.ResponseWriter, r *http.Request) { func notFoundHandler(w http.ResponseWriter, r *http.Request) {

5
weed/server/webdav_server.go

@ -123,6 +123,9 @@ func (fs *WebDavFileSystem) WithFilerClient(fn func(filer_pb.SeaweedFilerClient)
}, fs.option.FilerGrpcAddress, fs.option.GrpcDialOption) }, fs.option.FilerGrpcAddress, fs.option.GrpcDialOption)
} }
func (fs *WebDavFileSystem) AdjustedUrl(location *filer_pb.Location) string {
return location.Url
}
func clearName(name string) (string, error) { func clearName(name string) (string, error) {
slashed := strings.HasSuffix(name, "/") slashed := strings.HasSuffix(name, "/")
@ -520,7 +523,7 @@ func (f *WebDavFile) Read(p []byte) (readSize int, err error) {
} }
if f.reader == nil { if f.reader == nil {
chunkViews := filer.ViewFromVisibleIntervals(f.entryViewCache, 0, math.MaxInt64) chunkViews := filer.ViewFromVisibleIntervals(f.entryViewCache, 0, math.MaxInt64)
f.reader = filer.NewChunkReaderAtFromClient(filer.LookupFn(f.fs), chunkViews, f.fs.chunkCache, fileSize)
f.reader = filer.NewChunkReaderAtFromClient(f.fs, chunkViews, f.fs.chunkCache, fileSize)
} }
readSize, err = f.reader.ReadAt(p, f.off) readSize, err = f.reader.ReadAt(p, f.off)

4
weed/shell/commands.go

@ -102,6 +102,10 @@ func (ce *CommandEnv) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error
} }
func (ce *CommandEnv) AdjustedUrl(location *filer_pb.Location) string {
return location.Url
}
func parseFilerUrl(entryPath string) (filerServer string, filerPort int64, path string, err error) { func parseFilerUrl(entryPath string) (filerServer string, filerPort int64, path string, err error) {
if strings.HasPrefix(entryPath, "http") { if strings.HasPrefix(entryPath, "http") {
var u *url.URL var u *url.URL

Loading…
Cancel
Save