You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

208 lines
6.9 KiB

7 years ago
7 years ago
5 years ago
7 years ago
6 years ago
6 years ago
6 years ago
6 years ago
5 years ago
6 years ago
6 years ago
6 years ago
5 years ago
6 years ago
6 years ago
5 years ago
5 years ago
6 years ago
6 years ago
4 years ago
4 years ago
  1. package filer2
  2. import (
  3. "context"
  4. "strings"
  5. "time"
  6. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  7. "github.com/chrislusf/seaweedfs/weed/stats"
  8. "github.com/chrislusf/seaweedfs/weed/util"
  9. )
  10. type FilerStore interface {
  11. // GetName gets the name to locate the configuration in filer.toml file
  12. GetName() string
  13. // Initialize initializes the file store
  14. Initialize(configuration util.Configuration, prefix string) error
  15. InsertEntry(context.Context, *Entry) error
  16. UpdateEntry(context.Context, *Entry) (err error)
  17. // err == filer2.ErrNotFound if not found
  18. FindEntry(context.Context, util.FullPath) (entry *Entry, err error)
  19. DeleteEntry(context.Context, util.FullPath) (err error)
  20. DeleteFolderChildren(context.Context, util.FullPath) (err error)
  21. ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int) ([]*Entry, error)
  22. ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int, prefix string) ([]*Entry, error)
  23. BeginTransaction(ctx context.Context) (context.Context, error)
  24. CommitTransaction(ctx context.Context) error
  25. RollbackTransaction(ctx context.Context) error
  26. Shutdown()
  27. }
  28. type FilerLocalStore interface {
  29. UpdateOffset(filer string, lastTsNs int64) error
  30. ReadOffset(filer string) (lastTsNs int64, err error)
  31. }
  32. type FilerStoreWrapper struct {
  33. ActualStore FilerStore
  34. }
  35. func NewFilerStoreWrapper(store FilerStore) *FilerStoreWrapper {
  36. if innerStore, ok := store.(*FilerStoreWrapper); ok {
  37. return innerStore
  38. }
  39. return &FilerStoreWrapper{
  40. ActualStore: store,
  41. }
  42. }
  43. func (fsw *FilerStoreWrapper) GetName() string {
  44. return fsw.ActualStore.GetName()
  45. }
  46. func (fsw *FilerStoreWrapper) Initialize(configuration util.Configuration, prefix string) error {
  47. return fsw.ActualStore.Initialize(configuration, prefix)
  48. }
  49. func (fsw *FilerStoreWrapper) InsertEntry(ctx context.Context, entry *Entry) error {
  50. stats.FilerStoreCounter.WithLabelValues(fsw.ActualStore.GetName(), "insert").Inc()
  51. start := time.Now()
  52. defer func() {
  53. stats.FilerStoreHistogram.WithLabelValues(fsw.ActualStore.GetName(), "insert").Observe(time.Since(start).Seconds())
  54. }()
  55. filer_pb.BeforeEntrySerialization(entry.Chunks)
  56. if entry.Mime == "application/octet-stream" {
  57. entry.Mime = ""
  58. }
  59. return fsw.ActualStore.InsertEntry(ctx, entry)
  60. }
  61. func (fsw *FilerStoreWrapper) UpdateEntry(ctx context.Context, entry *Entry) error {
  62. stats.FilerStoreCounter.WithLabelValues(fsw.ActualStore.GetName(), "update").Inc()
  63. start := time.Now()
  64. defer func() {
  65. stats.FilerStoreHistogram.WithLabelValues(fsw.ActualStore.GetName(), "update").Observe(time.Since(start).Seconds())
  66. }()
  67. filer_pb.BeforeEntrySerialization(entry.Chunks)
  68. if entry.Mime == "application/octet-stream" {
  69. entry.Mime = ""
  70. }
  71. return fsw.ActualStore.UpdateEntry(ctx, entry)
  72. }
  73. func (fsw *FilerStoreWrapper) FindEntry(ctx context.Context, fp util.FullPath) (entry *Entry, err error) {
  74. stats.FilerStoreCounter.WithLabelValues(fsw.ActualStore.GetName(), "find").Inc()
  75. start := time.Now()
  76. defer func() {
  77. stats.FilerStoreHistogram.WithLabelValues(fsw.ActualStore.GetName(), "find").Observe(time.Since(start).Seconds())
  78. }()
  79. entry, err = fsw.ActualStore.FindEntry(ctx, fp)
  80. if err != nil {
  81. return nil, err
  82. }
  83. filer_pb.AfterEntryDeserialization(entry.Chunks)
  84. return
  85. }
  86. func (fsw *FilerStoreWrapper) DeleteEntry(ctx context.Context, fp util.FullPath) (err error) {
  87. stats.FilerStoreCounter.WithLabelValues(fsw.ActualStore.GetName(), "delete").Inc()
  88. start := time.Now()
  89. defer func() {
  90. stats.FilerStoreHistogram.WithLabelValues(fsw.ActualStore.GetName(), "delete").Observe(time.Since(start).Seconds())
  91. }()
  92. return fsw.ActualStore.DeleteEntry(ctx, fp)
  93. }
  94. func (fsw *FilerStoreWrapper) DeleteFolderChildren(ctx context.Context, fp util.FullPath) (err error) {
  95. stats.FilerStoreCounter.WithLabelValues(fsw.ActualStore.GetName(), "deleteFolderChildren").Inc()
  96. start := time.Now()
  97. defer func() {
  98. stats.FilerStoreHistogram.WithLabelValues(fsw.ActualStore.GetName(), "deleteFolderChildren").Observe(time.Since(start).Seconds())
  99. }()
  100. return fsw.ActualStore.DeleteFolderChildren(ctx, fp)
  101. }
  102. func (fsw *FilerStoreWrapper) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int) ([]*Entry, error) {
  103. stats.FilerStoreCounter.WithLabelValues(fsw.ActualStore.GetName(), "list").Inc()
  104. start := time.Now()
  105. defer func() {
  106. stats.FilerStoreHistogram.WithLabelValues(fsw.ActualStore.GetName(), "list").Observe(time.Since(start).Seconds())
  107. }()
  108. entries, err := fsw.ActualStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit)
  109. if err != nil {
  110. return nil, err
  111. }
  112. for _, entry := range entries {
  113. filer_pb.AfterEntryDeserialization(entry.Chunks)
  114. }
  115. return entries, err
  116. }
  117. func (fsw *FilerStoreWrapper) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int, prefix string) ([]*Entry, error) {
  118. stats.FilerStoreCounter.WithLabelValues(fsw.ActualStore.GetName(), "prefixList").Inc()
  119. start := time.Now()
  120. defer func() {
  121. stats.FilerStoreHistogram.WithLabelValues(fsw.ActualStore.GetName(), "prefixList").Observe(time.Since(start).Seconds())
  122. }()
  123. entries, err := fsw.ActualStore.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, prefix)
  124. if err == ErrUnsupportedListDirectoryPrefixed {
  125. entries, err = fsw.prefixFilterEntries(ctx, dirPath, startFileName, includeStartFile, limit, prefix)
  126. }
  127. if err != nil {
  128. return nil, err
  129. }
  130. for _, entry := range entries {
  131. filer_pb.AfterEntryDeserialization(entry.Chunks)
  132. }
  133. return entries, nil
  134. }
  135. func (fsw *FilerStoreWrapper) prefixFilterEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int, prefix string) (entries []*Entry, err error) {
  136. entries, err = fsw.ActualStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit)
  137. if err != nil {
  138. return nil, err
  139. }
  140. if prefix == "" {
  141. return
  142. }
  143. count := 0
  144. var lastFileName string
  145. notPrefixed := entries
  146. entries = nil
  147. for count < limit && len(notPrefixed) > 0 {
  148. for _, entry := range notPrefixed {
  149. lastFileName = entry.Name()
  150. if strings.HasPrefix(entry.Name(), prefix) {
  151. count++
  152. entries = append(entries, entry)
  153. if count >= limit {
  154. break
  155. }
  156. }
  157. }
  158. if count < limit {
  159. notPrefixed, err = fsw.ActualStore.ListDirectoryEntries(ctx, dirPath, lastFileName, false, limit)
  160. if err != nil {
  161. return
  162. }
  163. }
  164. }
  165. return
  166. }
  167. func (fsw *FilerStoreWrapper) BeginTransaction(ctx context.Context) (context.Context, error) {
  168. return fsw.ActualStore.BeginTransaction(ctx)
  169. }
  170. func (fsw *FilerStoreWrapper) CommitTransaction(ctx context.Context) error {
  171. return fsw.ActualStore.CommitTransaction(ctx)
  172. }
  173. func (fsw *FilerStoreWrapper) RollbackTransaction(ctx context.Context) error {
  174. return fsw.ActualStore.RollbackTransaction(ctx)
  175. }
  176. func (fsw *FilerStoreWrapper) Shutdown() {
  177. fsw.ActualStore.Shutdown()
  178. }