You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

106 lines
2.6 KiB

4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
  1. package filer
  2. import (
  3. "bytes"
  4. "context"
  5. "fmt"
  6. "github.com/chrislusf/seaweedfs/weed/glog"
  7. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  8. )
  9. func (fsw *FilerStoreWrapper) handleUpdateToHardLinks(ctx context.Context, entry *Entry) error {
  10. if entry.IsDirectory() {
  11. return nil
  12. }
  13. if len(entry.HardLinkId) > 0 {
  14. // handle hard links
  15. if err := fsw.setHardLink(ctx, entry); err != nil {
  16. return fmt.Errorf("setHardLink %d: %v", entry.HardLinkId, err)
  17. }
  18. }
  19. // check what is existing entry
  20. glog.V(4).Infof("handleUpdateToHardLinks FindEntry %s", entry.FullPath)
  21. actualStore := fsw.getActualStore(entry.FullPath)
  22. existingEntry, err := actualStore.FindEntry(ctx, entry.FullPath)
  23. if err != nil && err != filer_pb.ErrNotFound {
  24. return fmt.Errorf("update existing entry %s: %v", entry.FullPath, err)
  25. }
  26. // remove old hard link
  27. if err == nil && len(existingEntry.HardLinkId) != 0 && bytes.Compare(existingEntry.HardLinkId, entry.HardLinkId) != 0 {
  28. glog.V(4).Infof("handleUpdateToHardLinks DeleteHardLink %s", entry.FullPath)
  29. if err = fsw.DeleteHardLink(ctx, existingEntry.HardLinkId); err != nil {
  30. return err
  31. }
  32. }
  33. return nil
  34. }
  35. func (fsw *FilerStoreWrapper) setHardLink(ctx context.Context, entry *Entry) error {
  36. if len(entry.HardLinkId) == 0 {
  37. return nil
  38. }
  39. key := entry.HardLinkId
  40. newBlob, encodeErr := entry.EncodeAttributesAndChunks()
  41. if encodeErr != nil {
  42. return encodeErr
  43. }
  44. return fsw.KvPut(ctx, key, newBlob)
  45. }
  46. func (fsw *FilerStoreWrapper) maybeReadHardLink(ctx context.Context, entry *Entry) error {
  47. if len(entry.HardLinkId) == 0 {
  48. return nil
  49. }
  50. key := entry.HardLinkId
  51. glog.V(4).Infof("maybeReadHardLink KvGet %v", key)
  52. value, err := fsw.KvGet(ctx, key)
  53. if err != nil {
  54. glog.Errorf("read %s hardlink %d: %v", entry.FullPath, entry.HardLinkId, err)
  55. return err
  56. }
  57. if err = entry.DecodeAttributesAndChunks(value); err != nil {
  58. glog.Errorf("decode %s hardlink %d: %v", entry.FullPath, entry.HardLinkId, err)
  59. return err
  60. }
  61. return nil
  62. }
  63. func (fsw *FilerStoreWrapper) DeleteHardLink(ctx context.Context, hardLinkId HardLinkId) error {
  64. key := hardLinkId
  65. value, err := fsw.KvGet(ctx, key)
  66. if err == ErrKvNotFound {
  67. return nil
  68. }
  69. if err != nil {
  70. return err
  71. }
  72. entry := &Entry{}
  73. if err = entry.DecodeAttributesAndChunks(value); err != nil {
  74. return err
  75. }
  76. entry.HardLinkCounter--
  77. if entry.HardLinkCounter <= 0 {
  78. glog.V(4).Infof("DeleteHardLink KvDelete %v", key)
  79. return fsw.KvDelete(ctx, key)
  80. }
  81. newBlob, encodeErr := entry.EncodeAttributesAndChunks()
  82. if encodeErr != nil {
  83. return encodeErr
  84. }
  85. glog.V(4).Infof("DeleteHardLink KvPut %v", key)
  86. return fsw.KvPut(ctx, key, newBlob)
  87. }