You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

101 lines
2.5 KiB

4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
  1. package filer
  2. import (
  3. "bytes"
  4. "context"
  5. "fmt"
  6. "github.com/chrislusf/seaweedfs/weed/glog"
  7. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  8. )
  9. func (fsw *FilerStoreWrapper) handleUpdateToHardLinks(ctx context.Context, entry *Entry) error {
  10. if len(entry.HardLinkId) == 0 {
  11. return nil
  12. }
  13. // handle hard links
  14. if err := fsw.setHardLink(ctx, entry); err != nil {
  15. return fmt.Errorf("setHardLink %d: %v", entry.HardLinkId, err)
  16. }
  17. // check what is existing entry
  18. glog.V(4).Infof("handleUpdateToHardLinks FindEntry %s", entry.FullPath)
  19. existingEntry, err := fsw.getActualStore(entry.FullPath).FindEntry(ctx, entry.FullPath)
  20. if err != nil && err != filer_pb.ErrNotFound {
  21. return fmt.Errorf("update existing entry %s: %v", entry.FullPath, err)
  22. }
  23. // remove old hard link
  24. if err == nil && len(existingEntry.HardLinkId) != 0 && bytes.Compare(existingEntry.HardLinkId, entry.HardLinkId) != 0 {
  25. glog.V(4).Infof("handleUpdateToHardLinks DeleteHardLink %s", entry.FullPath)
  26. if err = fsw.DeleteHardLink(ctx, existingEntry.HardLinkId); err != nil {
  27. return err
  28. }
  29. }
  30. return nil
  31. }
  32. func (fsw *FilerStoreWrapper) setHardLink(ctx context.Context, entry *Entry) error {
  33. if len(entry.HardLinkId) == 0 {
  34. return nil
  35. }
  36. key := entry.HardLinkId
  37. newBlob, encodeErr := entry.EncodeAttributesAndChunks()
  38. if encodeErr != nil {
  39. return encodeErr
  40. }
  41. return fsw.KvPut(ctx, key, newBlob)
  42. }
  43. func (fsw *FilerStoreWrapper) maybeReadHardLink(ctx context.Context, entry *Entry) error {
  44. if len(entry.HardLinkId) == 0 {
  45. return nil
  46. }
  47. key := entry.HardLinkId
  48. glog.V(4).Infof("maybeReadHardLink KvGet %v", key)
  49. value, err := fsw.KvGet(ctx, key)
  50. if err != nil {
  51. glog.Errorf("read %s hardlink %d: %v", entry.FullPath, entry.HardLinkId, err)
  52. return err
  53. }
  54. if err = entry.DecodeAttributesAndChunks(value); err != nil {
  55. glog.Errorf("decode %s hardlink %d: %v", entry.FullPath, entry.HardLinkId, err)
  56. return err
  57. }
  58. return nil
  59. }
  60. func (fsw *FilerStoreWrapper) DeleteHardLink(ctx context.Context, hardLinkId HardLinkId) error {
  61. key := hardLinkId
  62. value, err := fsw.KvGet(ctx, key)
  63. if err == ErrKvNotFound {
  64. return nil
  65. }
  66. if err != nil {
  67. return err
  68. }
  69. entry := &Entry{}
  70. if err = entry.DecodeAttributesAndChunks(value); err != nil {
  71. return err
  72. }
  73. entry.HardLinkCounter--
  74. if entry.HardLinkCounter <= 0 {
  75. glog.V(4).Infof("DeleteHardLink KvDelete %v", key)
  76. return fsw.KvDelete(ctx, key)
  77. }
  78. newBlob, encodeErr := entry.EncodeAttributesAndChunks()
  79. if encodeErr != nil {
  80. return encodeErr
  81. }
  82. glog.V(4).Infof("DeleteHardLink KvPut %v", key)
  83. return fsw.KvPut(ctx, key, newBlob)
  84. }