You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

322 lines
10 KiB

4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
  1. package filer
  2. import (
  3. "context"
  4. "github.com/chrislusf/seaweedfs/weed/glog"
  5. "github.com/viant/ptrie"
  6. "strings"
  7. "time"
  8. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  9. "github.com/chrislusf/seaweedfs/weed/stats"
  10. "github.com/chrislusf/seaweedfs/weed/util"
  11. )
  12. var (
  13. _ = VirtualFilerStore(&FilerStoreWrapper{})
  14. )
  15. type VirtualFilerStore interface {
  16. FilerStore
  17. DeleteHardLink(ctx context.Context, hardLinkId HardLinkId) error
  18. DeleteOneEntry(ctx context.Context, entry *Entry) error
  19. AddPathSpecificStore(path string, storeId string, store FilerStore)
  20. OnBucketCreation(bucket string)
  21. OnBucketDeletion(bucket string)
  22. }
  23. type FilerStoreWrapper struct {
  24. defaultStore FilerStore
  25. pathToStore ptrie.Trie
  26. storeIdToStore map[string]FilerStore
  27. }
  28. func NewFilerStoreWrapper(store FilerStore) *FilerStoreWrapper {
  29. if innerStore, ok := store.(*FilerStoreWrapper); ok {
  30. return innerStore
  31. }
  32. return &FilerStoreWrapper{
  33. defaultStore: store,
  34. pathToStore: ptrie.New(),
  35. storeIdToStore: make(map[string]FilerStore),
  36. }
  37. }
  38. func (fsw *FilerStoreWrapper) OnBucketCreation(bucket string) {
  39. for _, store := range fsw.storeIdToStore {
  40. if ba, ok := store.(BucketAware); ok {
  41. ba.OnBucketCreation(bucket)
  42. }
  43. }
  44. if ba, ok := fsw.defaultStore.(BucketAware); ok {
  45. ba.OnBucketCreation(bucket)
  46. }
  47. }
  48. func (fsw *FilerStoreWrapper) OnBucketDeletion(bucket string) {
  49. for _, store := range fsw.storeIdToStore {
  50. if ba, ok := store.(BucketAware); ok {
  51. ba.OnBucketDeletion(bucket)
  52. }
  53. }
  54. if ba, ok := fsw.defaultStore.(BucketAware); ok {
  55. ba.OnBucketDeletion(bucket)
  56. }
  57. }
  58. func (fsw *FilerStoreWrapper) AddPathSpecificStore(path string, storeId string, store FilerStore) {
  59. fsw.storeIdToStore[storeId] = NewFilerStorePathTranlator(path, store)
  60. err := fsw.pathToStore.Put([]byte(path), storeId)
  61. if err != nil {
  62. glog.Fatalf("put path specific store: %v", err)
  63. }
  64. }
  65. func (fsw *FilerStoreWrapper) getActualStore(path util.FullPath) (store FilerStore) {
  66. store = fsw.defaultStore
  67. if path == "/" {
  68. return
  69. }
  70. var storeId string
  71. fsw.pathToStore.MatchPrefix([]byte(path), func(key []byte, value interface{}) bool {
  72. storeId = value.(string)
  73. return false
  74. })
  75. if storeId != "" {
  76. store = fsw.storeIdToStore[storeId]
  77. }
  78. return
  79. }
  80. func (fsw *FilerStoreWrapper) getDefaultStore() (store FilerStore) {
  81. return fsw.defaultStore
  82. }
  83. func (fsw *FilerStoreWrapper) GetName() string {
  84. return fsw.getDefaultStore().GetName()
  85. }
  86. func (fsw *FilerStoreWrapper) Initialize(configuration util.Configuration, prefix string) error {
  87. return fsw.getDefaultStore().Initialize(configuration, prefix)
  88. }
  89. func (fsw *FilerStoreWrapper) InsertEntry(ctx context.Context, entry *Entry) error {
  90. actualStore := fsw.getActualStore(entry.FullPath)
  91. stats.FilerStoreCounter.WithLabelValues(actualStore.GetName(), "insert").Inc()
  92. start := time.Now()
  93. defer func() {
  94. stats.FilerStoreHistogram.WithLabelValues(actualStore.GetName(), "insert").Observe(time.Since(start).Seconds())
  95. }()
  96. filer_pb.BeforeEntrySerialization(entry.Chunks)
  97. if entry.Mime == "application/octet-stream" {
  98. entry.Mime = ""
  99. }
  100. if err := fsw.handleUpdateToHardLinks(ctx, entry); err != nil {
  101. return err
  102. }
  103. glog.V(4).Infof("InsertEntry %s", entry.FullPath)
  104. return actualStore.InsertEntry(ctx, entry)
  105. }
  106. func (fsw *FilerStoreWrapper) UpdateEntry(ctx context.Context, entry *Entry) error {
  107. actualStore := fsw.getActualStore(entry.FullPath)
  108. stats.FilerStoreCounter.WithLabelValues(actualStore.GetName(), "update").Inc()
  109. start := time.Now()
  110. defer func() {
  111. stats.FilerStoreHistogram.WithLabelValues(actualStore.GetName(), "update").Observe(time.Since(start).Seconds())
  112. }()
  113. filer_pb.BeforeEntrySerialization(entry.Chunks)
  114. if entry.Mime == "application/octet-stream" {
  115. entry.Mime = ""
  116. }
  117. if err := fsw.handleUpdateToHardLinks(ctx, entry); err != nil {
  118. return err
  119. }
  120. glog.V(4).Infof("UpdateEntry %s", entry.FullPath)
  121. return actualStore.UpdateEntry(ctx, entry)
  122. }
  123. func (fsw *FilerStoreWrapper) FindEntry(ctx context.Context, fp util.FullPath) (entry *Entry, err error) {
  124. actualStore := fsw.getActualStore(fp)
  125. stats.FilerStoreCounter.WithLabelValues(actualStore.GetName(), "find").Inc()
  126. start := time.Now()
  127. defer func() {
  128. stats.FilerStoreHistogram.WithLabelValues(actualStore.GetName(), "find").Observe(time.Since(start).Seconds())
  129. }()
  130. entry, err = actualStore.FindEntry(ctx, fp)
  131. glog.V(4).Infof("FindEntry %s: %v", fp, err)
  132. if err != nil {
  133. return nil, err
  134. }
  135. fsw.maybeReadHardLink(ctx, entry)
  136. filer_pb.AfterEntryDeserialization(entry.Chunks)
  137. return
  138. }
  139. func (fsw *FilerStoreWrapper) DeleteEntry(ctx context.Context, fp util.FullPath) (err error) {
  140. actualStore := fsw.getActualStore(fp)
  141. stats.FilerStoreCounter.WithLabelValues(actualStore.GetName(), "delete").Inc()
  142. start := time.Now()
  143. defer func() {
  144. stats.FilerStoreHistogram.WithLabelValues(actualStore.GetName(), "delete").Observe(time.Since(start).Seconds())
  145. }()
  146. existingEntry, findErr := fsw.FindEntry(ctx, fp)
  147. if findErr == filer_pb.ErrNotFound {
  148. return nil
  149. }
  150. if len(existingEntry.HardLinkId) != 0 {
  151. // remove hard link
  152. glog.V(4).Infof("DeleteHardLink %s", existingEntry.FullPath)
  153. if err = fsw.DeleteHardLink(ctx, existingEntry.HardLinkId); err != nil {
  154. return err
  155. }
  156. }
  157. glog.V(4).Infof("DeleteEntry %s", fp)
  158. return actualStore.DeleteEntry(ctx, fp)
  159. }
  160. func (fsw *FilerStoreWrapper) DeleteOneEntry(ctx context.Context, existingEntry *Entry) (err error) {
  161. actualStore := fsw.getActualStore(existingEntry.FullPath)
  162. stats.FilerStoreCounter.WithLabelValues(actualStore.GetName(), "delete").Inc()
  163. start := time.Now()
  164. defer func() {
  165. stats.FilerStoreHistogram.WithLabelValues(actualStore.GetName(), "delete").Observe(time.Since(start).Seconds())
  166. }()
  167. if len(existingEntry.HardLinkId) != 0 {
  168. // remove hard link
  169. glog.V(4).Infof("DeleteHardLink %s", existingEntry.FullPath)
  170. if err = fsw.DeleteHardLink(ctx, existingEntry.HardLinkId); err != nil {
  171. return err
  172. }
  173. }
  174. glog.V(4).Infof("DeleteOneEntry %s", existingEntry.FullPath)
  175. return actualStore.DeleteEntry(ctx, existingEntry.FullPath)
  176. }
  177. func (fsw *FilerStoreWrapper) DeleteFolderChildren(ctx context.Context, fp util.FullPath) (err error) {
  178. actualStore := fsw.getActualStore(fp + "/")
  179. stats.FilerStoreCounter.WithLabelValues(actualStore.GetName(), "deleteFolderChildren").Inc()
  180. start := time.Now()
  181. defer func() {
  182. stats.FilerStoreHistogram.WithLabelValues(actualStore.GetName(), "deleteFolderChildren").Observe(time.Since(start).Seconds())
  183. }()
  184. glog.V(4).Infof("DeleteFolderChildren %s", fp)
  185. return actualStore.DeleteFolderChildren(ctx, fp)
  186. }
  187. func (fsw *FilerStoreWrapper) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc ListEachEntryFunc) (string, error) {
  188. actualStore := fsw.getActualStore(dirPath + "/")
  189. stats.FilerStoreCounter.WithLabelValues(actualStore.GetName(), "list").Inc()
  190. start := time.Now()
  191. defer func() {
  192. stats.FilerStoreHistogram.WithLabelValues(actualStore.GetName(), "list").Observe(time.Since(start).Seconds())
  193. }()
  194. glog.V(4).Infof("ListDirectoryEntries %s from %s limit %d", dirPath, startFileName, limit)
  195. return actualStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit, func(entry *Entry) bool {
  196. fsw.maybeReadHardLink(ctx, entry)
  197. filer_pb.AfterEntryDeserialization(entry.Chunks)
  198. return eachEntryFunc(entry)
  199. })
  200. }
  201. func (fsw *FilerStoreWrapper) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc ListEachEntryFunc) (lastFileName string, err error) {
  202. actualStore := fsw.getActualStore(dirPath + "/")
  203. stats.FilerStoreCounter.WithLabelValues(actualStore.GetName(), "prefixList").Inc()
  204. start := time.Now()
  205. defer func() {
  206. stats.FilerStoreHistogram.WithLabelValues(actualStore.GetName(), "prefixList").Observe(time.Since(start).Seconds())
  207. }()
  208. glog.V(4).Infof("ListDirectoryPrefixedEntries %s from %s prefix %s limit %d", dirPath, startFileName, prefix, limit)
  209. lastFileName, err = actualStore.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, prefix, eachEntryFunc)
  210. if err == ErrUnsupportedListDirectoryPrefixed {
  211. lastFileName, err = fsw.prefixFilterEntries(ctx, dirPath, startFileName, includeStartFile, limit, prefix, func(entry *Entry) bool {
  212. fsw.maybeReadHardLink(ctx, entry)
  213. filer_pb.AfterEntryDeserialization(entry.Chunks)
  214. return eachEntryFunc(entry)
  215. })
  216. }
  217. return lastFileName, err
  218. }
  219. func (fsw *FilerStoreWrapper) prefixFilterEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc ListEachEntryFunc) (lastFileName string, err error) {
  220. actualStore := fsw.getActualStore(dirPath + "/")
  221. if prefix == "" {
  222. return actualStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit, eachEntryFunc)
  223. }
  224. var notPrefixed []*Entry
  225. lastFileName, err = actualStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit, func(entry *Entry) bool {
  226. notPrefixed = append(notPrefixed, entry)
  227. return true
  228. })
  229. if err != nil {
  230. return
  231. }
  232. count := int64(0)
  233. for count < limit && len(notPrefixed) > 0 {
  234. for _, entry := range notPrefixed {
  235. if strings.HasPrefix(entry.Name(), prefix) {
  236. count++
  237. if !eachEntryFunc(entry) {
  238. return
  239. }
  240. if count >= limit {
  241. break
  242. }
  243. }
  244. }
  245. if count < limit {
  246. notPrefixed = notPrefixed[:0]
  247. _, err = actualStore.ListDirectoryEntries(ctx, dirPath, lastFileName, false, limit, func(entry *Entry) bool {
  248. notPrefixed = append(notPrefixed, entry)
  249. return true
  250. })
  251. if err != nil {
  252. return
  253. }
  254. }
  255. }
  256. return
  257. }
  258. func (fsw *FilerStoreWrapper) BeginTransaction(ctx context.Context) (context.Context, error) {
  259. return fsw.getDefaultStore().BeginTransaction(ctx)
  260. }
  261. func (fsw *FilerStoreWrapper) CommitTransaction(ctx context.Context) error {
  262. return fsw.getDefaultStore().CommitTransaction(ctx)
  263. }
  264. func (fsw *FilerStoreWrapper) RollbackTransaction(ctx context.Context) error {
  265. return fsw.getDefaultStore().RollbackTransaction(ctx)
  266. }
  267. func (fsw *FilerStoreWrapper) Shutdown() {
  268. fsw.getDefaultStore().Shutdown()
  269. }
  270. func (fsw *FilerStoreWrapper) KvPut(ctx context.Context, key []byte, value []byte) (err error) {
  271. return fsw.getDefaultStore().KvPut(ctx, key, value)
  272. }
  273. func (fsw *FilerStoreWrapper) KvGet(ctx context.Context, key []byte) (value []byte, err error) {
  274. return fsw.getDefaultStore().KvGet(ctx, key)
  275. }
  276. func (fsw *FilerStoreWrapper) KvDelete(ctx context.Context, key []byte) (err error) {
  277. return fsw.getDefaultStore().KvDelete(ctx, key)
  278. }