You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

329 lines
11 KiB

7 years ago
4 years ago
7 years ago
7 years ago
5 years ago
4 years ago
6 years ago
6 years ago
4 years ago
6 years ago
6 years ago
4 years ago
5 years ago
6 years ago
6 years ago
4 years ago
6 years ago
5 years ago
6 years ago
6 years ago
4 years ago
4 years ago
4 years ago
4 years ago
5 years ago
4 years ago
5 years ago
6 years ago
6 years ago
4 years ago
4 years ago
  1. package filer
  2. import (
  3. "context"
  4. "errors"
  5. "github.com/chrislusf/seaweedfs/weed/glog"
  6. "github.com/viant/ptrie"
  7. "strings"
  8. "time"
  9. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  10. "github.com/chrislusf/seaweedfs/weed/stats"
  11. "github.com/chrislusf/seaweedfs/weed/util"
  12. )
  13. var (
  14. ErrUnsupportedListDirectoryPrefixed = errors.New("unsupported directory prefix listing")
  15. ErrKvNotImplemented = errors.New("kv not implemented yet")
  16. ErrKvNotFound = errors.New("kv: not found")
  17. _ = VirtualFilerStore(&FilerStoreWrapper{})
  18. )
  19. type FilerStore interface {
  20. // GetName gets the name to locate the configuration in filer.toml file
  21. GetName() string
  22. // Initialize initializes the file store
  23. Initialize(configuration util.Configuration, prefix string) error
  24. InsertEntry(context.Context, *Entry) error
  25. UpdateEntry(context.Context, *Entry) (err error)
  26. // err == filer_pb.ErrNotFound if not found
  27. FindEntry(context.Context, util.FullPath) (entry *Entry, err error)
  28. DeleteEntry(context.Context, util.FullPath) (err error)
  29. DeleteFolderChildren(context.Context, util.FullPath) (err error)
  30. ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int) ([]*Entry, error)
  31. ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int, prefix string) ([]*Entry, error)
  32. BeginTransaction(ctx context.Context) (context.Context, error)
  33. CommitTransaction(ctx context.Context) error
  34. RollbackTransaction(ctx context.Context) error
  35. KvPut(ctx context.Context, key []byte, value []byte) (err error)
  36. KvGet(ctx context.Context, key []byte) (value []byte, err error)
  37. KvDelete(ctx context.Context, key []byte) (err error)
  38. Shutdown()
  39. }
  40. type VirtualFilerStore interface {
  41. FilerStore
  42. DeleteHardLink(ctx context.Context, hardLinkId HardLinkId) error
  43. DeleteOneEntry(ctx context.Context, entry *Entry) error
  44. AddPathSpecificStore(path string, storeId string, store FilerStore)
  45. }
  46. type FilerStoreWrapper struct {
  47. defaultStore FilerStore
  48. pathToStore ptrie.Trie
  49. storeIdToStore map[string]FilerStore
  50. }
  51. func NewFilerStoreWrapper(store FilerStore) *FilerStoreWrapper {
  52. if innerStore, ok := store.(*FilerStoreWrapper); ok {
  53. return innerStore
  54. }
  55. return &FilerStoreWrapper{
  56. defaultStore: store,
  57. pathToStore: ptrie.New(),
  58. storeIdToStore: make(map[string]FilerStore),
  59. }
  60. }
  61. func (fsw *FilerStoreWrapper) AddPathSpecificStore(path string, storeId string, store FilerStore) {
  62. fsw.storeIdToStore[storeId] = store
  63. err := fsw.pathToStore.Put([]byte(path), storeId)
  64. if err != nil {
  65. glog.Fatalf("put path specific store: %v", err)
  66. }
  67. }
  68. func (fsw *FilerStoreWrapper) getActualStore(path util.FullPath) (store FilerStore) {
  69. store = fsw.defaultStore
  70. if path == "/" {
  71. return
  72. }
  73. var storeId string
  74. fsw.pathToStore.MatchPrefix([]byte(path), func(key []byte, value interface{}) bool {
  75. storeId = value.(string)
  76. return false
  77. })
  78. if storeId != "" {
  79. store = fsw.storeIdToStore[storeId]
  80. }
  81. return
  82. }
  83. func (fsw *FilerStoreWrapper) getDefaultStore() (store FilerStore) {
  84. return fsw.defaultStore
  85. }
  86. func (fsw *FilerStoreWrapper) GetName() string {
  87. return fsw.getDefaultStore().GetName()
  88. }
  89. func (fsw *FilerStoreWrapper) Initialize(configuration util.Configuration, prefix string) error {
  90. return fsw.getDefaultStore().Initialize(configuration, prefix)
  91. }
  92. func (fsw *FilerStoreWrapper) InsertEntry(ctx context.Context, entry *Entry) error {
  93. actualStore := fsw.getActualStore(entry.FullPath)
  94. stats.FilerStoreCounter.WithLabelValues(actualStore.GetName(), "insert").Inc()
  95. start := time.Now()
  96. defer func() {
  97. stats.FilerStoreHistogram.WithLabelValues(actualStore.GetName(), "insert").Observe(time.Since(start).Seconds())
  98. }()
  99. filer_pb.BeforeEntrySerialization(entry.Chunks)
  100. if entry.Mime == "application/octet-stream" {
  101. entry.Mime = ""
  102. }
  103. if err := fsw.handleUpdateToHardLinks(ctx, entry); err != nil {
  104. return err
  105. }
  106. glog.V(4).Infof("InsertEntry %s", entry.FullPath)
  107. return actualStore.InsertEntry(ctx, entry)
  108. }
  109. func (fsw *FilerStoreWrapper) UpdateEntry(ctx context.Context, entry *Entry) error {
  110. actualStore := fsw.getActualStore(entry.FullPath)
  111. stats.FilerStoreCounter.WithLabelValues(actualStore.GetName(), "update").Inc()
  112. start := time.Now()
  113. defer func() {
  114. stats.FilerStoreHistogram.WithLabelValues(actualStore.GetName(), "update").Observe(time.Since(start).Seconds())
  115. }()
  116. filer_pb.BeforeEntrySerialization(entry.Chunks)
  117. if entry.Mime == "application/octet-stream" {
  118. entry.Mime = ""
  119. }
  120. if err := fsw.handleUpdateToHardLinks(ctx, entry); err != nil {
  121. return err
  122. }
  123. glog.V(4).Infof("UpdateEntry %s", entry.FullPath)
  124. return actualStore.UpdateEntry(ctx, entry)
  125. }
  126. func (fsw *FilerStoreWrapper) FindEntry(ctx context.Context, fp util.FullPath) (entry *Entry, err error) {
  127. actualStore := fsw.getActualStore(fp)
  128. stats.FilerStoreCounter.WithLabelValues(actualStore.GetName(), "find").Inc()
  129. start := time.Now()
  130. defer func() {
  131. stats.FilerStoreHistogram.WithLabelValues(actualStore.GetName(), "find").Observe(time.Since(start).Seconds())
  132. }()
  133. glog.V(4).Infof("FindEntry %s", fp)
  134. entry, err = actualStore.FindEntry(ctx, fp)
  135. if err != nil {
  136. return nil, err
  137. }
  138. fsw.maybeReadHardLink(ctx, entry)
  139. filer_pb.AfterEntryDeserialization(entry.Chunks)
  140. return
  141. }
  142. func (fsw *FilerStoreWrapper) DeleteEntry(ctx context.Context, fp util.FullPath) (err error) {
  143. actualStore := fsw.getActualStore(fp)
  144. stats.FilerStoreCounter.WithLabelValues(actualStore.GetName(), "delete").Inc()
  145. start := time.Now()
  146. defer func() {
  147. stats.FilerStoreHistogram.WithLabelValues(actualStore.GetName(), "delete").Observe(time.Since(start).Seconds())
  148. }()
  149. existingEntry, findErr := fsw.FindEntry(ctx, fp)
  150. if findErr == filer_pb.ErrNotFound {
  151. return nil
  152. }
  153. if len(existingEntry.HardLinkId) != 0 {
  154. // remove hard link
  155. glog.V(4).Infof("DeleteHardLink %s", existingEntry.FullPath)
  156. if err = fsw.DeleteHardLink(ctx, existingEntry.HardLinkId); err != nil {
  157. return err
  158. }
  159. }
  160. glog.V(4).Infof("DeleteEntry %s", fp)
  161. return actualStore.DeleteEntry(ctx, fp)
  162. }
  163. func (fsw *FilerStoreWrapper) DeleteOneEntry(ctx context.Context, existingEntry *Entry) (err error) {
  164. actualStore := fsw.getActualStore(existingEntry.FullPath)
  165. stats.FilerStoreCounter.WithLabelValues(actualStore.GetName(), "delete").Inc()
  166. start := time.Now()
  167. defer func() {
  168. stats.FilerStoreHistogram.WithLabelValues(actualStore.GetName(), "delete").Observe(time.Since(start).Seconds())
  169. }()
  170. if len(existingEntry.HardLinkId) != 0 {
  171. // remove hard link
  172. glog.V(4).Infof("DeleteHardLink %s", existingEntry.FullPath)
  173. if err = fsw.DeleteHardLink(ctx, existingEntry.HardLinkId); err != nil {
  174. return err
  175. }
  176. }
  177. glog.V(4).Infof("DeleteOneEntry %s", existingEntry.FullPath)
  178. return actualStore.DeleteEntry(ctx, existingEntry.FullPath)
  179. }
  180. func (fsw *FilerStoreWrapper) DeleteFolderChildren(ctx context.Context, fp util.FullPath) (err error) {
  181. actualStore := fsw.getActualStore(fp)
  182. stats.FilerStoreCounter.WithLabelValues(actualStore.GetName(), "deleteFolderChildren").Inc()
  183. start := time.Now()
  184. defer func() {
  185. stats.FilerStoreHistogram.WithLabelValues(actualStore.GetName(), "deleteFolderChildren").Observe(time.Since(start).Seconds())
  186. }()
  187. glog.V(4).Infof("DeleteFolderChildren %s", fp)
  188. return actualStore.DeleteFolderChildren(ctx, fp)
  189. }
  190. func (fsw *FilerStoreWrapper) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int) ([]*Entry, error) {
  191. actualStore := fsw.getActualStore(dirPath)
  192. stats.FilerStoreCounter.WithLabelValues(actualStore.GetName(), "list").Inc()
  193. start := time.Now()
  194. defer func() {
  195. stats.FilerStoreHistogram.WithLabelValues(actualStore.GetName(), "list").Observe(time.Since(start).Seconds())
  196. }()
  197. glog.V(4).Infof("ListDirectoryEntries %s from %s limit %d", dirPath, startFileName, limit)
  198. entries, err := actualStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit)
  199. if err != nil {
  200. return nil, err
  201. }
  202. for _, entry := range entries {
  203. fsw.maybeReadHardLink(ctx, entry)
  204. filer_pb.AfterEntryDeserialization(entry.Chunks)
  205. }
  206. return entries, err
  207. }
  208. func (fsw *FilerStoreWrapper) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int, prefix string) ([]*Entry, error) {
  209. actualStore := fsw.getActualStore(dirPath)
  210. stats.FilerStoreCounter.WithLabelValues(actualStore.GetName(), "prefixList").Inc()
  211. start := time.Now()
  212. defer func() {
  213. stats.FilerStoreHistogram.WithLabelValues(actualStore.GetName(), "prefixList").Observe(time.Since(start).Seconds())
  214. }()
  215. glog.V(4).Infof("ListDirectoryPrefixedEntries %s from %s prefix %s limit %d", dirPath, startFileName, prefix, limit)
  216. entries, err := actualStore.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, prefix)
  217. if err == ErrUnsupportedListDirectoryPrefixed {
  218. entries, err = fsw.prefixFilterEntries(ctx, dirPath, startFileName, includeStartFile, limit, prefix)
  219. }
  220. if err != nil {
  221. return nil, err
  222. }
  223. for _, entry := range entries {
  224. fsw.maybeReadHardLink(ctx, entry)
  225. filer_pb.AfterEntryDeserialization(entry.Chunks)
  226. }
  227. return entries, nil
  228. }
  229. func (fsw *FilerStoreWrapper) prefixFilterEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int, prefix string) (entries []*Entry, err error) {
  230. actualStore := fsw.getActualStore(dirPath)
  231. entries, err = actualStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit)
  232. if err != nil {
  233. return nil, err
  234. }
  235. if prefix == "" {
  236. return
  237. }
  238. count := 0
  239. var lastFileName string
  240. notPrefixed := entries
  241. entries = nil
  242. for count < limit && len(notPrefixed) > 0 {
  243. for _, entry := range notPrefixed {
  244. lastFileName = entry.Name()
  245. if strings.HasPrefix(entry.Name(), prefix) {
  246. count++
  247. entries = append(entries, entry)
  248. if count >= limit {
  249. break
  250. }
  251. }
  252. }
  253. if count < limit {
  254. notPrefixed, err = actualStore.ListDirectoryEntries(ctx, dirPath, lastFileName, false, limit)
  255. if err != nil {
  256. return
  257. }
  258. }
  259. }
  260. return
  261. }
  262. func (fsw *FilerStoreWrapper) BeginTransaction(ctx context.Context) (context.Context, error) {
  263. return fsw.getDefaultStore().BeginTransaction(ctx)
  264. }
  265. func (fsw *FilerStoreWrapper) CommitTransaction(ctx context.Context) error {
  266. return fsw.getDefaultStore().CommitTransaction(ctx)
  267. }
  268. func (fsw *FilerStoreWrapper) RollbackTransaction(ctx context.Context) error {
  269. return fsw.getDefaultStore().RollbackTransaction(ctx)
  270. }
  271. func (fsw *FilerStoreWrapper) Shutdown() {
  272. fsw.getDefaultStore().Shutdown()
  273. }
  274. func (fsw *FilerStoreWrapper) KvPut(ctx context.Context, key []byte, value []byte) (err error) {
  275. return fsw.getDefaultStore().KvPut(ctx, key, value)
  276. }
  277. func (fsw *FilerStoreWrapper) KvGet(ctx context.Context, key []byte) (value []byte, err error) {
  278. return fsw.getDefaultStore().KvGet(ctx, key)
  279. }
  280. func (fsw *FilerStoreWrapper) KvDelete(ctx context.Context, key []byte) (err error) {
  281. return fsw.getDefaultStore().KvDelete(ctx, key)
  282. }