You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

330 lines
11 KiB

4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
  1. package filer
  2. import (
  3. "context"
  4. "github.com/chrislusf/seaweedfs/weed/glog"
  5. "github.com/viant/ptrie"
  6. "strings"
  7. "time"
  8. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  9. "github.com/chrislusf/seaweedfs/weed/stats"
  10. "github.com/chrislusf/seaweedfs/weed/util"
  11. )
  12. var (
  13. _ = VirtualFilerStore(&FilerStoreWrapper{})
  14. )
  15. type VirtualFilerStore interface {
  16. FilerStore
  17. DeleteHardLink(ctx context.Context, hardLinkId HardLinkId) error
  18. DeleteOneEntry(ctx context.Context, entry *Entry) error
  19. AddPathSpecificStore(path string, storeId string, store FilerStore)
  20. OnBucketCreation(bucket string)
  21. OnBucketDeletion(bucket string)
  22. CanDropWholeBucket() bool
  23. }
  24. type FilerStoreWrapper struct {
  25. defaultStore FilerStore
  26. pathToStore ptrie.Trie
  27. storeIdToStore map[string]FilerStore
  28. }
  29. func NewFilerStoreWrapper(store FilerStore) *FilerStoreWrapper {
  30. if innerStore, ok := store.(*FilerStoreWrapper); ok {
  31. return innerStore
  32. }
  33. return &FilerStoreWrapper{
  34. defaultStore: store,
  35. pathToStore: ptrie.New(),
  36. storeIdToStore: make(map[string]FilerStore),
  37. }
  38. }
  39. func (fsw *FilerStoreWrapper) CanDropWholeBucket() bool {
  40. if ba, ok := fsw.defaultStore.(BucketAware); ok {
  41. return ba.CanDropWholeBucket()
  42. }
  43. return false
  44. }
  45. func (fsw *FilerStoreWrapper) OnBucketCreation(bucket string) {
  46. for _, store := range fsw.storeIdToStore {
  47. if ba, ok := store.(BucketAware); ok {
  48. ba.OnBucketCreation(bucket)
  49. }
  50. }
  51. if ba, ok := fsw.defaultStore.(BucketAware); ok {
  52. ba.OnBucketCreation(bucket)
  53. }
  54. }
  55. func (fsw *FilerStoreWrapper) OnBucketDeletion(bucket string) {
  56. for _, store := range fsw.storeIdToStore {
  57. if ba, ok := store.(BucketAware); ok {
  58. ba.OnBucketDeletion(bucket)
  59. }
  60. }
  61. if ba, ok := fsw.defaultStore.(BucketAware); ok {
  62. ba.OnBucketDeletion(bucket)
  63. }
  64. }
  65. func (fsw *FilerStoreWrapper) AddPathSpecificStore(path string, storeId string, store FilerStore) {
  66. fsw.storeIdToStore[storeId] = NewFilerStorePathTranlator(path, store)
  67. err := fsw.pathToStore.Put([]byte(path), storeId)
  68. if err != nil {
  69. glog.Fatalf("put path specific store: %v", err)
  70. }
  71. }
  72. func (fsw *FilerStoreWrapper) getActualStore(path util.FullPath) (store FilerStore) {
  73. store = fsw.defaultStore
  74. if path == "/" {
  75. return
  76. }
  77. var storeId string
  78. fsw.pathToStore.MatchPrefix([]byte(path), func(key []byte, value interface{}) bool {
  79. storeId = value.(string)
  80. return false
  81. })
  82. if storeId != "" {
  83. store = fsw.storeIdToStore[storeId]
  84. }
  85. return
  86. }
  87. func (fsw *FilerStoreWrapper) getDefaultStore() (store FilerStore) {
  88. return fsw.defaultStore
  89. }
  90. func (fsw *FilerStoreWrapper) GetName() string {
  91. return fsw.getDefaultStore().GetName()
  92. }
  93. func (fsw *FilerStoreWrapper) Initialize(configuration util.Configuration, prefix string) error {
  94. return fsw.getDefaultStore().Initialize(configuration, prefix)
  95. }
  96. func (fsw *FilerStoreWrapper) InsertEntry(ctx context.Context, entry *Entry) error {
  97. actualStore := fsw.getActualStore(entry.FullPath)
  98. stats.FilerStoreCounter.WithLabelValues(actualStore.GetName(), "insert").Inc()
  99. start := time.Now()
  100. defer func() {
  101. stats.FilerStoreHistogram.WithLabelValues(actualStore.GetName(), "insert").Observe(time.Since(start).Seconds())
  102. }()
  103. filer_pb.BeforeEntrySerialization(entry.Chunks)
  104. if entry.Mime == "application/octet-stream" {
  105. entry.Mime = ""
  106. }
  107. if err := fsw.handleUpdateToHardLinks(ctx, entry); err != nil {
  108. return err
  109. }
  110. glog.V(4).Infof("InsertEntry %s", entry.FullPath)
  111. return actualStore.InsertEntry(ctx, entry)
  112. }
  113. func (fsw *FilerStoreWrapper) UpdateEntry(ctx context.Context, entry *Entry) error {
  114. actualStore := fsw.getActualStore(entry.FullPath)
  115. stats.FilerStoreCounter.WithLabelValues(actualStore.GetName(), "update").Inc()
  116. start := time.Now()
  117. defer func() {
  118. stats.FilerStoreHistogram.WithLabelValues(actualStore.GetName(), "update").Observe(time.Since(start).Seconds())
  119. }()
  120. filer_pb.BeforeEntrySerialization(entry.Chunks)
  121. if entry.Mime == "application/octet-stream" {
  122. entry.Mime = ""
  123. }
  124. if err := fsw.handleUpdateToHardLinks(ctx, entry); err != nil {
  125. return err
  126. }
  127. glog.V(4).Infof("UpdateEntry %s", entry.FullPath)
  128. return actualStore.UpdateEntry(ctx, entry)
  129. }
  130. func (fsw *FilerStoreWrapper) FindEntry(ctx context.Context, fp util.FullPath) (entry *Entry, err error) {
  131. actualStore := fsw.getActualStore(fp)
  132. stats.FilerStoreCounter.WithLabelValues(actualStore.GetName(), "find").Inc()
  133. start := time.Now()
  134. defer func() {
  135. stats.FilerStoreHistogram.WithLabelValues(actualStore.GetName(), "find").Observe(time.Since(start).Seconds())
  136. }()
  137. entry, err = actualStore.FindEntry(ctx, fp)
  138. // glog.V(4).Infof("FindEntry %s: %v", fp, err)
  139. if err != nil {
  140. return nil, err
  141. }
  142. fsw.maybeReadHardLink(ctx, entry)
  143. filer_pb.AfterEntryDeserialization(entry.Chunks)
  144. return
  145. }
  146. func (fsw *FilerStoreWrapper) DeleteEntry(ctx context.Context, fp util.FullPath) (err error) {
  147. actualStore := fsw.getActualStore(fp)
  148. stats.FilerStoreCounter.WithLabelValues(actualStore.GetName(), "delete").Inc()
  149. start := time.Now()
  150. defer func() {
  151. stats.FilerStoreHistogram.WithLabelValues(actualStore.GetName(), "delete").Observe(time.Since(start).Seconds())
  152. }()
  153. existingEntry, findErr := fsw.FindEntry(ctx, fp)
  154. if findErr == filer_pb.ErrNotFound {
  155. return nil
  156. }
  157. if len(existingEntry.HardLinkId) != 0 {
  158. // remove hard link
  159. glog.V(4).Infof("DeleteHardLink %s", existingEntry.FullPath)
  160. if err = fsw.DeleteHardLink(ctx, existingEntry.HardLinkId); err != nil {
  161. return err
  162. }
  163. }
  164. glog.V(4).Infof("DeleteEntry %s", fp)
  165. return actualStore.DeleteEntry(ctx, fp)
  166. }
  167. func (fsw *FilerStoreWrapper) DeleteOneEntry(ctx context.Context, existingEntry *Entry) (err error) {
  168. actualStore := fsw.getActualStore(existingEntry.FullPath)
  169. stats.FilerStoreCounter.WithLabelValues(actualStore.GetName(), "delete").Inc()
  170. start := time.Now()
  171. defer func() {
  172. stats.FilerStoreHistogram.WithLabelValues(actualStore.GetName(), "delete").Observe(time.Since(start).Seconds())
  173. }()
  174. if len(existingEntry.HardLinkId) != 0 {
  175. // remove hard link
  176. glog.V(4).Infof("DeleteHardLink %s", existingEntry.FullPath)
  177. if err = fsw.DeleteHardLink(ctx, existingEntry.HardLinkId); err != nil {
  178. return err
  179. }
  180. }
  181. glog.V(4).Infof("DeleteOneEntry %s", existingEntry.FullPath)
  182. return actualStore.DeleteEntry(ctx, existingEntry.FullPath)
  183. }
  184. func (fsw *FilerStoreWrapper) DeleteFolderChildren(ctx context.Context, fp util.FullPath, limit int64) (err error) {
  185. actualStore := fsw.getActualStore(fp + "/")
  186. stats.FilerStoreCounter.WithLabelValues(actualStore.GetName(), "deleteFolderChildren").Inc()
  187. start := time.Now()
  188. defer func() {
  189. stats.FilerStoreHistogram.WithLabelValues(actualStore.GetName(), "deleteFolderChildren").Observe(time.Since(start).Seconds())
  190. }()
  191. glog.V(4).Infof("DeleteFolderChildren %s", fp)
  192. return actualStore.DeleteFolderChildren(ctx, fp, limit)
  193. }
  194. func (fsw *FilerStoreWrapper) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc ListEachEntryFunc) (string, error) {
  195. actualStore := fsw.getActualStore(dirPath + "/")
  196. stats.FilerStoreCounter.WithLabelValues(actualStore.GetName(), "list").Inc()
  197. start := time.Now()
  198. defer func() {
  199. stats.FilerStoreHistogram.WithLabelValues(actualStore.GetName(), "list").Observe(time.Since(start).Seconds())
  200. }()
  201. glog.V(4).Infof("ListDirectoryEntries %s from %s limit %d", dirPath, startFileName, limit)
  202. return actualStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit, func(entry *Entry) bool {
  203. fsw.maybeReadHardLink(ctx, entry)
  204. filer_pb.AfterEntryDeserialization(entry.Chunks)
  205. return eachEntryFunc(entry)
  206. })
  207. }
  208. func (fsw *FilerStoreWrapper) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc ListEachEntryFunc) (lastFileName string, err error) {
  209. actualStore := fsw.getActualStore(dirPath + "/")
  210. stats.FilerStoreCounter.WithLabelValues(actualStore.GetName(), "prefixList").Inc()
  211. start := time.Now()
  212. defer func() {
  213. stats.FilerStoreHistogram.WithLabelValues(actualStore.GetName(), "prefixList").Observe(time.Since(start).Seconds())
  214. }()
  215. glog.V(4).Infof("ListDirectoryPrefixedEntries %s from %s prefix %s limit %d", dirPath, startFileName, prefix, limit)
  216. lastFileName, err = actualStore.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, prefix, eachEntryFunc)
  217. if err == ErrUnsupportedListDirectoryPrefixed {
  218. lastFileName, err = fsw.prefixFilterEntries(ctx, dirPath, startFileName, includeStartFile, limit, prefix, func(entry *Entry) bool {
  219. fsw.maybeReadHardLink(ctx, entry)
  220. filer_pb.AfterEntryDeserialization(entry.Chunks)
  221. return eachEntryFunc(entry)
  222. })
  223. }
  224. return lastFileName, err
  225. }
  226. func (fsw *FilerStoreWrapper) prefixFilterEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc ListEachEntryFunc) (lastFileName string, err error) {
  227. actualStore := fsw.getActualStore(dirPath + "/")
  228. if prefix == "" {
  229. return actualStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit, eachEntryFunc)
  230. }
  231. var notPrefixed []*Entry
  232. lastFileName, err = actualStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit, func(entry *Entry) bool {
  233. notPrefixed = append(notPrefixed, entry)
  234. return true
  235. })
  236. if err != nil {
  237. return
  238. }
  239. count := int64(0)
  240. for count < limit && len(notPrefixed) > 0 {
  241. for _, entry := range notPrefixed {
  242. if strings.HasPrefix(entry.Name(), prefix) {
  243. count++
  244. if !eachEntryFunc(entry) {
  245. return
  246. }
  247. if count >= limit {
  248. break
  249. }
  250. }
  251. }
  252. if count < limit {
  253. notPrefixed = notPrefixed[:0]
  254. _, err = actualStore.ListDirectoryEntries(ctx, dirPath, lastFileName, false, limit, func(entry *Entry) bool {
  255. notPrefixed = append(notPrefixed, entry)
  256. return true
  257. })
  258. if err != nil {
  259. return
  260. }
  261. }
  262. }
  263. return
  264. }
  265. func (fsw *FilerStoreWrapper) BeginTransaction(ctx context.Context) (context.Context, error) {
  266. return fsw.getDefaultStore().BeginTransaction(ctx)
  267. }
  268. func (fsw *FilerStoreWrapper) CommitTransaction(ctx context.Context) error {
  269. return fsw.getDefaultStore().CommitTransaction(ctx)
  270. }
  271. func (fsw *FilerStoreWrapper) RollbackTransaction(ctx context.Context) error {
  272. return fsw.getDefaultStore().RollbackTransaction(ctx)
  273. }
  274. func (fsw *FilerStoreWrapper) Shutdown() {
  275. fsw.getDefaultStore().Shutdown()
  276. }
  277. func (fsw *FilerStoreWrapper) KvPut(ctx context.Context, key []byte, value []byte) (err error) {
  278. return fsw.getDefaultStore().KvPut(ctx, key, value)
  279. }
  280. func (fsw *FilerStoreWrapper) KvGet(ctx context.Context, key []byte) (value []byte, err error) {
  281. return fsw.getDefaultStore().KvGet(ctx, key)
  282. }
  283. func (fsw *FilerStoreWrapper) KvDelete(ctx context.Context, key []byte) (err error) {
  284. return fsw.getDefaultStore().KvDelete(ctx, key)
  285. }