You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

334 lines
11 KiB

4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
  1. package filer
  2. import (
  3. "context"
  4. "github.com/chrislusf/seaweedfs/weed/glog"
  5. "github.com/viant/ptrie"
  6. "math"
  7. "strings"
  8. "time"
  9. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  10. "github.com/chrislusf/seaweedfs/weed/stats"
  11. "github.com/chrislusf/seaweedfs/weed/util"
  12. )
  13. var (
  14. _ = VirtualFilerStore(&FilerStoreWrapper{})
  15. )
  16. type VirtualFilerStore interface {
  17. FilerStore
  18. DeleteHardLink(ctx context.Context, hardLinkId HardLinkId) error
  19. DeleteOneEntry(ctx context.Context, entry *Entry) error
  20. AddPathSpecificStore(path string, storeId string, store FilerStore)
  21. OnBucketCreation(bucket string)
  22. OnBucketDeletion(bucket string)
  23. CanDropWholeBucket() bool
  24. }
  25. type FilerStoreWrapper struct {
  26. defaultStore FilerStore
  27. pathToStore ptrie.Trie
  28. storeIdToStore map[string]FilerStore
  29. }
  30. func NewFilerStoreWrapper(store FilerStore) *FilerStoreWrapper {
  31. if innerStore, ok := store.(*FilerStoreWrapper); ok {
  32. return innerStore
  33. }
  34. return &FilerStoreWrapper{
  35. defaultStore: store,
  36. pathToStore: ptrie.New(),
  37. storeIdToStore: make(map[string]FilerStore),
  38. }
  39. }
  40. func (fsw *FilerStoreWrapper) CanDropWholeBucket() bool {
  41. if ba, ok := fsw.defaultStore.(BucketAware); ok {
  42. return ba.CanDropWholeBucket()
  43. }
  44. return false
  45. }
  46. func (fsw *FilerStoreWrapper) OnBucketCreation(bucket string) {
  47. for _, store := range fsw.storeIdToStore {
  48. if ba, ok := store.(BucketAware); ok {
  49. ba.OnBucketCreation(bucket)
  50. }
  51. }
  52. if ba, ok := fsw.defaultStore.(BucketAware); ok {
  53. ba.OnBucketCreation(bucket)
  54. }
  55. }
  56. func (fsw *FilerStoreWrapper) OnBucketDeletion(bucket string) {
  57. for _, store := range fsw.storeIdToStore {
  58. if ba, ok := store.(BucketAware); ok {
  59. ba.OnBucketDeletion(bucket)
  60. }
  61. }
  62. if ba, ok := fsw.defaultStore.(BucketAware); ok {
  63. ba.OnBucketDeletion(bucket)
  64. }
  65. }
  66. func (fsw *FilerStoreWrapper) AddPathSpecificStore(path string, storeId string, store FilerStore) {
  67. fsw.storeIdToStore[storeId] = NewFilerStorePathTranlator(path, store)
  68. err := fsw.pathToStore.Put([]byte(path), storeId)
  69. if err != nil {
  70. glog.Fatalf("put path specific store: %v", err)
  71. }
  72. }
  73. func (fsw *FilerStoreWrapper) getActualStore(path util.FullPath) (store FilerStore) {
  74. store = fsw.defaultStore
  75. if path == "/" {
  76. return
  77. }
  78. var storeId string
  79. fsw.pathToStore.MatchPrefix([]byte(path), func(key []byte, value interface{}) bool {
  80. storeId = value.(string)
  81. return false
  82. })
  83. if storeId != "" {
  84. store = fsw.storeIdToStore[storeId]
  85. }
  86. return
  87. }
  88. func (fsw *FilerStoreWrapper) getDefaultStore() (store FilerStore) {
  89. return fsw.defaultStore
  90. }
  91. func (fsw *FilerStoreWrapper) GetName() string {
  92. return fsw.getDefaultStore().GetName()
  93. }
  94. func (fsw *FilerStoreWrapper) Initialize(configuration util.Configuration, prefix string) error {
  95. return fsw.getDefaultStore().Initialize(configuration, prefix)
  96. }
  97. func (fsw *FilerStoreWrapper) InsertEntry(ctx context.Context, entry *Entry) error {
  98. actualStore := fsw.getActualStore(entry.FullPath)
  99. stats.FilerStoreCounter.WithLabelValues(actualStore.GetName(), "insert").Inc()
  100. start := time.Now()
  101. defer func() {
  102. stats.FilerStoreHistogram.WithLabelValues(actualStore.GetName(), "insert").Observe(time.Since(start).Seconds())
  103. }()
  104. filer_pb.BeforeEntrySerialization(entry.Chunks)
  105. if entry.Mime == "application/octet-stream" {
  106. entry.Mime = ""
  107. }
  108. if err := fsw.handleUpdateToHardLinks(ctx, entry); err != nil {
  109. return err
  110. }
  111. glog.V(4).Infof("InsertEntry %s", entry.FullPath)
  112. return actualStore.InsertEntry(ctx, entry)
  113. }
  114. func (fsw *FilerStoreWrapper) UpdateEntry(ctx context.Context, entry *Entry) error {
  115. actualStore := fsw.getActualStore(entry.FullPath)
  116. stats.FilerStoreCounter.WithLabelValues(actualStore.GetName(), "update").Inc()
  117. start := time.Now()
  118. defer func() {
  119. stats.FilerStoreHistogram.WithLabelValues(actualStore.GetName(), "update").Observe(time.Since(start).Seconds())
  120. }()
  121. filer_pb.BeforeEntrySerialization(entry.Chunks)
  122. if entry.Mime == "application/octet-stream" {
  123. entry.Mime = ""
  124. }
  125. if err := fsw.handleUpdateToHardLinks(ctx, entry); err != nil {
  126. return err
  127. }
  128. glog.V(4).Infof("UpdateEntry %s", entry.FullPath)
  129. return actualStore.UpdateEntry(ctx, entry)
  130. }
  131. func (fsw *FilerStoreWrapper) FindEntry(ctx context.Context, fp util.FullPath) (entry *Entry, err error) {
  132. actualStore := fsw.getActualStore(fp)
  133. stats.FilerStoreCounter.WithLabelValues(actualStore.GetName(), "find").Inc()
  134. start := time.Now()
  135. defer func() {
  136. stats.FilerStoreHistogram.WithLabelValues(actualStore.GetName(), "find").Observe(time.Since(start).Seconds())
  137. }()
  138. entry, err = actualStore.FindEntry(ctx, fp)
  139. // glog.V(4).Infof("FindEntry %s: %v", fp, err)
  140. if err != nil {
  141. return nil, err
  142. }
  143. fsw.maybeReadHardLink(ctx, entry)
  144. filer_pb.AfterEntryDeserialization(entry.Chunks)
  145. return
  146. }
  147. func (fsw *FilerStoreWrapper) DeleteEntry(ctx context.Context, fp util.FullPath) (err error) {
  148. actualStore := fsw.getActualStore(fp)
  149. stats.FilerStoreCounter.WithLabelValues(actualStore.GetName(), "delete").Inc()
  150. start := time.Now()
  151. defer func() {
  152. stats.FilerStoreHistogram.WithLabelValues(actualStore.GetName(), "delete").Observe(time.Since(start).Seconds())
  153. }()
  154. existingEntry, findErr := fsw.FindEntry(ctx, fp)
  155. if findErr == filer_pb.ErrNotFound {
  156. return nil
  157. }
  158. if len(existingEntry.HardLinkId) != 0 {
  159. // remove hard link
  160. glog.V(4).Infof("DeleteHardLink %s", existingEntry.FullPath)
  161. if err = fsw.DeleteHardLink(ctx, existingEntry.HardLinkId); err != nil {
  162. return err
  163. }
  164. }
  165. glog.V(4).Infof("DeleteEntry %s", fp)
  166. return actualStore.DeleteEntry(ctx, fp)
  167. }
  168. func (fsw *FilerStoreWrapper) DeleteOneEntry(ctx context.Context, existingEntry *Entry) (err error) {
  169. actualStore := fsw.getActualStore(existingEntry.FullPath)
  170. stats.FilerStoreCounter.WithLabelValues(actualStore.GetName(), "delete").Inc()
  171. start := time.Now()
  172. defer func() {
  173. stats.FilerStoreHistogram.WithLabelValues(actualStore.GetName(), "delete").Observe(time.Since(start).Seconds())
  174. }()
  175. if len(existingEntry.HardLinkId) != 0 {
  176. // remove hard link
  177. glog.V(4).Infof("DeleteHardLink %s", existingEntry.FullPath)
  178. if err = fsw.DeleteHardLink(ctx, existingEntry.HardLinkId); err != nil {
  179. return err
  180. }
  181. }
  182. glog.V(4).Infof("DeleteOneEntry %s", existingEntry.FullPath)
  183. return actualStore.DeleteEntry(ctx, existingEntry.FullPath)
  184. }
  185. func (fsw *FilerStoreWrapper) DeleteFolderChildren(ctx context.Context, fp util.FullPath) (err error) {
  186. actualStore := fsw.getActualStore(fp + "/")
  187. stats.FilerStoreCounter.WithLabelValues(actualStore.GetName(), "deleteFolderChildren").Inc()
  188. start := time.Now()
  189. defer func() {
  190. stats.FilerStoreHistogram.WithLabelValues(actualStore.GetName(), "deleteFolderChildren").Observe(time.Since(start).Seconds())
  191. }()
  192. glog.V(4).Infof("DeleteFolderChildren %s", fp)
  193. return actualStore.DeleteFolderChildren(ctx, fp)
  194. }
  195. func (fsw *FilerStoreWrapper) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc ListEachEntryFunc) (string, error) {
  196. actualStore := fsw.getActualStore(dirPath + "/")
  197. stats.FilerStoreCounter.WithLabelValues(actualStore.GetName(), "list").Inc()
  198. start := time.Now()
  199. defer func() {
  200. stats.FilerStoreHistogram.WithLabelValues(actualStore.GetName(), "list").Observe(time.Since(start).Seconds())
  201. }()
  202. glog.V(4).Infof("ListDirectoryEntries %s from %s limit %d", dirPath, startFileName, limit)
  203. return actualStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit, func(entry *Entry) bool {
  204. fsw.maybeReadHardLink(ctx, entry)
  205. filer_pb.AfterEntryDeserialization(entry.Chunks)
  206. return eachEntryFunc(entry)
  207. })
  208. }
  209. func (fsw *FilerStoreWrapper) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc ListEachEntryFunc) (lastFileName string, err error) {
  210. actualStore := fsw.getActualStore(dirPath + "/")
  211. stats.FilerStoreCounter.WithLabelValues(actualStore.GetName(), "prefixList").Inc()
  212. start := time.Now()
  213. defer func() {
  214. stats.FilerStoreHistogram.WithLabelValues(actualStore.GetName(), "prefixList").Observe(time.Since(start).Seconds())
  215. }()
  216. if limit > math.MaxInt32-1 {
  217. limit = math.MaxInt32 - 1
  218. }
  219. glog.V(4).Infof("ListDirectoryPrefixedEntries %s from %s prefix %s limit %d", dirPath, startFileName, prefix, limit)
  220. lastFileName, err = actualStore.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, prefix, eachEntryFunc)
  221. if err == ErrUnsupportedListDirectoryPrefixed {
  222. lastFileName, err = fsw.prefixFilterEntries(ctx, dirPath, startFileName, includeStartFile, limit, prefix, func(entry *Entry) bool {
  223. fsw.maybeReadHardLink(ctx, entry)
  224. filer_pb.AfterEntryDeserialization(entry.Chunks)
  225. return eachEntryFunc(entry)
  226. })
  227. }
  228. return lastFileName, err
  229. }
  230. func (fsw *FilerStoreWrapper) prefixFilterEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc ListEachEntryFunc) (lastFileName string, err error) {
  231. actualStore := fsw.getActualStore(dirPath + "/")
  232. if prefix == "" {
  233. return actualStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit, eachEntryFunc)
  234. }
  235. var notPrefixed []*Entry
  236. lastFileName, err = actualStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit, func(entry *Entry) bool {
  237. notPrefixed = append(notPrefixed, entry)
  238. return true
  239. })
  240. if err != nil {
  241. return
  242. }
  243. count := int64(0)
  244. for count < limit && len(notPrefixed) > 0 {
  245. for _, entry := range notPrefixed {
  246. if strings.HasPrefix(entry.Name(), prefix) {
  247. count++
  248. if !eachEntryFunc(entry) {
  249. return
  250. }
  251. if count >= limit {
  252. break
  253. }
  254. }
  255. }
  256. if count < limit {
  257. notPrefixed = notPrefixed[:0]
  258. _, err = actualStore.ListDirectoryEntries(ctx, dirPath, lastFileName, false, limit, func(entry *Entry) bool {
  259. notPrefixed = append(notPrefixed, entry)
  260. return true
  261. })
  262. if err != nil {
  263. return
  264. }
  265. }
  266. }
  267. return
  268. }
  269. func (fsw *FilerStoreWrapper) BeginTransaction(ctx context.Context) (context.Context, error) {
  270. return fsw.getDefaultStore().BeginTransaction(ctx)
  271. }
  272. func (fsw *FilerStoreWrapper) CommitTransaction(ctx context.Context) error {
  273. return fsw.getDefaultStore().CommitTransaction(ctx)
  274. }
  275. func (fsw *FilerStoreWrapper) RollbackTransaction(ctx context.Context) error {
  276. return fsw.getDefaultStore().RollbackTransaction(ctx)
  277. }
  278. func (fsw *FilerStoreWrapper) Shutdown() {
  279. fsw.getDefaultStore().Shutdown()
  280. }
  281. func (fsw *FilerStoreWrapper) KvPut(ctx context.Context, key []byte, value []byte) (err error) {
  282. return fsw.getDefaultStore().KvPut(ctx, key, value)
  283. }
  284. func (fsw *FilerStoreWrapper) KvGet(ctx context.Context, key []byte) (value []byte, err error) {
  285. return fsw.getDefaultStore().KvGet(ctx, key)
  286. }
  287. func (fsw *FilerStoreWrapper) KvDelete(ctx context.Context, key []byte) (err error) {
  288. return fsw.getDefaultStore().KvDelete(ctx, key)
  289. }