diff --git a/go.mod b/go.mod index 3b0582c52..a0907ac00 100644 --- a/go.mod +++ b/go.mod @@ -58,7 +58,7 @@ require ( github.com/grpc-ecosystem/go-grpc-middleware v1.1.0 github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed // indirect github.com/hashicorp/errwrap v1.0.0 // indirect - github.com/hashicorp/go-multierror v1.0.0 // indirect + github.com/hashicorp/go-multierror v1.1.0 // indirect github.com/hashicorp/go-uuid v1.0.2 // indirect github.com/hashicorp/hcl v1.0.0 // indirect github.com/jcmturner/gofork v1.0.0 // indirect @@ -165,6 +165,7 @@ require ( require ( github.com/coreos/etcd v3.3.10+incompatible // indirect + github.com/go-redsync/redsync/v4 v4.4.1 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/jcmturner/aescts/v2 v2.0.0 // indirect github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect @@ -176,6 +177,7 @@ require ( github.com/miekg/dns v1.1.25-0.20191211073109-8ebf2e419df7 // indirect github.com/opentracing/opentracing-go v1.2.0 // indirect github.com/spf13/pflag v1.0.5 // indirect + github.com/stvp/tempredis v0.0.0-20181119212430-b82af8480203 // indirect go.etcd.io/etcd/api/v3 v3.5.0 // indirect go.etcd.io/etcd/client/pkg/v3 v3.5.0 // indirect go.etcd.io/etcd/client/v3 v3.5.0 // indirect diff --git a/go.sum b/go.sum index abd342f0c..232d788b3 100644 --- a/go.sum +++ b/go.sum @@ -342,8 +342,13 @@ github.com/go-playground/universal-translator v0.17.0/go.mod h1:UkSxE5sNxxRwHyU+ github.com/go-playground/validator/v10 v10.2.0 h1:KgJ0snyC2R9VXYN2rneOtQcw5aHQB1Vv0sFl1UcHBOY= github.com/go-playground/validator/v10 v10.2.0/go.mod h1:uOYAAleCW8F/7oMFd6aG0GOhaH6EGOAJShg8Id5JGkI= github.com/go-playground/validator/v10 v10.4.1/go.mod h1:nlOn6nFhuKACm19sB/8EGNn9GlaMV7XkbRSipzJ0Ii4= +github.com/go-redis/redis v6.15.9+incompatible/go.mod h1:NAIEuMOZ/fxfXJIrKDQDz8wamY7mA7PouImQ2Jvg6kA= +github.com/go-redis/redis/v7 v7.4.0/go.mod h1:JDNMw23GTyLNC4GZu9njt15ctBQVn7xjRfnwdHj/Dcg= +github.com/go-redis/redis/v8 v8.1.1/go.mod h1:ysgGY09J/QeDYbu3HikWEIPCwaeOkuNoTgKayTEaEOw= github.com/go-redis/redis/v8 v8.4.4 h1:fGqgxCTR1sydaKI00oQf3OmkU/DIe/I/fYXvGklCIuc= github.com/go-redis/redis/v8 v8.4.4/go.mod h1:nA0bQuF0i5JFx4Ta9RZxGKXFrQ8cRWntra97f0196iY= +github.com/go-redsync/redsync/v4 v4.4.1 h1:Z0AaOpoLvzfZwLK+3uCDHcTxOXck2juzumu1EPJwCUI= +github.com/go-redsync/redsync/v4 v4.4.1/go.mod h1:QBOJAs1k8O6Eyrre4a++pxQgHe5eQ+HF56KuTVv+8Bs= github.com/go-sql-driver/mysql v1.3.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w= github.com/go-sql-driver/mysql v1.4.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w= github.com/go-sql-driver/mysql v1.4.1/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w= @@ -441,6 +446,7 @@ github.com/golang/snappy v0.0.2-0.20190904063534-ff6b7dc882cf/go.mod h1:/XxbfmMg github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/gomodule/redigo v1.8.2/go.mod h1:P9dn9mFrCBvWhGE1wpxx6fgq7BAeLBk+UUUzlpkBYO0= github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/btree v1.0.0 h1:0udJVsspx3VBr5FwtLhQQtuAsVc79tTq0ocGIPAU6qo= github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= @@ -542,6 +548,8 @@ github.com/hashicorp/go-immutable-radix v1.0.0/go.mod h1:0y9vanUI8NX6FsYoO3zeMjh github.com/hashicorp/go-msgpack v0.5.3/go.mod h1:ahLV/dePpqEmjfWmKiqvPkv/twdG7iPBM1vqhUKIvfM= github.com/hashicorp/go-multierror v1.0.0 h1:iVjPR7a6H0tWELX5NxNe7bYopibicUzc7uPribsnS6o= github.com/hashicorp/go-multierror v1.0.0/go.mod h1:dHtQlpGsu+cZNNAkkCN/P3hoUDHhCYQXV3UM06sGGrk= +github.com/hashicorp/go-multierror v1.1.0 h1:B9UzwGQJehnUY1yNrnwREHc3fGbC2xefo8g4TbElacI= +github.com/hashicorp/go-multierror v1.1.0/go.mod h1:spPvp8C1qA32ftKqdAHm4hHTbPw+vmowP0z+KUhOZdA= github.com/hashicorp/go-rootcerts v1.0.0/go.mod h1:K6zTfqpRlCUIjkwsN4Z+hiSfzSTQa6eBIzfwKfwNnHU= github.com/hashicorp/go-sockaddr v1.0.0/go.mod h1:7Xibr9yA9JjQq1JpNB2Vw7kxv8xerXegt+ozgdvDeDU= github.com/hashicorp/go-syslog v1.0.0/go.mod h1:qPfqrKkXGihmCqbJM2mZgkZGvKG1dFdvsLplgctolz4= @@ -751,12 +759,16 @@ github.com/olivere/elastic/v7 v7.0.19 h1:w4F6JpqOISadhYf/n0NR1cNj73xHqh4pzPwD1Gk github.com/olivere/elastic/v7 v7.0.19/go.mod h1:4Jqt5xvjqpjCqgnTcHwl3j8TLs8mvoOK8NYgo/qEOu4= github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= +github.com/onsi/ginkgo v1.10.1/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.12.1/go.mod h1:zj2OWP4+oCPe1qIXoGWkgMRwljMUYCdkwsT2108oapk= +github.com/onsi/ginkgo v1.14.1/go.mod h1:iSB4RoI2tjJc9BBv4NKIKWKya62Rps+oPG/Lv9klQyY= github.com/onsi/ginkgo v1.14.2 h1:8mVmC9kjFFmA8H4pKMUhcblgifdkOIXPvbhN1T36q1M= github.com/onsi/ginkgo v1.14.2/go.mod h1:iSB4RoI2tjJc9BBv4NKIKWKya62Rps+oPG/Lv9klQyY= github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= +github.com/onsi/gomega v1.7.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY= github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo= +github.com/onsi/gomega v1.10.2/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo= github.com/onsi/gomega v1.10.4 h1:NiTx7EEvBzu9sFOD1zORteLSt3o8gnlvZZwSE9TnY9U= github.com/onsi/gomega v1.10.4/go.mod h1:g/HbgYopi++010VEqkFgJHKC09uJiW9UkXvMUuKHUCQ= github.com/op/go-logging v0.0.0-20160315200505-970db520ece7/go.mod h1:HzydrMdWErDVzsI23lYNej1Htcns9BCg93Dk0bBINWk= @@ -959,6 +971,8 @@ github.com/stretchr/testify v1.6.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/ github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stvp/tempredis v0.0.0-20181119212430-b82af8480203 h1:QVqDTf3h2WHt08YuiTGPZLls0Wq99X9bWd0Q5ZSBesM= +github.com/stvp/tempredis v0.0.0-20181119212430-b82af8480203/go.mod h1:oqN97ltKNihBbwlX8dLpwxCl3+HnXKV/R0e+sRLd9C8= github.com/swaggo/files v0.0.0-20190704085106-630677cd5c14/go.mod h1:gxQT6pBGRuIGunNf/+tSOB5OHvguWi8Tbt82WOkf35E= github.com/swaggo/gin-swagger v1.2.0/go.mod h1:qlH2+W7zXGZkczuL+r2nEBR2JTT+/lX05Nn6vPhc7OI= github.com/swaggo/http-swagger v0.0.0-20200308142732-58ac5e232fba/go.mod h1:O1lAbCgAAX/KZ80LM/OXwtWFI/5TvZlwxSg8Cq08PV0= @@ -1073,6 +1087,7 @@ go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.5/go.mod h1:5pWMHQbX5EPX2/62yrJeAkowc+lfs/XD7Uxpq3pI6kk= go.opencensus.io v0.23.0 h1:gqCw0LfLxScz8irSi8exQc7fyQ0fKQU/qnC/X8+V/1M= go.opencensus.io v0.23.0/go.mod h1:XItmlyltB5F7CS4xOC1DcqMoFqwtC6OG2xF7mCv7P7E= +go.opentelemetry.io/otel v0.11.0/go.mod h1:G8UCk+KooF2HLkgo8RHX9epABH/aRGYET7gQOqBVdB0= go.opentelemetry.io/otel v0.15.0 h1:CZFy2lPhxd4HlhZnYK8gRyDotksO3Ip9rBweY1vVYJw= go.opentelemetry.io/otel v0.15.0/go.mod h1:e4GKElweB8W2gWUqbghw0B8t5MCTccc9212eNHnOHwA= go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI= diff --git a/weed/filer/redis3/kv_directory_children.go b/weed/filer/redis3/kv_directory_children.go index 16d921d03..797e7797c 100644 --- a/weed/filer/redis3/kv_directory_children.go +++ b/weed/filer/redis3/kv_directory_children.go @@ -10,7 +10,18 @@ import ( const maxNameBatchSizeLimit = 1000 -func insertChild(ctx context.Context, client redis.UniversalClient, key string, name string) error { +func insertChild(ctx context.Context, redisStore *UniversalRedis3Store, key string, name string) error { + + // lock and unlock + mutex := redisStore.redsync.NewMutex(key+"lock") + if err := mutex.Lock(); err != nil { + return fmt.Errorf("lock %s: %v", key, err) + } + defer func() { + mutex.Unlock() + }() + + client := redisStore.Client data, err := client.Get(ctx, key).Result() if err != nil { if err != redis.Nil { @@ -20,11 +31,11 @@ func insertChild(ctx context.Context, client redis.UniversalClient, key string, store := newSkipListElementStore(key, client) nameList := skiplist.LoadNameList([]byte(data), store, maxNameBatchSizeLimit) - // println("add", key, name) if err := nameList.WriteName(name); err != nil { glog.Errorf("add %s %s: %v", key, name, err) return err } + if !nameList.HasChanges() { return nil } @@ -36,7 +47,16 @@ func insertChild(ctx context.Context, client redis.UniversalClient, key string, return nil } -func removeChild(ctx context.Context, client redis.UniversalClient, key string, name string) error { +func removeChild(ctx context.Context, redisStore *UniversalRedis3Store, key string, name string) error { + + // lock and unlock + mutex := redisStore.redsync.NewMutex(key+"lock") + if err := mutex.Lock(); err != nil { + return fmt.Errorf("lock %s: %v", key, err) + } + defer mutex.Unlock() + + client := redisStore.Client data, err := client.Get(ctx, key).Result() if err != nil { if err != redis.Nil { @@ -60,8 +80,16 @@ func removeChild(ctx context.Context, client redis.UniversalClient, key string, return nil } -func removeChildren(ctx context.Context, client redis.UniversalClient, key string, onDeleteFn func(name string) error) error { +func removeChildren(ctx context.Context, redisStore *UniversalRedis3Store, key string, onDeleteFn func(name string) error) error { + + // lock and unlock + mutex := redisStore.redsync.NewMutex(key+"lock") + if err := mutex.Lock(); err != nil { + return fmt.Errorf("lock %s: %v", key, err) + } + defer mutex.Unlock() + client := redisStore.Client data, err := client.Get(ctx, key).Result() if err != nil { if err != redis.Nil { @@ -84,13 +112,13 @@ func removeChildren(ctx context.Context, client redis.UniversalClient, key strin if err = nameList.RemoteAllListElement(); err != nil { return err } - + return nil } -func listChildren(ctx context.Context, client redis.UniversalClient, key string, startFileName string, eachFn func(name string) bool) error { - +func listChildren(ctx context.Context, redisStore *UniversalRedis3Store, key string, startFileName string, eachFn func(name string) bool) error { + client := redisStore.Client data, err := client.Get(ctx, key).Result() if err != nil { if err != redis.Nil { diff --git a/weed/filer/redis3/kv_directory_children_test.go b/weed/filer/redis3/kv_directory_children_test.go new file mode 100644 index 000000000..7c086bdfb --- /dev/null +++ b/weed/filer/redis3/kv_directory_children_test.go @@ -0,0 +1,75 @@ +package redis3 + +import ( + "github.com/chrislusf/seaweedfs/weed/util/skiplist" + goredislib "github.com/go-redis/redis/v8" + "github.com/stvp/tempredis" + "testing" +) + +var names = []string{ + "cassandra.in.sh", + "cassandra", + "debug-cql.bat", + "nodetool", + "nodetool.bat", + "source-conf.ps1", + "sstableloader", + "sstableloader.bat", + "sstablescrub", + "sstablescrub.bat", + "sstableupgrade", + "sstableupgrade.bat", + "sstableutil", + "sstableutil.bat", + "sstableverify", + "sstableverify.bat", + "stop-server", + "stop-server.bat", + "stop-server.ps1", + "cassandra.in.bat", + "cqlsh.py", + "cqlsh", + "cassandra.ps1", + "cqlsh.bat", + "debug-cql", + "cassandra.bat", +} + +func TestNameList(t *testing.T) { + server, err := tempredis.Start(tempredis.Config{}) + if err != nil { + panic(err) + } + defer server.Term() + + client := goredislib.NewClient(&goredislib.Options{ + Network: "unix", + Addr: server.Socket(), + }) + + store := newSkipListElementStore("/yyy/bin", client) + var data []byte + for _, name := range names { + nameList := skiplist.LoadNameList(data, store, maxNameBatchSizeLimit) + nameList.WriteName(name) + + nameList.ListNames("", func(name string) bool { + println(" * ", name) + return true + }) + + if nameList.HasChanges() { + println("has some changes") + data = nameList.ToBytes() + } + println() + } + + nameList := skiplist.LoadNameList(data, store, maxNameBatchSizeLimit) + nameList.ListNames("", func(name string) bool { + println(name) + return true + }) + +} \ No newline at end of file diff --git a/weed/filer/redis3/redis_cluster_store.go b/weed/filer/redis3/redis_cluster_store.go index e0c620450..73fc0dd20 100644 --- a/weed/filer/redis3/redis_cluster_store.go +++ b/weed/filer/redis3/redis_cluster_store.go @@ -4,6 +4,8 @@ import ( "github.com/chrislusf/seaweedfs/weed/filer" "github.com/chrislusf/seaweedfs/weed/util" "github.com/go-redis/redis/v8" + "github.com/go-redsync/redsync/v4" + "github.com/go-redsync/redsync/v4/redis/goredis/v8" ) func init() { @@ -38,5 +40,6 @@ func (store *RedisCluster3Store) initialize(addresses []string, password string, ReadOnly: readOnly, RouteByLatency: routeByLatency, }) + store.redsync = redsync.New(goredis.NewPool(store.Client)) return } diff --git a/weed/filer/redis3/redis_store.go b/weed/filer/redis3/redis_store.go index fdbf994ec..2eec3d37a 100644 --- a/weed/filer/redis3/redis_store.go +++ b/weed/filer/redis3/redis_store.go @@ -4,6 +4,8 @@ import ( "github.com/chrislusf/seaweedfs/weed/filer" "github.com/chrislusf/seaweedfs/weed/util" "github.com/go-redis/redis/v8" + "github.com/go-redsync/redsync/v4" + "github.com/go-redsync/redsync/v4/redis/goredis/v8" ) func init() { @@ -32,5 +34,6 @@ func (store *Redis3Store) initialize(hostPort string, password string, database Password: password, DB: database, }) + store.redsync = redsync.New(goredis.NewPool(store.Client)) return } diff --git a/weed/filer/redis3/universal_redis_store.go b/weed/filer/redis3/universal_redis_store.go index 8a89e7c48..f04ee493d 100644 --- a/weed/filer/redis3/universal_redis_store.go +++ b/weed/filer/redis3/universal_redis_store.go @@ -3,6 +3,7 @@ package redis3 import ( "context" "fmt" + "github.com/go-redsync/redsync/v4" "time" "github.com/go-redis/redis/v8" @@ -18,7 +19,8 @@ const ( ) type UniversalRedis3Store struct { - Client redis.UniversalClient + Client redis.UniversalClient + redsync *redsync.Redsync } func (store *UniversalRedis3Store) BeginTransaction(ctx context.Context) (context.Context, error) { @@ -49,7 +51,7 @@ func (store *UniversalRedis3Store) InsertEntry(ctx context.Context, entry *filer dir, name := entry.FullPath.DirAndName() if name != "" { - if err = insertChild(ctx, store.Client, genDirectoryListKey(dir), name); err != nil { + if err = insertChild(ctx, store, genDirectoryListKey(dir), name); err != nil { return fmt.Errorf("persisting %s in parent dir: %v", entry.FullPath, err) } } @@ -99,7 +101,7 @@ func (store *UniversalRedis3Store) DeleteEntry(ctx context.Context, fullpath uti dir, name := fullpath.DirAndName() if name != "" { - if err = removeChild(ctx, store.Client, genDirectoryListKey(dir), name); err != nil { + if err = removeChild(ctx, store, genDirectoryListKey(dir), name); err != nil { return fmt.Errorf("DeleteEntry %s in parent dir: %v", fullpath, err) } } @@ -109,7 +111,7 @@ func (store *UniversalRedis3Store) DeleteEntry(ctx context.Context, fullpath uti func (store *UniversalRedis3Store) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath) (err error) { - return removeChildren(ctx, store.Client, genDirectoryListKey(string(fullpath)), func(name string) error { + return removeChildren(ctx, store, genDirectoryListKey(string(fullpath)), func(name string) error { path := util.NewFullPath(string(fullpath), name) _, err = store.Client.Del(ctx, string(path)).Result() if err != nil { @@ -131,7 +133,7 @@ func (store *UniversalRedis3Store) ListDirectoryEntries(ctx context.Context, dir dirListKey := genDirectoryListKey(string(dirPath)) counter := int64(0) - err = listChildren(ctx, store.Client, dirListKey, startFileName, func(fileName string) bool { + err = listChildren(ctx, store, dirListKey, startFileName, func(fileName string) bool { if startFileName != "" { if !includeStartFile && startFileName == fileName { return true diff --git a/weed/util/skiplist/name_list.go b/weed/util/skiplist/name_list.go index 4ba26665a..c5cbf3f87 100644 --- a/weed/util/skiplist/name_list.go +++ b/weed/util/skiplist/name_list.go @@ -300,7 +300,7 @@ func (nl *NameList) ListNames(startFrom string, visitNamesFn func(name string) b } } - return nil + return nil } func (nl *NameList) RemoteAllListElement() error {