From c07bcd5065391441cdc97c05975e447999bab4b1 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 23 Mar 2020 00:30:02 -0700 Subject: [PATCH] refactoring --- weed/pb/filer_pb/filer_client.go | 95 ++++++++++++++++++++++++++-- weed/s3api/filer_multipart.go | 4 +- weed/s3api/filer_util.go | 98 ++--------------------------- weed/s3api/s3api_bucket_handlers.go | 6 -- 4 files changed, 97 insertions(+), 106 deletions(-) diff --git a/weed/pb/filer_pb/filer_client.go b/weed/pb/filer_pb/filer_client.go index 100e997b2..1a92b452d 100644 --- a/weed/pb/filer_pb/filer_client.go +++ b/weed/pb/filer_pb/filer_client.go @@ -5,11 +5,18 @@ import ( "fmt" "io" "math" + "os" + "time" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/util" ) +var ( + OS_UID = uint32(os.Getuid()) + OS_GID = uint32(os.Getgid()) +) + type FilerClient interface { WithFilerClient(fn func(SeaweedFilerClient) error) error AdjustedUrl(hostAndPort string) string @@ -50,15 +57,26 @@ func GetEntry(filerClient FilerClient, fullFilePath util.FullPath) (entry *Entry func ReadDirAllEntries(filerClient FilerClient, fullDirPath util.FullPath, prefix string, fn func(entry *Entry, isLast bool)) (err error) { - err = filerClient.WithFilerClient(func(client SeaweedFilerClient) error { + return doList(filerClient, fullDirPath, prefix, fn, "", false, math.MaxUint32) + +} + +func List(filerClient FilerClient, parentDirectoryPath, prefix string, fn func(entry *Entry, isLast bool), startFrom string, inclusive bool, limit uint32) (err error) { + + return doList(filerClient, util.FullPath(parentDirectoryPath), prefix, fn, startFrom, inclusive, limit) + +} - lastEntryName := "" +func doList(filerClient FilerClient, fullDirPath util.FullPath, prefix string, fn func(entry *Entry, isLast bool), startFrom string, inclusive bool, limit uint32) (err error) { + + err = filerClient.WithFilerClient(func(client SeaweedFilerClient) error { request := &ListEntriesRequest{ - Directory: string(fullDirPath), - Prefix: prefix, - StartFromFileName: lastEntryName, - Limit: math.MaxUint32, + Directory: string(fullDirPath), + Prefix: prefix, + StartFromFileName: startFrom, + Limit: limit, + InclusiveStartFrom: inclusive, } glog.V(3).Infof("read directory: %v", request) @@ -120,3 +138,68 @@ func Exists(filerClient FilerClient, parentDirectoryPath string, entryName strin return } + +func Mkdir(filerClient FilerClient, parentDirectoryPath string, dirName string, fn func(entry *Entry)) error { + return filerClient.WithFilerClient(func(client SeaweedFilerClient) error { + + entry := &Entry{ + Name: dirName, + IsDirectory: true, + Attributes: &FuseAttributes{ + Mtime: time.Now().Unix(), + Crtime: time.Now().Unix(), + FileMode: uint32(0777 | os.ModeDir), + Uid: OS_UID, + Gid: OS_GID, + }, + } + + if fn != nil { + fn(entry) + } + + request := &CreateEntryRequest{ + Directory: parentDirectoryPath, + Entry: entry, + } + + glog.V(1).Infof("mkdir: %v", request) + if err := CreateEntry(client, request); err != nil { + glog.V(0).Infof("mkdir %v: %v", request, err) + return fmt.Errorf("mkdir %s/%s: %v", parentDirectoryPath, dirName, err) + } + + return nil + }) +} + +func MkFile(filerClient FilerClient, parentDirectoryPath string, fileName string, chunks []*FileChunk) error { + return filerClient.WithFilerClient(func(client SeaweedFilerClient) error { + + entry := &Entry{ + Name: fileName, + IsDirectory: false, + Attributes: &FuseAttributes{ + Mtime: time.Now().Unix(), + Crtime: time.Now().Unix(), + FileMode: uint32(0770), + Uid: OS_UID, + Gid: OS_GID, + }, + Chunks: chunks, + } + + request := &CreateEntryRequest{ + Directory: parentDirectoryPath, + Entry: entry, + } + + glog.V(1).Infof("create file: %s/%s", parentDirectoryPath, fileName) + if err := CreateEntry(client, request); err != nil { + glog.V(0).Infof("create file %v:%v", request, err) + return fmt.Errorf("create file %s/%s: %v", parentDirectoryPath, fileName, err) + } + + return nil + }) +} diff --git a/weed/s3api/filer_multipart.go b/weed/s3api/filer_multipart.go index 1350fb18e..e81461dd2 100644 --- a/weed/s3api/filer_multipart.go +++ b/weed/s3api/filer_multipart.go @@ -155,7 +155,7 @@ func (s3a *S3ApiServer) listMultipartUploads(input *s3.ListMultipartUploadsInput }, } - entries, err := s3a.list(s3a.genUploadsFolder(*input.Bucket), *input.Prefix, *input.KeyMarker, true, int(*input.MaxUploads)) + entries, err := s3a.list(s3a.genUploadsFolder(*input.Bucket), *input.Prefix, *input.KeyMarker, true, uint32(*input.MaxUploads)) if err != nil { glog.Errorf("listMultipartUploads %s error: %v", *input.Bucket, err) return @@ -190,7 +190,7 @@ func (s3a *S3ApiServer) listObjectParts(input *s3.ListPartsInput) (output *ListP }, } - entries, err := s3a.list(s3a.genUploadsFolder(*input.Bucket)+"/"+*input.UploadId, "", fmt.Sprintf("%04d.part", *input.PartNumberMarker), false, int(*input.MaxParts)) + entries, err := s3a.list(s3a.genUploadsFolder(*input.Bucket)+"/"+*input.UploadId, "", fmt.Sprintf("%04d.part", *input.PartNumberMarker), false, uint32(*input.MaxParts)) if err != nil { glog.Errorf("listObjectParts %s %s error: %v", *input.Bucket, *input.UploadId, err) return nil, ErrNoSuchUpload diff --git a/weed/s3api/filer_util.go b/weed/s3api/filer_util.go index 2e738af50..51249cf39 100644 --- a/weed/s3api/filer_util.go +++ b/weed/s3api/filer_util.go @@ -3,115 +3,29 @@ package s3api import ( "context" "fmt" - "io" - "os" "strings" - "time" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" ) func (s3a *S3ApiServer) mkdir(parentDirectoryPath string, dirName string, fn func(entry *filer_pb.Entry)) error { - return s3a.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { - - entry := &filer_pb.Entry{ - Name: dirName, - IsDirectory: true, - Attributes: &filer_pb.FuseAttributes{ - Mtime: time.Now().Unix(), - Crtime: time.Now().Unix(), - FileMode: uint32(0777 | os.ModeDir), - Uid: OS_UID, - Gid: OS_GID, - }, - } - - if fn != nil { - fn(entry) - } - - request := &filer_pb.CreateEntryRequest{ - Directory: parentDirectoryPath, - Entry: entry, - } - glog.V(1).Infof("mkdir: %v", request) - if err := filer_pb.CreateEntry(client, request); err != nil { - glog.V(0).Infof("mkdir %v: %v", request, err) - return fmt.Errorf("mkdir %s/%s: %v", parentDirectoryPath, dirName, err) - } + return filer_pb.Mkdir(s3a, parentDirectoryPath, dirName, fn) - return nil - }) } func (s3a *S3ApiServer) mkFile(parentDirectoryPath string, fileName string, chunks []*filer_pb.FileChunk) error { - return s3a.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { - - entry := &filer_pb.Entry{ - Name: fileName, - IsDirectory: false, - Attributes: &filer_pb.FuseAttributes{ - Mtime: time.Now().Unix(), - Crtime: time.Now().Unix(), - FileMode: uint32(0770), - Uid: OS_UID, - Gid: OS_GID, - }, - Chunks: chunks, - } - - request := &filer_pb.CreateEntryRequest{ - Directory: parentDirectoryPath, - Entry: entry, - } - glog.V(1).Infof("create file: %s/%s", parentDirectoryPath, fileName) - if err := filer_pb.CreateEntry(client, request); err != nil { - glog.V(0).Infof("create file %v:%v", request, err) - return fmt.Errorf("create file %s/%s: %v", parentDirectoryPath, fileName, err) - } + return filer_pb.MkFile(s3a, parentDirectoryPath, fileName, chunks) - return nil - }) } -func (s3a *S3ApiServer) list(parentDirectoryPath, prefix, startFrom string, inclusive bool, limit int) (entries []*filer_pb.Entry, err error) { - - err = s3a.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { +func (s3a *S3ApiServer) list(parentDirectoryPath, prefix, startFrom string, inclusive bool, limit uint32) (entries []*filer_pb.Entry, err error) { - request := &filer_pb.ListEntriesRequest{ - Directory: parentDirectoryPath, - Prefix: prefix, - StartFromFileName: startFrom, - InclusiveStartFrom: inclusive, - Limit: uint32(limit), - } - - glog.V(4).Infof("read directory: %v", request) - stream, err := client.ListEntries(context.Background(), request) - if err != nil { - glog.V(0).Infof("read directory %v: %v", request, err) - return fmt.Errorf("list dir %v: %v", parentDirectoryPath, err) - } - - for { - resp, recvErr := stream.Recv() - if recvErr != nil { - if recvErr == io.EOF { - break - } else { - return recvErr - } - } - - entries = append(entries, resp.Entry) - - } - - return nil - }) + err = filer_pb.List(s3a, parentDirectoryPath, prefix, func(entry *filer_pb.Entry, isLast bool) { + entries = append(entries, entry) + }, startFrom, inclusive, limit) return diff --git a/weed/s3api/s3api_bucket_handlers.go b/weed/s3api/s3api_bucket_handlers.go index 02a01e74f..f1bfb2156 100644 --- a/weed/s3api/s3api_bucket_handlers.go +++ b/weed/s3api/s3api_bucket_handlers.go @@ -6,7 +6,6 @@ import ( "fmt" "math" "net/http" - "os" "time" "github.com/aws/aws-sdk-go/aws" @@ -17,11 +16,6 @@ import ( "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" ) -var ( - OS_UID = uint32(os.Getuid()) - OS_GID = uint32(os.Getgid()) -) - type ListAllMyBucketsResult struct { XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ ListAllMyBucketsResult"` Owner *s3.Owner