You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

325 lines
11 KiB

7 years ago
4 years ago
7 years ago
7 years ago
5 years ago
4 years ago
4 years ago
4 years ago
6 years ago
6 years ago
4 years ago
6 years ago
6 years ago
4 years ago
5 years ago
6 years ago
6 years ago
4 years ago
6 years ago
5 years ago
6 years ago
6 years ago
4 years ago
4 years ago
4 years ago
4 years ago
5 years ago
4 years ago
5 years ago
6 years ago
6 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
  1. package filer
  2. import (
  3. "context"
  4. "errors"
  5. "github.com/chrislusf/seaweedfs/weed/glog"
  6. "github.com/viant/ptrie"
  7. "strings"
  8. "time"
  9. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  10. "github.com/chrislusf/seaweedfs/weed/stats"
  11. "github.com/chrislusf/seaweedfs/weed/util"
  12. )
  13. var (
  14. ErrUnsupportedListDirectoryPrefixed = errors.New("unsupported directory prefix listing")
  15. ErrKvNotImplemented = errors.New("kv not implemented yet")
  16. ErrKvNotFound = errors.New("kv: not found")
  17. _ = VirtualFilerStore(&FilerStoreWrapper{})
  18. )
  19. type FilerStore interface {
  20. // GetName gets the name to locate the configuration in filer.toml file
  21. GetName() string
  22. // Initialize initializes the file store
  23. Initialize(configuration util.Configuration, prefix string) error
  24. InsertEntry(context.Context, *Entry) error
  25. UpdateEntry(context.Context, *Entry) (err error)
  26. // err == filer_pb.ErrNotFound if not found
  27. FindEntry(context.Context, util.FullPath) (entry *Entry, err error)
  28. DeleteEntry(context.Context, util.FullPath) (err error)
  29. DeleteFolderChildren(context.Context, util.FullPath) (err error)
  30. ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int) ([]*Entry, error)
  31. ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int, prefix string) ([]*Entry, error)
  32. BeginTransaction(ctx context.Context) (context.Context, error)
  33. CommitTransaction(ctx context.Context) error
  34. RollbackTransaction(ctx context.Context) error
  35. KvPut(ctx context.Context, key []byte, value []byte) (err error)
  36. KvGet(ctx context.Context, key []byte) (value []byte, err error)
  37. KvDelete(ctx context.Context, key []byte) (err error)
  38. Shutdown()
  39. }
  40. type VirtualFilerStore interface {
  41. FilerStore
  42. DeleteHardLink(ctx context.Context, hardLinkId HardLinkId) error
  43. DeleteOneEntry(ctx context.Context, entry *Entry) error
  44. AddPathSpecificStore(path string, storeId string, store FilerStore)
  45. }
  46. type FilerStoreWrapper struct {
  47. defaultStore FilerStore
  48. pathToStore ptrie.Trie
  49. storeIdToStore map[string]FilerStore
  50. }
  51. func NewFilerStoreWrapper(store FilerStore) *FilerStoreWrapper {
  52. if innerStore, ok := store.(*FilerStoreWrapper); ok {
  53. return innerStore
  54. }
  55. return &FilerStoreWrapper{
  56. defaultStore: store,
  57. pathToStore: ptrie.New(),
  58. storeIdToStore: make(map[string]FilerStore),
  59. }
  60. }
  61. func (fsw *FilerStoreWrapper) AddPathSpecificStore(path string, storeId string, store FilerStore) {
  62. fsw.storeIdToStore[storeId] = store
  63. err := fsw.pathToStore.Put([]byte(path), storeId)
  64. if err != nil {
  65. glog.Fatalf("put path specific store: %v", err)
  66. }
  67. }
  68. func (fsw *FilerStoreWrapper) getActualStore(path util.FullPath) (store FilerStore) {
  69. store = fsw.defaultStore
  70. if path == "" {
  71. return
  72. }
  73. var storeId string
  74. fsw.pathToStore.MatchPrefix([]byte(path), func(key []byte, value interface{}) bool {
  75. storeId = value.(string)
  76. return false
  77. })
  78. if storeId != "" {
  79. store = fsw.storeIdToStore[storeId]
  80. }
  81. return
  82. }
  83. func (fsw *FilerStoreWrapper) GetName() string {
  84. return fsw.getActualStore("").GetName()
  85. }
  86. func (fsw *FilerStoreWrapper) Initialize(configuration util.Configuration, prefix string) error {
  87. return fsw.getActualStore("").Initialize(configuration, prefix)
  88. }
  89. func (fsw *FilerStoreWrapper) InsertEntry(ctx context.Context, entry *Entry) error {
  90. actualStore := fsw.getActualStore(entry.FullPath)
  91. stats.FilerStoreCounter.WithLabelValues(actualStore.GetName(), "insert").Inc()
  92. start := time.Now()
  93. defer func() {
  94. stats.FilerStoreHistogram.WithLabelValues(actualStore.GetName(), "insert").Observe(time.Since(start).Seconds())
  95. }()
  96. filer_pb.BeforeEntrySerialization(entry.Chunks)
  97. if entry.Mime == "application/octet-stream" {
  98. entry.Mime = ""
  99. }
  100. if err := fsw.handleUpdateToHardLinks(ctx, entry); err != nil {
  101. return err
  102. }
  103. glog.V(4).Infof("InsertEntry %s", entry.FullPath)
  104. return actualStore.InsertEntry(ctx, entry)
  105. }
  106. func (fsw *FilerStoreWrapper) UpdateEntry(ctx context.Context, entry *Entry) error {
  107. actualStore := fsw.getActualStore(entry.FullPath)
  108. stats.FilerStoreCounter.WithLabelValues(actualStore.GetName(), "update").Inc()
  109. start := time.Now()
  110. defer func() {
  111. stats.FilerStoreHistogram.WithLabelValues(actualStore.GetName(), "update").Observe(time.Since(start).Seconds())
  112. }()
  113. filer_pb.BeforeEntrySerialization(entry.Chunks)
  114. if entry.Mime == "application/octet-stream" {
  115. entry.Mime = ""
  116. }
  117. if err := fsw.handleUpdateToHardLinks(ctx, entry); err != nil {
  118. return err
  119. }
  120. glog.V(4).Infof("UpdateEntry %s", entry.FullPath)
  121. return actualStore.UpdateEntry(ctx, entry)
  122. }
  123. func (fsw *FilerStoreWrapper) FindEntry(ctx context.Context, fp util.FullPath) (entry *Entry, err error) {
  124. actualStore := fsw.getActualStore(fp)
  125. stats.FilerStoreCounter.WithLabelValues(actualStore.GetName(), "find").Inc()
  126. start := time.Now()
  127. defer func() {
  128. stats.FilerStoreHistogram.WithLabelValues(actualStore.GetName(), "find").Observe(time.Since(start).Seconds())
  129. }()
  130. glog.V(4).Infof("FindEntry %s", fp)
  131. entry, err = actualStore.FindEntry(ctx, fp)
  132. if err != nil {
  133. return nil, err
  134. }
  135. fsw.maybeReadHardLink(ctx, entry)
  136. filer_pb.AfterEntryDeserialization(entry.Chunks)
  137. return
  138. }
  139. func (fsw *FilerStoreWrapper) DeleteEntry(ctx context.Context, fp util.FullPath) (err error) {
  140. actualStore := fsw.getActualStore(fp)
  141. stats.FilerStoreCounter.WithLabelValues(actualStore.GetName(), "delete").Inc()
  142. start := time.Now()
  143. defer func() {
  144. stats.FilerStoreHistogram.WithLabelValues(actualStore.GetName(), "delete").Observe(time.Since(start).Seconds())
  145. }()
  146. existingEntry, findErr := fsw.FindEntry(ctx, fp)
  147. if findErr == filer_pb.ErrNotFound {
  148. return nil
  149. }
  150. if len(existingEntry.HardLinkId) != 0 {
  151. // remove hard link
  152. glog.V(4).Infof("DeleteHardLink %s", existingEntry.FullPath)
  153. if err = fsw.DeleteHardLink(ctx, existingEntry.HardLinkId); err != nil {
  154. return err
  155. }
  156. }
  157. glog.V(4).Infof("DeleteEntry %s", fp)
  158. return actualStore.DeleteEntry(ctx, fp)
  159. }
  160. func (fsw *FilerStoreWrapper) DeleteOneEntry(ctx context.Context, existingEntry *Entry) (err error) {
  161. actualStore := fsw.getActualStore(existingEntry.FullPath)
  162. stats.FilerStoreCounter.WithLabelValues(actualStore.GetName(), "delete").Inc()
  163. start := time.Now()
  164. defer func() {
  165. stats.FilerStoreHistogram.WithLabelValues(actualStore.GetName(), "delete").Observe(time.Since(start).Seconds())
  166. }()
  167. if len(existingEntry.HardLinkId) != 0 {
  168. // remove hard link
  169. glog.V(4).Infof("DeleteHardLink %s", existingEntry.FullPath)
  170. if err = fsw.DeleteHardLink(ctx, existingEntry.HardLinkId); err != nil {
  171. return err
  172. }
  173. }
  174. glog.V(4).Infof("DeleteOneEntry %s", existingEntry.FullPath)
  175. return actualStore.DeleteEntry(ctx, existingEntry.FullPath)
  176. }
  177. func (fsw *FilerStoreWrapper) DeleteFolderChildren(ctx context.Context, fp util.FullPath) (err error) {
  178. actualStore := fsw.getActualStore(fp)
  179. stats.FilerStoreCounter.WithLabelValues(actualStore.GetName(), "deleteFolderChildren").Inc()
  180. start := time.Now()
  181. defer func() {
  182. stats.FilerStoreHistogram.WithLabelValues(actualStore.GetName(), "deleteFolderChildren").Observe(time.Since(start).Seconds())
  183. }()
  184. glog.V(4).Infof("DeleteFolderChildren %s", fp)
  185. return actualStore.DeleteFolderChildren(ctx, fp)
  186. }
  187. func (fsw *FilerStoreWrapper) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int) ([]*Entry, error) {
  188. actualStore := fsw.getActualStore(dirPath)
  189. stats.FilerStoreCounter.WithLabelValues(actualStore.GetName(), "list").Inc()
  190. start := time.Now()
  191. defer func() {
  192. stats.FilerStoreHistogram.WithLabelValues(actualStore.GetName(), "list").Observe(time.Since(start).Seconds())
  193. }()
  194. glog.V(4).Infof("ListDirectoryEntries %s from %s limit %d", dirPath, startFileName, limit)
  195. entries, err := actualStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit)
  196. if err != nil {
  197. return nil, err
  198. }
  199. for _, entry := range entries {
  200. fsw.maybeReadHardLink(ctx, entry)
  201. filer_pb.AfterEntryDeserialization(entry.Chunks)
  202. }
  203. return entries, err
  204. }
  205. func (fsw *FilerStoreWrapper) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int, prefix string) ([]*Entry, error) {
  206. actualStore := fsw.getActualStore(dirPath)
  207. stats.FilerStoreCounter.WithLabelValues(actualStore.GetName(), "prefixList").Inc()
  208. start := time.Now()
  209. defer func() {
  210. stats.FilerStoreHistogram.WithLabelValues(actualStore.GetName(), "prefixList").Observe(time.Since(start).Seconds())
  211. }()
  212. glog.V(4).Infof("ListDirectoryPrefixedEntries %s from %s prefix %s limit %d", dirPath, startFileName, prefix, limit)
  213. entries, err := actualStore.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, prefix)
  214. if err == ErrUnsupportedListDirectoryPrefixed {
  215. entries, err = fsw.prefixFilterEntries(ctx, dirPath, startFileName, includeStartFile, limit, prefix)
  216. }
  217. if err != nil {
  218. return nil, err
  219. }
  220. for _, entry := range entries {
  221. fsw.maybeReadHardLink(ctx, entry)
  222. filer_pb.AfterEntryDeserialization(entry.Chunks)
  223. }
  224. return entries, nil
  225. }
  226. func (fsw *FilerStoreWrapper) prefixFilterEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int, prefix string) (entries []*Entry, err error) {
  227. actualStore := fsw.getActualStore(dirPath)
  228. entries, err = actualStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit)
  229. if err != nil {
  230. return nil, err
  231. }
  232. if prefix == "" {
  233. return
  234. }
  235. count := 0
  236. var lastFileName string
  237. notPrefixed := entries
  238. entries = nil
  239. for count < limit && len(notPrefixed) > 0 {
  240. for _, entry := range notPrefixed {
  241. lastFileName = entry.Name()
  242. if strings.HasPrefix(entry.Name(), prefix) {
  243. count++
  244. entries = append(entries, entry)
  245. if count >= limit {
  246. break
  247. }
  248. }
  249. }
  250. if count < limit {
  251. notPrefixed, err = actualStore.ListDirectoryEntries(ctx, dirPath, lastFileName, false, limit)
  252. if err != nil {
  253. return
  254. }
  255. }
  256. }
  257. return
  258. }
  259. func (fsw *FilerStoreWrapper) BeginTransaction(ctx context.Context) (context.Context, error) {
  260. return fsw.getActualStore("").BeginTransaction(ctx)
  261. }
  262. func (fsw *FilerStoreWrapper) CommitTransaction(ctx context.Context) error {
  263. return fsw.getActualStore("").CommitTransaction(ctx)
  264. }
  265. func (fsw *FilerStoreWrapper) RollbackTransaction(ctx context.Context) error {
  266. return fsw.getActualStore("").RollbackTransaction(ctx)
  267. }
  268. func (fsw *FilerStoreWrapper) Shutdown() {
  269. fsw.getActualStore("").Shutdown()
  270. }
  271. func (fsw *FilerStoreWrapper) KvPut(ctx context.Context, key []byte, value []byte) (err error) {
  272. return fsw.getActualStore("").KvPut(ctx, key, value)
  273. }
  274. func (fsw *FilerStoreWrapper) KvGet(ctx context.Context, key []byte) (value []byte, err error) {
  275. return fsw.getActualStore("").KvGet(ctx, key)
  276. }
  277. func (fsw *FilerStoreWrapper) KvDelete(ctx context.Context, key []byte) (err error) {
  278. return fsw.getActualStore("").KvDelete(ctx, key)
  279. }