You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

286 lines
9.6 KiB

7 years ago
4 years ago
7 years ago
7 years ago
5 years ago
6 years ago
6 years ago
4 years ago
6 years ago
6 years ago
4 years ago
5 years ago
6 years ago
6 years ago
4 years ago
6 years ago
5 years ago
6 years ago
6 years ago
4 years ago
4 years ago
4 years ago
4 years ago
5 years ago
4 years ago
5 years ago
6 years ago
6 years ago
4 years ago
4 years ago
4 years ago
4 years ago
  1. package filer
  2. import (
  3. "context"
  4. "errors"
  5. "github.com/chrislusf/seaweedfs/weed/glog"
  6. "strings"
  7. "time"
  8. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  9. "github.com/chrislusf/seaweedfs/weed/stats"
  10. "github.com/chrislusf/seaweedfs/weed/util"
  11. )
  12. var (
  13. ErrUnsupportedListDirectoryPrefixed = errors.New("unsupported directory prefix listing")
  14. ErrKvNotImplemented = errors.New("kv not implemented yet")
  15. ErrKvNotFound = errors.New("kv: not found")
  16. _ = VirtualFilerStore(&FilerStoreWrapper{})
  17. )
  18. type FilerStore interface {
  19. // GetName gets the name to locate the configuration in filer.toml file
  20. GetName() string
  21. // Initialize initializes the file store
  22. Initialize(configuration util.Configuration, prefix string) error
  23. InsertEntry(context.Context, *Entry) error
  24. UpdateEntry(context.Context, *Entry) (err error)
  25. // err == filer_pb.ErrNotFound if not found
  26. FindEntry(context.Context, util.FullPath) (entry *Entry, err error)
  27. DeleteEntry(context.Context, util.FullPath) (err error)
  28. DeleteFolderChildren(context.Context, util.FullPath) (err error)
  29. ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int) ([]*Entry, error)
  30. ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int, prefix string) ([]*Entry, error)
  31. BeginTransaction(ctx context.Context) (context.Context, error)
  32. CommitTransaction(ctx context.Context) error
  33. RollbackTransaction(ctx context.Context) error
  34. KvPut(ctx context.Context, key []byte, value []byte) (err error)
  35. KvGet(ctx context.Context, key []byte) (value []byte, err error)
  36. KvDelete(ctx context.Context, key []byte) (err error)
  37. Shutdown()
  38. }
  39. type VirtualFilerStore interface {
  40. FilerStore
  41. DeleteHardLink(ctx context.Context, hardLinkId HardLinkId) error
  42. DeleteOneEntry(ctx context.Context, entry *Entry) error
  43. }
  44. type FilerStoreWrapper struct {
  45. ActualStore FilerStore
  46. }
  47. func NewFilerStoreWrapper(store FilerStore) *FilerStoreWrapper {
  48. if innerStore, ok := store.(*FilerStoreWrapper); ok {
  49. return innerStore
  50. }
  51. return &FilerStoreWrapper{
  52. ActualStore: store,
  53. }
  54. }
  55. func (fsw *FilerStoreWrapper) GetName() string {
  56. return fsw.ActualStore.GetName()
  57. }
  58. func (fsw *FilerStoreWrapper) Initialize(configuration util.Configuration, prefix string) error {
  59. return fsw.ActualStore.Initialize(configuration, prefix)
  60. }
  61. func (fsw *FilerStoreWrapper) InsertEntry(ctx context.Context, entry *Entry) error {
  62. stats.FilerStoreCounter.WithLabelValues(fsw.ActualStore.GetName(), "insert").Inc()
  63. start := time.Now()
  64. defer func() {
  65. stats.FilerStoreHistogram.WithLabelValues(fsw.ActualStore.GetName(), "insert").Observe(time.Since(start).Seconds())
  66. }()
  67. filer_pb.BeforeEntrySerialization(entry.Chunks)
  68. if entry.Mime == "application/octet-stream" {
  69. entry.Mime = ""
  70. }
  71. if err := fsw.handleUpdateToHardLinks(ctx, entry); err != nil {
  72. return err
  73. }
  74. glog.V(4).Infof("InsertEntry %s", entry.FullPath)
  75. return fsw.ActualStore.InsertEntry(ctx, entry)
  76. }
  77. func (fsw *FilerStoreWrapper) UpdateEntry(ctx context.Context, entry *Entry) error {
  78. stats.FilerStoreCounter.WithLabelValues(fsw.ActualStore.GetName(), "update").Inc()
  79. start := time.Now()
  80. defer func() {
  81. stats.FilerStoreHistogram.WithLabelValues(fsw.ActualStore.GetName(), "update").Observe(time.Since(start).Seconds())
  82. }()
  83. filer_pb.BeforeEntrySerialization(entry.Chunks)
  84. if entry.Mime == "application/octet-stream" {
  85. entry.Mime = ""
  86. }
  87. if err := fsw.handleUpdateToHardLinks(ctx, entry); err != nil {
  88. return err
  89. }
  90. glog.V(4).Infof("UpdateEntry %s", entry.FullPath)
  91. return fsw.ActualStore.UpdateEntry(ctx, entry)
  92. }
  93. func (fsw *FilerStoreWrapper) FindEntry(ctx context.Context, fp util.FullPath) (entry *Entry, err error) {
  94. stats.FilerStoreCounter.WithLabelValues(fsw.ActualStore.GetName(), "find").Inc()
  95. start := time.Now()
  96. defer func() {
  97. stats.FilerStoreHistogram.WithLabelValues(fsw.ActualStore.GetName(), "find").Observe(time.Since(start).Seconds())
  98. }()
  99. glog.V(4).Infof("FindEntry %s", fp)
  100. entry, err = fsw.ActualStore.FindEntry(ctx, fp)
  101. if err != nil {
  102. return nil, err
  103. }
  104. fsw.maybeReadHardLink(ctx, entry)
  105. filer_pb.AfterEntryDeserialization(entry.Chunks)
  106. return
  107. }
  108. func (fsw *FilerStoreWrapper) DeleteEntry(ctx context.Context, fp util.FullPath) (err error) {
  109. stats.FilerStoreCounter.WithLabelValues(fsw.ActualStore.GetName(), "delete").Inc()
  110. start := time.Now()
  111. defer func() {
  112. stats.FilerStoreHistogram.WithLabelValues(fsw.ActualStore.GetName(), "delete").Observe(time.Since(start).Seconds())
  113. }()
  114. existingEntry, findErr := fsw.FindEntry(ctx, fp)
  115. if findErr == filer_pb.ErrNotFound {
  116. return nil
  117. }
  118. if len(existingEntry.HardLinkId) != 0 {
  119. // remove hard link
  120. glog.V(4).Infof("DeleteHardLink %s", existingEntry.FullPath)
  121. if err = fsw.DeleteHardLink(ctx, existingEntry.HardLinkId); err != nil {
  122. return err
  123. }
  124. }
  125. glog.V(4).Infof("DeleteEntry %s", fp)
  126. return fsw.ActualStore.DeleteEntry(ctx, fp)
  127. }
  128. func (fsw *FilerStoreWrapper) DeleteOneEntry(ctx context.Context, existingEntry *Entry) (err error) {
  129. stats.FilerStoreCounter.WithLabelValues(fsw.ActualStore.GetName(), "delete").Inc()
  130. start := time.Now()
  131. defer func() {
  132. stats.FilerStoreHistogram.WithLabelValues(fsw.ActualStore.GetName(), "delete").Observe(time.Since(start).Seconds())
  133. }()
  134. if len(existingEntry.HardLinkId) != 0 {
  135. // remove hard link
  136. glog.V(4).Infof("DeleteHardLink %s", existingEntry.FullPath)
  137. if err = fsw.DeleteHardLink(ctx, existingEntry.HardLinkId); err != nil {
  138. return err
  139. }
  140. }
  141. glog.V(4).Infof("DeleteOneEntry %s", existingEntry.FullPath)
  142. return fsw.ActualStore.DeleteEntry(ctx, existingEntry.FullPath)
  143. }
  144. func (fsw *FilerStoreWrapper) DeleteFolderChildren(ctx context.Context, fp util.FullPath) (err error) {
  145. stats.FilerStoreCounter.WithLabelValues(fsw.ActualStore.GetName(), "deleteFolderChildren").Inc()
  146. start := time.Now()
  147. defer func() {
  148. stats.FilerStoreHistogram.WithLabelValues(fsw.ActualStore.GetName(), "deleteFolderChildren").Observe(time.Since(start).Seconds())
  149. }()
  150. glog.V(4).Infof("DeleteFolderChildren %s", fp)
  151. return fsw.ActualStore.DeleteFolderChildren(ctx, fp)
  152. }
  153. func (fsw *FilerStoreWrapper) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int) ([]*Entry, error) {
  154. stats.FilerStoreCounter.WithLabelValues(fsw.ActualStore.GetName(), "list").Inc()
  155. start := time.Now()
  156. defer func() {
  157. stats.FilerStoreHistogram.WithLabelValues(fsw.ActualStore.GetName(), "list").Observe(time.Since(start).Seconds())
  158. }()
  159. glog.V(4).Infof("ListDirectoryEntries %s from %s limit %d", dirPath, startFileName, limit)
  160. entries, err := fsw.ActualStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit)
  161. if err != nil {
  162. return nil, err
  163. }
  164. for _, entry := range entries {
  165. fsw.maybeReadHardLink(ctx, entry)
  166. filer_pb.AfterEntryDeserialization(entry.Chunks)
  167. }
  168. return entries, err
  169. }
  170. func (fsw *FilerStoreWrapper) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int, prefix string) ([]*Entry, error) {
  171. stats.FilerStoreCounter.WithLabelValues(fsw.ActualStore.GetName(), "prefixList").Inc()
  172. start := time.Now()
  173. defer func() {
  174. stats.FilerStoreHistogram.WithLabelValues(fsw.ActualStore.GetName(), "prefixList").Observe(time.Since(start).Seconds())
  175. }()
  176. glog.V(4).Infof("ListDirectoryPrefixedEntries %s from %s prefix %s limit %d", dirPath, startFileName, prefix, limit)
  177. entries, err := fsw.ActualStore.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, prefix)
  178. if err == ErrUnsupportedListDirectoryPrefixed {
  179. entries, err = fsw.prefixFilterEntries(ctx, dirPath, startFileName, includeStartFile, limit, prefix)
  180. }
  181. if err != nil {
  182. return nil, err
  183. }
  184. for _, entry := range entries {
  185. fsw.maybeReadHardLink(ctx, entry)
  186. filer_pb.AfterEntryDeserialization(entry.Chunks)
  187. }
  188. return entries, nil
  189. }
  190. func (fsw *FilerStoreWrapper) prefixFilterEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int, prefix string) (entries []*Entry, err error) {
  191. entries, err = fsw.ActualStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit)
  192. if err != nil {
  193. return nil, err
  194. }
  195. if prefix == "" {
  196. return
  197. }
  198. count := 0
  199. var lastFileName string
  200. notPrefixed := entries
  201. entries = nil
  202. for count < limit && len(notPrefixed) > 0 {
  203. for _, entry := range notPrefixed {
  204. lastFileName = entry.Name()
  205. if strings.HasPrefix(entry.Name(), prefix) {
  206. count++
  207. entries = append(entries, entry)
  208. if count >= limit {
  209. break
  210. }
  211. }
  212. }
  213. if count < limit {
  214. notPrefixed, err = fsw.ActualStore.ListDirectoryEntries(ctx, dirPath, lastFileName, false, limit)
  215. if err != nil {
  216. return
  217. }
  218. }
  219. }
  220. return
  221. }
  222. func (fsw *FilerStoreWrapper) BeginTransaction(ctx context.Context) (context.Context, error) {
  223. return fsw.ActualStore.BeginTransaction(ctx)
  224. }
  225. func (fsw *FilerStoreWrapper) CommitTransaction(ctx context.Context) error {
  226. return fsw.ActualStore.CommitTransaction(ctx)
  227. }
  228. func (fsw *FilerStoreWrapper) RollbackTransaction(ctx context.Context) error {
  229. return fsw.ActualStore.RollbackTransaction(ctx)
  230. }
  231. func (fsw *FilerStoreWrapper) Shutdown() {
  232. fsw.ActualStore.Shutdown()
  233. }
  234. func (fsw *FilerStoreWrapper) KvPut(ctx context.Context, key []byte, value []byte) (err error) {
  235. return fsw.ActualStore.KvPut(ctx, key, value)
  236. }
  237. func (fsw *FilerStoreWrapper) KvGet(ctx context.Context, key []byte) (value []byte, err error) {
  238. return fsw.ActualStore.KvGet(ctx, key)
  239. }
  240. func (fsw *FilerStoreWrapper) KvDelete(ctx context.Context, key []byte) (err error) {
  241. return fsw.ActualStore.KvDelete(ctx, key)
  242. }