You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

124 lines
4.1 KiB

7 years ago
7 years ago
7 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
  1. package filer2
  2. import (
  3. "context"
  4. "errors"
  5. "time"
  6. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  7. "github.com/chrislusf/seaweedfs/weed/stats"
  8. "github.com/chrislusf/seaweedfs/weed/util"
  9. )
  10. type FilerStore interface {
  11. // GetName gets the name to locate the configuration in filer.toml file
  12. GetName() string
  13. // Initialize initializes the file store
  14. Initialize(configuration util.Configuration) error
  15. InsertEntry(context.Context, *Entry) error
  16. UpdateEntry(context.Context, *Entry) (err error)
  17. // err == filer2.ErrNotFound if not found
  18. FindEntry(context.Context, FullPath) (entry *Entry, err error)
  19. DeleteEntry(context.Context, FullPath) (err error)
  20. ListDirectoryEntries(ctx context.Context, dirPath FullPath, startFileName string, includeStartFile bool, limit int) ([]*Entry, error)
  21. BeginTransaction(ctx context.Context) (context.Context, error)
  22. CommitTransaction(ctx context.Context) error
  23. RollbackTransaction(ctx context.Context) error
  24. }
  25. var ErrNotFound = errors.New("filer: no entry is found in filer store")
  26. type FilerStoreWrapper struct {
  27. actualStore FilerStore
  28. }
  29. func NewFilerStoreWrapper(store FilerStore) *FilerStoreWrapper {
  30. return &FilerStoreWrapper{
  31. actualStore: store,
  32. }
  33. }
  34. func (fsw *FilerStoreWrapper) GetName() string {
  35. return fsw.actualStore.GetName()
  36. }
  37. func (fsw *FilerStoreWrapper) Initialize(configuration util.Configuration) error {
  38. return fsw.actualStore.Initialize(configuration)
  39. }
  40. func (fsw *FilerStoreWrapper) InsertEntry(ctx context.Context, entry *Entry) error {
  41. stats.FilerStoreCounter.WithLabelValues(fsw.actualStore.GetName(), "insert").Inc()
  42. start := time.Now()
  43. defer func() {
  44. stats.FilerStoreHistogram.WithLabelValues(fsw.actualStore.GetName(), "insert").Observe(time.Since(start).Seconds())
  45. }()
  46. filer_pb.BeforeEntrySerialization(entry.Chunks)
  47. return fsw.actualStore.InsertEntry(ctx, entry)
  48. }
  49. func (fsw *FilerStoreWrapper) UpdateEntry(ctx context.Context, entry *Entry) error {
  50. stats.FilerStoreCounter.WithLabelValues(fsw.actualStore.GetName(), "update").Inc()
  51. start := time.Now()
  52. defer func() {
  53. stats.FilerStoreHistogram.WithLabelValues(fsw.actualStore.GetName(), "update").Observe(time.Since(start).Seconds())
  54. }()
  55. filer_pb.BeforeEntrySerialization(entry.Chunks)
  56. return fsw.actualStore.UpdateEntry(ctx, entry)
  57. }
  58. func (fsw *FilerStoreWrapper) FindEntry(ctx context.Context, fp FullPath) (entry *Entry, err error) {
  59. stats.FilerStoreCounter.WithLabelValues(fsw.actualStore.GetName(), "find").Inc()
  60. start := time.Now()
  61. defer func() {
  62. stats.FilerStoreHistogram.WithLabelValues(fsw.actualStore.GetName(), "find").Observe(time.Since(start).Seconds())
  63. }()
  64. entry, err = fsw.actualStore.FindEntry(ctx, fp)
  65. if err != nil {
  66. return nil, err
  67. }
  68. filer_pb.AfterEntryDeserialization(entry.Chunks)
  69. return
  70. }
  71. func (fsw *FilerStoreWrapper) DeleteEntry(ctx context.Context, fp FullPath) (err error) {
  72. stats.FilerStoreCounter.WithLabelValues(fsw.actualStore.GetName(), "delete").Inc()
  73. start := time.Now()
  74. defer func() {
  75. stats.FilerStoreHistogram.WithLabelValues(fsw.actualStore.GetName(), "delete").Observe(time.Since(start).Seconds())
  76. }()
  77. return fsw.actualStore.DeleteEntry(ctx, fp)
  78. }
  79. func (fsw *FilerStoreWrapper) ListDirectoryEntries(ctx context.Context, dirPath FullPath, startFileName string, includeStartFile bool, limit int) ([]*Entry, error) {
  80. stats.FilerStoreCounter.WithLabelValues(fsw.actualStore.GetName(), "list").Inc()
  81. start := time.Now()
  82. defer func() {
  83. stats.FilerStoreHistogram.WithLabelValues(fsw.actualStore.GetName(), "list").Observe(time.Since(start).Seconds())
  84. }()
  85. entries, err := fsw.actualStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit)
  86. if err != nil {
  87. return nil, err
  88. }
  89. for _, entry := range entries {
  90. filer_pb.AfterEntryDeserialization(entry.Chunks)
  91. }
  92. return entries, err
  93. }
  94. func (fsw *FilerStoreWrapper) BeginTransaction(ctx context.Context) (context.Context, error) {
  95. return fsw.actualStore.BeginTransaction(ctx)
  96. }
  97. func (fsw *FilerStoreWrapper) CommitTransaction(ctx context.Context) error {
  98. return fsw.actualStore.CommitTransaction(ctx)
  99. }
  100. func (fsw *FilerStoreWrapper) RollbackTransaction(ctx context.Context) error {
  101. return fsw.actualStore.RollbackTransaction(ctx)
  102. }