diff --git a/weed/command/filer_replication.go b/weed/command/filer_replication.go index c5a8b3c84..1f239a981 100644 --- a/weed/command/filer_replication.go +++ b/weed/command/filer_replication.go @@ -52,7 +52,13 @@ func runFilerReplicate(cmd *Command, args []string) bool { glog.Errorf("receive %s: %+v", key, err) continue } - glog.V(1).Infof("processing file: %s", key) + if m.OldEntry!=nil&&m.NewEntry==nil{ + glog.V(1).Infof("delete: %s", key) + }else if m.OldEntry==nil&&m.NewEntry!=nil{ + glog.V(1).Infof(" add: %s", key) + }else{ + glog.V(1).Infof("modify: %s", key) + } if err = replicator.Replicate(key, m); err != nil { glog.Errorf("replicate %s: %+v", key, err) } diff --git a/weed/filer2/filer.go b/weed/filer2/filer.go index a652e6480..2586c664c 100644 --- a/weed/filer2/filer.go +++ b/weed/filer2/filer.go @@ -121,8 +121,14 @@ func (f *Filer) CreateEntry(entry *Entry) error { oldEntry, _ := f.FindEntry(entry.FullPath) - if err := f.store.InsertEntry(entry); err != nil { - return fmt.Errorf("insert entry %s: %v", entry.FullPath, err) + if oldEntry == nil { + if err := f.store.InsertEntry(entry); err != nil { + return fmt.Errorf("insert entry %s: %v", entry.FullPath, err) + } + } else { + if err := f.store.UpdateEntry(entry); err != nil { + return fmt.Errorf("update entry %s: %v", entry.FullPath, err) + } } f.NotifyUpdateEvent(oldEntry, entry, true) diff --git a/weed/filesys/dir.go b/weed/filesys/dir.go index 06c5f2569..c0771159a 100644 --- a/weed/filesys/dir.go +++ b/weed/filesys/dir.go @@ -91,55 +91,57 @@ func (dir *Dir) Attr(context context.Context, attr *fuse.Attr) error { return nil } -func (dir *Dir) newFile(name string, chunks []*filer_pb.FileChunk, attr *filer_pb.FuseAttributes) *File { +func (dir *Dir) newFile(name string, entry *filer_pb.Entry) *File { return &File{ - Name: name, - dir: dir, - wfs: dir.wfs, - attributes: attr, - Chunks: chunks, + Name: name, + dir: dir, + wfs: dir.wfs, + entry: entry, } } func (dir *Dir) Create(ctx context.Context, req *fuse.CreateRequest, resp *fuse.CreateResponse) (fs.Node, fs.Handle, error) { - err := dir.wfs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error { - - request := &filer_pb.CreateEntryRequest{ - Directory: dir.Path, - Entry: &filer_pb.Entry{ - Name: req.Name, - IsDirectory: req.Mode&os.ModeDir > 0, - Attributes: &filer_pb.FuseAttributes{ - Mtime: time.Now().Unix(), - Crtime: time.Now().Unix(), - FileMode: uint32(req.Mode), - Uid: req.Uid, - Gid: req.Gid, - Collection: dir.wfs.option.Collection, - Replication: dir.wfs.option.Replication, - TtlSec: dir.wfs.option.TtlSec, - }, + request := &filer_pb.CreateEntryRequest{ + Directory: dir.Path, + Entry: &filer_pb.Entry{ + Name: req.Name, + IsDirectory: req.Mode&os.ModeDir > 0, + Attributes: &filer_pb.FuseAttributes{ + Mtime: time.Now().Unix(), + Crtime: time.Now().Unix(), + FileMode: uint32(req.Mode), + Uid: req.Uid, + Gid: req.Gid, + Collection: dir.wfs.option.Collection, + Replication: dir.wfs.option.Replication, + TtlSec: dir.wfs.option.TtlSec, }, - } + }, + } + glog.V(1).Infof("create: %v", request) - glog.V(1).Infof("create: %v", request) - if _, err := client.CreateEntry(ctx, request); err != nil { - glog.V(0).Infof("create %s/%s: %v", dir.Path, req.Name, err) - return fuse.EIO + if request.Entry.IsDirectory { + if err := dir.wfs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error { + if _, err := client.CreateEntry(ctx, request); err != nil { + glog.V(0).Infof("create %s/%s: %v", dir.Path, req.Name, err) + return fuse.EIO + } + return nil + }); err != nil { + return nil, nil, err } + } - return nil - }) - - if err == nil { - file := dir.newFile(req.Name, nil, &filer_pb.FuseAttributes{}) + file := dir.newFile(req.Name, request.Entry) + if !request.Entry.IsDirectory { file.isOpen = true - return file, dir.wfs.AcquireHandle(file, req.Uid, req.Gid), nil } + fh := dir.wfs.AcquireHandle(file, req.Uid, req.Gid) + fh.dirtyMetadata = true + return file, fh, nil - return nil, nil, err } func (dir *Dir) Mkdir(ctx context.Context, req *fuse.MkdirRequest) (fs.Node, error) { @@ -204,7 +206,7 @@ func (dir *Dir) Lookup(ctx context.Context, req *fuse.LookupRequest, resp *fuse. if entry.IsDirectory { node = &Dir{Path: path.Join(dir.Path, req.Name), wfs: dir.wfs, attributes: entry.Attributes} } else { - node = dir.newFile(req.Name, entry.Chunks, entry.Attributes) + node = dir.newFile(req.Name, entry) } resp.EntryValid = time.Duration(0) diff --git a/weed/filesys/dirty_page.go b/weed/filesys/dirty_page.go index 145d89138..eb810d383 100644 --- a/weed/filesys/dirty_page.go +++ b/weed/filesys/dirty_page.go @@ -66,7 +66,8 @@ func (pages *ContinuousDirtyPages) AddPage(ctx context.Context, offset int64, da if offset != pages.Offset+pages.Size { // when this happens, debug shows the data overlapping with existing data is empty // the data is not just append - if offset == pages.Offset { + if offset == pages.Offset && int(pages.Size) < len(data) { + // glog.V(2).Infof("pages[%d,%d) pages.Data len=%v, data len=%d, pages.Size=%d", pages.Offset, pages.Offset+pages.Size, len(pages.Data), len(data), pages.Size) copy(pages.Data[pages.Size:], data[pages.Size:]) } else { if pages.Size != 0 { diff --git a/weed/filesys/file.go b/weed/filesys/file.go index 1bf544494..911d678df 100644 --- a/weed/filesys/file.go +++ b/weed/filesys/file.go @@ -18,12 +18,11 @@ var _ = fs.NodeFsyncer(&File{}) var _ = fs.NodeSetattrer(&File{}) type File struct { - Chunks []*filer_pb.FileChunk - Name string - dir *Dir - wfs *WFS - attributes *filer_pb.FuseAttributes - isOpen bool + Name string + dir *Dir + wfs *WFS + entry *filer_pb.Entry + isOpen bool } func (file *File) fullpath() string { @@ -36,11 +35,11 @@ func (file *File) Attr(ctx context.Context, attr *fuse.Attr) error { return err } - attr.Mode = os.FileMode(file.attributes.FileMode) - attr.Size = filer2.TotalSize(file.Chunks) - attr.Mtime = time.Unix(file.attributes.Mtime, 0) - attr.Gid = file.attributes.Gid - attr.Uid = file.attributes.Uid + attr.Mode = os.FileMode(file.entry.Attributes.FileMode) + attr.Size = filer2.TotalSize(file.entry.Chunks) + attr.Mtime = time.Unix(file.entry.Attributes.Mtime, 0) + attr.Gid = file.entry.Attributes.Gid + attr.Uid = file.entry.Attributes.Uid return nil @@ -72,41 +71,37 @@ func (file *File) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *f return nil } - glog.V(3).Infof("%v file setattr %+v, old:%+v", file.fullpath(), req, file.attributes) + glog.V(3).Infof("%v file setattr %+v, old:%+v", file.fullpath(), req, file.entry.Attributes) if req.Valid.Size() { glog.V(3).Infof("%v file setattr set size=%v", file.fullpath(), req.Size) if req.Size == 0 { // fmt.Printf("truncate %v \n", fullPath) - file.Chunks = nil + file.entry.Chunks = nil } - file.attributes.FileSize = req.Size + file.entry.Attributes.FileSize = req.Size } if req.Valid.Mode() { - file.attributes.FileMode = uint32(req.Mode) + file.entry.Attributes.FileMode = uint32(req.Mode) } if req.Valid.Uid() { - file.attributes.Uid = req.Uid + file.entry.Attributes.Uid = req.Uid } if req.Valid.Gid() { - file.attributes.Gid = req.Gid + file.entry.Attributes.Gid = req.Gid } if req.Valid.Mtime() { - file.attributes.Mtime = req.Mtime.Unix() + file.entry.Attributes.Mtime = req.Mtime.Unix() } return file.wfs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.UpdateEntryRequest{ Directory: file.dir.Path, - Entry: &filer_pb.Entry{ - Name: file.Name, - Attributes: file.attributes, - Chunks: file.Chunks, - }, + Entry: file.entry, } glog.V(1).Infof("set attr file entry: %v", request) @@ -130,31 +125,29 @@ func (file *File) Fsync(ctx context.Context, req *fuse.FsyncRequest) error { } func (file *File) maybeLoadAttributes(ctx context.Context) error { - if file.attributes == nil || !file.isOpen { + if file.entry == nil || !file.isOpen { item := file.wfs.listDirectoryEntriesCache.Get(file.fullpath()) if item != nil && !item.Expired() { entry := item.Value().(*filer_pb.Entry) - file.Chunks = entry.Chunks - file.attributes = entry.Attributes + file.entry = entry // glog.V(1).Infof("file attr read cached %v attributes", file.Name) } else { err := file.wfs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error { - request := &filer_pb.GetEntryAttributesRequest{ + request := &filer_pb.LookupDirectoryEntryRequest{ Name: file.Name, - ParentDir: file.dir.Path, + Directory: file.dir.Path, } - resp, err := client.GetEntryAttributes(ctx, request) + resp, err := client.LookupDirectoryEntry(ctx, request) if err != nil { glog.V(0).Infof("file attr read file %v: %v", request, err) return err } - file.attributes = resp.Attributes - file.Chunks = resp.Chunks + file.entry = resp.Entry - glog.V(1).Infof("file attr %v %+v: %d", file.fullpath(), file.attributes, filer2.TotalSize(file.Chunks)) + glog.V(1).Infof("file attr %v %+v: %d", file.fullpath(), file.entry.Attributes, filer2.TotalSize(file.entry.Chunks)) return nil }) diff --git a/weed/filesys/filehandle.go b/weed/filesys/filehandle.go index 786abbef2..e86238793 100644 --- a/weed/filesys/filehandle.go +++ b/weed/filesys/filehandle.go @@ -17,10 +17,9 @@ import ( type FileHandle struct { // cache file has been written to dirtyPages *ContinuousDirtyPages - dirtyMetadata bool contentType string - - handle uint64 + dirtyMetadata bool + handle uint64 f *File RequestId fuse.RequestID // unique ID for request @@ -51,14 +50,14 @@ func (fh *FileHandle) Read(ctx context.Context, req *fuse.ReadRequest, resp *fus glog.V(4).Infof("%s read fh %d: [%d,%d)", fh.f.fullpath(), fh.handle, req.Offset, req.Offset+int64(req.Size)) // this value should come from the filer instead of the old f - if len(fh.f.Chunks) == 0 { + if len(fh.f.entry.Chunks) == 0 { glog.V(0).Infof("empty fh %v/%v", fh.f.dir.Path, fh.f.Name) return fmt.Errorf("empty file %v/%v", fh.f.dir.Path, fh.f.Name) } buff := make([]byte, req.Size) - chunkViews := filer2.ViewFromChunks(fh.f.Chunks, req.Offset, req.Size) + chunkViews := filer2.ViewFromChunks(fh.f.entry.Chunks, req.Offset, req.Size) var vids []string for _, chunkView := range chunkViews { @@ -152,7 +151,7 @@ func (fh *FileHandle) Write(ctx context.Context, req *fuse.WriteRequest, resp *f } for _, chunk := range chunks { - fh.f.Chunks = append(fh.f.Chunks, chunk) + fh.f.entry.Chunks = append(fh.f.entry.Chunks, chunk) glog.V(1).Infof("uploaded %s/%s to %s [%d,%d)", fh.f.dir.Path, fh.f.Name, chunk.FileId, chunk.Offset, chunk.Offset+int64(chunk.Size)) fh.dirtyMetadata = true } @@ -184,52 +183,36 @@ func (fh *FileHandle) Flush(ctx context.Context, req *fuse.FlushRequest) error { return fmt.Errorf("flush %s/%s to %s [%d,%d): %v", fh.f.dir.Path, fh.f.Name, chunk.FileId, chunk.Offset, chunk.Offset+int64(chunk.Size), err) } if chunk != nil { - fh.f.Chunks = append(fh.f.Chunks, chunk) - fh.dirtyMetadata = true + fh.f.entry.Chunks = append(fh.f.entry.Chunks, chunk) } if !fh.dirtyMetadata { return nil } - if len(fh.f.Chunks) == 0 { - glog.V(2).Infof("fh %s/%s flush skipping empty: %v", fh.f.dir.Path, fh.f.Name, req) - return nil - } + return fh.f.wfs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error { - err = fh.f.wfs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error { - - if fh.f.attributes != nil { - fh.f.attributes.Mime = fh.contentType - fh.f.attributes.Uid = req.Uid - fh.f.attributes.Gid = req.Gid + if fh.f.entry.Attributes != nil { + fh.f.entry.Attributes.Mime = fh.contentType + fh.f.entry.Attributes.Uid = req.Uid + fh.f.entry.Attributes.Gid = req.Gid } - request := &filer_pb.UpdateEntryRequest{ + request := &filer_pb.CreateEntryRequest{ Directory: fh.f.dir.Path, - Entry: &filer_pb.Entry{ - Name: fh.f.Name, - Attributes: fh.f.attributes, - Chunks: fh.f.Chunks, - }, + Entry: fh.f.entry, } - glog.V(1).Infof("%s/%s set chunks: %v", fh.f.dir.Path, fh.f.Name, len(fh.f.Chunks)) - for i, chunk := range fh.f.Chunks { + glog.V(1).Infof("%s/%s set chunks: %v", fh.f.dir.Path, fh.f.Name, len(fh.f.entry.Chunks)) + for i, chunk := range fh.f.entry.Chunks { glog.V(1).Infof("%s/%s chunks %d: %v [%d,%d)", fh.f.dir.Path, fh.f.Name, i, chunk.FileId, chunk.Offset, chunk.Offset+int64(chunk.Size)) } - if _, err := client.UpdateEntry(ctx, request); err != nil { + if _, err := client.CreateEntry(ctx, request); err != nil { return fmt.Errorf("update fh: %v", err) } return nil }) - - if err == nil { - fh.dirtyMetadata = false - } - - return err } func volumeId(fileId string) string {