You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

229 lines
7.8 KiB

7 years ago
7 years ago
7 years ago
5 years ago
7 years ago
6 years ago
6 years ago
6 years ago
6 years ago
5 years ago
6 years ago
6 years ago
6 years ago
5 years ago
6 years ago
6 years ago
5 years ago
5 years ago
6 years ago
6 years ago
4 years ago
4 years ago
  1. package filer
  2. import (
  3. "context"
  4. "errors"
  5. "strings"
  6. "time"
  7. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  8. "github.com/chrislusf/seaweedfs/weed/stats"
  9. "github.com/chrislusf/seaweedfs/weed/util"
  10. )
  11. var (
  12. ErrUnsupportedListDirectoryPrefixed = errors.New("unsupported directory prefix listing")
  13. ErrKvNotImplemented = errors.New("kv not implemented yet")
  14. ErrKvNotFound = errors.New("kv: not found")
  15. )
  16. type FilerStore interface {
  17. // GetName gets the name to locate the configuration in filer.toml file
  18. GetName() string
  19. // Initialize initializes the file store
  20. Initialize(configuration util.Configuration, prefix string) error
  21. InsertEntry(context.Context, *Entry) error
  22. UpdateEntry(context.Context, *Entry) (err error)
  23. // err == filer2.ErrNotFound if not found
  24. FindEntry(context.Context, util.FullPath) (entry *Entry, err error)
  25. DeleteEntry(context.Context, util.FullPath) (err error)
  26. DeleteFolderChildren(context.Context, util.FullPath) (err error)
  27. ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int) ([]*Entry, error)
  28. ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int, prefix string) ([]*Entry, error)
  29. BeginTransaction(ctx context.Context) (context.Context, error)
  30. CommitTransaction(ctx context.Context) error
  31. RollbackTransaction(ctx context.Context) error
  32. KvPut(ctx context.Context, key []byte, value []byte) (err error)
  33. KvGet(ctx context.Context, key []byte) (value []byte, err error)
  34. KvDelete(ctx context.Context, key []byte) (err error)
  35. Shutdown()
  36. }
  37. type FilerLocalStore interface {
  38. UpdateOffset(filer string, lastTsNs int64) error
  39. ReadOffset(filer string) (lastTsNs int64, err error)
  40. }
  41. type FilerStoreWrapper struct {
  42. ActualStore FilerStore
  43. }
  44. func NewFilerStoreWrapper(store FilerStore) *FilerStoreWrapper {
  45. if innerStore, ok := store.(*FilerStoreWrapper); ok {
  46. return innerStore
  47. }
  48. return &FilerStoreWrapper{
  49. ActualStore: store,
  50. }
  51. }
  52. func (fsw *FilerStoreWrapper) GetName() string {
  53. return fsw.ActualStore.GetName()
  54. }
  55. func (fsw *FilerStoreWrapper) Initialize(configuration util.Configuration, prefix string) error {
  56. return fsw.ActualStore.Initialize(configuration, prefix)
  57. }
  58. func (fsw *FilerStoreWrapper) InsertEntry(ctx context.Context, entry *Entry) error {
  59. stats.FilerStoreCounter.WithLabelValues(fsw.ActualStore.GetName(), "insert").Inc()
  60. start := time.Now()
  61. defer func() {
  62. stats.FilerStoreHistogram.WithLabelValues(fsw.ActualStore.GetName(), "insert").Observe(time.Since(start).Seconds())
  63. }()
  64. filer_pb.BeforeEntrySerialization(entry.Chunks)
  65. if entry.Mime == "application/octet-stream" {
  66. entry.Mime = ""
  67. }
  68. return fsw.ActualStore.InsertEntry(ctx, entry)
  69. }
  70. func (fsw *FilerStoreWrapper) UpdateEntry(ctx context.Context, entry *Entry) error {
  71. stats.FilerStoreCounter.WithLabelValues(fsw.ActualStore.GetName(), "update").Inc()
  72. start := time.Now()
  73. defer func() {
  74. stats.FilerStoreHistogram.WithLabelValues(fsw.ActualStore.GetName(), "update").Observe(time.Since(start).Seconds())
  75. }()
  76. filer_pb.BeforeEntrySerialization(entry.Chunks)
  77. if entry.Mime == "application/octet-stream" {
  78. entry.Mime = ""
  79. }
  80. return fsw.ActualStore.UpdateEntry(ctx, entry)
  81. }
  82. func (fsw *FilerStoreWrapper) FindEntry(ctx context.Context, fp util.FullPath) (entry *Entry, err error) {
  83. stats.FilerStoreCounter.WithLabelValues(fsw.ActualStore.GetName(), "find").Inc()
  84. start := time.Now()
  85. defer func() {
  86. stats.FilerStoreHistogram.WithLabelValues(fsw.ActualStore.GetName(), "find").Observe(time.Since(start).Seconds())
  87. }()
  88. entry, err = fsw.ActualStore.FindEntry(ctx, fp)
  89. if err != nil {
  90. return nil, err
  91. }
  92. filer_pb.AfterEntryDeserialization(entry.Chunks)
  93. return
  94. }
  95. func (fsw *FilerStoreWrapper) DeleteEntry(ctx context.Context, fp util.FullPath) (err error) {
  96. stats.FilerStoreCounter.WithLabelValues(fsw.ActualStore.GetName(), "delete").Inc()
  97. start := time.Now()
  98. defer func() {
  99. stats.FilerStoreHistogram.WithLabelValues(fsw.ActualStore.GetName(), "delete").Observe(time.Since(start).Seconds())
  100. }()
  101. return fsw.ActualStore.DeleteEntry(ctx, fp)
  102. }
  103. func (fsw *FilerStoreWrapper) DeleteFolderChildren(ctx context.Context, fp util.FullPath) (err error) {
  104. stats.FilerStoreCounter.WithLabelValues(fsw.ActualStore.GetName(), "deleteFolderChildren").Inc()
  105. start := time.Now()
  106. defer func() {
  107. stats.FilerStoreHistogram.WithLabelValues(fsw.ActualStore.GetName(), "deleteFolderChildren").Observe(time.Since(start).Seconds())
  108. }()
  109. return fsw.ActualStore.DeleteFolderChildren(ctx, fp)
  110. }
  111. func (fsw *FilerStoreWrapper) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int) ([]*Entry, error) {
  112. stats.FilerStoreCounter.WithLabelValues(fsw.ActualStore.GetName(), "list").Inc()
  113. start := time.Now()
  114. defer func() {
  115. stats.FilerStoreHistogram.WithLabelValues(fsw.ActualStore.GetName(), "list").Observe(time.Since(start).Seconds())
  116. }()
  117. entries, err := fsw.ActualStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit)
  118. if err != nil {
  119. return nil, err
  120. }
  121. for _, entry := range entries {
  122. filer_pb.AfterEntryDeserialization(entry.Chunks)
  123. }
  124. return entries, err
  125. }
  126. func (fsw *FilerStoreWrapper) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int, prefix string) ([]*Entry, error) {
  127. stats.FilerStoreCounter.WithLabelValues(fsw.ActualStore.GetName(), "prefixList").Inc()
  128. start := time.Now()
  129. defer func() {
  130. stats.FilerStoreHistogram.WithLabelValues(fsw.ActualStore.GetName(), "prefixList").Observe(time.Since(start).Seconds())
  131. }()
  132. entries, err := fsw.ActualStore.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, prefix)
  133. if err == ErrUnsupportedListDirectoryPrefixed {
  134. entries, err = fsw.prefixFilterEntries(ctx, dirPath, startFileName, includeStartFile, limit, prefix)
  135. }
  136. if err != nil {
  137. return nil, err
  138. }
  139. for _, entry := range entries {
  140. filer_pb.AfterEntryDeserialization(entry.Chunks)
  141. }
  142. return entries, nil
  143. }
  144. func (fsw *FilerStoreWrapper) prefixFilterEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int, prefix string) (entries []*Entry, err error) {
  145. entries, err = fsw.ActualStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit)
  146. if err != nil {
  147. return nil, err
  148. }
  149. if prefix == "" {
  150. return
  151. }
  152. count := 0
  153. var lastFileName string
  154. notPrefixed := entries
  155. entries = nil
  156. for count < limit && len(notPrefixed) > 0 {
  157. for _, entry := range notPrefixed {
  158. lastFileName = entry.Name()
  159. if strings.HasPrefix(entry.Name(), prefix) {
  160. count++
  161. entries = append(entries, entry)
  162. if count >= limit {
  163. break
  164. }
  165. }
  166. }
  167. if count < limit {
  168. notPrefixed, err = fsw.ActualStore.ListDirectoryEntries(ctx, dirPath, lastFileName, false, limit)
  169. if err != nil {
  170. return
  171. }
  172. }
  173. }
  174. return
  175. }
  176. func (fsw *FilerStoreWrapper) BeginTransaction(ctx context.Context) (context.Context, error) {
  177. return fsw.ActualStore.BeginTransaction(ctx)
  178. }
  179. func (fsw *FilerStoreWrapper) CommitTransaction(ctx context.Context) error {
  180. return fsw.ActualStore.CommitTransaction(ctx)
  181. }
  182. func (fsw *FilerStoreWrapper) RollbackTransaction(ctx context.Context) error {
  183. return fsw.ActualStore.RollbackTransaction(ctx)
  184. }
  185. func (fsw *FilerStoreWrapper) Shutdown() {
  186. fsw.ActualStore.Shutdown()
  187. }
  188. func (fsw *FilerStoreWrapper) KvPut(ctx context.Context, key []byte, value []byte) (err error) {
  189. return fsw.ActualStore.KvPut(ctx, key, value)
  190. }
  191. func (fsw *FilerStoreWrapper) KvGet(ctx context.Context, key []byte) (value []byte, err error) {
  192. return fsw.ActualStore.KvGet(ctx, key)
  193. }
  194. func (fsw *FilerStoreWrapper) KvDelete(ctx context.Context, key []byte) (err error) {
  195. return fsw.ActualStore.KvDelete(ctx, key)
  196. }