362 lines
12 KiB

3 years ago
3 years ago
4 years ago
3 years ago
3 years ago
4 years ago
3 years ago
4 years ago
3 years ago
4 years ago
3 years ago
4 years ago
3 years ago
3 years ago
3 years ago
  1. package filer
  2. import (
  3. "context"
  4. "github.com/chrislusf/seaweedfs/weed/glog"
  5. "github.com/viant/ptrie"
  6. "io"
  7. "math"
  8. "strings"
  9. "time"
  10. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  11. "github.com/chrislusf/seaweedfs/weed/stats"
  12. "github.com/chrislusf/seaweedfs/weed/util"
  13. )
  14. var (
  15. _ = VirtualFilerStore(&FilerStoreWrapper{})
  16. _ = Debuggable(&FilerStoreWrapper{})
  17. )
  18. type VirtualFilerStore interface {
  19. FilerStore
  20. DeleteHardLink(ctx context.Context, hardLinkId HardLinkId) error
  21. DeleteOneEntry(ctx context.Context, entry *Entry) error
  22. DeleteOneEntrySkipHardlink(ctx context.Context, fullpath util.FullPath) error
  23. AddPathSpecificStore(path string, storeId string, store FilerStore)
  24. OnBucketCreation(bucket string)
  25. OnBucketDeletion(bucket string)
  26. CanDropWholeBucket() bool
  27. }
  28. type FilerStoreWrapper struct {
  29. defaultStore FilerStore
  30. pathToStore ptrie.Trie
  31. storeIdToStore map[string]FilerStore
  32. }
  33. func NewFilerStoreWrapper(store FilerStore) *FilerStoreWrapper {
  34. if innerStore, ok := store.(*FilerStoreWrapper); ok {
  35. return innerStore
  36. }
  37. return &FilerStoreWrapper{
  38. defaultStore: store,
  39. pathToStore: ptrie.New(),
  40. storeIdToStore: make(map[string]FilerStore),
  41. }
  42. }
  43. func (fsw *FilerStoreWrapper) CanDropWholeBucket() bool {
  44. if ba, ok := fsw.defaultStore.(BucketAware); ok {
  45. return ba.CanDropWholeBucket()
  46. }
  47. return false
  48. }
  49. func (fsw *FilerStoreWrapper) OnBucketCreation(bucket string) {
  50. for _, store := range fsw.storeIdToStore {
  51. if ba, ok := store.(BucketAware); ok {
  52. ba.OnBucketCreation(bucket)
  53. }
  54. }
  55. if ba, ok := fsw.defaultStore.(BucketAware); ok {
  56. ba.OnBucketCreation(bucket)
  57. }
  58. }
  59. func (fsw *FilerStoreWrapper) OnBucketDeletion(bucket string) {
  60. for _, store := range fsw.storeIdToStore {
  61. if ba, ok := store.(BucketAware); ok {
  62. ba.OnBucketDeletion(bucket)
  63. }
  64. }
  65. if ba, ok := fsw.defaultStore.(BucketAware); ok {
  66. ba.OnBucketDeletion(bucket)
  67. }
  68. }
  69. func (fsw *FilerStoreWrapper) AddPathSpecificStore(path string, storeId string, store FilerStore) {
  70. fsw.storeIdToStore[storeId] = NewFilerStorePathTranlator(path, store)
  71. err := fsw.pathToStore.Put([]byte(path), storeId)
  72. if err != nil {
  73. glog.Fatalf("put path specific store: %v", err)
  74. }
  75. }
  76. func (fsw *FilerStoreWrapper) getActualStore(path util.FullPath) (store FilerStore) {
  77. store = fsw.defaultStore
  78. if path == "/" {
  79. return
  80. }
  81. var storeId string
  82. fsw.pathToStore.MatchPrefix([]byte(path), func(key []byte, value interface{}) bool {
  83. storeId = value.(string)
  84. return false
  85. })
  86. if storeId != "" {
  87. store = fsw.storeIdToStore[storeId]
  88. }
  89. return
  90. }
  91. func (fsw *FilerStoreWrapper) getDefaultStore() (store FilerStore) {
  92. return fsw.defaultStore
  93. }
  94. func (fsw *FilerStoreWrapper) GetName() string {
  95. return fsw.getDefaultStore().GetName()
  96. }
  97. func (fsw *FilerStoreWrapper) Initialize(configuration util.Configuration, prefix string) error {
  98. return fsw.getDefaultStore().Initialize(configuration, prefix)
  99. }
  100. func (fsw *FilerStoreWrapper) InsertEntry(ctx context.Context, entry *Entry) error {
  101. actualStore := fsw.getActualStore(entry.FullPath)
  102. stats.FilerStoreCounter.WithLabelValues(actualStore.GetName(), "insert").Inc()
  103. start := time.Now()
  104. defer func() {
  105. stats.FilerStoreHistogram.WithLabelValues(actualStore.GetName(), "insert").Observe(time.Since(start).Seconds())
  106. }()
  107. filer_pb.BeforeEntrySerialization(entry.Chunks)
  108. if entry.Mime == "application/octet-stream" {
  109. entry.Mime = ""
  110. }
  111. if err := fsw.handleUpdateToHardLinks(ctx, entry); err != nil {
  112. return err
  113. }
  114. // glog.V(4).Infof("InsertEntry %s", entry.FullPath)
  115. return actualStore.InsertEntry(ctx, entry)
  116. }
  117. func (fsw *FilerStoreWrapper) UpdateEntry(ctx context.Context, entry *Entry) error {
  118. actualStore := fsw.getActualStore(entry.FullPath)
  119. stats.FilerStoreCounter.WithLabelValues(actualStore.GetName(), "update").Inc()
  120. start := time.Now()
  121. defer func() {
  122. stats.FilerStoreHistogram.WithLabelValues(actualStore.GetName(), "update").Observe(time.Since(start).Seconds())
  123. }()
  124. filer_pb.BeforeEntrySerialization(entry.Chunks)
  125. if entry.Mime == "application/octet-stream" {
  126. entry.Mime = ""
  127. }
  128. if err := fsw.handleUpdateToHardLinks(ctx, entry); err != nil {
  129. return err
  130. }
  131. // glog.V(4).Infof("UpdateEntry %s", entry.FullPath)
  132. return actualStore.UpdateEntry(ctx, entry)
  133. }
  134. func (fsw *FilerStoreWrapper) FindEntry(ctx context.Context, fp util.FullPath) (entry *Entry, err error) {
  135. actualStore := fsw.getActualStore(fp)
  136. stats.FilerStoreCounter.WithLabelValues(actualStore.GetName(), "find").Inc()
  137. start := time.Now()
  138. defer func() {
  139. stats.FilerStoreHistogram.WithLabelValues(actualStore.GetName(), "find").Observe(time.Since(start).Seconds())
  140. }()
  141. entry, err = actualStore.FindEntry(ctx, fp)
  142. // glog.V(4).Infof("FindEntry %s: %v", fp, err)
  143. if err != nil {
  144. return nil, err
  145. }
  146. fsw.maybeReadHardLink(ctx, entry)
  147. filer_pb.AfterEntryDeserialization(entry.Chunks)
  148. return
  149. }
  150. func (fsw *FilerStoreWrapper) DeleteEntry(ctx context.Context, fp util.FullPath) (err error) {
  151. actualStore := fsw.getActualStore(fp)
  152. stats.FilerStoreCounter.WithLabelValues(actualStore.GetName(), "delete").Inc()
  153. start := time.Now()
  154. defer func() {
  155. stats.FilerStoreHistogram.WithLabelValues(actualStore.GetName(), "delete").Observe(time.Since(start).Seconds())
  156. }()
  157. existingEntry, findErr := fsw.FindEntry(ctx, fp)
  158. if findErr == filer_pb.ErrNotFound {
  159. return nil
  160. }
  161. if len(existingEntry.HardLinkId) != 0 {
  162. // remove hard link
  163. glog.V(4).Infof("DeleteHardLink %s", existingEntry.FullPath)
  164. if err = fsw.DeleteHardLink(ctx, existingEntry.HardLinkId); err != nil {
  165. return err
  166. }
  167. }
  168. // glog.V(4).Infof("DeleteEntry %s", fp)
  169. return actualStore.DeleteEntry(ctx, fp)
  170. }
  171. func (fsw *FilerStoreWrapper) DeleteOneEntry(ctx context.Context, existingEntry *Entry) (err error) {
  172. actualStore := fsw.getActualStore(existingEntry.FullPath)
  173. stats.FilerStoreCounter.WithLabelValues(actualStore.GetName(), "delete").Inc()
  174. start := time.Now()
  175. defer func() {
  176. stats.FilerStoreHistogram.WithLabelValues(actualStore.GetName(), "delete").Observe(time.Since(start).Seconds())
  177. }()
  178. if len(existingEntry.HardLinkId) != 0 {
  179. // remove hard link
  180. glog.V(4).Infof("DeleteHardLink %s", existingEntry.FullPath)
  181. if err = fsw.DeleteHardLink(ctx, existingEntry.HardLinkId); err != nil {
  182. return err
  183. }
  184. }
  185. // glog.V(4).Infof("DeleteOneEntry %s", existingEntry.FullPath)
  186. return actualStore.DeleteEntry(ctx, existingEntry.FullPath)
  187. }
  188. func (fsw *FilerStoreWrapper) DeleteOneEntrySkipHardlink(ctx context.Context, fullpath util.FullPath) (err error) {
  189. actualStore := fsw.getActualStore(fullpath)
  190. stats.FilerStoreCounter.WithLabelValues(actualStore.GetName(), "delete").Inc()
  191. start := time.Now()
  192. defer func() {
  193. stats.FilerStoreHistogram.WithLabelValues(actualStore.GetName(), "delete").Observe(time.Since(start).Seconds())
  194. }()
  195. glog.V(4).Infof("DeleteOneEntrySkipHardlink %s", fullpath)
  196. return actualStore.DeleteEntry(ctx, fullpath)
  197. }
  198. func (fsw *FilerStoreWrapper) DeleteFolderChildren(ctx context.Context, fp util.FullPath) (err error) {
  199. actualStore := fsw.getActualStore(fp + "/")
  200. stats.FilerStoreCounter.WithLabelValues(actualStore.GetName(), "deleteFolderChildren").Inc()
  201. start := time.Now()
  202. defer func() {
  203. stats.FilerStoreHistogram.WithLabelValues(actualStore.GetName(), "deleteFolderChildren").Observe(time.Since(start).Seconds())
  204. }()
  205. // glog.V(4).Infof("DeleteFolderChildren %s", fp)
  206. return actualStore.DeleteFolderChildren(ctx, fp)
  207. }
  208. func (fsw *FilerStoreWrapper) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc ListEachEntryFunc) (string, error) {
  209. actualStore := fsw.getActualStore(dirPath + "/")
  210. stats.FilerStoreCounter.WithLabelValues(actualStore.GetName(), "list").Inc()
  211. start := time.Now()
  212. defer func() {
  213. stats.FilerStoreHistogram.WithLabelValues(actualStore.GetName(), "list").Observe(time.Since(start).Seconds())
  214. }()
  215. // glog.V(4).Infof("ListDirectoryEntries %s from %s limit %d", dirPath, startFileName, limit)
  216. return actualStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit, func(entry *Entry) bool {
  217. fsw.maybeReadHardLink(ctx, entry)
  218. filer_pb.AfterEntryDeserialization(entry.Chunks)
  219. return eachEntryFunc(entry)
  220. })
  221. }
  222. func (fsw *FilerStoreWrapper) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc ListEachEntryFunc) (lastFileName string, err error) {
  223. actualStore := fsw.getActualStore(dirPath + "/")
  224. stats.FilerStoreCounter.WithLabelValues(actualStore.GetName(), "prefixList").Inc()
  225. start := time.Now()
  226. defer func() {
  227. stats.FilerStoreHistogram.WithLabelValues(actualStore.GetName(), "prefixList").Observe(time.Since(start).Seconds())
  228. }()
  229. if limit > math.MaxInt32-1 {
  230. limit = math.MaxInt32 - 1
  231. }
  232. // glog.V(4).Infof("ListDirectoryPrefixedEntries %s from %s prefix %s limit %d", dirPath, startFileName, prefix, limit)
  233. adjustedEntryFunc := func(entry *Entry) bool {
  234. fsw.maybeReadHardLink(ctx, entry)
  235. filer_pb.AfterEntryDeserialization(entry.Chunks)
  236. return eachEntryFunc(entry)
  237. }
  238. lastFileName, err = actualStore.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, prefix, adjustedEntryFunc)
  239. if err == ErrUnsupportedListDirectoryPrefixed {
  240. lastFileName, err = fsw.prefixFilterEntries(ctx, dirPath, startFileName, includeStartFile, limit, prefix, adjustedEntryFunc)
  241. }
  242. return lastFileName, err
  243. }
  244. func (fsw *FilerStoreWrapper) prefixFilterEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc ListEachEntryFunc) (lastFileName string, err error) {
  245. actualStore := fsw.getActualStore(dirPath + "/")
  246. if prefix == "" {
  247. return actualStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit, eachEntryFunc)
  248. }
  249. var notPrefixed []*Entry
  250. lastFileName, err = actualStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit, func(entry *Entry) bool {
  251. notPrefixed = append(notPrefixed, entry)
  252. return true
  253. })
  254. if err != nil {
  255. return
  256. }
  257. count := int64(0)
  258. for count < limit && len(notPrefixed) > 0 {
  259. var isLastItemHasPrefix bool
  260. for _, entry := range notPrefixed {
  261. if strings.HasPrefix(entry.Name(), prefix) {
  262. isLastItemHasPrefix = true
  263. count++
  264. if !eachEntryFunc(entry) {
  265. return
  266. }
  267. if count >= limit {
  268. break
  269. }
  270. } else {
  271. isLastItemHasPrefix = false
  272. }
  273. }
  274. if count < limit && isLastItemHasPrefix && len(notPrefixed) == int(limit) {
  275. notPrefixed = notPrefixed[:0]
  276. lastFileName, err = actualStore.ListDirectoryEntries(ctx, dirPath, lastFileName, false, limit, func(entry *Entry) bool {
  277. notPrefixed = append(notPrefixed, entry)
  278. return true
  279. })
  280. if err != nil {
  281. return
  282. }
  283. } else {
  284. break
  285. }
  286. }
  287. return
  288. }
  289. func (fsw *FilerStoreWrapper) BeginTransaction(ctx context.Context) (context.Context, error) {
  290. return fsw.getDefaultStore().BeginTransaction(ctx)
  291. }
  292. func (fsw *FilerStoreWrapper) CommitTransaction(ctx context.Context) error {
  293. return fsw.getDefaultStore().CommitTransaction(ctx)
  294. }
  295. func (fsw *FilerStoreWrapper) RollbackTransaction(ctx context.Context) error {
  296. return fsw.getDefaultStore().RollbackTransaction(ctx)
  297. }
  298. func (fsw *FilerStoreWrapper) Shutdown() {
  299. fsw.getDefaultStore().Shutdown()
  300. }
  301. func (fsw *FilerStoreWrapper) KvPut(ctx context.Context, key []byte, value []byte) (err error) {
  302. return fsw.getDefaultStore().KvPut(ctx, key, value)
  303. }
  304. func (fsw *FilerStoreWrapper) KvGet(ctx context.Context, key []byte) (value []byte, err error) {
  305. return fsw.getDefaultStore().KvGet(ctx, key)
  306. }
  307. func (fsw *FilerStoreWrapper) KvDelete(ctx context.Context, key []byte) (err error) {
  308. return fsw.getDefaultStore().KvDelete(ctx, key)
  309. }
  310. func (fsw *FilerStoreWrapper) Debug(writer io.Writer) {
  311. if debuggable, ok := fsw.getDefaultStore().(Debuggable); ok {
  312. debuggable.Debug(writer)
  313. }
  314. }