From 1295347958e0954a5e1f053cd7ab52576fb8151e Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Thu, 24 Sep 2020 09:43:00 -0700 Subject: [PATCH] adjust hardlink update simplify logic, pass entity content directly to hard link. The "weed mount" handles the logic to calculate hard link counter. --- weed/filer/filerstore.go | 201 +----------------------------- weed/filer/filerstore_hardlink.go | 95 ++++++++++++++ weed/filesys/dir_link.go | 46 +++---- 3 files changed, 119 insertions(+), 223 deletions(-) create mode 100644 weed/filer/filerstore_hardlink.go diff --git a/weed/filer/filerstore.go b/weed/filer/filerstore.go index 7dc778562..888168581 100644 --- a/weed/filer/filerstore.go +++ b/weed/filer/filerstore.go @@ -3,8 +3,6 @@ package filer import ( "context" "errors" - "fmt" - "github.com/chrislusf/seaweedfs/weed/glog" "strings" "time" @@ -82,29 +80,8 @@ func (fsw *FilerStoreWrapper) InsertEntry(ctx context.Context, entry *Entry) err entry.Mime = "" } - if entry.HardLinkId != 0 { - // check what is existing entry - existingEntry, err := fsw.ActualStore.FindEntry(ctx, entry.FullPath) - - if err == nil && entry.HardLinkId == existingEntry.HardLinkId { - // updating the same entry - if err := fsw.updateHardLink(ctx, entry); err != nil { - return err - } - return nil - } else { - if err == nil && existingEntry.HardLinkId != 0 { - // break away from the old hard link - if err := fsw.DeleteHardLink(ctx, entry.HardLinkId); err != nil { - return err - } - } - // CreateLink 1.2 : update new file to hardlink mode - // update one existing hard link, counter ++ - if err := fsw.increaseHardLink(ctx, entry.HardLinkId); err != nil { - return err - } - } + if err := fsw.handleUpdateToHardLinks(ctx, entry); err != nil { + return err } return fsw.ActualStore.InsertEntry(ctx, entry) @@ -122,50 +99,8 @@ func (fsw *FilerStoreWrapper) UpdateEntry(ctx context.Context, entry *Entry) err entry.Mime = "" } - if entry.HardLinkId != 0 { - // handle hard link - - // check what is existing entry - existingEntry, err := fsw.ActualStore.FindEntry(ctx, entry.FullPath) - if err != nil { - return fmt.Errorf("update existing entry %s: %v", entry.FullPath, err) - } - - err = fsw.maybeReadHardLink(ctx, &Entry{HardLinkId: entry.HardLinkId}) - if err == ErrKvNotFound { - - // CreateLink 1.1 : split source entry into hardlink+empty_entry - - // create hard link from existing entry, counter ++ - existingEntry.HardLinkId = entry.HardLinkId - if err = fsw.createHardLink(ctx, existingEntry); err != nil { - return fmt.Errorf("createHardLink %d: %v", existingEntry.HardLinkId, err) - } - - // create the empty entry - if err = fsw.ActualStore.UpdateEntry(ctx, &Entry{ - FullPath: entry.FullPath, - HardLinkId: entry.HardLinkId, - }); err != nil { - return fmt.Errorf("UpdateEntry to link %d: %v", entry.FullPath, err) - } - return nil - } - if err != nil { - return fmt.Errorf("update entry %s: %v", entry.FullPath, err) - } - - if entry.HardLinkId != existingEntry.HardLinkId { - // if different hard link id, moving to a new hard link - glog.Fatalf("unexpected. update entry to a new link. not implemented yet.") - } else { - // updating hardlink with new metadata - if err = fsw.updateHardLink(ctx, entry); err != nil { - return fmt.Errorf("updateHardLink %d from %s: %v", entry.HardLinkId, entry.FullPath, err) - } - } - - return nil + if err := fsw.handleUpdateToHardLinks(ctx, entry); err != nil { + return err } return fsw.ActualStore.UpdateEntry(ctx, entry) @@ -318,131 +253,3 @@ func (fsw *FilerStoreWrapper) KvGet(ctx context.Context, key []byte) (value []by func (fsw *FilerStoreWrapper) KvDelete(ctx context.Context, key []byte) (err error) { return fsw.ActualStore.KvDelete(ctx, key) } - -func (fsw *FilerStoreWrapper) createHardLink(ctx context.Context, entry *Entry) error { - if entry.HardLinkId == 0 { - return nil - } - key := entry.HardLinkId.Key() - - _, err := fsw.KvGet(ctx, key) - if err != ErrKvNotFound { - return fmt.Errorf("create hardlink %d: already exists: %v", entry.HardLinkId, err) - } - - entry.HardLinkCounter = 1 - - newBlob, encodeErr := entry.EncodeAttributesAndChunks() - if encodeErr != nil { - return encodeErr - } - - return fsw.KvPut(ctx, key, newBlob) -} - -func (fsw *FilerStoreWrapper) updateHardLink(ctx context.Context, entry *Entry) error { - if entry.HardLinkId == 0 { - return nil - } - key := entry.HardLinkId.Key() - - value, err := fsw.KvGet(ctx, key) - if err == ErrKvNotFound { - return fmt.Errorf("update hardlink %d: missing", entry.HardLinkId) - } - if err != nil { - return fmt.Errorf("update hardlink %d err: %v", entry.HardLinkId, err) - } - - existingEntry := &Entry{} - if err = existingEntry.DecodeAttributesAndChunks(value); err != nil { - return err - } - - entry.HardLinkCounter = existingEntry.HardLinkCounter - - newBlob, encodeErr := entry.EncodeAttributesAndChunks() - if encodeErr != nil { - return encodeErr - } - - return fsw.KvPut(ctx, key, newBlob) -} - -func (fsw *FilerStoreWrapper) increaseHardLink(ctx context.Context, hardLinkId HardLinkId) error { - if hardLinkId == 0 { - return nil - } - key := hardLinkId.Key() - - value, err := fsw.KvGet(ctx, key) - if err == ErrKvNotFound { - return fmt.Errorf("increaseHardLink %d: missing", hardLinkId) - } - if err != nil { - return fmt.Errorf("increaseHardLink %d err: %v", hardLinkId, err) - } - - existingEntry := &Entry{} - if err = existingEntry.DecodeAttributesAndChunks(value); err != nil { - return err - } - - existingEntry.HardLinkCounter++ - - newBlob, encodeErr := existingEntry.EncodeAttributesAndChunks() - if encodeErr != nil { - return encodeErr - } - - return fsw.KvPut(ctx, key, newBlob) -} - -func (fsw *FilerStoreWrapper) maybeReadHardLink(ctx context.Context, entry *Entry) error { - if entry.HardLinkId == 0 { - return nil - } - key := entry.HardLinkId.Key() - - value, err := fsw.KvGet(ctx, key) - if err != nil { - glog.Errorf("read %s hardlink %d: %v", entry.FullPath, entry.HardLinkId, err) - return err - } - - if err = entry.DecodeAttributesAndChunks(value); err != nil { - glog.Errorf("decode %s hardlink %d: %v", entry.FullPath, entry.HardLinkId, err) - return err - } - - return nil -} - -func (fsw *FilerStoreWrapper) DeleteHardLink(ctx context.Context, hardLinkId HardLinkId) error { - key := hardLinkId.Key() - value, err := fsw.KvGet(ctx, key) - if err == ErrKvNotFound { - return nil - } - if err != nil { - return err - } - - entry := &Entry{} - if err = entry.DecodeAttributesAndChunks(value); err != nil { - return err - } - - entry.HardLinkCounter-- - if entry.HardLinkCounter <= 0 { - return fsw.KvDelete(ctx, key) - } - - newBlob, encodeErr := entry.EncodeAttributesAndChunks() - if encodeErr != nil { - return encodeErr - } - - return fsw.KvPut(ctx, key, newBlob) - -} diff --git a/weed/filer/filerstore_hardlink.go b/weed/filer/filerstore_hardlink.go new file mode 100644 index 000000000..ec768b9cb --- /dev/null +++ b/weed/filer/filerstore_hardlink.go @@ -0,0 +1,95 @@ +package filer + +import ( + "context" + "fmt" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" +) + +func (fsw *FilerStoreWrapper) handleUpdateToHardLinks(ctx context.Context, entry *Entry) error { + if entry.HardLinkId == 0 { + return nil + } + // handle hard links + if err := fsw.setHardLink(ctx, entry); err != nil { + return fmt.Errorf("setHardLink %d: %v", entry.HardLinkId, err) + } + + // check what is existing entry + existingEntry, err := fsw.ActualStore.FindEntry(ctx, entry.FullPath) + if err != nil && err != filer_pb.ErrNotFound { + return fmt.Errorf("update existing entry %s: %v", entry.FullPath, err) + } + + // remove old hard link + if err == nil && existingEntry.HardLinkId != entry.HardLinkId { + if err = fsw.DeleteHardLink(ctx, existingEntry.HardLinkId); err != nil { + return err + } + } + return nil +} + +func (fsw *FilerStoreWrapper) setHardLink(ctx context.Context, entry *Entry) error { + if entry.HardLinkId == 0 { + return nil + } + key := entry.HardLinkId.Key() + + newBlob, encodeErr := entry.EncodeAttributesAndChunks() + if encodeErr != nil { + return encodeErr + } + + return fsw.KvPut(ctx, key, newBlob) +} + +func (fsw *FilerStoreWrapper) maybeReadHardLink(ctx context.Context, entry *Entry) error { + if entry.HardLinkId == 0 { + return nil + } + key := entry.HardLinkId.Key() + + value, err := fsw.KvGet(ctx, key) + if err != nil { + glog.Errorf("read %s hardlink %d: %v", entry.FullPath, entry.HardLinkId, err) + return err + } + + if err = entry.DecodeAttributesAndChunks(value); err != nil { + glog.Errorf("decode %s hardlink %d: %v", entry.FullPath, entry.HardLinkId, err) + return err + } + + return nil +} + +func (fsw *FilerStoreWrapper) DeleteHardLink(ctx context.Context, hardLinkId HardLinkId) error { + key := hardLinkId.Key() + value, err := fsw.KvGet(ctx, key) + if err == ErrKvNotFound { + return nil + } + if err != nil { + return err + } + + entry := &Entry{} + if err = entry.DecodeAttributesAndChunks(value); err != nil { + return err + } + + entry.HardLinkCounter-- + if entry.HardLinkCounter <= 0 { + return fsw.KvDelete(ctx, key) + } + + newBlob, encodeErr := entry.EncodeAttributesAndChunks() + if encodeErr != nil { + return encodeErr + } + + return fsw.KvPut(ctx, key, newBlob) + +} diff --git a/weed/filesys/dir_link.go b/weed/filesys/dir_link.go index c15aed863..ddc3248bd 100644 --- a/weed/filesys/dir_link.go +++ b/weed/filesys/dir_link.go @@ -32,31 +32,28 @@ func (dir *Dir) Link(ctx context.Context, req *fuse.LinkRequest, old fs.Node) (f } // update old file to hardlink mode - var updateOldEntryRequest *filer_pb.UpdateEntryRequest - var hardLinkId filer.HardLinkId - if oldFile.entry.HardLinkId != 0 { - hardLinkId = filer.HardLinkId(oldFile.entry.HardLinkId) - } else { - // CreateLink 1.1 : split source entry into hardlink+empty_entry - hardLinkId = filer.HardLinkId(util.RandomInt64()) - updateOldEntryRequest = &filer_pb.UpdateEntryRequest{ - Directory: oldFile.dir.FullPath(), - Entry: &filer_pb.Entry{ - Name: oldFile.entry.Name, - IsDirectory: oldFile.entry.IsDirectory, - HardLinkId: int64(hardLinkId), - }, - Signatures: []int32{dir.wfs.signature}, - } + if oldFile.entry.HardLinkId == 0 { + oldFile.entry.HardLinkId = util.RandomInt64() + oldFile.entry.HardLinkCounter = 1 + } + oldFile.entry.HardLinkCounter++ + updateOldEntryRequest := &filer_pb.UpdateEntryRequest{ + Directory: oldFile.dir.FullPath(), + Entry: oldFile.entry, + Signatures: []int32{dir.wfs.signature}, } // CreateLink 1.2 : update new file to hardlink mode request := &filer_pb.CreateEntryRequest{ Directory: dir.FullPath(), Entry: &filer_pb.Entry{ - Name: req.NewName, - IsDirectory: false, - HardLinkId: int64(hardLinkId), + Name: req.NewName, + IsDirectory: false, + Attributes: oldFile.entry.Attributes, + Chunks: oldFile.entry.Chunks, + Extended: oldFile.entry.Extended, + HardLinkId: oldFile.entry.HardLinkId, + HardLinkCounter: oldFile.entry.HardLinkCounter, }, Signatures: []int32{dir.wfs.signature}, } @@ -67,14 +64,11 @@ func (dir *Dir) Link(ctx context.Context, req *fuse.LinkRequest, old fs.Node) (f dir.wfs.mapPbIdFromLocalToFiler(request.Entry) defer dir.wfs.mapPbIdFromFilerToLocal(request.Entry) - if updateOldEntryRequest != nil { - if err := filer_pb.UpdateEntry(client, updateOldEntryRequest); err != nil { - glog.V(0).Infof("Link %v/%v -> %s/%s: %v", oldFile.dir.FullPath(), oldFile.Name, dir.FullPath(), req.NewName, err) - return fuse.EIO - } - dir.wfs.metaCache.UpdateEntry(context.Background(), filer.FromPbEntry(updateOldEntryRequest.Directory, updateOldEntryRequest.Entry)) - oldFile.entry.HardLinkId = int64(hardLinkId) + if err := filer_pb.UpdateEntry(client, updateOldEntryRequest); err != nil { + glog.V(0).Infof("Link %v/%v -> %s/%s: %v", oldFile.dir.FullPath(), oldFile.Name, dir.FullPath(), req.NewName, err) + return fuse.EIO } + dir.wfs.metaCache.UpdateEntry(context.Background(), filer.FromPbEntry(updateOldEntryRequest.Directory, updateOldEntryRequest.Entry)) if err := filer_pb.CreateEntry(client, request); err != nil { glog.V(0).Infof("Link %v/%v -> %s/%s: %v", oldFile.dir.FullPath(), oldFile.Name, dir.FullPath(), req.NewName, err)