You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

114 lines
4.1 KiB

7 years ago
7 years ago
7 years ago
6 years ago
  1. package filer2
  2. import (
  3. "context"
  4. "errors"
  5. "time"
  6. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  7. "github.com/chrislusf/seaweedfs/weed/stats"
  8. "github.com/chrislusf/seaweedfs/weed/util"
  9. )
  10. type FilerStore interface {
  11. // GetName gets the name to locate the configuration in filer.toml file
  12. GetName() string
  13. // Initialize initializes the file store
  14. Initialize(configuration util.Configuration) error
  15. InsertEntry(context.Context, *Entry) error
  16. UpdateEntry(context.Context, *Entry) (err error)
  17. // err == filer2.ErrNotFound if not found
  18. FindEntry(context.Context, FullPath) (entry *Entry, err error)
  19. DeleteEntry(context.Context, FullPath) (err error)
  20. ListDirectoryEntries(ctx context.Context, dirPath FullPath, startFileName string, includeStartFile bool, limit int) ([]*Entry, error)
  21. BeginTransaction(ctx context.Context) (context.Context, error)
  22. CommitTransaction(ctx context.Context) error
  23. RollbackTransaction(ctx context.Context) error
  24. }
  25. var ErrNotFound = errors.New("filer: no entry is found in filer store")
  26. type FilerStoreWrapper struct {
  27. actualStore FilerStore
  28. }
  29. func NewFilerStoreWrapper(store FilerStore) *FilerStoreWrapper {
  30. return &FilerStoreWrapper{
  31. actualStore: store,
  32. }
  33. }
  34. func (fsw *FilerStoreWrapper) GetName() string {
  35. return fsw.actualStore.GetName()
  36. }
  37. func (fsw *FilerStoreWrapper) Initialize(configuration util.Configuration) error {
  38. return fsw.actualStore.Initialize(configuration)
  39. }
  40. func (fsw *FilerStoreWrapper) InsertEntry(ctx context.Context, entry *Entry) error {
  41. stats.FilerStoreCounter.WithLabelValues(fsw.actualStore.GetName(), "insert").Inc()
  42. start := time.Now()
  43. defer func() { stats.FilerStoreHistogram.WithLabelValues(fsw.actualStore.GetName(), "insert").Observe(time.Since(start).Seconds()) }()
  44. filer_pb.BeforeEntrySerialization(entry.Chunks)
  45. return fsw.actualStore.InsertEntry(ctx, entry)
  46. }
  47. func (fsw *FilerStoreWrapper) UpdateEntry(ctx context.Context, entry *Entry) error {
  48. stats.FilerStoreCounter.WithLabelValues(fsw.actualStore.GetName(), "update").Inc()
  49. start := time.Now()
  50. defer func() { stats.FilerStoreHistogram.WithLabelValues(fsw.actualStore.GetName(), "update").Observe(time.Since(start).Seconds()) }()
  51. filer_pb.BeforeEntrySerialization(entry.Chunks)
  52. return fsw.actualStore.UpdateEntry(ctx, entry)
  53. }
  54. func (fsw *FilerStoreWrapper) FindEntry(ctx context.Context, fp FullPath) (entry *Entry, err error) {
  55. stats.FilerStoreCounter.WithLabelValues(fsw.actualStore.GetName(), "find").Inc()
  56. start := time.Now()
  57. defer func() { stats.FilerStoreHistogram.WithLabelValues(fsw.actualStore.GetName(), "find").Observe(time.Since(start).Seconds()) }()
  58. entry, err = fsw.actualStore.FindEntry(ctx, fp)
  59. if err != nil {
  60. return nil, err
  61. }
  62. filer_pb.AfterEntryDeserialization(entry.Chunks)
  63. return
  64. }
  65. func (fsw *FilerStoreWrapper) DeleteEntry(ctx context.Context, fp FullPath) (err error) {
  66. stats.FilerStoreCounter.WithLabelValues(fsw.actualStore.GetName(), "delete").Inc()
  67. start := time.Now()
  68. defer func() { stats.FilerStoreHistogram.WithLabelValues(fsw.actualStore.GetName(), "delete").Observe(time.Since(start).Seconds()) }()
  69. return fsw.actualStore.DeleteEntry(ctx, fp)
  70. }
  71. func (fsw *FilerStoreWrapper) ListDirectoryEntries(ctx context.Context, dirPath FullPath, startFileName string, includeStartFile bool, limit int) ([]*Entry, error) {
  72. stats.FilerStoreCounter.WithLabelValues(fsw.actualStore.GetName(), "list").Inc()
  73. start := time.Now()
  74. defer func() { stats.FilerStoreHistogram.WithLabelValues(fsw.actualStore.GetName(), "list").Observe(time.Since(start).Seconds()) }()
  75. entries, err := fsw.actualStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit)
  76. if err != nil {
  77. return nil, err
  78. }
  79. for _, entry := range entries {
  80. filer_pb.AfterEntryDeserialization(entry.Chunks)
  81. }
  82. return entries, err
  83. }
  84. func (fsw *FilerStoreWrapper) BeginTransaction(ctx context.Context) (context.Context, error) {
  85. return fsw.actualStore.BeginTransaction(ctx)
  86. }
  87. func (fsw *FilerStoreWrapper) CommitTransaction(ctx context.Context) error {
  88. return fsw.actualStore.CommitTransaction(ctx)
  89. }
  90. func (fsw *FilerStoreWrapper) RollbackTransaction(ctx context.Context) error {
  91. return fsw.actualStore.RollbackTransaction(ctx)
  92. }