You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

166 lines
4.4 KiB

  1. package mongodb
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/chrislusf/seaweedfs/weed/filer2"
  6. "github.com/chrislusf/seaweedfs/weed/glog"
  7. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  8. "github.com/chrislusf/seaweedfs/weed/util"
  9. "go.mongodb.org/mongo-driver/bson"
  10. "go.mongodb.org/mongo-driver/mongo"
  11. "go.mongodb.org/mongo-driver/mongo/options"
  12. "time"
  13. )
  14. func init() {
  15. filer2.Stores = append(filer2.Stores, &MongodbStore{})
  16. }
  17. type MongodbStore struct {
  18. connect *mongo.Client
  19. database string
  20. collectionName string
  21. }
  22. type Model struct {
  23. Directory string `bson:"directory"`
  24. Name string `bson:"name"`
  25. Meta []byte `bson:"meta"`
  26. }
  27. func (store *MongodbStore) GetName() string {
  28. return "mongodb"
  29. }
  30. func (store *MongodbStore) Initialize(configuration util.Configuration, prefix string) (err error) {
  31. store.database = configuration.GetString(prefix + "database")
  32. store.collectionName = "filemeta"
  33. return store.connection(configuration.GetString(prefix + "uri"))
  34. }
  35. func (store *MongodbStore) connection(uri string) (err error) {
  36. ctx, _ := context.WithTimeout(context.Background(), 10*time.Second)
  37. client, err := mongo.Connect(ctx, options.Client().ApplyURI(uri))
  38. store.connect = client
  39. return err
  40. }
  41. func (store *MongodbStore) BeginTransaction(ctx context.Context) (context.Context, error) {
  42. return ctx, nil
  43. }
  44. func (store *MongodbStore) CommitTransaction(ctx context.Context) error {
  45. return nil
  46. }
  47. func (store *MongodbStore) RollbackTransaction(ctx context.Context) error {
  48. return nil
  49. }
  50. func (store *MongodbStore) InsertEntry(ctx context.Context, entry *filer2.Entry) (err error) {
  51. dir, name := entry.FullPath.DirAndName()
  52. meta, err := entry.EncodeAttributesAndChunks()
  53. if err != nil {
  54. return fmt.Errorf("encode %s: %s", entry.FullPath, err)
  55. }
  56. c := store.connect.Database(store.database).Collection(store.collectionName)
  57. _, err = c.InsertOne(ctx, Model{
  58. Directory: dir,
  59. Name: name,
  60. Meta: meta,
  61. })
  62. return nil
  63. }
  64. func (store *MongodbStore) UpdateEntry(ctx context.Context, entry *filer2.Entry) (err error) {
  65. return store.UpdateEntry(ctx, entry)
  66. }
  67. func (store *MongodbStore) FindEntry(ctx context.Context, fullpath util.FullPath) (entry *filer2.Entry, err error) {
  68. dir, name := fullpath.DirAndName()
  69. var data Model
  70. var where = bson.M{"directory": dir, "name": name}
  71. err = store.connect.Database(store.database).Collection(store.collectionName).FindOne(ctx, where).Decode(&data)
  72. if err != mongo.ErrNoDocuments && err != nil {
  73. return nil, filer_pb.ErrNotFound
  74. }
  75. if len(data.Meta) == 0 {
  76. return nil, filer_pb.ErrNotFound
  77. }
  78. entry = &filer2.Entry{
  79. FullPath: fullpath,
  80. }
  81. err = entry.DecodeAttributesAndChunks(data.Meta)
  82. if err != nil {
  83. return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err)
  84. }
  85. return entry, nil
  86. }
  87. func (store *MongodbStore) DeleteEntry(ctx context.Context, fullpath util.FullPath) error {
  88. where := bson.M{"directory": fullpath}
  89. _, err := store.connect.Database(store.database).Collection(store.collectionName).DeleteOne(ctx, where)
  90. if err != nil {
  91. return fmt.Errorf("delete %s : %v", fullpath, err)
  92. }
  93. return nil
  94. }
  95. func (store *MongodbStore) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath) error {
  96. return nil
  97. }
  98. func (store *MongodbStore) ListDirectoryEntries(ctx context.Context, fullpath util.FullPath, startFileName string, inclusive bool, limit int) (entries []*filer2.Entry, err error) {
  99. var where = bson.M{"directory": string(fullpath), "name": bson.M{"$gt": startFileName,}}
  100. if inclusive {
  101. where["name"] = bson.M{
  102. "$gte": startFileName,
  103. }
  104. }
  105. optLimit := int64(limit)
  106. cur, err := store.connect.Database(store.database).Collection(store.collectionName).Find(ctx, where, &options.FindOptions{Limit: &optLimit})
  107. for cur.Next(ctx) {
  108. var data Model
  109. err := cur.Decode(&data)
  110. if err != nil && err != mongo.ErrNoDocuments {
  111. return nil, err
  112. }
  113. entry := &filer2.Entry{
  114. FullPath: util.NewFullPath(string(fullpath), data.Name),
  115. }
  116. if decodeErr := entry.DecodeAttributesAndChunks(data.Meta); decodeErr != nil {
  117. err = decodeErr
  118. glog.V(0).Infof("list %s : %v", entry.FullPath, err)
  119. break
  120. }
  121. entries = append(entries, entry)
  122. }
  123. if err := cur.Close(ctx); err != nil {
  124. glog.V(0).Infof("list iterator close: %v", err)
  125. }
  126. return entries, err
  127. }
  128. func (store *MongodbStore) Shutdown() {
  129. ctx, _ := context.WithTimeout(context.Background(), 10*time.Second)
  130. store.connect.Disconnect(ctx)
  131. }