You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

172 lines
4.5 KiB

4 years ago
4 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
4 years ago
4 years ago
  1. package filer_pb
  2. import (
  3. "bytes"
  4. "context"
  5. "errors"
  6. "fmt"
  7. "github.com/chrislusf/seaweedfs/weed/filer"
  8. "github.com/chrislusf/seaweedfs/weed/wdclient"
  9. "math"
  10. "strings"
  11. "github.com/chrislusf/seaweedfs/weed/glog"
  12. "github.com/chrislusf/seaweedfs/weed/storage/needle"
  13. "github.com/golang/protobuf/proto"
  14. "github.com/viant/ptrie"
  15. )
  16. func ToFileIdObject(fileIdStr string) (*FileId, error) {
  17. t, err := needle.ParseFileIdFromString(fileIdStr)
  18. if err != nil {
  19. return nil, err
  20. }
  21. return &FileId{
  22. VolumeId: uint32(t.VolumeId),
  23. Cookie: uint32(t.Cookie),
  24. FileKey: uint64(t.Key),
  25. }, nil
  26. }
  27. func (fid *FileId) toFileIdString() string {
  28. return needle.NewFileId(needle.VolumeId(fid.VolumeId), fid.FileKey, fid.Cookie).String()
  29. }
  30. func (c *FileChunk) GetFileIdString() string {
  31. if c.FileId != "" {
  32. return c.FileId
  33. }
  34. if c.Fid != nil {
  35. c.FileId = c.Fid.toFileIdString()
  36. return c.FileId
  37. }
  38. return ""
  39. }
  40. func BeforeEntrySerialization(chunks []*FileChunk) {
  41. for _, chunk := range chunks {
  42. if chunk.FileId != "" {
  43. if fid, err := ToFileIdObject(chunk.FileId); err == nil {
  44. chunk.Fid = fid
  45. chunk.FileId = ""
  46. }
  47. }
  48. if chunk.SourceFileId != "" {
  49. if fid, err := ToFileIdObject(chunk.SourceFileId); err == nil {
  50. chunk.SourceFid = fid
  51. chunk.SourceFileId = ""
  52. }
  53. }
  54. }
  55. }
  56. func EnsureFid(chunk *FileChunk) {
  57. if chunk.Fid != nil {
  58. return
  59. }
  60. if fid, err := ToFileIdObject(chunk.FileId); err == nil {
  61. chunk.Fid = fid
  62. }
  63. }
  64. func AfterEntryDeserialization(chunks []*FileChunk) {
  65. for _, chunk := range chunks {
  66. if chunk.Fid != nil && chunk.FileId == "" {
  67. chunk.FileId = chunk.Fid.toFileIdString()
  68. }
  69. if chunk.SourceFid != nil && chunk.SourceFileId == "" {
  70. chunk.SourceFileId = chunk.SourceFid.toFileIdString()
  71. }
  72. }
  73. }
  74. func CreateEntry(client SeaweedFilerClient, request *CreateEntryRequest) error {
  75. resp, err := client.CreateEntry(context.Background(), request)
  76. if err != nil {
  77. glog.V(1).Infof("create entry %s/%s %v: %v", request.Directory, request.Entry.Name, request.OExcl, err)
  78. return fmt.Errorf("CreateEntry: %v", err)
  79. }
  80. if resp.Error != "" {
  81. glog.V(1).Infof("create entry %s/%s %v: %v", request.Directory, request.Entry.Name, request.OExcl, resp.Error)
  82. return fmt.Errorf("CreateEntry : %v", resp.Error)
  83. }
  84. return nil
  85. }
  86. func UpdateEntry(client SeaweedFilerClient, request *UpdateEntryRequest) error {
  87. _, err := client.UpdateEntry(context.Background(), request)
  88. if err != nil {
  89. glog.V(1).Infof("update entry %s/%s :%v", request.Directory, request.Entry.Name, err)
  90. return fmt.Errorf("UpdateEntry: %v", err)
  91. }
  92. return nil
  93. }
  94. func LookupEntry(client SeaweedFilerClient, request *LookupDirectoryEntryRequest) (*LookupDirectoryEntryResponse, error) {
  95. resp, err := client.LookupDirectoryEntry(context.Background(), request)
  96. if err != nil {
  97. if err == ErrNotFound || strings.Contains(err.Error(), ErrNotFound.Error()) {
  98. return nil, ErrNotFound
  99. }
  100. glog.V(3).Infof("read %s/%v: %v", request.Directory, request.Name, err)
  101. return nil, fmt.Errorf("LookupEntry1: %v", err)
  102. }
  103. if resp.Entry == nil {
  104. return nil, ErrNotFound
  105. }
  106. return resp, nil
  107. }
  108. func ReadEntry(masterClient *wdclient.MasterClient, filerClient SeaweedFilerClient, dir, name string, byteBuffer *bytes.Buffer) error {
  109. request := &LookupDirectoryEntryRequest{
  110. Directory: filer.DirectoryEtc,
  111. Name: filer.FilerConfName,
  112. }
  113. respLookupEntry, err := LookupEntry(filerClient, request)
  114. if err != nil {
  115. return err
  116. }
  117. if len(respLookupEntry.Entry.Content) > 0 {
  118. _, err = byteBuffer.Write(respLookupEntry.Entry.Content)
  119. return err
  120. }
  121. return filer.StreamContent(masterClient, byteBuffer, respLookupEntry.Entry.Chunks, 0, math.MaxInt64)
  122. }
  123. var ErrNotFound = errors.New("filer: no entry is found in filer store")
  124. func IsCreate(event *SubscribeMetadataResponse) bool {
  125. return event.EventNotification.NewEntry != nil && event.EventNotification.OldEntry == nil
  126. }
  127. func IsUpdate(event *SubscribeMetadataResponse) bool {
  128. return event.EventNotification.NewEntry != nil &&
  129. event.EventNotification.OldEntry != nil &&
  130. event.Directory == event.EventNotification.NewParentPath
  131. }
  132. func IsDelete(event *SubscribeMetadataResponse) bool {
  133. return event.EventNotification.NewEntry == nil && event.EventNotification.OldEntry != nil
  134. }
  135. func IsRename(event *SubscribeMetadataResponse) bool {
  136. return event.EventNotification.NewEntry != nil &&
  137. event.EventNotification.OldEntry != nil &&
  138. event.Directory != event.EventNotification.NewParentPath
  139. }
  140. var _ = ptrie.KeyProvider(&FilerConf_PathConf{})
  141. func (fp *FilerConf_PathConf) Key() interface{} {
  142. key, _ := proto.Marshal(fp)
  143. return string(key)
  144. }