|
|
@ -2,7 +2,12 @@ package mongodb |
|
|
|
|
|
|
|
import ( |
|
|
|
"context" |
|
|
|
"crypto/tls" |
|
|
|
"crypto/x509" |
|
|
|
"fmt" |
|
|
|
"os" |
|
|
|
"time" |
|
|
|
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/filer" |
|
|
|
"github.com/seaweedfs/seaweedfs/weed/glog" |
|
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" |
|
|
@ -10,7 +15,6 @@ import ( |
|
|
|
"go.mongodb.org/mongo-driver/bson" |
|
|
|
"go.mongodb.org/mongo-driver/mongo" |
|
|
|
"go.mongodb.org/mongo-driver/mongo/options" |
|
|
|
"time" |
|
|
|
) |
|
|
|
|
|
|
|
func init() { |
|
|
@ -37,17 +41,43 @@ func (store *MongodbStore) Initialize(configuration util.Configuration, prefix s |
|
|
|
store.database = configuration.GetString(prefix + "database") |
|
|
|
store.collectionName = "filemeta" |
|
|
|
poolSize := configuration.GetInt(prefix + "option_pool_size") |
|
|
|
return store.connection(configuration.GetString(prefix+"uri"), uint64(poolSize)) |
|
|
|
uri := configuration.GetString(prefix + "uri") |
|
|
|
ssl := configuration.GetBool(prefix + "ssl") |
|
|
|
sslCAFile := configuration.GetString(prefix + "ssl_ca_file") |
|
|
|
sslCertFile := configuration.GetString(prefix + "ssl_cert_file") |
|
|
|
sslKeyFile := configuration.GetString(prefix + "ssl_key_file") |
|
|
|
username := configuration.GetString(prefix + "username") |
|
|
|
password := configuration.GetString(prefix + "password") |
|
|
|
|
|
|
|
return store.connection(uri, uint64(poolSize), ssl, sslCAFile, sslCertFile, sslKeyFile, username, password) |
|
|
|
} |
|
|
|
|
|
|
|
func (store *MongodbStore) connection(uri string, poolSize uint64) (err error) { |
|
|
|
ctx, _ := context.WithTimeout(context.Background(), 10*time.Second) |
|
|
|
func (store *MongodbStore) connection(uri string, poolSize uint64, ssl bool, sslCAFile, sslCertFile, sslKeyFile string, username, password string) (err error) { |
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) |
|
|
|
defer cancel() |
|
|
|
|
|
|
|
opts := options.Client().ApplyURI(uri) |
|
|
|
|
|
|
|
if poolSize > 0 { |
|
|
|
opts.SetMaxPoolSize(poolSize) |
|
|
|
} |
|
|
|
|
|
|
|
if ssl { |
|
|
|
tlsConfig, err := configureTLS(sslCAFile, sslCertFile, sslKeyFile) |
|
|
|
if err != nil { |
|
|
|
return err |
|
|
|
} |
|
|
|
opts.SetTLSConfig(tlsConfig) |
|
|
|
} |
|
|
|
|
|
|
|
if username != "" && password != "" { |
|
|
|
creds := options.Credential{ |
|
|
|
Username: username, |
|
|
|
Password: password, |
|
|
|
} |
|
|
|
opts.SetAuth(creds) |
|
|
|
} |
|
|
|
|
|
|
|
client, err := mongo.Connect(ctx, opts) |
|
|
|
if err != nil { |
|
|
|
return err |
|
|
@ -55,10 +85,36 @@ func (store *MongodbStore) connection(uri string, poolSize uint64) (err error) { |
|
|
|
|
|
|
|
c := client.Database(store.database).Collection(store.collectionName) |
|
|
|
err = store.indexUnique(c) |
|
|
|
|
|
|
|
store.connect = client |
|
|
|
return err |
|
|
|
} |
|
|
|
|
|
|
|
func configureTLS(caFile, certFile, keyFile string) (*tls.Config, error) { |
|
|
|
cert, err := tls.LoadX509KeyPair(certFile, keyFile) |
|
|
|
if err != nil { |
|
|
|
return nil, fmt.Errorf("could not load client key pair: %s", err) |
|
|
|
} |
|
|
|
|
|
|
|
caCert, err := os.ReadFile(caFile) |
|
|
|
if err != nil { |
|
|
|
return nil, fmt.Errorf("could not read CA certificate: %s", err) |
|
|
|
} |
|
|
|
|
|
|
|
caCertPool := x509.NewCertPool() |
|
|
|
if !caCertPool.AppendCertsFromPEM(caCert) { |
|
|
|
return nil, fmt.Errorf("failed to append CA certificate") |
|
|
|
} |
|
|
|
|
|
|
|
tlsConfig := &tls.Config{ |
|
|
|
Certificates: []tls.Certificate{cert}, |
|
|
|
RootCAs: caCertPool, |
|
|
|
InsecureSkipVerify: true, |
|
|
|
} |
|
|
|
|
|
|
|
return tlsConfig, nil |
|
|
|
} |
|
|
|
|
|
|
|
func (store *MongodbStore) createIndex(c *mongo.Collection, index mongo.IndexModel, opts *options.CreateIndexesOptions) error { |
|
|
|
_, err := c.Indexes().CreateOne(context.Background(), index, opts) |
|
|
|
return err |
|
|
@ -93,13 +149,10 @@ func (store *MongodbStore) RollbackTransaction(ctx context.Context) error { |
|
|
|
} |
|
|
|
|
|
|
|
func (store *MongodbStore) InsertEntry(ctx context.Context, entry *filer.Entry) (err error) { |
|
|
|
|
|
|
|
return store.UpdateEntry(ctx, entry) |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
func (store *MongodbStore) UpdateEntry(ctx context.Context, entry *filer.Entry) (err error) { |
|
|
|
|
|
|
|
dir, name := entry.FullPath.DirAndName() |
|
|
|
meta, err := entry.EncodeAttributesAndChunks() |
|
|
|
if err != nil { |
|
|
@ -126,7 +179,6 @@ func (store *MongodbStore) UpdateEntry(ctx context.Context, entry *filer.Entry) |
|
|
|
} |
|
|
|
|
|
|
|
func (store *MongodbStore) FindEntry(ctx context.Context, fullpath util.FullPath) (entry *filer.Entry, err error) { |
|
|
|
|
|
|
|
dir, name := fullpath.DirAndName() |
|
|
|
var data Model |
|
|
|
|
|
|
@ -154,7 +206,6 @@ func (store *MongodbStore) FindEntry(ctx context.Context, fullpath util.FullPath |
|
|
|
} |
|
|
|
|
|
|
|
func (store *MongodbStore) DeleteEntry(ctx context.Context, fullpath util.FullPath) error { |
|
|
|
|
|
|
|
dir, name := fullpath.DirAndName() |
|
|
|
|
|
|
|
where := bson.M{"directory": dir, "name": name} |
|
|
@ -167,7 +218,6 @@ func (store *MongodbStore) DeleteEntry(ctx context.Context, fullpath util.FullPa |
|
|
|
} |
|
|
|
|
|
|
|
func (store *MongodbStore) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath) error { |
|
|
|
|
|
|
|
where := bson.M{"directory": fullpath} |
|
|
|
_, err := store.connect.Database(store.database).Collection(store.collectionName).DeleteMany(ctx, where) |
|
|
|
if err != nil { |
|
|
@ -182,7 +232,6 @@ func (store *MongodbStore) ListDirectoryPrefixedEntries(ctx context.Context, dir |
|
|
|
} |
|
|
|
|
|
|
|
func (store *MongodbStore) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) { |
|
|
|
|
|
|
|
var where = bson.M{"directory": string(dirPath), "name": bson.M{"$gt": startFileName}} |
|
|
|
if includeStartFile { |
|
|
|
where["name"] = bson.M{ |
|
|
|