You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

273 lines
8.9 KiB

7 years ago
7 years ago
7 years ago
5 years ago
6 years ago
6 years ago
6 years ago
6 years ago
5 years ago
6 years ago
6 years ago
6 years ago
5 years ago
6 years ago
6 years ago
5 years ago
5 years ago
6 years ago
6 years ago
4 years ago
4 years ago
  1. package filer
  2. import (
  3. "context"
  4. "errors"
  5. "strings"
  6. "time"
  7. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  8. "github.com/chrislusf/seaweedfs/weed/stats"
  9. "github.com/chrislusf/seaweedfs/weed/util"
  10. )
  11. var (
  12. ErrUnsupportedListDirectoryPrefixed = errors.New("unsupported directory prefix listing")
  13. ErrKvNotImplemented = errors.New("kv not implemented yet")
  14. ErrKvNotFound = errors.New("kv: not found")
  15. )
  16. type FilerStore interface {
  17. // GetName gets the name to locate the configuration in filer.toml file
  18. GetName() string
  19. // Initialize initializes the file store
  20. Initialize(configuration util.Configuration, prefix string) error
  21. InsertEntry(context.Context, *Entry) error
  22. UpdateEntry(context.Context, *Entry) (err error)
  23. // err == filer_pb.ErrNotFound if not found
  24. FindEntry(context.Context, util.FullPath) (entry *Entry, err error)
  25. DeleteEntry(context.Context, util.FullPath) (err error)
  26. DeleteFolderChildren(context.Context, util.FullPath) (err error)
  27. ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int) ([]*Entry, error)
  28. ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int, prefix string) ([]*Entry, error)
  29. BeginTransaction(ctx context.Context) (context.Context, error)
  30. CommitTransaction(ctx context.Context) error
  31. RollbackTransaction(ctx context.Context) error
  32. KvPut(ctx context.Context, key []byte, value []byte) (err error)
  33. KvGet(ctx context.Context, key []byte) (value []byte, err error)
  34. KvDelete(ctx context.Context, key []byte) (err error)
  35. Shutdown()
  36. }
  37. type VirtualFilerStore interface {
  38. FilerStore
  39. DeleteHardLink(ctx context.Context, hardLinkId HardLinkId) error
  40. DeleteOneEntry(ctx context.Context, entry *Entry) error
  41. }
  42. type FilerStoreWrapper struct {
  43. ActualStore FilerStore
  44. }
  45. func NewFilerStoreWrapper(store FilerStore) *FilerStoreWrapper {
  46. if innerStore, ok := store.(*FilerStoreWrapper); ok {
  47. return innerStore
  48. }
  49. return &FilerStoreWrapper{
  50. ActualStore: store,
  51. }
  52. }
  53. func (fsw *FilerStoreWrapper) GetName() string {
  54. return fsw.ActualStore.GetName()
  55. }
  56. func (fsw *FilerStoreWrapper) Initialize(configuration util.Configuration, prefix string) error {
  57. return fsw.ActualStore.Initialize(configuration, prefix)
  58. }
  59. func (fsw *FilerStoreWrapper) InsertEntry(ctx context.Context, entry *Entry) error {
  60. stats.FilerStoreCounter.WithLabelValues(fsw.ActualStore.GetName(), "insert").Inc()
  61. start := time.Now()
  62. defer func() {
  63. stats.FilerStoreHistogram.WithLabelValues(fsw.ActualStore.GetName(), "insert").Observe(time.Since(start).Seconds())
  64. }()
  65. filer_pb.BeforeEntrySerialization(entry.Chunks)
  66. if entry.Mime == "application/octet-stream" {
  67. entry.Mime = ""
  68. }
  69. if err := fsw.handleUpdateToHardLinks(ctx, entry); err != nil {
  70. return err
  71. }
  72. return fsw.ActualStore.InsertEntry(ctx, entry)
  73. }
  74. func (fsw *FilerStoreWrapper) UpdateEntry(ctx context.Context, entry *Entry) error {
  75. stats.FilerStoreCounter.WithLabelValues(fsw.ActualStore.GetName(), "update").Inc()
  76. start := time.Now()
  77. defer func() {
  78. stats.FilerStoreHistogram.WithLabelValues(fsw.ActualStore.GetName(), "update").Observe(time.Since(start).Seconds())
  79. }()
  80. filer_pb.BeforeEntrySerialization(entry.Chunks)
  81. if entry.Mime == "application/octet-stream" {
  82. entry.Mime = ""
  83. }
  84. if err := fsw.handleUpdateToHardLinks(ctx, entry); err != nil {
  85. return err
  86. }
  87. return fsw.ActualStore.UpdateEntry(ctx, entry)
  88. }
  89. func (fsw *FilerStoreWrapper) FindEntry(ctx context.Context, fp util.FullPath) (entry *Entry, err error) {
  90. stats.FilerStoreCounter.WithLabelValues(fsw.ActualStore.GetName(), "find").Inc()
  91. start := time.Now()
  92. defer func() {
  93. stats.FilerStoreHistogram.WithLabelValues(fsw.ActualStore.GetName(), "find").Observe(time.Since(start).Seconds())
  94. }()
  95. entry, err = fsw.ActualStore.FindEntry(ctx, fp)
  96. if err != nil {
  97. return nil, err
  98. }
  99. fsw.maybeReadHardLink(ctx, entry)
  100. filer_pb.AfterEntryDeserialization(entry.Chunks)
  101. return
  102. }
  103. func (fsw *FilerStoreWrapper) DeleteEntry(ctx context.Context, fp util.FullPath) (err error) {
  104. stats.FilerStoreCounter.WithLabelValues(fsw.ActualStore.GetName(), "delete").Inc()
  105. start := time.Now()
  106. defer func() {
  107. stats.FilerStoreHistogram.WithLabelValues(fsw.ActualStore.GetName(), "delete").Observe(time.Since(start).Seconds())
  108. }()
  109. existingEntry, findErr := fsw.FindEntry(ctx, fp)
  110. if findErr == filer_pb.ErrNotFound {
  111. return nil
  112. }
  113. if len(existingEntry.HardLinkId) != 0 {
  114. // remove hard link
  115. if err = fsw.DeleteHardLink(ctx, existingEntry.HardLinkId); err != nil {
  116. return err
  117. }
  118. }
  119. return fsw.ActualStore.DeleteEntry(ctx, fp)
  120. }
  121. func (fsw *FilerStoreWrapper) DeleteOneEntry(ctx context.Context, existingEntry *Entry) (err error) {
  122. stats.FilerStoreCounter.WithLabelValues(fsw.ActualStore.GetName(), "delete").Inc()
  123. start := time.Now()
  124. defer func() {
  125. stats.FilerStoreHistogram.WithLabelValues(fsw.ActualStore.GetName(), "delete").Observe(time.Since(start).Seconds())
  126. }()
  127. if len(existingEntry.HardLinkId) != 0 {
  128. // remove hard link
  129. if err = fsw.DeleteHardLink(ctx, existingEntry.HardLinkId); err != nil {
  130. return err
  131. }
  132. }
  133. return fsw.ActualStore.DeleteEntry(ctx, existingEntry.FullPath)
  134. }
  135. func (fsw *FilerStoreWrapper) DeleteFolderChildren(ctx context.Context, fp util.FullPath) (err error) {
  136. stats.FilerStoreCounter.WithLabelValues(fsw.ActualStore.GetName(), "deleteFolderChildren").Inc()
  137. start := time.Now()
  138. defer func() {
  139. stats.FilerStoreHistogram.WithLabelValues(fsw.ActualStore.GetName(), "deleteFolderChildren").Observe(time.Since(start).Seconds())
  140. }()
  141. return fsw.ActualStore.DeleteFolderChildren(ctx, fp)
  142. }
  143. func (fsw *FilerStoreWrapper) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int) ([]*Entry, error) {
  144. stats.FilerStoreCounter.WithLabelValues(fsw.ActualStore.GetName(), "list").Inc()
  145. start := time.Now()
  146. defer func() {
  147. stats.FilerStoreHistogram.WithLabelValues(fsw.ActualStore.GetName(), "list").Observe(time.Since(start).Seconds())
  148. }()
  149. entries, err := fsw.ActualStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit)
  150. if err != nil {
  151. return nil, err
  152. }
  153. for _, entry := range entries {
  154. fsw.maybeReadHardLink(ctx, entry)
  155. filer_pb.AfterEntryDeserialization(entry.Chunks)
  156. }
  157. return entries, err
  158. }
  159. func (fsw *FilerStoreWrapper) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int, prefix string) ([]*Entry, error) {
  160. stats.FilerStoreCounter.WithLabelValues(fsw.ActualStore.GetName(), "prefixList").Inc()
  161. start := time.Now()
  162. defer func() {
  163. stats.FilerStoreHistogram.WithLabelValues(fsw.ActualStore.GetName(), "prefixList").Observe(time.Since(start).Seconds())
  164. }()
  165. entries, err := fsw.ActualStore.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, prefix)
  166. if err == ErrUnsupportedListDirectoryPrefixed {
  167. entries, err = fsw.prefixFilterEntries(ctx, dirPath, startFileName, includeStartFile, limit, prefix)
  168. }
  169. if err != nil {
  170. return nil, err
  171. }
  172. for _, entry := range entries {
  173. fsw.maybeReadHardLink(ctx, entry)
  174. filer_pb.AfterEntryDeserialization(entry.Chunks)
  175. }
  176. return entries, nil
  177. }
  178. func (fsw *FilerStoreWrapper) prefixFilterEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int, prefix string) (entries []*Entry, err error) {
  179. entries, err = fsw.ActualStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit)
  180. if err != nil {
  181. return nil, err
  182. }
  183. if prefix == "" {
  184. return
  185. }
  186. count := 0
  187. var lastFileName string
  188. notPrefixed := entries
  189. entries = nil
  190. for count < limit && len(notPrefixed) > 0 {
  191. for _, entry := range notPrefixed {
  192. lastFileName = entry.Name()
  193. if strings.HasPrefix(entry.Name(), prefix) {
  194. count++
  195. entries = append(entries, entry)
  196. if count >= limit {
  197. break
  198. }
  199. }
  200. }
  201. if count < limit {
  202. notPrefixed, err = fsw.ActualStore.ListDirectoryEntries(ctx, dirPath, lastFileName, false, limit)
  203. if err != nil {
  204. return
  205. }
  206. }
  207. }
  208. return
  209. }
  210. func (fsw *FilerStoreWrapper) BeginTransaction(ctx context.Context) (context.Context, error) {
  211. return fsw.ActualStore.BeginTransaction(ctx)
  212. }
  213. func (fsw *FilerStoreWrapper) CommitTransaction(ctx context.Context) error {
  214. return fsw.ActualStore.CommitTransaction(ctx)
  215. }
  216. func (fsw *FilerStoreWrapper) RollbackTransaction(ctx context.Context) error {
  217. return fsw.ActualStore.RollbackTransaction(ctx)
  218. }
  219. func (fsw *FilerStoreWrapper) Shutdown() {
  220. fsw.ActualStore.Shutdown()
  221. }
  222. func (fsw *FilerStoreWrapper) KvPut(ctx context.Context, key []byte, value []byte) (err error) {
  223. return fsw.ActualStore.KvPut(ctx, key, value)
  224. }
  225. func (fsw *FilerStoreWrapper) KvGet(ctx context.Context, key []byte) (value []byte, err error) {
  226. return fsw.ActualStore.KvGet(ctx, key)
  227. }
  228. func (fsw *FilerStoreWrapper) KvDelete(ctx context.Context, key []byte) (err error) {
  229. return fsw.ActualStore.KvDelete(ctx, key)
  230. }