Chris Lu
6 years ago
committed by
GitHub
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 907 additions and 167 deletions
-
1weed/command/command.go
-
109weed/command/webdav.go
-
163weed/filer2/filer_client_util.go
-
92weed/filesys/dir.go
-
2weed/filesys/dir_link.go
-
2weed/filesys/dir_rename.go
-
2weed/filesys/dirty_page.go
-
4weed/filesys/file.go
-
89weed/filesys/filehandle.go
-
4weed/filesys/wfs.go
-
2weed/filesys/wfs_deletion.go
-
604weed/server/webdav_server.go
@ -0,0 +1,109 @@ |
|||
package command |
|||
|
|||
import ( |
|||
"fmt" |
|||
"net/http" |
|||
"os/user" |
|||
"strconv" |
|||
"time" |
|||
|
|||
"github.com/chrislusf/seaweedfs/weed/glog" |
|||
"github.com/chrislusf/seaweedfs/weed/security" |
|||
"github.com/chrislusf/seaweedfs/weed/server" |
|||
"github.com/chrislusf/seaweedfs/weed/util" |
|||
"github.com/spf13/viper" |
|||
) |
|||
|
|||
var ( |
|||
webDavStandaloneOptions WebDavOption |
|||
) |
|||
|
|||
type WebDavOption struct { |
|||
filer *string |
|||
port *int |
|||
collection *string |
|||
tlsPrivateKey *string |
|||
tlsCertificate *string |
|||
} |
|||
|
|||
func init() { |
|||
cmdWebDav.Run = runWebDav // break init cycle
|
|||
webDavStandaloneOptions.filer = cmdWebDav.Flag.String("filer", "localhost:8888", "filer server address") |
|||
webDavStandaloneOptions.port = cmdWebDav.Flag.Int("port", 7333, "webdav server http listen port") |
|||
webDavStandaloneOptions.collection = cmdWebDav.Flag.String("collection", "", "collection to create the files") |
|||
webDavStandaloneOptions.tlsPrivateKey = cmdWebDav.Flag.String("key.file", "", "path to the TLS private key file") |
|||
webDavStandaloneOptions.tlsCertificate = cmdWebDav.Flag.String("cert.file", "", "path to the TLS certificate file") |
|||
} |
|||
|
|||
var cmdWebDav = &Command{ |
|||
UsageLine: "webdav -port=7333 -filer=<ip:port>", |
|||
Short: "start a webdav server that is backed by a filer", |
|||
Long: `start a webdav server that is backed by a filer. |
|||
|
|||
`, |
|||
} |
|||
|
|||
func runWebDav(cmd *Command, args []string) bool { |
|||
|
|||
weed_server.LoadConfiguration("security", false) |
|||
|
|||
glog.V(0).Infof("Starting Seaweed WebDav Server %s at https port %d", util.VERSION, *webDavStandaloneOptions.port) |
|||
|
|||
return webDavStandaloneOptions.startWebDav() |
|||
|
|||
} |
|||
|
|||
func (wo *WebDavOption) startWebDav() bool { |
|||
|
|||
filerGrpcAddress, err := parseFilerGrpcAddress(*wo.filer) |
|||
if err != nil { |
|||
glog.Fatal(err) |
|||
return false |
|||
} |
|||
|
|||
// detect current user
|
|||
uid, gid := uint32(0), uint32(0) |
|||
if u, err := user.Current(); err == nil { |
|||
if parsedId, pe := strconv.ParseUint(u.Uid, 10, 32); pe == nil { |
|||
uid = uint32(parsedId) |
|||
} |
|||
if parsedId, pe := strconv.ParseUint(u.Gid, 10, 32); pe == nil { |
|||
gid = uint32(parsedId) |
|||
} |
|||
} |
|||
|
|||
ws, webdavServer_err := weed_server.NewWebDavServer(&weed_server.WebDavOption{ |
|||
Filer: *wo.filer, |
|||
FilerGrpcAddress: filerGrpcAddress, |
|||
GrpcDialOption: security.LoadClientTLS(viper.Sub("grpc"), "client"), |
|||
Collection: *wo.collection, |
|||
Uid: uid, |
|||
Gid: gid, |
|||
}) |
|||
if webdavServer_err != nil { |
|||
glog.Fatalf("WebDav Server startup error: %v", webdavServer_err) |
|||
} |
|||
|
|||
httpS := &http.Server{Handler: ws.Handler} |
|||
|
|||
listenAddress := fmt.Sprintf(":%d", *wo.port) |
|||
webDavListener, err := util.NewListener(listenAddress, time.Duration(10)*time.Second) |
|||
if err != nil { |
|||
glog.Fatalf("WebDav Server listener on %s error: %v", listenAddress, err) |
|||
} |
|||
|
|||
if *wo.tlsPrivateKey != "" { |
|||
glog.V(0).Infof("Start Seaweed WebDav Server %s at https port %d", util.VERSION, *wo.port) |
|||
if err = httpS.ServeTLS(webDavListener, *wo.tlsCertificate, *wo.tlsPrivateKey); err != nil { |
|||
glog.Fatalf("WebDav Server Fail to serve: %v", err) |
|||
} |
|||
} else { |
|||
glog.V(0).Infof("Start Seaweed WebDav Server %s at http port %d", util.VERSION, *wo.port) |
|||
if err = httpS.Serve(webDavListener); err != nil { |
|||
glog.Fatalf("WebDav Server Fail to serve: %v", err) |
|||
} |
|||
} |
|||
|
|||
return true |
|||
|
|||
} |
@ -0,0 +1,163 @@ |
|||
package filer2 |
|||
|
|||
import ( |
|||
"context" |
|||
"fmt" |
|||
"strings" |
|||
"sync" |
|||
|
|||
"github.com/chrislusf/seaweedfs/weed/glog" |
|||
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb" |
|||
"github.com/chrislusf/seaweedfs/weed/util" |
|||
) |
|||
|
|||
func VolumeId(fileId string) string { |
|||
lastCommaIndex := strings.LastIndex(fileId, ",") |
|||
if lastCommaIndex > 0 { |
|||
return fileId[:lastCommaIndex] |
|||
} |
|||
return fileId |
|||
} |
|||
|
|||
type FilerClient interface { |
|||
WithFilerClient(ctx context.Context, fn func(filer_pb.SeaweedFilerClient) error) error |
|||
} |
|||
|
|||
func ReadIntoBuffer(ctx context.Context, filerClient FilerClient, fullFilePath string, buff []byte, chunkViews []*ChunkView, baseOffset int64) (totalRead int64, err error) { |
|||
var vids []string |
|||
for _, chunkView := range chunkViews { |
|||
vids = append(vids, VolumeId(chunkView.FileId)) |
|||
} |
|||
|
|||
vid2Locations := make(map[string]*filer_pb.Locations) |
|||
|
|||
err = filerClient.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { |
|||
|
|||
glog.V(4).Infof("read fh lookup volume id locations: %v", vids) |
|||
resp, err := client.LookupVolume(ctx, &filer_pb.LookupVolumeRequest{ |
|||
VolumeIds: vids, |
|||
}) |
|||
if err != nil { |
|||
return err |
|||
} |
|||
|
|||
vid2Locations = resp.LocationsMap |
|||
|
|||
return nil |
|||
}) |
|||
|
|||
if err != nil { |
|||
return 0, fmt.Errorf("failed to lookup volume ids %v: %v", vids, err) |
|||
} |
|||
|
|||
var wg sync.WaitGroup |
|||
for _, chunkView := range chunkViews { |
|||
wg.Add(1) |
|||
go func(chunkView *ChunkView) { |
|||
defer wg.Done() |
|||
|
|||
glog.V(4).Infof("read fh reading chunk: %+v", chunkView) |
|||
|
|||
locations := vid2Locations[VolumeId(chunkView.FileId)] |
|||
if locations == nil || len(locations.Locations) == 0 { |
|||
glog.V(0).Infof("failed to locate %s", chunkView.FileId) |
|||
err = fmt.Errorf("failed to locate %s", chunkView.FileId) |
|||
return |
|||
} |
|||
|
|||
var n int64 |
|||
n, err = util.ReadUrl( |
|||
fmt.Sprintf("http://%s/%s", locations.Locations[0].Url, chunkView.FileId), |
|||
chunkView.Offset, |
|||
int(chunkView.Size), |
|||
buff[chunkView.LogicOffset-baseOffset:chunkView.LogicOffset-baseOffset+int64(chunkView.Size)], |
|||
!chunkView.IsFullChunk) |
|||
|
|||
if err != nil { |
|||
|
|||
glog.V(0).Infof("%v read http://%s/%v %v bytes: %v", fullFilePath, locations.Locations[0].Url, chunkView.FileId, n, err) |
|||
|
|||
err = fmt.Errorf("failed to read http://%s/%s: %v", |
|||
locations.Locations[0].Url, chunkView.FileId, err) |
|||
return |
|||
} |
|||
|
|||
glog.V(4).Infof("read fh read %d bytes: %+v", n, chunkView) |
|||
totalRead += n |
|||
|
|||
}(chunkView) |
|||
} |
|||
wg.Wait() |
|||
return |
|||
} |
|||
|
|||
func GetEntry(ctx context.Context, filerClient FilerClient, fullFilePath string) (entry *filer_pb.Entry, err error) { |
|||
|
|||
dir, name := FullPath(fullFilePath).DirAndName() |
|||
|
|||
err = filerClient.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { |
|||
|
|||
request := &filer_pb.LookupDirectoryEntryRequest{ |
|||
Directory: dir, |
|||
Name: name, |
|||
} |
|||
|
|||
glog.V(3).Infof("read %s request: %v", fullFilePath, request) |
|||
resp, err := client.LookupDirectoryEntry(ctx, request) |
|||
if err != nil { |
|||
if err == ErrNotFound { |
|||
return nil |
|||
} |
|||
glog.V(3).Infof("read %s attr %v: %v", fullFilePath, request, err) |
|||
return err |
|||
} |
|||
|
|||
if resp.Entry != nil { |
|||
entry = resp.Entry |
|||
} |
|||
|
|||
return nil |
|||
}) |
|||
|
|||
return |
|||
} |
|||
|
|||
func ReadDirAllEntries(ctx context.Context, filerClient FilerClient, fullDirPath string, fn func(entry *filer_pb.Entry)) (err error) { |
|||
|
|||
err = filerClient.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { |
|||
|
|||
paginationLimit := 1024 |
|||
|
|||
lastEntryName := "" |
|||
|
|||
for { |
|||
|
|||
request := &filer_pb.ListEntriesRequest{ |
|||
Directory: fullDirPath, |
|||
StartFromFileName: lastEntryName, |
|||
Limit: uint32(paginationLimit), |
|||
} |
|||
|
|||
glog.V(3).Infof("read directory: %v", request) |
|||
resp, err := client.ListEntries(ctx, request) |
|||
if err != nil { |
|||
return fmt.Errorf("list %s: %v", fullDirPath, err) |
|||
} |
|||
|
|||
for _, entry := range resp.Entries { |
|||
fn(entry) |
|||
lastEntryName = entry.Name |
|||
} |
|||
|
|||
if len(resp.Entries) < paginationLimit { |
|||
break |
|||
} |
|||
|
|||
} |
|||
|
|||
return nil |
|||
|
|||
}) |
|||
|
|||
return |
|||
} |
@ -0,0 +1,604 @@ |
|||
package weed_server |
|||
|
|||
import ( |
|||
"bytes" |
|||
"context" |
|||
"fmt" |
|||
"io" |
|||
"net/http" |
|||
"net/url" |
|||
"os" |
|||
"path" |
|||
"strings" |
|||
"time" |
|||
|
|||
"github.com/chrislusf/seaweedfs/weed/operation" |
|||
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb" |
|||
"github.com/chrislusf/seaweedfs/weed/util" |
|||
"golang.org/x/net/webdav" |
|||
"google.golang.org/grpc" |
|||
|
|||
"github.com/chrislusf/seaweedfs/weed/filer2" |
|||
"github.com/chrislusf/seaweedfs/weed/glog" |
|||
"github.com/chrislusf/seaweedfs/weed/security" |
|||
"github.com/spf13/viper" |
|||
) |
|||
|
|||
type WebDavOption struct { |
|||
Filer string |
|||
FilerGrpcAddress string |
|||
DomainName string |
|||
BucketsPath string |
|||
GrpcDialOption grpc.DialOption |
|||
Collection string |
|||
Uid uint32 |
|||
Gid uint32 |
|||
} |
|||
|
|||
type WebDavServer struct { |
|||
option *WebDavOption |
|||
secret security.SigningKey |
|||
filer *filer2.Filer |
|||
grpcDialOption grpc.DialOption |
|||
Handler *webdav.Handler |
|||
} |
|||
|
|||
func NewWebDavServer(option *WebDavOption) (ws *WebDavServer, err error) { |
|||
|
|||
fs, _ := NewWebDavFileSystem(option) |
|||
|
|||
ws = &WebDavServer{ |
|||
option: option, |
|||
grpcDialOption: security.LoadClientTLS(viper.Sub("grpc"), "filer"), |
|||
Handler: &webdav.Handler{ |
|||
FileSystem: fs, |
|||
LockSystem: webdav.NewMemLS(), |
|||
Logger: func(r *http.Request, err error) { |
|||
litmus := r.Header.Get("X-Litmus") |
|||
if len(litmus) > 19 { |
|||
litmus = litmus[:16] + "..." |
|||
} |
|||
|
|||
switch r.Method { |
|||
case "COPY", "MOVE": |
|||
dst := "" |
|||
if u, err := url.Parse(r.Header.Get("Destination")); err == nil { |
|||
dst = u.Path |
|||
} |
|||
glog.V(3).Infof("%-18s %s %s %v", |
|||
r.Method, |
|||
r.URL.Path, |
|||
dst, |
|||
err) |
|||
default: |
|||
glog.V(3).Infof("%-18s %s %v", |
|||
r.Method, |
|||
r.URL.Path, |
|||
err) |
|||
} |
|||
}, |
|||
}, |
|||
} |
|||
|
|||
return ws, nil |
|||
} |
|||
|
|||
// adapted from https://github.com/mattn/davfs/blob/master/plugin/mysql/mysql.go
|
|||
|
|||
type WebDavFileSystem struct { |
|||
option *WebDavOption |
|||
secret security.SigningKey |
|||
filer *filer2.Filer |
|||
grpcDialOption grpc.DialOption |
|||
} |
|||
|
|||
type FileInfo struct { |
|||
name string |
|||
size int64 |
|||
mode os.FileMode |
|||
modifiledTime time.Time |
|||
isDirectory bool |
|||
} |
|||
|
|||
func (fi *FileInfo) Name() string { return fi.name } |
|||
func (fi *FileInfo) Size() int64 { return fi.size } |
|||
func (fi *FileInfo) Mode() os.FileMode { return fi.mode } |
|||
func (fi *FileInfo) ModTime() time.Time { return fi.modifiledTime } |
|||
func (fi *FileInfo) IsDir() bool { return fi.isDirectory } |
|||
func (fi *FileInfo) Sys() interface{} { return nil } |
|||
|
|||
type WebDavFile struct { |
|||
fs *WebDavFileSystem |
|||
name string |
|||
isDirectory bool |
|||
off int64 |
|||
entry *filer_pb.Entry |
|||
entryViewCache []filer2.VisibleInterval |
|||
} |
|||
|
|||
func NewWebDavFileSystem(option *WebDavOption) (webdav.FileSystem, error) { |
|||
return &WebDavFileSystem{ |
|||
option: option, |
|||
}, nil |
|||
} |
|||
|
|||
func (fs *WebDavFileSystem) WithFilerClient(ctx context.Context, fn func(filer_pb.SeaweedFilerClient) error) error { |
|||
|
|||
return util.WithCachedGrpcClient(ctx, func(grpcConnection *grpc.ClientConn) error { |
|||
client := filer_pb.NewSeaweedFilerClient(grpcConnection) |
|||
return fn(client) |
|||
}, fs.option.FilerGrpcAddress, fs.option.GrpcDialOption) |
|||
|
|||
} |
|||
|
|||
func clearName(name string) (string, error) { |
|||
slashed := strings.HasSuffix(name, "/") |
|||
name = path.Clean(name) |
|||
if !strings.HasSuffix(name, "/") && slashed { |
|||
name += "/" |
|||
} |
|||
if !strings.HasPrefix(name, "/") { |
|||
return "", os.ErrInvalid |
|||
} |
|||
return name, nil |
|||
} |
|||
|
|||
func (fs *WebDavFileSystem) Mkdir(ctx context.Context, fullDirPath string, perm os.FileMode) error { |
|||
|
|||
glog.V(2).Infof("WebDavFileSystem.Mkdir %v", fullDirPath) |
|||
|
|||
if !strings.HasSuffix(fullDirPath, "/") { |
|||
fullDirPath += "/" |
|||
} |
|||
|
|||
var err error |
|||
if fullDirPath, err = clearName(fullDirPath); err != nil { |
|||
return err |
|||
} |
|||
|
|||
_, err = fs.stat(ctx, fullDirPath) |
|||
if err == nil { |
|||
return os.ErrExist |
|||
} |
|||
|
|||
base := "/" |
|||
for _, elem := range strings.Split(strings.Trim(fullDirPath, "/"), "/") { |
|||
base += elem + "/" |
|||
_, err = fs.stat(ctx, base) |
|||
if err != os.ErrNotExist { |
|||
return err |
|||
} |
|||
err = fs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { |
|||
dir, name := filer2.FullPath(base).DirAndName() |
|||
request := &filer_pb.CreateEntryRequest{ |
|||
Directory: dir, |
|||
Entry: &filer_pb.Entry{ |
|||
Name: name, |
|||
IsDirectory: true, |
|||
Attributes: &filer_pb.FuseAttributes{ |
|||
Mtime: time.Now().Unix(), |
|||
Crtime: time.Now().Unix(), |
|||
FileMode: uint32(perm), |
|||
Uid: fs.option.Uid, |
|||
Gid: fs.option.Gid, |
|||
}, |
|||
}, |
|||
} |
|||
|
|||
glog.V(1).Infof("mkdir: %v", request) |
|||
if _, err := client.CreateEntry(ctx, request); err != nil { |
|||
return fmt.Errorf("mkdir %s/%s: %v", dir, name, err) |
|||
} |
|||
|
|||
return nil |
|||
}) |
|||
if err != nil { |
|||
return err |
|||
} |
|||
} |
|||
return nil |
|||
} |
|||
|
|||
func (fs *WebDavFileSystem) OpenFile(ctx context.Context, fullFilePath string, flag int, perm os.FileMode) (webdav.File, error) { |
|||
|
|||
glog.V(2).Infof("WebDavFileSystem.OpenFile %v", fullFilePath) |
|||
|
|||
var err error |
|||
if fullFilePath, err = clearName(fullFilePath); err != nil { |
|||
return nil, err |
|||
} |
|||
|
|||
if flag&os.O_CREATE != 0 { |
|||
// file should not have / suffix.
|
|||
if strings.HasSuffix(fullFilePath, "/") { |
|||
return nil, os.ErrInvalid |
|||
} |
|||
// based directory should be exists.
|
|||
dir, _ := path.Split(fullFilePath) |
|||
_, err := fs.stat(ctx, dir) |
|||
if err != nil { |
|||
return nil, os.ErrInvalid |
|||
} |
|||
_, err = fs.stat(ctx, fullFilePath) |
|||
if err == nil { |
|||
if flag&os.O_EXCL != 0 { |
|||
return nil, os.ErrExist |
|||
} |
|||
fs.removeAll(ctx, fullFilePath) |
|||
} |
|||
|
|||
dir, name := filer2.FullPath(fullFilePath).DirAndName() |
|||
err = fs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { |
|||
if _, err := client.CreateEntry(ctx, &filer_pb.CreateEntryRequest{ |
|||
Directory: dir, |
|||
Entry: &filer_pb.Entry{ |
|||
Name: name, |
|||
IsDirectory: perm&os.ModeDir > 0, |
|||
Attributes: &filer_pb.FuseAttributes{ |
|||
Mtime: time.Now().Unix(), |
|||
Crtime: time.Now().Unix(), |
|||
FileMode: uint32(perm), |
|||
Uid: fs.option.Uid, |
|||
Gid: fs.option.Gid, |
|||
Collection: fs.option.Collection, |
|||
Replication: "000", |
|||
TtlSec: 0, |
|||
}, |
|||
}, |
|||
}); err != nil { |
|||
return fmt.Errorf("create %s: %v", fullFilePath, err) |
|||
} |
|||
return nil |
|||
}) |
|||
if err != nil { |
|||
return nil, err |
|||
} |
|||
return &WebDavFile{ |
|||
fs: fs, |
|||
name: fullFilePath, |
|||
isDirectory: false, |
|||
}, nil |
|||
} |
|||
|
|||
fi, err := fs.stat(ctx, fullFilePath) |
|||
if err != nil { |
|||
return nil, os.ErrNotExist |
|||
} |
|||
if !strings.HasSuffix(fullFilePath, "/") && fi.IsDir() { |
|||
fullFilePath += "/" |
|||
} |
|||
|
|||
return &WebDavFile{ |
|||
fs: fs, |
|||
name: fullFilePath, |
|||
isDirectory: false, |
|||
}, nil |
|||
|
|||
} |
|||
|
|||
func (fs *WebDavFileSystem) removeAll(ctx context.Context, fullFilePath string) error { |
|||
var err error |
|||
if fullFilePath, err = clearName(fullFilePath); err != nil { |
|||
return err |
|||
} |
|||
|
|||
fi, err := fs.stat(ctx, fullFilePath) |
|||
if err != nil { |
|||
return err |
|||
} |
|||
|
|||
if fi.IsDir() { |
|||
//_, err = fs.db.Exec(`delete from filesystem where fullFilePath like $1 escape '\'`, strings.Replace(fullFilePath, `%`, `\%`, -1)+`%`)
|
|||
} else { |
|||
//_, err = fs.db.Exec(`delete from filesystem where fullFilePath = ?`, fullFilePath)
|
|||
} |
|||
dir, name := filer2.FullPath(fullFilePath).DirAndName() |
|||
err = fs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { |
|||
|
|||
request := &filer_pb.DeleteEntryRequest{ |
|||
Directory: dir, |
|||
Name: name, |
|||
IsDeleteData: true, |
|||
} |
|||
|
|||
glog.V(3).Infof("removing entry: %v", request) |
|||
_, err := client.DeleteEntry(ctx, request) |
|||
if err != nil { |
|||
return fmt.Errorf("remove %s: %v", fullFilePath, err) |
|||
} |
|||
|
|||
return nil |
|||
}) |
|||
return err |
|||
} |
|||
|
|||
func (fs *WebDavFileSystem) RemoveAll(ctx context.Context, name string) error { |
|||
|
|||
glog.V(2).Infof("WebDavFileSystem.RemoveAll %v", name) |
|||
|
|||
return fs.removeAll(ctx, name) |
|||
} |
|||
|
|||
func (fs *WebDavFileSystem) Rename(ctx context.Context, oldName, newName string) error { |
|||
|
|||
glog.V(2).Infof("WebDavFileSystem.Rename %v to %v", oldName, newName) |
|||
|
|||
var err error |
|||
if oldName, err = clearName(oldName); err != nil { |
|||
return err |
|||
} |
|||
if newName, err = clearName(newName); err != nil { |
|||
return err |
|||
} |
|||
|
|||
of, err := fs.stat(ctx, oldName) |
|||
if err != nil { |
|||
return os.ErrExist |
|||
} |
|||
if of.IsDir() && !strings.HasSuffix(oldName, "/") { |
|||
oldName += "/" |
|||
newName += "/" |
|||
} |
|||
|
|||
_, err = fs.stat(ctx, newName) |
|||
if err == nil { |
|||
return os.ErrExist |
|||
} |
|||
|
|||
oldDir, oldBaseName := filer2.FullPath(oldName).DirAndName() |
|||
newDir, newBaseName := filer2.FullPath(newName).DirAndName() |
|||
|
|||
return fs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { |
|||
|
|||
request := &filer_pb.AtomicRenameEntryRequest{ |
|||
OldDirectory: oldDir, |
|||
OldName: oldBaseName, |
|||
NewDirectory: newDir, |
|||
NewName: newBaseName, |
|||
} |
|||
|
|||
_, err := client.AtomicRenameEntry(ctx, request) |
|||
if err != nil { |
|||
return fmt.Errorf("renaming %s/%s => %s/%s: %v", oldDir, oldBaseName, newDir, newBaseName, err) |
|||
} |
|||
|
|||
return nil |
|||
|
|||
}) |
|||
} |
|||
|
|||
func (fs *WebDavFileSystem) stat(ctx context.Context, fullFilePath string) (os.FileInfo, error) { |
|||
var err error |
|||
if fullFilePath, err = clearName(fullFilePath); err != nil { |
|||
return nil, err |
|||
} |
|||
|
|||
var fi FileInfo |
|||
entry, err := filer2.GetEntry(ctx, fs, fullFilePath) |
|||
if err != nil { |
|||
return nil, err |
|||
} |
|||
fi.size = int64(filer2.TotalSize(entry.GetChunks())) |
|||
fi.name = fullFilePath |
|||
fi.mode = os.FileMode(entry.Attributes.FileMode) |
|||
fi.modifiledTime = time.Unix(entry.Attributes.Mtime, 0) |
|||
fi.isDirectory = entry.IsDirectory |
|||
|
|||
_, fi.name = path.Split(path.Clean(fi.name)) |
|||
if fi.name == "" { |
|||
fi.name = "/" |
|||
fi.modifiledTime = time.Now() |
|||
fi.isDirectory = true |
|||
} |
|||
return &fi, nil |
|||
} |
|||
|
|||
func (fs *WebDavFileSystem) Stat(ctx context.Context, name string) (os.FileInfo, error) { |
|||
|
|||
glog.V(2).Infof("WebDavFileSystem.Stat %v", name) |
|||
|
|||
return fs.stat(ctx, name) |
|||
} |
|||
|
|||
func (f *WebDavFile) Write(buf []byte) (int, error) { |
|||
|
|||
glog.V(2).Infof("WebDavFileSystem.Write %v", f.name) |
|||
|
|||
var err error |
|||
ctx := context.Background() |
|||
if f.entry == nil { |
|||
f.entry, err = filer2.GetEntry(ctx, f.fs, f.name) |
|||
} |
|||
|
|||
if err != nil { |
|||
return 0, err |
|||
} |
|||
|
|||
var fileId, host string |
|||
var auth security.EncodedJwt |
|||
|
|||
if err = f.fs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { |
|||
|
|||
request := &filer_pb.AssignVolumeRequest{ |
|||
Count: 1, |
|||
Replication: "000", |
|||
Collection: f.fs.option.Collection, |
|||
} |
|||
|
|||
resp, err := client.AssignVolume(ctx, request) |
|||
if err != nil { |
|||
glog.V(0).Infof("assign volume failure %v: %v", request, err) |
|||
return err |
|||
} |
|||
|
|||
fileId, host, auth = resp.FileId, resp.Url, security.EncodedJwt(resp.Auth) |
|||
|
|||
return nil |
|||
}); err != nil { |
|||
return 0, fmt.Errorf("filerGrpcAddress assign volume: %v", err) |
|||
} |
|||
|
|||
fileUrl := fmt.Sprintf("http://%s/%s", host, fileId) |
|||
bufReader := bytes.NewReader(buf) |
|||
uploadResult, err := operation.Upload(fileUrl, f.name, bufReader, false, "application/octet-stream", nil, auth) |
|||
if err != nil { |
|||
glog.V(0).Infof("upload data %v to %s: %v", f.name, fileUrl, err) |
|||
return 0, fmt.Errorf("upload data: %v", err) |
|||
} |
|||
if uploadResult.Error != "" { |
|||
glog.V(0).Infof("upload failure %v to %s: %v", f.name, fileUrl, err) |
|||
return 0, fmt.Errorf("upload result: %v", uploadResult.Error) |
|||
} |
|||
|
|||
chunk := &filer_pb.FileChunk{ |
|||
FileId: fileId, |
|||
Offset: f.off, |
|||
Size: uint64(len(buf)), |
|||
Mtime: time.Now().UnixNano(), |
|||
ETag: uploadResult.ETag, |
|||
} |
|||
|
|||
f.entry.Chunks = append(f.entry.Chunks, chunk) |
|||
dir, _ := filer2.FullPath(f.name).DirAndName() |
|||
|
|||
err = f.fs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { |
|||
f.entry.Attributes.Mtime = time.Now().Unix() |
|||
|
|||
request := &filer_pb.UpdateEntryRequest{ |
|||
Directory: dir, |
|||
Entry: f.entry, |
|||
} |
|||
|
|||
if _, err := client.UpdateEntry(ctx, request); err != nil { |
|||
return fmt.Errorf("update %s: %v", f.name, err) |
|||
} |
|||
|
|||
return nil |
|||
}) |
|||
|
|||
if err !=nil { |
|||
f.off += int64(len(buf)) |
|||
} |
|||
return len(buf), err |
|||
} |
|||
|
|||
func (f *WebDavFile) Close() error { |
|||
|
|||
glog.V(2).Infof("WebDavFileSystem.Close %v", f.name) |
|||
|
|||
if f.entry != nil { |
|||
f.entry = nil |
|||
f.entryViewCache = nil |
|||
} |
|||
|
|||
return nil |
|||
} |
|||
|
|||
func (f *WebDavFile) Read(p []byte) (readSize int, err error) { |
|||
|
|||
glog.V(2).Infof("WebDavFileSystem.Read %v", f.name) |
|||
ctx := context.Background() |
|||
|
|||
if f.entry == nil { |
|||
f.entry, err = filer2.GetEntry(ctx, f.fs, f.name) |
|||
} |
|||
if err != nil { |
|||
return 0, err |
|||
} |
|||
if len(f.entry.Chunks) == 0 { |
|||
return 0, io.EOF |
|||
} |
|||
if f.entryViewCache == nil { |
|||
f.entryViewCache = filer2.NonOverlappingVisibleIntervals(f.entry.Chunks) |
|||
} |
|||
chunkViews := filer2.ViewFromVisibleIntervals(f.entryViewCache, f.off, len(p)) |
|||
|
|||
totalRead, err := filer2.ReadIntoBuffer(ctx, f.fs, f.name, p, chunkViews, f.off) |
|||
if err != nil { |
|||
return 0, err |
|||
} |
|||
readSize = int(totalRead) |
|||
|
|||
f.off += totalRead |
|||
if readSize == 0 { |
|||
return 0, io.EOF |
|||
} |
|||
return |
|||
} |
|||
|
|||
func (f *WebDavFile) Readdir(count int) (ret []os.FileInfo, err error) { |
|||
|
|||
glog.V(2).Infof("WebDavFileSystem.Readdir %v count %d", f.name, count) |
|||
ctx := context.Background() |
|||
|
|||
dir := f.name |
|||
if dir != "/" && strings.HasSuffix(dir, "/") { |
|||
dir = dir[:len(dir)-1] |
|||
} |
|||
|
|||
err = filer2.ReadDirAllEntries(ctx, f.fs, dir, func(entry *filer_pb.Entry) { |
|||
fi := FileInfo{ |
|||
size: int64(filer2.TotalSize(entry.GetChunks())), |
|||
name: entry.Name, |
|||
mode: os.FileMode(entry.Attributes.FileMode), |
|||
modifiledTime: time.Unix(entry.Attributes.Mtime, 0), |
|||
isDirectory: entry.IsDirectory, |
|||
} |
|||
|
|||
if !strings.HasSuffix(fi.name, "/") && fi.IsDir() { |
|||
fi.name += "/" |
|||
} |
|||
glog.V(4).Infof("entry: %v", fi.name) |
|||
ret = append(ret, &fi) |
|||
}) |
|||
|
|||
|
|||
old := f.off |
|||
if old >= int64(len(ret)) { |
|||
if count > 0 { |
|||
return nil, io.EOF |
|||
} |
|||
return nil, nil |
|||
} |
|||
if count > 0 { |
|||
f.off += int64(count) |
|||
if f.off > int64(len(ret)) { |
|||
f.off = int64(len(ret)) |
|||
} |
|||
} else { |
|||
f.off = int64(len(ret)) |
|||
old = 0 |
|||
} |
|||
|
|||
return ret[old:f.off], nil |
|||
} |
|||
|
|||
func (f *WebDavFile) Seek(offset int64, whence int) (int64, error) { |
|||
|
|||
glog.V(2).Infof("WebDavFile.Seek %v %v %v", f.name, offset, whence) |
|||
|
|||
ctx := context.Background() |
|||
|
|||
var err error |
|||
switch whence { |
|||
case 0: |
|||
f.off = 0 |
|||
case 2: |
|||
if fi, err := f.fs.stat(ctx, f.name); err != nil { |
|||
return 0, err |
|||
} else { |
|||
f.off = fi.Size() |
|||
} |
|||
} |
|||
f.off += offset |
|||
return f.off, err |
|||
} |
|||
|
|||
func (f *WebDavFile) Stat() (os.FileInfo, error) { |
|||
|
|||
glog.V(2).Infof("WebDavFile.Stat %v", f.name) |
|||
|
|||
ctx := context.Background() |
|||
|
|||
return f.fs.stat(ctx, f.name) |
|||
} |
Write
Preview
Loading…
Cancel
Save
Reference in new issue