You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

290 lines
9.9 KiB

7 years ago
4 years ago
7 years ago
7 years ago
5 years ago
4 years ago
4 years ago
4 years ago
4 years ago
6 years ago
4 years ago
6 years ago
4 years ago
4 years ago
4 years ago
6 years ago
4 years ago
6 years ago
4 years ago
4 years ago
5 years ago
4 years ago
6 years ago
4 years ago
6 years ago
4 years ago
4 years ago
6 years ago
5 years ago
4 years ago
6 years ago
4 years ago
6 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
5 years ago
4 years ago
4 years ago
4 years ago
4 years ago
5 years ago
4 years ago
6 years ago
4 years ago
6 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
  1. package filer
  2. import (
  3. "context"
  4. "errors"
  5. "github.com/chrislusf/seaweedfs/weed/glog"
  6. "strings"
  7. "time"
  8. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  9. "github.com/chrislusf/seaweedfs/weed/stats"
  10. "github.com/chrislusf/seaweedfs/weed/util"
  11. )
  12. var (
  13. ErrUnsupportedListDirectoryPrefixed = errors.New("unsupported directory prefix listing")
  14. ErrKvNotImplemented = errors.New("kv not implemented yet")
  15. ErrKvNotFound = errors.New("kv: not found")
  16. _ = VirtualFilerStore(&FilerStoreWrapper{})
  17. )
  18. type FilerStore interface {
  19. // GetName gets the name to locate the configuration in filer.toml file
  20. GetName() string
  21. // Initialize initializes the file store
  22. Initialize(configuration util.Configuration, prefix string) error
  23. InsertEntry(context.Context, *Entry) error
  24. UpdateEntry(context.Context, *Entry) (err error)
  25. // err == filer_pb.ErrNotFound if not found
  26. FindEntry(context.Context, util.FullPath) (entry *Entry, err error)
  27. DeleteEntry(context.Context, util.FullPath) (err error)
  28. DeleteFolderChildren(context.Context, util.FullPath) (err error)
  29. ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int) ([]*Entry, error)
  30. ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int, prefix string) ([]*Entry, error)
  31. BeginTransaction(ctx context.Context) (context.Context, error)
  32. CommitTransaction(ctx context.Context) error
  33. RollbackTransaction(ctx context.Context) error
  34. KvPut(ctx context.Context, key []byte, value []byte) (err error)
  35. KvGet(ctx context.Context, key []byte) (value []byte, err error)
  36. KvDelete(ctx context.Context, key []byte) (err error)
  37. Shutdown()
  38. }
  39. type VirtualFilerStore interface {
  40. FilerStore
  41. DeleteHardLink(ctx context.Context, hardLinkId HardLinkId) error
  42. DeleteOneEntry(ctx context.Context, entry *Entry) error
  43. }
  44. type FilerStoreWrapper struct {
  45. ActualStore FilerStore
  46. }
  47. func NewFilerStoreWrapper(store FilerStore) *FilerStoreWrapper {
  48. if innerStore, ok := store.(*FilerStoreWrapper); ok {
  49. return innerStore
  50. }
  51. return &FilerStoreWrapper{
  52. ActualStore: store,
  53. }
  54. }
  55. func (fsw *FilerStoreWrapper) getActualStore(path string) FilerStore {
  56. return fsw.ActualStore
  57. }
  58. func (fsw *FilerStoreWrapper) GetName() string {
  59. return fsw.getActualStore("").GetName()
  60. }
  61. func (fsw *FilerStoreWrapper) Initialize(configuration util.Configuration, prefix string) error {
  62. return fsw.getActualStore("").Initialize(configuration, prefix)
  63. }
  64. func (fsw *FilerStoreWrapper) InsertEntry(ctx context.Context, entry *Entry) error {
  65. stats.FilerStoreCounter.WithLabelValues(fsw.getActualStore("").GetName(), "insert").Inc()
  66. start := time.Now()
  67. defer func() {
  68. stats.FilerStoreHistogram.WithLabelValues(fsw.getActualStore("").GetName(), "insert").Observe(time.Since(start).Seconds())
  69. }()
  70. filer_pb.BeforeEntrySerialization(entry.Chunks)
  71. if entry.Mime == "application/octet-stream" {
  72. entry.Mime = ""
  73. }
  74. if err := fsw.handleUpdateToHardLinks(ctx, entry); err != nil {
  75. return err
  76. }
  77. glog.V(4).Infof("InsertEntry %s", entry.FullPath)
  78. return fsw.getActualStore("").InsertEntry(ctx, entry)
  79. }
  80. func (fsw *FilerStoreWrapper) UpdateEntry(ctx context.Context, entry *Entry) error {
  81. stats.FilerStoreCounter.WithLabelValues(fsw.getActualStore("").GetName(), "update").Inc()
  82. start := time.Now()
  83. defer func() {
  84. stats.FilerStoreHistogram.WithLabelValues(fsw.getActualStore("").GetName(), "update").Observe(time.Since(start).Seconds())
  85. }()
  86. filer_pb.BeforeEntrySerialization(entry.Chunks)
  87. if entry.Mime == "application/octet-stream" {
  88. entry.Mime = ""
  89. }
  90. if err := fsw.handleUpdateToHardLinks(ctx, entry); err != nil {
  91. return err
  92. }
  93. glog.V(4).Infof("UpdateEntry %s", entry.FullPath)
  94. return fsw.getActualStore("").UpdateEntry(ctx, entry)
  95. }
  96. func (fsw *FilerStoreWrapper) FindEntry(ctx context.Context, fp util.FullPath) (entry *Entry, err error) {
  97. stats.FilerStoreCounter.WithLabelValues(fsw.getActualStore("").GetName(), "find").Inc()
  98. start := time.Now()
  99. defer func() {
  100. stats.FilerStoreHistogram.WithLabelValues(fsw.getActualStore("").GetName(), "find").Observe(time.Since(start).Seconds())
  101. }()
  102. glog.V(4).Infof("FindEntry %s", fp)
  103. entry, err = fsw.getActualStore("").FindEntry(ctx, fp)
  104. if err != nil {
  105. return nil, err
  106. }
  107. fsw.maybeReadHardLink(ctx, entry)
  108. filer_pb.AfterEntryDeserialization(entry.Chunks)
  109. return
  110. }
  111. func (fsw *FilerStoreWrapper) DeleteEntry(ctx context.Context, fp util.FullPath) (err error) {
  112. stats.FilerStoreCounter.WithLabelValues(fsw.getActualStore("").GetName(), "delete").Inc()
  113. start := time.Now()
  114. defer func() {
  115. stats.FilerStoreHistogram.WithLabelValues(fsw.getActualStore("").GetName(), "delete").Observe(time.Since(start).Seconds())
  116. }()
  117. existingEntry, findErr := fsw.FindEntry(ctx, fp)
  118. if findErr == filer_pb.ErrNotFound {
  119. return nil
  120. }
  121. if len(existingEntry.HardLinkId) != 0 {
  122. // remove hard link
  123. glog.V(4).Infof("DeleteHardLink %s", existingEntry.FullPath)
  124. if err = fsw.DeleteHardLink(ctx, existingEntry.HardLinkId); err != nil {
  125. return err
  126. }
  127. }
  128. glog.V(4).Infof("DeleteEntry %s", fp)
  129. return fsw.getActualStore("").DeleteEntry(ctx, fp)
  130. }
  131. func (fsw *FilerStoreWrapper) DeleteOneEntry(ctx context.Context, existingEntry *Entry) (err error) {
  132. stats.FilerStoreCounter.WithLabelValues(fsw.getActualStore("").GetName(), "delete").Inc()
  133. start := time.Now()
  134. defer func() {
  135. stats.FilerStoreHistogram.WithLabelValues(fsw.getActualStore("").GetName(), "delete").Observe(time.Since(start).Seconds())
  136. }()
  137. if len(existingEntry.HardLinkId) != 0 {
  138. // remove hard link
  139. glog.V(4).Infof("DeleteHardLink %s", existingEntry.FullPath)
  140. if err = fsw.DeleteHardLink(ctx, existingEntry.HardLinkId); err != nil {
  141. return err
  142. }
  143. }
  144. glog.V(4).Infof("DeleteOneEntry %s", existingEntry.FullPath)
  145. return fsw.getActualStore("").DeleteEntry(ctx, existingEntry.FullPath)
  146. }
  147. func (fsw *FilerStoreWrapper) DeleteFolderChildren(ctx context.Context, fp util.FullPath) (err error) {
  148. stats.FilerStoreCounter.WithLabelValues(fsw.getActualStore("").GetName(), "deleteFolderChildren").Inc()
  149. start := time.Now()
  150. defer func() {
  151. stats.FilerStoreHistogram.WithLabelValues(fsw.getActualStore("").GetName(), "deleteFolderChildren").Observe(time.Since(start).Seconds())
  152. }()
  153. glog.V(4).Infof("DeleteFolderChildren %s", fp)
  154. return fsw.getActualStore("").DeleteFolderChildren(ctx, fp)
  155. }
  156. func (fsw *FilerStoreWrapper) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int) ([]*Entry, error) {
  157. stats.FilerStoreCounter.WithLabelValues(fsw.getActualStore("").GetName(), "list").Inc()
  158. start := time.Now()
  159. defer func() {
  160. stats.FilerStoreHistogram.WithLabelValues(fsw.getActualStore("").GetName(), "list").Observe(time.Since(start).Seconds())
  161. }()
  162. glog.V(4).Infof("ListDirectoryEntries %s from %s limit %d", dirPath, startFileName, limit)
  163. entries, err := fsw.getActualStore("").ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit)
  164. if err != nil {
  165. return nil, err
  166. }
  167. for _, entry := range entries {
  168. fsw.maybeReadHardLink(ctx, entry)
  169. filer_pb.AfterEntryDeserialization(entry.Chunks)
  170. }
  171. return entries, err
  172. }
  173. func (fsw *FilerStoreWrapper) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int, prefix string) ([]*Entry, error) {
  174. stats.FilerStoreCounter.WithLabelValues(fsw.getActualStore("").GetName(), "prefixList").Inc()
  175. start := time.Now()
  176. defer func() {
  177. stats.FilerStoreHistogram.WithLabelValues(fsw.getActualStore("").GetName(), "prefixList").Observe(time.Since(start).Seconds())
  178. }()
  179. glog.V(4).Infof("ListDirectoryPrefixedEntries %s from %s prefix %s limit %d", dirPath, startFileName, prefix, limit)
  180. entries, err := fsw.getActualStore("").ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, prefix)
  181. if err == ErrUnsupportedListDirectoryPrefixed {
  182. entries, err = fsw.prefixFilterEntries(ctx, dirPath, startFileName, includeStartFile, limit, prefix)
  183. }
  184. if err != nil {
  185. return nil, err
  186. }
  187. for _, entry := range entries {
  188. fsw.maybeReadHardLink(ctx, entry)
  189. filer_pb.AfterEntryDeserialization(entry.Chunks)
  190. }
  191. return entries, nil
  192. }
  193. func (fsw *FilerStoreWrapper) prefixFilterEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int, prefix string) (entries []*Entry, err error) {
  194. entries, err = fsw.getActualStore("").ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit)
  195. if err != nil {
  196. return nil, err
  197. }
  198. if prefix == "" {
  199. return
  200. }
  201. count := 0
  202. var lastFileName string
  203. notPrefixed := entries
  204. entries = nil
  205. for count < limit && len(notPrefixed) > 0 {
  206. for _, entry := range notPrefixed {
  207. lastFileName = entry.Name()
  208. if strings.HasPrefix(entry.Name(), prefix) {
  209. count++
  210. entries = append(entries, entry)
  211. if count >= limit {
  212. break
  213. }
  214. }
  215. }
  216. if count < limit {
  217. notPrefixed, err = fsw.getActualStore("").ListDirectoryEntries(ctx, dirPath, lastFileName, false, limit)
  218. if err != nil {
  219. return
  220. }
  221. }
  222. }
  223. return
  224. }
  225. func (fsw *FilerStoreWrapper) BeginTransaction(ctx context.Context) (context.Context, error) {
  226. return fsw.getActualStore("").BeginTransaction(ctx)
  227. }
  228. func (fsw *FilerStoreWrapper) CommitTransaction(ctx context.Context) error {
  229. return fsw.getActualStore("").CommitTransaction(ctx)
  230. }
  231. func (fsw *FilerStoreWrapper) RollbackTransaction(ctx context.Context) error {
  232. return fsw.getActualStore("").RollbackTransaction(ctx)
  233. }
  234. func (fsw *FilerStoreWrapper) Shutdown() {
  235. fsw.getActualStore("").Shutdown()
  236. }
  237. func (fsw *FilerStoreWrapper) KvPut(ctx context.Context, key []byte, value []byte) (err error) {
  238. return fsw.getActualStore("").KvPut(ctx, key, value)
  239. }
  240. func (fsw *FilerStoreWrapper) KvGet(ctx context.Context, key []byte) (value []byte, err error) {
  241. return fsw.getActualStore("").KvGet(ctx, key)
  242. }
  243. func (fsw *FilerStoreWrapper) KvDelete(ctx context.Context, key []byte) (err error) {
  244. return fsw.getActualStore("").KvDelete(ctx, key)
  245. }