Browse Source

Merge pull request #5 from chrislusf/master

sync
pull/1404/head
hilimd 4 years ago
committed by GitHub
parent
commit
f9ba5cd986
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 10
      README.md
  2. 7
      go.mod
  3. 2
      go.sum
  4. 106
      test/s3/basic/basic_test.go
  5. 18
      weed/filesys/dir.go
  6. 34
      weed/filesys/dir_rename.go
  7. 1
      weed/filesys/file.go
  8. 8
      weed/filesys/filehandle.go
  9. 207
      weed/filesys/fscache.go
  10. 96
      weed/filesys/fscache_test.go
  11. 2
      weed/filesys/wfs.go
  12. 6
      weed/s3api/auth_credentials.go
  13. 10
      weed/s3api/s3api_bucket_handlers.go
  14. 14
      weed/s3api/s3api_object_copy_handlers.go
  15. 35
      weed/s3api/s3api_object_handlers.go
  16. 36
      weed/s3api/s3api_object_multipart_handlers.go
  17. 10
      weed/s3api/s3api_objects_list_handlers.go
  18. 2
      weed/server/common.go
  19. 6
      weed/server/filer_server_handlers_read.go
  20. 4
      weed/server/volume_server_handlers_read.go

10
README.md

@ -4,7 +4,7 @@
[![Build Status](https://travis-ci.org/chrislusf/seaweedfs.svg?branch=master)](https://travis-ci.org/chrislusf/seaweedfs)
[![GoDoc](https://godoc.org/github.com/chrislusf/seaweedfs/weed?status.svg)](https://godoc.org/github.com/chrislusf/seaweedfs/weed)
[![Wiki](https://img.shields.io/badge/docs-wiki-blue.svg)](https://github.com/chrislusf/seaweedfs/wiki)
[![Docker Pulls](https://img.shields.io/docker/pulls/chrislusf/seaweedfs.svg?maxAge=86400)](https://hub.docker.com/r/chrislusf/seaweedfs/)
[![Docker Pulls](https://img.shields.io/docker/pulls/chrislusf/seaweedfs.svg?maxAge=4800)](https://hub.docker.com/r/chrislusf/seaweedfs/)
![SeaweedFS Logo](https://raw.githubusercontent.com/chrislusf/seaweedfs/master/note/seaweedfs.png)
@ -112,17 +112,19 @@ On top of the object store, optional [Filer] can support directories and POSIX a
[Back to TOC](#table-of-contents)
## Filer Features ##
* [filer server][Filer] provide "normal" directories and files via http.
* [mount filer][Mount] to read and write files directly as a local directory via FUSE.
* [Filer server][Filer] provide "normal" directories and files via http.
* [Super Large Files][SuperLargeFiles] stores large or super large files in tens of TB.
* [Mount filer][Mount] to read and write files directly as a local directory via FUSE.
* [Amazon S3 compatible API][AmazonS3API] to access files with S3 tooling.
* [Hadoop Compatible File System][Hadoop] to access files from Hadoop/Spark/Flink/etc jobs.
* [Async Backup To Cloud][BackupToCloud] has extremely fast local access and backups to Amazon S3, Google Cloud Storage, Azure, BackBlaze.
* [WebDAV] access as a mapped drive on Mac and Windows, or from mobile devices.
* [AES256-GCM Encrypted Storage][FilerDataEncryption] safely stores the encrypted data.
* [File TTL][FilerTTL] automatically purge file metadata and actual file data.
* [Kubernetes CSI Driver][SeaweedFsCsiDriver] A Container Storage Interface (CSI) Driver. [![Docker Pulls](https://img.shields.io/docker/pulls/chrislusf/seaweedfs-csi-driver.svg?maxAge=604800)](https://hub.docker.com/r/chrislusf/seaweedfs-csi-driver/)
* [Kubernetes CSI Driver][SeaweedFsCsiDriver] A Container Storage Interface (CSI) Driver. [![Docker Pulls](https://img.shields.io/docker/pulls/chrislusf/seaweedfs-csi-driver.svg?maxAge=4800)](https://hub.docker.com/r/chrislusf/seaweedfs-csi-driver/)
[Filer]: https://github.com/chrislusf/seaweedfs/wiki/Directories-and-Files
[SuperLargeFiles]: https://github.com/chrislusf/seaweedfs/wiki/Data-Structure-for-Large-Files
[Mount]: https://github.com/chrislusf/seaweedfs/wiki/FUSE-Mount
[AmazonS3API]: https://github.com/chrislusf/seaweedfs/wiki/Amazon-S3-API
[BackupToCloud]: https://github.com/chrislusf/seaweedfs/wiki/Backup-to-Cloud

7
go.mod

@ -32,7 +32,7 @@ require (
github.com/golang/protobuf v1.4.2
github.com/google/btree v1.0.0
github.com/google/uuid v1.1.1
github.com/gorilla/mux v1.7.3
github.com/gorilla/mux v1.7.4
github.com/gorilla/websocket v1.4.1 // indirect
github.com/grpc-ecosystem/grpc-gateway v1.11.0 // indirect
github.com/hashicorp/golang-lru v0.5.3 // indirect
@ -90,7 +90,4 @@ require (
gopkg.in/karlseguin/expect.v1 v1.0.1 // indirect
)
replace (
github.com/satori/go.uuid v1.2.0 => github.com/satori/go.uuid v0.0.0-20181028125025-b2ce2384e17b
go.etcd.io/etcd => go.etcd.io/etcd v0.5.0-alpha.5.0.20200425165423-262c93980547
)
replace go.etcd.io/etcd => go.etcd.io/etcd v0.5.0-alpha.5.0.20200425165423-262c93980547

2
go.sum

@ -236,6 +236,8 @@ github.com/googleapis/gax-go/v2 v2.0.5 h1:sjZBwGj9Jlw33ImPtvFviGYvseOtDM7hkSKB7+
github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk=
github.com/gorilla/mux v1.7.3 h1:gnP5JzjVOuiZD07fKKToCAOjS0yOpj/qPETTXCCS6hw=
github.com/gorilla/mux v1.7.3/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs=
github.com/gorilla/mux v1.7.4 h1:VuZ8uybHlWmqV03+zRzdwKL4tUnIp1MAQtp1mIFE1bc=
github.com/gorilla/mux v1.7.4/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So=
github.com/gorilla/websocket v0.0.0-20170926233335-4201258b820c/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ=
github.com/gorilla/websocket v1.4.0 h1:WDFjx/TMzVgy9VdMMQi2K2Emtwi2QcUQsztZ/zLaH/Q=
github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ=

106
test/s3/basic/basic_test.go

@ -2,14 +2,14 @@ package basic
import (
"fmt"
"os"
"strings"
"testing"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
"io/ioutil"
"os"
"strings"
"testing"
)
var (
@ -109,3 +109,101 @@ func exitErrorf(msg string, args ...interface{}) {
fmt.Fprintf(os.Stderr, msg+"\n", args...)
os.Exit(1)
}
const (
Bucket = "theBucket"
object = "foo/bar"
Data = "<data>"
)
func TestObjectOp(t *testing.T) {
_, err := svc.CreateBucket(&s3.CreateBucketInput{
Bucket: aws.String(Bucket),
})
if err != nil {
exitErrorf("Unable to create bucket, %v", err)
}
_, err = svc.PutObject(&s3.PutObjectInput{
Bucket: aws.String(Bucket),
Key: aws.String(object),
Body: strings.NewReader(Data),
})
if err != nil {
exitErrorf("Unable to put object, %v", err)
}
dest := fmt.Sprintf("%s_bak", object)
copyObj, err := svc.CopyObject(&s3.CopyObjectInput{
Bucket: aws.String(Bucket),
CopySource: aws.String(fmt.Sprintf("%s/%s", Bucket, object)),
Key: aws.String(dest),
})
if err != nil {
exitErrorf("Unable to copy object, %v", err)
}
t.Log("copy object result -> ", copyObj.CopyObjectResult)
getObj, err := svc.GetObject(&s3.GetObjectInput{
Bucket: aws.String(Bucket),
Key: aws.String(dest),
})
if err != nil {
exitErrorf("Unable to get copy object, %v", err)
}
data, err := ioutil.ReadAll(getObj.Body)
if err != nil {
exitErrorf("Unable to read object data, %v", err)
}
if string(data) != Data {
t.Error("object data -> ", string(data))
}
listObj, err := svc.ListObjectsV2(&s3.ListObjectsV2Input{
Bucket: aws.String(Bucket),
Prefix: aws.String("foo/"),
})
if err != nil {
exitErrorf("Unable to list objects, %v", err)
}
count := 0
for _, content := range listObj.Contents {
key := aws.StringValue(content.Key)
if key == dest {
count++
} else if key == object {
count++
}
if count == 2 {
break
}
}
if count != 2 {
exitErrorf("Unable to find two objects, %v", listObj.Contents)
}
_, err = svc.DeleteObject(&s3.DeleteObjectInput{
Bucket: aws.String(Bucket),
Key: aws.String(object),
})
if err != nil {
exitErrorf("Unable to delete source object, %v", err)
}
_, err = svc.DeleteObject(&s3.DeleteObjectInput{
Bucket: aws.String(Bucket),
Key: aws.String(dest),
})
if err != nil {
exitErrorf("Unable to delete object, %v", err)
}
_, err = svc.DeleteBucket(&s3.DeleteBucketInput{
Bucket: aws.String(Bucket),
})
if err != nil {
exitErrorf("Unable to delete bucket, %v", err)
}
}

18
weed/filesys/dir.go

@ -27,6 +27,7 @@ type Dir struct {
var _ = fs.Node(&Dir{})
var _ = fs.NodeCreater(&Dir{})
var _ = fs.NodeMkdirer(&Dir{})
var _ = fs.NodeFsyncer(&Dir{})
var _ = fs.NodeRequestLookuper(&Dir{})
var _ = fs.HandleReadDirAller(&Dir{})
var _ = fs.NodeRemover(&Dir{})
@ -90,8 +91,15 @@ func (dir *Dir) setRootDirAttributes(attr *fuse.Attr) {
attr.BlockSize = 1024 * 1024
}
func (dir *Dir) Fsync(ctx context.Context, req *fuse.FsyncRequest) error {
// fsync works at OS level
// write the file chunks to the filerGrpcAddress
glog.V(3).Infof("dir %s fsync %+v", dir.FullPath(), req)
return nil
}
func (dir *Dir) newFile(name string, entry *filer_pb.Entry) fs.Node {
return dir.wfs.fsNodeCache.EnsureFsNode(util.NewFullPath(dir.FullPath(), name), func() fs.Node {
return &File{
Name: name,
dir: dir,
@ -99,14 +107,11 @@ func (dir *Dir) newFile(name string, entry *filer_pb.Entry) fs.Node {
entry: entry,
entryViewCache: nil,
}
})
}
func (dir *Dir) newDirectory(fullpath util.FullPath, entry *filer_pb.Entry) fs.Node {
return dir.wfs.fsNodeCache.EnsureFsNode(fullpath, func() fs.Node {
return &Dir{name: entry.Name, wfs: dir.wfs, entry: entry, parent: dir}
})
}
@ -306,8 +311,6 @@ func (dir *Dir) removeOneFile(req *fuse.RemoveRequest) error {
dir.wfs.deleteFileChunks(entry.Chunks)
dir.wfs.fsNodeCache.DeleteFsNode(filePath)
dir.wfs.metaCache.DeleteEntry(context.Background(), filePath)
glog.V(3).Infof("remove file: %v", req)
@ -324,7 +327,6 @@ func (dir *Dir) removeOneFile(req *fuse.RemoveRequest) error {
func (dir *Dir) removeFolder(req *fuse.RemoveRequest) error {
t := util.NewFullPath(dir.FullPath(), req.Name)
dir.wfs.fsNodeCache.DeleteFsNode(t)
dir.wfs.metaCache.DeleteEntry(context.Background(), t)
@ -417,8 +419,6 @@ func (dir *Dir) Listxattr(ctx context.Context, req *fuse.ListxattrRequest, resp
func (dir *Dir) Forget() {
glog.V(3).Infof("Forget dir %s", dir.FullPath())
dir.wfs.fsNodeCache.DeleteFsNode(util.FullPath(dir.FullPath()))
}
func (dir *Dir) maybeLoadEntry() error {

34
weed/filesys/dir_rename.go

@ -3,11 +3,12 @@ package filesys
import (
"context"
"github.com/seaweedfs/fuse"
"github.com/seaweedfs/fuse/fs"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/seaweedfs/fuse"
"github.com/seaweedfs/fuse/fs"
)
func (dir *Dir) Rename(ctx context.Context, req *fuse.RenameRequest, newDirectory fs.Node) error {
@ -19,7 +20,15 @@ func (dir *Dir) Rename(ctx context.Context, req *fuse.RenameRequest, newDirector
glog.V(4).Infof("dir Rename %s => %s", oldPath, newPath)
err := dir.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
// find local old entry
oldEntry, err := dir.wfs.metaCache.FindEntry(context.Background(), oldPath)
if err != nil {
glog.V(0).Infof("dir Rename can not find source %s : %v", oldPath, err)
return fuse.ENOENT
}
// update remote filer
err = dir.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.AtomicRenameEntryRequest{
OldDirectory: dir.FullPath(),
@ -30,21 +39,30 @@ func (dir *Dir) Rename(ctx context.Context, req *fuse.RenameRequest, newDirector
_, err := client.AtomicRenameEntry(context.Background(), request)
if err != nil {
glog.V(0).Infof("dir Rename %s => %s : %v", oldPath, newPath, err)
return fuse.EIO
}
return nil
})
if err != nil {
glog.V(0).Infof("dir Rename %s => %s : %v", oldPath, newPath, err)
return fuse.EIO
}
if err == nil {
// TODO: replicate renaming logic on filer
if err := dir.wfs.metaCache.DeleteEntry(context.Background(), oldPath); err != nil {
glog.V(0).Infof("dir Rename delete local %s => %s : %v", oldPath, newPath, err)
return fuse.EIO
}
oldEntry.FullPath = newPath
if err := dir.wfs.metaCache.InsertEntry(context.Background(), oldEntry); err != nil {
glog.V(0).Infof("dir Rename insert local %s => %s : %v", oldPath, newPath, err)
return fuse.EIO
}
// fmt.Printf("rename path: %v => %v\n", oldPath, newPath)
dir.wfs.fsNodeCache.Move(oldPath, newPath)
delete(dir.wfs.handles, oldPath.AsInode())
}
return err
}

1
weed/filesys/file.go

@ -213,7 +213,6 @@ func (file *File) Fsync(ctx context.Context, req *fuse.FsyncRequest) error {
func (file *File) Forget() {
t := util.NewFullPath(file.dir.FullPath(), file.Name)
glog.V(3).Infof("Forget file %s", t)
file.wfs.fsNodeCache.DeleteFsNode(t)
}
func (file *File) maybeLoadEntry(ctx context.Context) error {

8
weed/filesys/filehandle.go

@ -191,10 +191,16 @@ func (fh *FileHandle) Flush(ctx context.Context, req *fuse.FlushRequest) error {
if fh.f.entry.Attributes != nil {
fh.f.entry.Attributes.Mime = fh.contentType
if fh.f.entry.Attributes.Uid == 0 {
fh.f.entry.Attributes.Uid = req.Uid
}
if fh.f.entry.Attributes.Gid == 0 {
fh.f.entry.Attributes.Gid = req.Gid
fh.f.entry.Attributes.Mtime = time.Now().Unix()
}
if fh.f.entry.Attributes.Crtime == 0 {
fh.f.entry.Attributes.Crtime = time.Now().Unix()
}
fh.f.entry.Attributes.Mtime = time.Now().Unix()
fh.f.entry.Attributes.FileMode = uint32(os.FileMode(fh.f.entry.Attributes.FileMode) &^ fh.f.wfs.option.Umask)
fh.f.entry.Attributes.Collection = fh.dirtyPages.collection
fh.f.entry.Attributes.Replication = fh.dirtyPages.replication

207
weed/filesys/fscache.go

@ -1,207 +0,0 @@
package filesys
import (
"sync"
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/seaweedfs/fuse/fs"
)
type FsCache struct {
root *FsNode
sync.RWMutex
}
type FsNode struct {
parent *FsNode
node fs.Node
name string
childrenLock sync.RWMutex
children map[string]*FsNode
}
func newFsCache(root fs.Node) *FsCache {
return &FsCache{
root: &FsNode{
node: root,
},
}
}
func (c *FsCache) GetFsNode(path util.FullPath) fs.Node {
c.RLock()
defer c.RUnlock()
return c.doGetFsNode(path)
}
func (c *FsCache) doGetFsNode(path util.FullPath) fs.Node {
t := c.root
for _, p := range path.Split() {
t = t.findChild(p)
if t == nil {
return nil
}
}
return t.node
}
func (c *FsCache) SetFsNode(path util.FullPath, node fs.Node) {
c.Lock()
defer c.Unlock()
c.doSetFsNode(path, node)
}
func (c *FsCache) doSetFsNode(path util.FullPath, node fs.Node) {
t := c.root
for _, p := range path.Split() {
t = t.ensureChild(p)
}
t.node = node
}
func (c *FsCache) EnsureFsNode(path util.FullPath, genNodeFn func() fs.Node) fs.Node {
c.Lock()
defer c.Unlock()
t := c.doGetFsNode(path)
if t != nil {
return t
}
t = genNodeFn()
c.doSetFsNode(path, t)
return t
}
func (c *FsCache) DeleteFsNode(path util.FullPath) {
c.Lock()
defer c.Unlock()
t := c.root
for _, p := range path.Split() {
t = t.findChild(p)
if t == nil {
return
}
}
if t.parent != nil {
t.parent.disconnectChild(t)
}
t.deleteSelf()
}
// oldPath and newPath are full path including the new name
func (c *FsCache) Move(oldPath util.FullPath, newPath util.FullPath) *FsNode {
c.Lock()
defer c.Unlock()
// find old node
src := c.root
for _, p := range oldPath.Split() {
src = src.findChild(p)
if src == nil {
return src
}
}
if src.parent != nil {
src.parent.disconnectChild(src)
}
// find new node
target := c.root
for _, p := range newPath.Split() {
target = target.ensureChild(p)
}
parent := target.parent
src.name = target.name
if dir, ok := src.node.(*Dir); ok {
dir.name = target.name // target is not Dir, but a shortcut
}
if f, ok := src.node.(*File); ok {
f.Name = target.name
if f.entry != nil {
f.entry.Name = f.Name
}
}
parent.disconnectChild(target)
target.deleteSelf()
src.connectToParent(parent)
return src
}
func (n *FsNode) connectToParent(parent *FsNode) {
n.parent = parent
oldNode := parent.findChild(n.name)
if oldNode != nil {
oldNode.deleteSelf()
}
if dir, ok := n.node.(*Dir); ok {
dir.parent = parent.node.(*Dir)
}
if f, ok := n.node.(*File); ok {
f.dir = parent.node.(*Dir)
}
n.childrenLock.Lock()
parent.children[n.name] = n
n.childrenLock.Unlock()
}
func (n *FsNode) findChild(name string) *FsNode {
n.childrenLock.RLock()
defer n.childrenLock.RUnlock()
child, found := n.children[name]
if found {
return child
}
return nil
}
func (n *FsNode) ensureChild(name string) *FsNode {
n.childrenLock.Lock()
defer n.childrenLock.Unlock()
if n.children == nil {
n.children = make(map[string]*FsNode)
}
child, found := n.children[name]
if found {
return child
}
t := &FsNode{
parent: n,
node: nil,
name: name,
children: nil,
}
n.children[name] = t
return t
}
func (n *FsNode) disconnectChild(child *FsNode) {
n.childrenLock.Lock()
delete(n.children, child.name)
n.childrenLock.Unlock()
child.parent = nil
}
func (n *FsNode) deleteSelf() {
n.childrenLock.Lock()
for _, child := range n.children {
child.deleteSelf()
}
n.children = nil
n.childrenLock.Unlock()
n.node = nil
n.parent = nil
}

96
weed/filesys/fscache_test.go

@ -1,96 +0,0 @@
package filesys
import (
"testing"
"github.com/chrislusf/seaweedfs/weed/util"
)
func TestPathSplit(t *testing.T) {
parts := util.FullPath("/").Split()
if len(parts) != 0 {
t.Errorf("expecting an empty list, but getting %d", len(parts))
}
parts = util.FullPath("/readme.md").Split()
if len(parts) != 1 {
t.Errorf("expecting an empty list, but getting %d", len(parts))
}
}
func TestFsCache(t *testing.T) {
cache := newFsCache(nil)
x := cache.GetFsNode(util.FullPath("/y/x"))
if x != nil {
t.Errorf("wrong node!")
}
p := util.FullPath("/a/b/c")
cache.SetFsNode(p, &File{Name: "cc"})
tNode := cache.GetFsNode(p)
tFile := tNode.(*File)
if tFile.Name != "cc" {
t.Errorf("expecting a FsNode")
}
cache.SetFsNode(util.FullPath("/a/b/d"), &File{Name: "dd"})
cache.SetFsNode(util.FullPath("/a/b/e"), &File{Name: "ee"})
cache.SetFsNode(util.FullPath("/a/b/f"), &File{Name: "ff"})
cache.SetFsNode(util.FullPath("/z"), &File{Name: "zz"})
cache.SetFsNode(util.FullPath("/a"), &File{Name: "aa"})
b := cache.GetFsNode(util.FullPath("/a/b"))
if b != nil {
t.Errorf("unexpected node!")
}
a := cache.GetFsNode(util.FullPath("/a"))
if a == nil {
t.Errorf("missing node!")
}
cache.DeleteFsNode(util.FullPath("/a"))
if b != nil {
t.Errorf("unexpected node!")
}
a = cache.GetFsNode(util.FullPath("/a"))
if a != nil {
t.Errorf("wrong DeleteFsNode!")
}
z := cache.GetFsNode(util.FullPath("/z"))
if z == nil {
t.Errorf("missing node!")
}
y := cache.GetFsNode(util.FullPath("/x/y"))
if y != nil {
t.Errorf("wrong node!")
}
}
func TestFsCacheMove(t *testing.T) {
cache := newFsCache(nil)
cache.SetFsNode(util.FullPath("/a/b/d"), &File{Name: "dd"})
cache.SetFsNode(util.FullPath("/a/b/e"), &File{Name: "ee"})
cache.SetFsNode(util.FullPath("/z"), &File{Name: "zz"})
cache.SetFsNode(util.FullPath("/a"), &File{Name: "aa"})
cache.Move(util.FullPath("/a/b"), util.FullPath("/z/x"))
d := cache.GetFsNode(util.FullPath("/z/x/d"))
if d == nil {
t.Errorf("unexpected nil node!")
}
if d.(*File).Name != "dd" {
t.Errorf("unexpected non dd node!")
}
}

2
weed/filesys/wfs.go

@ -64,7 +64,6 @@ type WFS struct {
stats statsCache
root fs.Node
fsNodeCache *FsCache
chunkCache *chunk_cache.ChunkCache
metaCache *meta_cache.MetaCache
@ -102,7 +101,6 @@ func NewSeaweedFileSystem(option *Option) *WFS {
})
wfs.root = &Dir{name: wfs.option.FilerMountRootPath, wfs: wfs}
wfs.fsNodeCache = newFsCache(wfs.root)
return wfs
}

6
weed/s3api/auth_credentials.go

@ -7,7 +7,6 @@ import (
"net/http"
"github.com/golang/protobuf/jsonpb"
"github.com/gorilla/mux"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/iam_pb"
@ -110,7 +109,7 @@ func (iam *IdentityAccessManagement) lookupByAccessKey(accessKey string) (identi
func (iam *IdentityAccessManagement) Auth(f http.HandlerFunc, action Action) http.HandlerFunc {
if iam.isEnabled() {
if !iam.isEnabled() {
return f
}
@ -159,8 +158,7 @@ func (iam *IdentityAccessManagement) authRequest(r *http.Request, action Action)
glog.V(3).Infof("user name: %v actions: %v", identity.Name, identity.Actions)
vars := mux.Vars(r)
bucket := vars["bucket"]
bucket, _ := getBucketAndObject(r)
if !identity.canDo(action, bucket) {
return ErrAccessDenied

10
weed/s3api/s3api_bucket_handlers.go

@ -10,7 +10,6 @@ import (
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/gorilla/mux"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
@ -56,8 +55,7 @@ func (s3a *S3ApiServer) ListBucketsHandler(w http.ResponseWriter, r *http.Reques
func (s3a *S3ApiServer) PutBucketHandler(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
bucket := vars["bucket"]
bucket, _ := getBucketAndObject(r)
// create the folder for bucket, but lazily create actual collection
if err := s3a.mkdir(s3a.option.BucketsPath, bucket, nil); err != nil {
@ -70,8 +68,7 @@ func (s3a *S3ApiServer) PutBucketHandler(w http.ResponseWriter, r *http.Request)
func (s3a *S3ApiServer) DeleteBucketHandler(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
bucket := vars["bucket"]
bucket, _ := getBucketAndObject(r)
err := s3a.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
@ -100,8 +97,7 @@ func (s3a *S3ApiServer) DeleteBucketHandler(w http.ResponseWriter, r *http.Reque
func (s3a *S3ApiServer) HeadBucketHandler(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
bucket := vars["bucket"]
bucket, _ := getBucketAndObject(r)
err := s3a.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {

14
weed/s3api/s3api_object_copy_handlers.go

@ -8,16 +8,12 @@ import (
"strings"
"time"
"github.com/gorilla/mux"
"github.com/chrislusf/seaweedfs/weed/util"
)
func (s3a *S3ApiServer) CopyObjectHandler(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
dstBucket := vars["bucket"]
dstObject := getObject(vars)
dstBucket, dstObject := getBucketAndObject(r)
// Copy source path.
cpSrcPath, err := url.QueryUnescape(r.Header.Get("X-Amz-Copy-Source"))
@ -61,7 +57,7 @@ func (s3a *S3ApiServer) CopyObjectHandler(w http.ResponseWriter, r *http.Request
response := CopyObjectResult{
ETag: etag,
LastModified: time.Now(),
LastModified: time.Now().UTC(),
}
writeSuccessResponseXML(w, encodeResponse(response))
@ -85,9 +81,7 @@ type CopyPartResult struct {
func (s3a *S3ApiServer) CopyObjectPartHandler(w http.ResponseWriter, r *http.Request) {
// https://docs.aws.amazon.com/AmazonS3/latest/dev/CopyingObjctsUsingRESTMPUapi.html
// https://docs.aws.amazon.com/AmazonS3/latest/API/API_UploadPartCopy.html
vars := mux.Vars(r)
dstBucket := vars["bucket"]
// dstObject := getObject(vars)
dstBucket, _ := getBucketAndObject(r)
// Copy source path.
cpSrcPath, err := url.QueryUnescape(r.Header.Get("X-Amz-Copy-Source"))
@ -143,7 +137,7 @@ func (s3a *S3ApiServer) CopyObjectPartHandler(w http.ResponseWriter, r *http.Req
response := CopyPartResult{
ETag: etag,
LastModified: time.Now(),
LastModified: time.Now().UTC(),
}
writeSuccessResponseXML(w, encodeResponse(response))

35
weed/s3api/s3api_object_handlers.go

@ -32,9 +32,7 @@ func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request)
// http://docs.aws.amazon.com/AmazonS3/latest/dev/UploadingObjects.html
vars := mux.Vars(r)
bucket := vars["bucket"]
object := getObject(vars)
bucket, object := getBucketAndObject(r)
_, err := validateContentMd5(r.Header)
if err != nil {
@ -45,8 +43,13 @@ func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request)
rAuthType := getRequestAuthType(r)
dataReader := r.Body
var s3ErrCode ErrorCode
if rAuthType == authTypeStreamingSigned {
switch rAuthType {
case authTypeStreamingSigned:
dataReader, s3ErrCode = s3a.iam.newSignV4ChunkedReader(r)
case authTypeSignedV2, authTypePresignedV2:
_, s3ErrCode = s3a.iam.isReqAuthenticatedV2(r)
case authTypePresigned, authTypeSigned:
_, s3ErrCode = s3a.iam.reqSignatureV4Verify(r)
}
if s3ErrCode != ErrNone {
writeErrorResponse(w, s3ErrCode, r.URL)
@ -70,9 +73,7 @@ func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request)
func (s3a *S3ApiServer) GetObjectHandler(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
bucket := vars["bucket"]
object := getObject(vars)
bucket, object := getBucketAndObject(r)
if strings.HasSuffix(r.URL.Path, "/") {
writeErrorResponse(w, ErrNotImplemented, r.URL)
@ -88,9 +89,7 @@ func (s3a *S3ApiServer) GetObjectHandler(w http.ResponseWriter, r *http.Request)
func (s3a *S3ApiServer) HeadObjectHandler(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
bucket := vars["bucket"]
object := getObject(vars)
bucket, object := getBucketAndObject(r)
destUrl := fmt.Sprintf("http://%s%s/%s%s",
s3a.option.Filer, s3a.option.BucketsPath, bucket, object)
@ -101,9 +100,7 @@ func (s3a *S3ApiServer) HeadObjectHandler(w http.ResponseWriter, r *http.Request
func (s3a *S3ApiServer) DeleteObjectHandler(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
bucket := vars["bucket"]
object := getObject(vars)
bucket, object := getBucketAndObject(r)
destUrl := fmt.Sprintf("http://%s%s/%s%s?recursive=true",
s3a.option.Filer, s3a.option.BucketsPath, bucket, object)
@ -151,8 +148,7 @@ type DeleteObjectsResponse struct {
// DeleteMultipleObjectsHandler - Delete multiple objects
func (s3a *S3ApiServer) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
bucket := vars["bucket"]
bucket, _ := getBucketAndObject(r)
deleteXMLBytes, err := ioutil.ReadAll(r.Body)
if err != nil {
@ -305,10 +301,13 @@ func setEtag(w http.ResponseWriter, etag string) {
}
}
func getObject(vars map[string]string) string {
object := vars["object"]
func getBucketAndObject(r *http.Request) (bucket, object string) {
vars := mux.Vars(r)
bucket = vars["bucket"]
object = vars["object"]
if !strings.HasPrefix(object, "/") {
object = "/" + object
}
return object
return
}

36
weed/s3api/s3api_object_multipart_handlers.go

@ -9,7 +9,6 @@ import (
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/gorilla/mux"
)
const (
@ -21,10 +20,7 @@ const (
// NewMultipartUploadHandler - New multipart upload.
func (s3a *S3ApiServer) NewMultipartUploadHandler(w http.ResponseWriter, r *http.Request) {
var object, bucket string
vars := mux.Vars(r)
bucket = vars["bucket"]
object = vars["object"]
bucket, object := getBucketAndObject(r)
response, errCode := s3a.createMultipartUpload(&s3.CreateMultipartUploadInput{
Bucket: aws.String(bucket),
@ -44,9 +40,7 @@ func (s3a *S3ApiServer) NewMultipartUploadHandler(w http.ResponseWriter, r *http
// CompleteMultipartUploadHandler - Completes multipart upload.
func (s3a *S3ApiServer) CompleteMultipartUploadHandler(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
bucket := vars["bucket"]
object := getObject(vars)
bucket, object := getBucketAndObject(r)
// Get upload id.
uploadID, _, _, _ := getObjectResources(r.URL.Query())
@ -70,9 +64,7 @@ func (s3a *S3ApiServer) CompleteMultipartUploadHandler(w http.ResponseWriter, r
// AbortMultipartUploadHandler - Aborts multipart upload.
func (s3a *S3ApiServer) AbortMultipartUploadHandler(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
bucket := vars["bucket"]
object := getObject(vars)
bucket, object := getBucketAndObject(r)
// Get upload id.
uploadID, _, _, _ := getObjectResources(r.URL.Query())
@ -96,8 +88,7 @@ func (s3a *S3ApiServer) AbortMultipartUploadHandler(w http.ResponseWriter, r *ht
// ListMultipartUploadsHandler - Lists multipart uploads.
func (s3a *S3ApiServer) ListMultipartUploadsHandler(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
bucket := vars["bucket"]
bucket, _ := getBucketAndObject(r)
prefix, keyMarker, uploadIDMarker, delimiter, maxUploads, encodingType := getBucketMultipartResources(r.URL.Query())
if maxUploads < 0 {
@ -135,9 +126,7 @@ func (s3a *S3ApiServer) ListMultipartUploadsHandler(w http.ResponseWriter, r *ht
// ListObjectPartsHandler - Lists object parts in a multipart upload.
func (s3a *S3ApiServer) ListObjectPartsHandler(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
bucket := vars["bucket"]
object := getObject(vars)
bucket, object := getBucketAndObject(r)
uploadID, partNumberMarker, maxParts, _ := getObjectResources(r.URL.Query())
if partNumberMarker < 0 {
@ -170,10 +159,7 @@ func (s3a *S3ApiServer) ListObjectPartsHandler(w http.ResponseWriter, r *http.Re
// PutObjectPartHandler - Put an object part in a multipart upload.
func (s3a *S3ApiServer) PutObjectPartHandler(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
bucket := vars["bucket"]
rAuthType := getRequestAuthType(r)
bucket, _ := getBucketAndObject(r)
uploadID := r.URL.Query().Get("uploadId")
exists, err := s3a.exists(s3a.genUploadsFolder(bucket), uploadID, true)
@ -193,10 +179,16 @@ func (s3a *S3ApiServer) PutObjectPartHandler(w http.ResponseWriter, r *http.Requ
return
}
var s3ErrCode ErrorCode
rAuthType := getRequestAuthType(r)
dataReader := r.Body
if rAuthType == authTypeStreamingSigned {
var s3ErrCode ErrorCode
switch rAuthType {
case authTypeStreamingSigned:
dataReader, s3ErrCode = s3a.iam.newSignV4ChunkedReader(r)
case authTypeSignedV2, authTypePresignedV2:
_, s3ErrCode = s3a.iam.isReqAuthenticatedV2(r)
case authTypePresigned, authTypeSigned:
_, s3ErrCode = s3a.iam.reqSignatureV4Verify(r)
}
if s3ErrCode != ErrNone {
writeErrorResponse(w, s3ErrCode, r.URL)

10
weed/s3api/s3api_objects_list_handlers.go

@ -11,8 +11,6 @@ import (
"strings"
"time"
"github.com/gorilla/mux"
"github.com/chrislusf/seaweedfs/weed/filer2"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
@ -23,10 +21,7 @@ func (s3a *S3ApiServer) ListObjectsV2Handler(w http.ResponseWriter, r *http.Requ
// https://docs.aws.amazon.com/AmazonS3/latest/API/v2-RESTBucketGET.html
// collect parameters
vars := mux.Vars(r)
bucket := vars["bucket"]
glog.V(4).Infof("read v2: %v", vars)
bucket, _ := getBucketAndObject(r)
originalPrefix, marker, startAfter, delimiter, _, maxKeys := getListObjectsV2Args(r.URL.Query())
@ -58,8 +53,7 @@ func (s3a *S3ApiServer) ListObjectsV1Handler(w http.ResponseWriter, r *http.Requ
// https://docs.aws.amazon.com/AmazonS3/latest/API/RESTBucketGET.html
// collect parameters
vars := mux.Vars(r)
bucket := vars["bucket"]
bucket, _ := getBucketAndObject(r)
originalPrefix, marker, delimiter, maxKeys := getListObjectsV1Args(r.URL.Query())

2
weed/server/common.go

@ -218,7 +218,7 @@ func handleStaticResources2(r *mux.Router) {
r.PathPrefix("/seaweedfsstatic/").Handler(http.StripPrefix("/seaweedfsstatic", http.FileServer(statikFS)))
}
func adjustHeadersAfterHEAD(w http.ResponseWriter, r *http.Request, filename string) {
func adjustHeaderContentDisposition(w http.ResponseWriter, r *http.Request, filename string) {
if filename != "" {
contentDisposition := "inline"
if r.FormValue("dl") != "" {

6
weed/server/filer_server_handlers_read.go

@ -101,14 +101,14 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request,
}
setEtag(w, etag)
filename := entry.Name()
adjustHeaderContentDisposition(w, r, filename)
if r.Method == "HEAD" {
w.Header().Set("Content-Length", strconv.FormatInt(int64(filer2.TotalSize(entry.Chunks)), 10))
return
}
filename := entry.Name()
adjustHeadersAfterHEAD(w, r, filename)
totalSize := int64(filer2.TotalSize(entry.Chunks))
if rangeReq := r.Header.Get("Range"); rangeReq == "" {

4
weed/server/volume_server_handlers_read.go

@ -244,13 +244,13 @@ func writeResponseContent(filename, mimeType string, rs io.ReadSeeker, w http.Re
}
w.Header().Set("Accept-Ranges", "bytes")
adjustHeaderContentDisposition(w, r, filename)
if r.Method == "HEAD" {
w.Header().Set("Content-Length", strconv.FormatInt(totalSize, 10))
return nil
}
adjustHeadersAfterHEAD(w, r, filename)
processRangeRequest(r, w, totalSize, mimeType, func(writer io.Writer, offset int64, size int64) error {
if _, e = rs.Seek(offset, 0); e != nil {
return e

Loading…
Cancel
Save