You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

224 lines
7.6 KiB

7 years ago
7 years ago
7 years ago
5 years ago
6 years ago
6 years ago
6 years ago
6 years ago
5 years ago
6 years ago
6 years ago
6 years ago
5 years ago
6 years ago
6 years ago
5 years ago
5 years ago
6 years ago
6 years ago
4 years ago
4 years ago
  1. package filer
  2. import (
  3. "context"
  4. "errors"
  5. "strings"
  6. "time"
  7. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  8. "github.com/chrislusf/seaweedfs/weed/stats"
  9. "github.com/chrislusf/seaweedfs/weed/util"
  10. )
  11. var (
  12. ErrUnsupportedListDirectoryPrefixed = errors.New("unsupported directory prefix listing")
  13. ErrKvNotImplemented = errors.New("kv not implemented yet")
  14. ErrKvNotFound = errors.New("kv: not found")
  15. )
  16. type FilerStore interface {
  17. // GetName gets the name to locate the configuration in filer.toml file
  18. GetName() string
  19. // Initialize initializes the file store
  20. Initialize(configuration util.Configuration, prefix string) error
  21. InsertEntry(context.Context, *Entry) error
  22. UpdateEntry(context.Context, *Entry) (err error)
  23. // err == filer2.ErrNotFound if not found
  24. FindEntry(context.Context, util.FullPath) (entry *Entry, err error)
  25. DeleteEntry(context.Context, util.FullPath) (err error)
  26. DeleteFolderChildren(context.Context, util.FullPath) (err error)
  27. ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int) ([]*Entry, error)
  28. ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int, prefix string) ([]*Entry, error)
  29. BeginTransaction(ctx context.Context) (context.Context, error)
  30. CommitTransaction(ctx context.Context) error
  31. RollbackTransaction(ctx context.Context) error
  32. KvPut(ctx context.Context, key []byte, value []byte) (err error)
  33. KvGet(ctx context.Context, key []byte) (value []byte, err error)
  34. KvDelete(ctx context.Context, key []byte) (err error)
  35. Shutdown()
  36. }
  37. type FilerStoreWrapper struct {
  38. ActualStore FilerStore
  39. }
  40. func NewFilerStoreWrapper(store FilerStore) *FilerStoreWrapper {
  41. if innerStore, ok := store.(*FilerStoreWrapper); ok {
  42. return innerStore
  43. }
  44. return &FilerStoreWrapper{
  45. ActualStore: store,
  46. }
  47. }
  48. func (fsw *FilerStoreWrapper) GetName() string {
  49. return fsw.ActualStore.GetName()
  50. }
  51. func (fsw *FilerStoreWrapper) Initialize(configuration util.Configuration, prefix string) error {
  52. return fsw.ActualStore.Initialize(configuration, prefix)
  53. }
  54. func (fsw *FilerStoreWrapper) InsertEntry(ctx context.Context, entry *Entry) error {
  55. stats.FilerStoreCounter.WithLabelValues(fsw.ActualStore.GetName(), "insert").Inc()
  56. start := time.Now()
  57. defer func() {
  58. stats.FilerStoreHistogram.WithLabelValues(fsw.ActualStore.GetName(), "insert").Observe(time.Since(start).Seconds())
  59. }()
  60. filer_pb.BeforeEntrySerialization(entry.Chunks)
  61. if entry.Mime == "application/octet-stream" {
  62. entry.Mime = ""
  63. }
  64. return fsw.ActualStore.InsertEntry(ctx, entry)
  65. }
  66. func (fsw *FilerStoreWrapper) UpdateEntry(ctx context.Context, entry *Entry) error {
  67. stats.FilerStoreCounter.WithLabelValues(fsw.ActualStore.GetName(), "update").Inc()
  68. start := time.Now()
  69. defer func() {
  70. stats.FilerStoreHistogram.WithLabelValues(fsw.ActualStore.GetName(), "update").Observe(time.Since(start).Seconds())
  71. }()
  72. filer_pb.BeforeEntrySerialization(entry.Chunks)
  73. if entry.Mime == "application/octet-stream" {
  74. entry.Mime = ""
  75. }
  76. return fsw.ActualStore.UpdateEntry(ctx, entry)
  77. }
  78. func (fsw *FilerStoreWrapper) FindEntry(ctx context.Context, fp util.FullPath) (entry *Entry, err error) {
  79. stats.FilerStoreCounter.WithLabelValues(fsw.ActualStore.GetName(), "find").Inc()
  80. start := time.Now()
  81. defer func() {
  82. stats.FilerStoreHistogram.WithLabelValues(fsw.ActualStore.GetName(), "find").Observe(time.Since(start).Seconds())
  83. }()
  84. entry, err = fsw.ActualStore.FindEntry(ctx, fp)
  85. if err != nil {
  86. return nil, err
  87. }
  88. filer_pb.AfterEntryDeserialization(entry.Chunks)
  89. return
  90. }
  91. func (fsw *FilerStoreWrapper) DeleteEntry(ctx context.Context, fp util.FullPath) (err error) {
  92. stats.FilerStoreCounter.WithLabelValues(fsw.ActualStore.GetName(), "delete").Inc()
  93. start := time.Now()
  94. defer func() {
  95. stats.FilerStoreHistogram.WithLabelValues(fsw.ActualStore.GetName(), "delete").Observe(time.Since(start).Seconds())
  96. }()
  97. return fsw.ActualStore.DeleteEntry(ctx, fp)
  98. }
  99. func (fsw *FilerStoreWrapper) DeleteFolderChildren(ctx context.Context, fp util.FullPath) (err error) {
  100. stats.FilerStoreCounter.WithLabelValues(fsw.ActualStore.GetName(), "deleteFolderChildren").Inc()
  101. start := time.Now()
  102. defer func() {
  103. stats.FilerStoreHistogram.WithLabelValues(fsw.ActualStore.GetName(), "deleteFolderChildren").Observe(time.Since(start).Seconds())
  104. }()
  105. return fsw.ActualStore.DeleteFolderChildren(ctx, fp)
  106. }
  107. func (fsw *FilerStoreWrapper) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int) ([]*Entry, error) {
  108. stats.FilerStoreCounter.WithLabelValues(fsw.ActualStore.GetName(), "list").Inc()
  109. start := time.Now()
  110. defer func() {
  111. stats.FilerStoreHistogram.WithLabelValues(fsw.ActualStore.GetName(), "list").Observe(time.Since(start).Seconds())
  112. }()
  113. entries, err := fsw.ActualStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit)
  114. if err != nil {
  115. return nil, err
  116. }
  117. for _, entry := range entries {
  118. filer_pb.AfterEntryDeserialization(entry.Chunks)
  119. }
  120. return entries, err
  121. }
  122. func (fsw *FilerStoreWrapper) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int, prefix string) ([]*Entry, error) {
  123. stats.FilerStoreCounter.WithLabelValues(fsw.ActualStore.GetName(), "prefixList").Inc()
  124. start := time.Now()
  125. defer func() {
  126. stats.FilerStoreHistogram.WithLabelValues(fsw.ActualStore.GetName(), "prefixList").Observe(time.Since(start).Seconds())
  127. }()
  128. entries, err := fsw.ActualStore.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, prefix)
  129. if err == ErrUnsupportedListDirectoryPrefixed {
  130. entries, err = fsw.prefixFilterEntries(ctx, dirPath, startFileName, includeStartFile, limit, prefix)
  131. }
  132. if err != nil {
  133. return nil, err
  134. }
  135. for _, entry := range entries {
  136. filer_pb.AfterEntryDeserialization(entry.Chunks)
  137. }
  138. return entries, nil
  139. }
  140. func (fsw *FilerStoreWrapper) prefixFilterEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int, prefix string) (entries []*Entry, err error) {
  141. entries, err = fsw.ActualStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit)
  142. if err != nil {
  143. return nil, err
  144. }
  145. if prefix == "" {
  146. return
  147. }
  148. count := 0
  149. var lastFileName string
  150. notPrefixed := entries
  151. entries = nil
  152. for count < limit && len(notPrefixed) > 0 {
  153. for _, entry := range notPrefixed {
  154. lastFileName = entry.Name()
  155. if strings.HasPrefix(entry.Name(), prefix) {
  156. count++
  157. entries = append(entries, entry)
  158. if count >= limit {
  159. break
  160. }
  161. }
  162. }
  163. if count < limit {
  164. notPrefixed, err = fsw.ActualStore.ListDirectoryEntries(ctx, dirPath, lastFileName, false, limit)
  165. if err != nil {
  166. return
  167. }
  168. }
  169. }
  170. return
  171. }
  172. func (fsw *FilerStoreWrapper) BeginTransaction(ctx context.Context) (context.Context, error) {
  173. return fsw.ActualStore.BeginTransaction(ctx)
  174. }
  175. func (fsw *FilerStoreWrapper) CommitTransaction(ctx context.Context) error {
  176. return fsw.ActualStore.CommitTransaction(ctx)
  177. }
  178. func (fsw *FilerStoreWrapper) RollbackTransaction(ctx context.Context) error {
  179. return fsw.ActualStore.RollbackTransaction(ctx)
  180. }
  181. func (fsw *FilerStoreWrapper) Shutdown() {
  182. fsw.ActualStore.Shutdown()
  183. }
  184. func (fsw *FilerStoreWrapper) KvPut(ctx context.Context, key []byte, value []byte) (err error) {
  185. return fsw.ActualStore.KvPut(ctx, key, value)
  186. }
  187. func (fsw *FilerStoreWrapper) KvGet(ctx context.Context, key []byte) (value []byte, err error) {
  188. return fsw.ActualStore.KvGet(ctx, key)
  189. }
  190. func (fsw *FilerStoreWrapper) KvDelete(ctx context.Context, key []byte) (err error) {
  191. return fsw.ActualStore.KvDelete(ctx, key)
  192. }