You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

157 lines
4.1 KiB

4 years ago
4 years ago
4 years ago
4 years ago
5 years ago
4 years ago
  1. package filer_pb
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "strings"
  7. "github.com/chrislusf/seaweedfs/weed/glog"
  8. "github.com/chrislusf/seaweedfs/weed/storage/needle"
  9. "github.com/golang/protobuf/proto"
  10. "github.com/viant/ptrie"
  11. )
  12. func (entry *Entry) IsInRemoteOnly() bool {
  13. return len(entry.Chunks) == 0 && entry.RemoteEntry != nil && entry.RemoteEntry.RemoteSize > 0
  14. }
  15. func ToFileIdObject(fileIdStr string) (*FileId, error) {
  16. t, err := needle.ParseFileIdFromString(fileIdStr)
  17. if err != nil {
  18. return nil, err
  19. }
  20. return &FileId{
  21. VolumeId: uint32(t.VolumeId),
  22. Cookie: uint32(t.Cookie),
  23. FileKey: uint64(t.Key),
  24. }, nil
  25. }
  26. func (fid *FileId) toFileIdString() string {
  27. return needle.NewFileId(needle.VolumeId(fid.VolumeId), fid.FileKey, fid.Cookie).String()
  28. }
  29. func (c *FileChunk) GetFileIdString() string {
  30. if c.FileId != "" {
  31. return c.FileId
  32. }
  33. if c.Fid != nil {
  34. c.FileId = c.Fid.toFileIdString()
  35. return c.FileId
  36. }
  37. return ""
  38. }
  39. func BeforeEntrySerialization(chunks []*FileChunk) {
  40. for _, chunk := range chunks {
  41. if chunk.FileId != "" {
  42. if fid, err := ToFileIdObject(chunk.FileId); err == nil {
  43. chunk.Fid = fid
  44. chunk.FileId = ""
  45. }
  46. }
  47. if chunk.SourceFileId != "" {
  48. if fid, err := ToFileIdObject(chunk.SourceFileId); err == nil {
  49. chunk.SourceFid = fid
  50. chunk.SourceFileId = ""
  51. }
  52. }
  53. }
  54. }
  55. func EnsureFid(chunk *FileChunk) {
  56. if chunk.Fid != nil {
  57. return
  58. }
  59. if fid, err := ToFileIdObject(chunk.FileId); err == nil {
  60. chunk.Fid = fid
  61. }
  62. }
  63. func AfterEntryDeserialization(chunks []*FileChunk) {
  64. for _, chunk := range chunks {
  65. if chunk.Fid != nil && chunk.FileId == "" {
  66. chunk.FileId = chunk.Fid.toFileIdString()
  67. }
  68. if chunk.SourceFid != nil && chunk.SourceFileId == "" {
  69. chunk.SourceFileId = chunk.SourceFid.toFileIdString()
  70. }
  71. }
  72. }
  73. func CreateEntry(client SeaweedFilerClient, request *CreateEntryRequest) error {
  74. resp, err := client.CreateEntry(context.Background(), request)
  75. if err != nil {
  76. glog.V(1).Infof("create entry %s/%s %v: %v", request.Directory, request.Entry.Name, request.OExcl, err)
  77. return fmt.Errorf("CreateEntry: %v", err)
  78. }
  79. if resp.Error != "" {
  80. glog.V(1).Infof("create entry %s/%s %v: %v", request.Directory, request.Entry.Name, request.OExcl, resp.Error)
  81. return fmt.Errorf("CreateEntry : %v", resp.Error)
  82. }
  83. return nil
  84. }
  85. func UpdateEntry(client SeaweedFilerClient, request *UpdateEntryRequest) error {
  86. _, err := client.UpdateEntry(context.Background(), request)
  87. if err != nil {
  88. glog.V(1).Infof("update entry %s/%s :%v", request.Directory, request.Entry.Name, err)
  89. return fmt.Errorf("UpdateEntry: %v", err)
  90. }
  91. return nil
  92. }
  93. func LookupEntry(client SeaweedFilerClient, request *LookupDirectoryEntryRequest) (*LookupDirectoryEntryResponse, error) {
  94. resp, err := client.LookupDirectoryEntry(context.Background(), request)
  95. if err != nil {
  96. if err == ErrNotFound || strings.Contains(err.Error(), ErrNotFound.Error()) {
  97. return nil, ErrNotFound
  98. }
  99. glog.V(3).Infof("read %s/%v: %v", request.Directory, request.Name, err)
  100. return nil, fmt.Errorf("LookupEntry1: %v", err)
  101. }
  102. if resp.Entry == nil {
  103. return nil, ErrNotFound
  104. }
  105. return resp, nil
  106. }
  107. var ErrNotFound = errors.New("filer: no entry is found in filer store")
  108. func IsCreate(event *SubscribeMetadataResponse) bool {
  109. return event.EventNotification.NewEntry != nil && event.EventNotification.OldEntry == nil
  110. }
  111. func IsUpdate(event *SubscribeMetadataResponse) bool {
  112. return event.EventNotification.NewEntry != nil &&
  113. event.EventNotification.OldEntry != nil &&
  114. event.Directory == event.EventNotification.NewParentPath
  115. }
  116. func IsDelete(event *SubscribeMetadataResponse) bool {
  117. return event.EventNotification.NewEntry == nil && event.EventNotification.OldEntry != nil
  118. }
  119. func IsRename(event *SubscribeMetadataResponse) bool {
  120. return event.EventNotification.NewEntry != nil &&
  121. event.EventNotification.OldEntry != nil &&
  122. event.Directory != event.EventNotification.NewParentPath
  123. }
  124. var _ = ptrie.KeyProvider(&FilerConf_PathConf{})
  125. func (fp *FilerConf_PathConf) Key() interface{} {
  126. key, _ := proto.Marshal(fp)
  127. return string(key)
  128. }
  129. func (fp *RemoteStorageLocation) Key() interface{} {
  130. key, _ := proto.Marshal(fp)
  131. return string(key)
  132. }