You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

147 lines
3.8 KiB

4 years ago
4 years ago
4 years ago
4 years ago
5 years ago
4 years ago
  1. package filer_pb
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "strings"
  7. "github.com/chrislusf/seaweedfs/weed/glog"
  8. "github.com/chrislusf/seaweedfs/weed/storage/needle"
  9. "github.com/viant/ptrie"
  10. )
  11. func ToFileIdObject(fileIdStr string) (*FileId, error) {
  12. t, err := needle.ParseFileIdFromString(fileIdStr)
  13. if err != nil {
  14. return nil, err
  15. }
  16. return &FileId{
  17. VolumeId: uint32(t.VolumeId),
  18. Cookie: uint32(t.Cookie),
  19. FileKey: uint64(t.Key),
  20. }, nil
  21. }
  22. func (fid *FileId) toFileIdString() string {
  23. return needle.NewFileId(needle.VolumeId(fid.VolumeId), fid.FileKey, fid.Cookie).String()
  24. }
  25. func (c *FileChunk) GetFileIdString() string {
  26. if c.FileId != "" {
  27. return c.FileId
  28. }
  29. if c.Fid != nil {
  30. c.FileId = c.Fid.toFileIdString()
  31. return c.FileId
  32. }
  33. return ""
  34. }
  35. func BeforeEntrySerialization(chunks []*FileChunk) {
  36. for _, chunk := range chunks {
  37. if chunk.FileId != "" {
  38. if fid, err := ToFileIdObject(chunk.FileId); err == nil {
  39. chunk.Fid = fid
  40. chunk.FileId = ""
  41. }
  42. }
  43. if chunk.SourceFileId != "" {
  44. if fid, err := ToFileIdObject(chunk.SourceFileId); err == nil {
  45. chunk.SourceFid = fid
  46. chunk.SourceFileId = ""
  47. }
  48. }
  49. }
  50. }
  51. func EnsureFid(chunk *FileChunk) {
  52. if chunk.Fid != nil {
  53. return
  54. }
  55. if fid, err := ToFileIdObject(chunk.FileId); err == nil {
  56. chunk.Fid = fid
  57. }
  58. }
  59. func AfterEntryDeserialization(chunks []*FileChunk) {
  60. for _, chunk := range chunks {
  61. if chunk.Fid != nil && chunk.FileId == "" {
  62. chunk.FileId = chunk.Fid.toFileIdString()
  63. }
  64. if chunk.SourceFid != nil && chunk.SourceFileId == "" {
  65. chunk.SourceFileId = chunk.SourceFid.toFileIdString()
  66. }
  67. }
  68. }
  69. func CreateEntry(client SeaweedFilerClient, request *CreateEntryRequest) error {
  70. resp, err := client.CreateEntry(context.Background(), request)
  71. if err != nil {
  72. glog.V(1).Infof("create entry %s/%s %v: %v", request.Directory, request.Entry.Name, request.OExcl, err)
  73. return fmt.Errorf("CreateEntry: %v", err)
  74. }
  75. if resp.Error != "" {
  76. glog.V(1).Infof("create entry %s/%s %v: %v", request.Directory, request.Entry.Name, request.OExcl, resp.Error)
  77. return fmt.Errorf("CreateEntry : %v", resp.Error)
  78. }
  79. return nil
  80. }
  81. func UpdateEntry(client SeaweedFilerClient, request *UpdateEntryRequest) error {
  82. _, err := client.UpdateEntry(context.Background(), request)
  83. if err != nil {
  84. glog.V(1).Infof("update entry %s/%s :%v", request.Directory, request.Entry.Name, err)
  85. return fmt.Errorf("UpdateEntry: %v", err)
  86. }
  87. return nil
  88. }
  89. func LookupEntry(client SeaweedFilerClient, request *LookupDirectoryEntryRequest) (*LookupDirectoryEntryResponse, error) {
  90. resp, err := client.LookupDirectoryEntry(context.Background(), request)
  91. if err != nil {
  92. if err == ErrNotFound || strings.Contains(err.Error(), ErrNotFound.Error()) {
  93. return nil, ErrNotFound
  94. }
  95. glog.V(3).Infof("read %s/%v: %v", request.Directory, request.Name, err)
  96. return nil, fmt.Errorf("LookupEntry1: %v", err)
  97. }
  98. if resp.Entry == nil {
  99. return nil, ErrNotFound
  100. }
  101. return resp, nil
  102. }
  103. var ErrNotFound = errors.New("filer: no entry is found in filer store")
  104. func IsCreate(event *SubscribeMetadataResponse) bool {
  105. return event.EventNotification.NewEntry != nil && event.EventNotification.OldEntry == nil
  106. }
  107. func IsUpdate(event *SubscribeMetadataResponse) bool {
  108. return event.EventNotification.NewEntry != nil &&
  109. event.EventNotification.OldEntry != nil &&
  110. event.Directory == event.EventNotification.NewParentPath
  111. }
  112. func IsDelete(event *SubscribeMetadataResponse) bool {
  113. return event.EventNotification.NewEntry == nil && event.EventNotification.OldEntry != nil
  114. }
  115. func IsRename(event *SubscribeMetadataResponse) bool {
  116. return event.EventNotification.NewEntry != nil &&
  117. event.EventNotification.OldEntry != nil &&
  118. event.Directory != event.EventNotification.NewParentPath
  119. }
  120. var _ = ptrie.KeyProvider(&FilerConf_PathConf{})
  121. func (fp *FilerConf_PathConf) Key() interface{} {
  122. return fp.LocationPrefix
  123. }