You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

284 lines
9.5 KiB

7 years ago
4 years ago
7 years ago
7 years ago
5 years ago
6 years ago
6 years ago
4 years ago
6 years ago
6 years ago
4 years ago
5 years ago
6 years ago
6 years ago
4 years ago
6 years ago
5 years ago
6 years ago
6 years ago
4 years ago
4 years ago
4 years ago
4 years ago
5 years ago
4 years ago
5 years ago
6 years ago
6 years ago
4 years ago
4 years ago
4 years ago
4 years ago
  1. package filer
  2. import (
  3. "context"
  4. "errors"
  5. "github.com/chrislusf/seaweedfs/weed/glog"
  6. "strings"
  7. "time"
  8. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  9. "github.com/chrislusf/seaweedfs/weed/stats"
  10. "github.com/chrislusf/seaweedfs/weed/util"
  11. )
  12. var (
  13. ErrUnsupportedListDirectoryPrefixed = errors.New("unsupported directory prefix listing")
  14. ErrKvNotImplemented = errors.New("kv not implemented yet")
  15. ErrKvNotFound = errors.New("kv: not found")
  16. )
  17. type FilerStore interface {
  18. // GetName gets the name to locate the configuration in filer.toml file
  19. GetName() string
  20. // Initialize initializes the file store
  21. Initialize(configuration util.Configuration, prefix string) error
  22. InsertEntry(context.Context, *Entry) error
  23. UpdateEntry(context.Context, *Entry) (err error)
  24. // err == filer_pb.ErrNotFound if not found
  25. FindEntry(context.Context, util.FullPath) (entry *Entry, err error)
  26. DeleteEntry(context.Context, util.FullPath) (err error)
  27. DeleteFolderChildren(context.Context, util.FullPath) (err error)
  28. ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int) ([]*Entry, error)
  29. ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int, prefix string) ([]*Entry, error)
  30. BeginTransaction(ctx context.Context) (context.Context, error)
  31. CommitTransaction(ctx context.Context) error
  32. RollbackTransaction(ctx context.Context) error
  33. KvPut(ctx context.Context, key []byte, value []byte) (err error)
  34. KvGet(ctx context.Context, key []byte) (value []byte, err error)
  35. KvDelete(ctx context.Context, key []byte) (err error)
  36. Shutdown()
  37. }
  38. type VirtualFilerStore interface {
  39. FilerStore
  40. DeleteHardLink(ctx context.Context, hardLinkId HardLinkId) error
  41. DeleteOneEntry(ctx context.Context, entry *Entry) error
  42. }
  43. type FilerStoreWrapper struct {
  44. ActualStore FilerStore
  45. }
  46. func NewFilerStoreWrapper(store FilerStore) *FilerStoreWrapper {
  47. if innerStore, ok := store.(*FilerStoreWrapper); ok {
  48. return innerStore
  49. }
  50. return &FilerStoreWrapper{
  51. ActualStore: store,
  52. }
  53. }
  54. func (fsw *FilerStoreWrapper) GetName() string {
  55. return fsw.ActualStore.GetName()
  56. }
  57. func (fsw *FilerStoreWrapper) Initialize(configuration util.Configuration, prefix string) error {
  58. return fsw.ActualStore.Initialize(configuration, prefix)
  59. }
  60. func (fsw *FilerStoreWrapper) InsertEntry(ctx context.Context, entry *Entry) error {
  61. stats.FilerStoreCounter.WithLabelValues(fsw.ActualStore.GetName(), "insert").Inc()
  62. start := time.Now()
  63. defer func() {
  64. stats.FilerStoreHistogram.WithLabelValues(fsw.ActualStore.GetName(), "insert").Observe(time.Since(start).Seconds())
  65. }()
  66. filer_pb.BeforeEntrySerialization(entry.Chunks)
  67. if entry.Mime == "application/octet-stream" {
  68. entry.Mime = ""
  69. }
  70. if err := fsw.handleUpdateToHardLinks(ctx, entry); err != nil {
  71. return err
  72. }
  73. glog.V(4).Infof("InsertEntry %s", entry.FullPath)
  74. return fsw.ActualStore.InsertEntry(ctx, entry)
  75. }
  76. func (fsw *FilerStoreWrapper) UpdateEntry(ctx context.Context, entry *Entry) error {
  77. stats.FilerStoreCounter.WithLabelValues(fsw.ActualStore.GetName(), "update").Inc()
  78. start := time.Now()
  79. defer func() {
  80. stats.FilerStoreHistogram.WithLabelValues(fsw.ActualStore.GetName(), "update").Observe(time.Since(start).Seconds())
  81. }()
  82. filer_pb.BeforeEntrySerialization(entry.Chunks)
  83. if entry.Mime == "application/octet-stream" {
  84. entry.Mime = ""
  85. }
  86. if err := fsw.handleUpdateToHardLinks(ctx, entry); err != nil {
  87. return err
  88. }
  89. glog.V(4).Infof("UpdateEntry %s", entry.FullPath)
  90. return fsw.ActualStore.UpdateEntry(ctx, entry)
  91. }
  92. func (fsw *FilerStoreWrapper) FindEntry(ctx context.Context, fp util.FullPath) (entry *Entry, err error) {
  93. stats.FilerStoreCounter.WithLabelValues(fsw.ActualStore.GetName(), "find").Inc()
  94. start := time.Now()
  95. defer func() {
  96. stats.FilerStoreHistogram.WithLabelValues(fsw.ActualStore.GetName(), "find").Observe(time.Since(start).Seconds())
  97. }()
  98. glog.V(4).Infof("FindEntry %s", fp)
  99. entry, err = fsw.ActualStore.FindEntry(ctx, fp)
  100. if err != nil {
  101. return nil, err
  102. }
  103. fsw.maybeReadHardLink(ctx, entry)
  104. filer_pb.AfterEntryDeserialization(entry.Chunks)
  105. return
  106. }
  107. func (fsw *FilerStoreWrapper) DeleteEntry(ctx context.Context, fp util.FullPath) (err error) {
  108. stats.FilerStoreCounter.WithLabelValues(fsw.ActualStore.GetName(), "delete").Inc()
  109. start := time.Now()
  110. defer func() {
  111. stats.FilerStoreHistogram.WithLabelValues(fsw.ActualStore.GetName(), "delete").Observe(time.Since(start).Seconds())
  112. }()
  113. existingEntry, findErr := fsw.FindEntry(ctx, fp)
  114. if findErr == filer_pb.ErrNotFound {
  115. return nil
  116. }
  117. if len(existingEntry.HardLinkId) != 0 {
  118. // remove hard link
  119. glog.V(4).Infof("DeleteHardLink %s", existingEntry.FullPath)
  120. if err = fsw.DeleteHardLink(ctx, existingEntry.HardLinkId); err != nil {
  121. return err
  122. }
  123. }
  124. glog.V(4).Infof("DeleteEntry %s", fp)
  125. return fsw.ActualStore.DeleteEntry(ctx, fp)
  126. }
  127. func (fsw *FilerStoreWrapper) DeleteOneEntry(ctx context.Context, existingEntry *Entry) (err error) {
  128. stats.FilerStoreCounter.WithLabelValues(fsw.ActualStore.GetName(), "delete").Inc()
  129. start := time.Now()
  130. defer func() {
  131. stats.FilerStoreHistogram.WithLabelValues(fsw.ActualStore.GetName(), "delete").Observe(time.Since(start).Seconds())
  132. }()
  133. if len(existingEntry.HardLinkId) != 0 {
  134. // remove hard link
  135. glog.V(4).Infof("DeleteHardLink %s", existingEntry.FullPath)
  136. if err = fsw.DeleteHardLink(ctx, existingEntry.HardLinkId); err != nil {
  137. return err
  138. }
  139. }
  140. glog.V(4).Infof("DeleteOneEntry %s", existingEntry.FullPath)
  141. return fsw.ActualStore.DeleteEntry(ctx, existingEntry.FullPath)
  142. }
  143. func (fsw *FilerStoreWrapper) DeleteFolderChildren(ctx context.Context, fp util.FullPath) (err error) {
  144. stats.FilerStoreCounter.WithLabelValues(fsw.ActualStore.GetName(), "deleteFolderChildren").Inc()
  145. start := time.Now()
  146. defer func() {
  147. stats.FilerStoreHistogram.WithLabelValues(fsw.ActualStore.GetName(), "deleteFolderChildren").Observe(time.Since(start).Seconds())
  148. }()
  149. glog.V(4).Infof("DeleteFolderChildren %s", fp)
  150. return fsw.ActualStore.DeleteFolderChildren(ctx, fp)
  151. }
  152. func (fsw *FilerStoreWrapper) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int) ([]*Entry, error) {
  153. stats.FilerStoreCounter.WithLabelValues(fsw.ActualStore.GetName(), "list").Inc()
  154. start := time.Now()
  155. defer func() {
  156. stats.FilerStoreHistogram.WithLabelValues(fsw.ActualStore.GetName(), "list").Observe(time.Since(start).Seconds())
  157. }()
  158. glog.V(4).Infof("ListDirectoryEntries %s from %s limit %d", dirPath, startFileName, limit)
  159. entries, err := fsw.ActualStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit)
  160. if err != nil {
  161. return nil, err
  162. }
  163. for _, entry := range entries {
  164. fsw.maybeReadHardLink(ctx, entry)
  165. filer_pb.AfterEntryDeserialization(entry.Chunks)
  166. }
  167. return entries, err
  168. }
  169. func (fsw *FilerStoreWrapper) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int, prefix string) ([]*Entry, error) {
  170. stats.FilerStoreCounter.WithLabelValues(fsw.ActualStore.GetName(), "prefixList").Inc()
  171. start := time.Now()
  172. defer func() {
  173. stats.FilerStoreHistogram.WithLabelValues(fsw.ActualStore.GetName(), "prefixList").Observe(time.Since(start).Seconds())
  174. }()
  175. glog.V(4).Infof("ListDirectoryPrefixedEntries %s from %s prefix %s limit %d", dirPath, startFileName, prefix, limit)
  176. entries, err := fsw.ActualStore.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, prefix)
  177. if err == ErrUnsupportedListDirectoryPrefixed {
  178. entries, err = fsw.prefixFilterEntries(ctx, dirPath, startFileName, includeStartFile, limit, prefix)
  179. }
  180. if err != nil {
  181. return nil, err
  182. }
  183. for _, entry := range entries {
  184. fsw.maybeReadHardLink(ctx, entry)
  185. filer_pb.AfterEntryDeserialization(entry.Chunks)
  186. }
  187. return entries, nil
  188. }
  189. func (fsw *FilerStoreWrapper) prefixFilterEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int, prefix string) (entries []*Entry, err error) {
  190. entries, err = fsw.ActualStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit)
  191. if err != nil {
  192. return nil, err
  193. }
  194. if prefix == "" {
  195. return
  196. }
  197. count := 0
  198. var lastFileName string
  199. notPrefixed := entries
  200. entries = nil
  201. for count < limit && len(notPrefixed) > 0 {
  202. for _, entry := range notPrefixed {
  203. lastFileName = entry.Name()
  204. if strings.HasPrefix(entry.Name(), prefix) {
  205. count++
  206. entries = append(entries, entry)
  207. if count >= limit {
  208. break
  209. }
  210. }
  211. }
  212. if count < limit {
  213. notPrefixed, err = fsw.ActualStore.ListDirectoryEntries(ctx, dirPath, lastFileName, false, limit)
  214. if err != nil {
  215. return
  216. }
  217. }
  218. }
  219. return
  220. }
  221. func (fsw *FilerStoreWrapper) BeginTransaction(ctx context.Context) (context.Context, error) {
  222. return fsw.ActualStore.BeginTransaction(ctx)
  223. }
  224. func (fsw *FilerStoreWrapper) CommitTransaction(ctx context.Context) error {
  225. return fsw.ActualStore.CommitTransaction(ctx)
  226. }
  227. func (fsw *FilerStoreWrapper) RollbackTransaction(ctx context.Context) error {
  228. return fsw.ActualStore.RollbackTransaction(ctx)
  229. }
  230. func (fsw *FilerStoreWrapper) Shutdown() {
  231. fsw.ActualStore.Shutdown()
  232. }
  233. func (fsw *FilerStoreWrapper) KvPut(ctx context.Context, key []byte, value []byte) (err error) {
  234. return fsw.ActualStore.KvPut(ctx, key, value)
  235. }
  236. func (fsw *FilerStoreWrapper) KvGet(ctx context.Context, key []byte) (value []byte, err error) {
  237. return fsw.ActualStore.KvGet(ctx, key)
  238. }
  239. func (fsw *FilerStoreWrapper) KvDelete(ctx context.Context, key []byte) (err error) {
  240. return fsw.ActualStore.KvDelete(ctx, key)
  241. }