Browse Source

refactoring

pull/1255/head
Chris Lu 5 years ago
parent
commit
c07bcd5065
  1. 91
      weed/pb/filer_pb/filer_client.go
  2. 4
      weed/s3api/filer_multipart.go
  3. 98
      weed/s3api/filer_util.go
  4. 6
      weed/s3api/s3api_bucket_handlers.go

91
weed/pb/filer_pb/filer_client.go

@ -5,11 +5,18 @@ import (
"fmt" "fmt"
"io" "io"
"math" "math"
"os"
"time"
"github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/util" "github.com/chrislusf/seaweedfs/weed/util"
) )
var (
OS_UID = uint32(os.Getuid())
OS_GID = uint32(os.Getgid())
)
type FilerClient interface { type FilerClient interface {
WithFilerClient(fn func(SeaweedFilerClient) error) error WithFilerClient(fn func(SeaweedFilerClient) error) error
AdjustedUrl(hostAndPort string) string AdjustedUrl(hostAndPort string) string
@ -50,15 +57,26 @@ func GetEntry(filerClient FilerClient, fullFilePath util.FullPath) (entry *Entry
func ReadDirAllEntries(filerClient FilerClient, fullDirPath util.FullPath, prefix string, fn func(entry *Entry, isLast bool)) (err error) { func ReadDirAllEntries(filerClient FilerClient, fullDirPath util.FullPath, prefix string, fn func(entry *Entry, isLast bool)) (err error) {
err = filerClient.WithFilerClient(func(client SeaweedFilerClient) error {
return doList(filerClient, fullDirPath, prefix, fn, "", false, math.MaxUint32)
}
lastEntryName := ""
func List(filerClient FilerClient, parentDirectoryPath, prefix string, fn func(entry *Entry, isLast bool), startFrom string, inclusive bool, limit uint32) (err error) {
return doList(filerClient, util.FullPath(parentDirectoryPath), prefix, fn, startFrom, inclusive, limit)
}
func doList(filerClient FilerClient, fullDirPath util.FullPath, prefix string, fn func(entry *Entry, isLast bool), startFrom string, inclusive bool, limit uint32) (err error) {
err = filerClient.WithFilerClient(func(client SeaweedFilerClient) error {
request := &ListEntriesRequest{ request := &ListEntriesRequest{
Directory: string(fullDirPath), Directory: string(fullDirPath),
Prefix: prefix, Prefix: prefix,
StartFromFileName: lastEntryName,
Limit: math.MaxUint32,
StartFromFileName: startFrom,
Limit: limit,
InclusiveStartFrom: inclusive,
} }
glog.V(3).Infof("read directory: %v", request) glog.V(3).Infof("read directory: %v", request)
@ -120,3 +138,68 @@ func Exists(filerClient FilerClient, parentDirectoryPath string, entryName strin
return return
} }
func Mkdir(filerClient FilerClient, parentDirectoryPath string, dirName string, fn func(entry *Entry)) error {
return filerClient.WithFilerClient(func(client SeaweedFilerClient) error {
entry := &Entry{
Name: dirName,
IsDirectory: true,
Attributes: &FuseAttributes{
Mtime: time.Now().Unix(),
Crtime: time.Now().Unix(),
FileMode: uint32(0777 | os.ModeDir),
Uid: OS_UID,
Gid: OS_GID,
},
}
if fn != nil {
fn(entry)
}
request := &CreateEntryRequest{
Directory: parentDirectoryPath,
Entry: entry,
}
glog.V(1).Infof("mkdir: %v", request)
if err := CreateEntry(client, request); err != nil {
glog.V(0).Infof("mkdir %v: %v", request, err)
return fmt.Errorf("mkdir %s/%s: %v", parentDirectoryPath, dirName, err)
}
return nil
})
}
func MkFile(filerClient FilerClient, parentDirectoryPath string, fileName string, chunks []*FileChunk) error {
return filerClient.WithFilerClient(func(client SeaweedFilerClient) error {
entry := &Entry{
Name: fileName,
IsDirectory: false,
Attributes: &FuseAttributes{
Mtime: time.Now().Unix(),
Crtime: time.Now().Unix(),
FileMode: uint32(0770),
Uid: OS_UID,
Gid: OS_GID,
},
Chunks: chunks,
}
request := &CreateEntryRequest{
Directory: parentDirectoryPath,
Entry: entry,
}
glog.V(1).Infof("create file: %s/%s", parentDirectoryPath, fileName)
if err := CreateEntry(client, request); err != nil {
glog.V(0).Infof("create file %v:%v", request, err)
return fmt.Errorf("create file %s/%s: %v", parentDirectoryPath, fileName, err)
}
return nil
})
}

4
weed/s3api/filer_multipart.go

@ -155,7 +155,7 @@ func (s3a *S3ApiServer) listMultipartUploads(input *s3.ListMultipartUploadsInput
}, },
} }
entries, err := s3a.list(s3a.genUploadsFolder(*input.Bucket), *input.Prefix, *input.KeyMarker, true, int(*input.MaxUploads))
entries, err := s3a.list(s3a.genUploadsFolder(*input.Bucket), *input.Prefix, *input.KeyMarker, true, uint32(*input.MaxUploads))
if err != nil { if err != nil {
glog.Errorf("listMultipartUploads %s error: %v", *input.Bucket, err) glog.Errorf("listMultipartUploads %s error: %v", *input.Bucket, err)
return return
@ -190,7 +190,7 @@ func (s3a *S3ApiServer) listObjectParts(input *s3.ListPartsInput) (output *ListP
}, },
} }
entries, err := s3a.list(s3a.genUploadsFolder(*input.Bucket)+"/"+*input.UploadId, "", fmt.Sprintf("%04d.part", *input.PartNumberMarker), false, int(*input.MaxParts))
entries, err := s3a.list(s3a.genUploadsFolder(*input.Bucket)+"/"+*input.UploadId, "", fmt.Sprintf("%04d.part", *input.PartNumberMarker), false, uint32(*input.MaxParts))
if err != nil { if err != nil {
glog.Errorf("listObjectParts %s %s error: %v", *input.Bucket, *input.UploadId, err) glog.Errorf("listObjectParts %s %s error: %v", *input.Bucket, *input.UploadId, err)
return nil, ErrNoSuchUpload return nil, ErrNoSuchUpload

98
weed/s3api/filer_util.go

@ -3,115 +3,29 @@ package s3api
import ( import (
"context" "context"
"fmt" "fmt"
"io"
"os"
"strings" "strings"
"time"
"github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
) )
func (s3a *S3ApiServer) mkdir(parentDirectoryPath string, dirName string, fn func(entry *filer_pb.Entry)) error { func (s3a *S3ApiServer) mkdir(parentDirectoryPath string, dirName string, fn func(entry *filer_pb.Entry)) error {
return s3a.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
entry := &filer_pb.Entry{
Name: dirName,
IsDirectory: true,
Attributes: &filer_pb.FuseAttributes{
Mtime: time.Now().Unix(),
Crtime: time.Now().Unix(),
FileMode: uint32(0777 | os.ModeDir),
Uid: OS_UID,
Gid: OS_GID,
},
}
if fn != nil {
fn(entry)
}
request := &filer_pb.CreateEntryRequest{
Directory: parentDirectoryPath,
Entry: entry,
}
glog.V(1).Infof("mkdir: %v", request)
if err := filer_pb.CreateEntry(client, request); err != nil {
glog.V(0).Infof("mkdir %v: %v", request, err)
return fmt.Errorf("mkdir %s/%s: %v", parentDirectoryPath, dirName, err)
}
return filer_pb.Mkdir(s3a, parentDirectoryPath, dirName, fn)
return nil
})
} }
func (s3a *S3ApiServer) mkFile(parentDirectoryPath string, fileName string, chunks []*filer_pb.FileChunk) error { func (s3a *S3ApiServer) mkFile(parentDirectoryPath string, fileName string, chunks []*filer_pb.FileChunk) error {
return s3a.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
entry := &filer_pb.Entry{
Name: fileName,
IsDirectory: false,
Attributes: &filer_pb.FuseAttributes{
Mtime: time.Now().Unix(),
Crtime: time.Now().Unix(),
FileMode: uint32(0770),
Uid: OS_UID,
Gid: OS_GID,
},
Chunks: chunks,
}
request := &filer_pb.CreateEntryRequest{
Directory: parentDirectoryPath,
Entry: entry,
}
glog.V(1).Infof("create file: %s/%s", parentDirectoryPath, fileName)
if err := filer_pb.CreateEntry(client, request); err != nil {
glog.V(0).Infof("create file %v:%v", request, err)
return fmt.Errorf("create file %s/%s: %v", parentDirectoryPath, fileName, err)
}
return nil
})
}
func (s3a *S3ApiServer) list(parentDirectoryPath, prefix, startFrom string, inclusive bool, limit int) (entries []*filer_pb.Entry, err error) {
err = s3a.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.ListEntriesRequest{
Directory: parentDirectoryPath,
Prefix: prefix,
StartFromFileName: startFrom,
InclusiveStartFrom: inclusive,
Limit: uint32(limit),
}
glog.V(4).Infof("read directory: %v", request)
stream, err := client.ListEntries(context.Background(), request)
if err != nil {
glog.V(0).Infof("read directory %v: %v", request, err)
return fmt.Errorf("list dir %v: %v", parentDirectoryPath, err)
}
return filer_pb.MkFile(s3a, parentDirectoryPath, fileName, chunks)
for {
resp, recvErr := stream.Recv()
if recvErr != nil {
if recvErr == io.EOF {
break
} else {
return recvErr
}
} }
entries = append(entries, resp.Entry)
func (s3a *S3ApiServer) list(parentDirectoryPath, prefix, startFrom string, inclusive bool, limit uint32) (entries []*filer_pb.Entry, err error) {
}
return nil
})
err = filer_pb.List(s3a, parentDirectoryPath, prefix, func(entry *filer_pb.Entry, isLast bool) {
entries = append(entries, entry)
}, startFrom, inclusive, limit)
return return

6
weed/s3api/s3api_bucket_handlers.go

@ -6,7 +6,6 @@ import (
"fmt" "fmt"
"math" "math"
"net/http" "net/http"
"os"
"time" "time"
"github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws"
@ -17,11 +16,6 @@ import (
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
) )
var (
OS_UID = uint32(os.Getuid())
OS_GID = uint32(os.Getgid())
)
type ListAllMyBucketsResult struct { type ListAllMyBucketsResult struct {
XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ ListAllMyBucketsResult"` XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ ListAllMyBucketsResult"`
Owner *s3.Owner Owner *s3.Owner

Loading…
Cancel
Save