diff --git a/weed/command/imports.go b/weed/command/imports.go index a2f59189f..48cda5f90 100644 --- a/weed/command/imports.go +++ b/weed/command/imports.go @@ -29,6 +29,7 @@ import ( _ "github.com/chrislusf/seaweedfs/weed/filer/postgres2" _ "github.com/chrislusf/seaweedfs/weed/filer/redis" _ "github.com/chrislusf/seaweedfs/weed/filer/redis2" + _ "github.com/chrislusf/seaweedfs/weed/filer/redis3" _ "github.com/chrislusf/seaweedfs/weed/filer/sqlite" _ "github.com/chrislusf/seaweedfs/weed/filer/tikv" ) diff --git a/weed/filer.toml b/weed/filer.toml new file mode 100644 index 000000000..a0af38d95 --- /dev/null +++ b/weed/filer.toml @@ -0,0 +1,5 @@ +[redis3] +enabled = true +address = "localhost:6379" +password = "" +database = 0 diff --git a/weed/filer/redis3/kv_directory_children.go b/weed/filer/redis3/kv_directory_children.go index f3152c970..5465a833d 100644 --- a/weed/filer/redis3/kv_directory_children.go +++ b/weed/filer/redis3/kv_directory_children.go @@ -3,11 +3,13 @@ package redis3 import ( "context" "fmt" - "github.com/chrislusf/seaweedfs/weed/util/bptree" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/util/skiplist" "github.com/go-redis/redis/v8" - "github.com/golang/protobuf/proto" ) +const maxNameBatchSizeLimit = 5 + func insertChild(ctx context.Context, client redis.UniversalClient, key string, name string) error { data, err := client.Get(ctx, key).Result() if err != nil { @@ -15,12 +17,22 @@ func insertChild(ctx context.Context, client redis.UniversalClient, key string, return fmt.Errorf("read %s: %v", key, err) } } - rootNode := &bptree.ProtoNode{} - if err := proto.UnmarshalMerge([]byte(data), rootNode); err != nil { - return fmt.Errorf("decoding root for %s: %v", key, err) + store := newSkipListElementStore(key, client) + nameList := skiplist.LoadNameList([]byte(data), store, maxNameBatchSizeLimit) + + // println("add", key, name) + if err := nameList.WriteName(name); err != nil { + glog.Errorf("add %s %s: %v", key, name, err) + return err + } + if !nameList.HasChanges() { + return nil + } + + if err := client.Set(ctx, key, nameList.ToBytes(), 0).Err(); err != nil { + return err } - tree := rootNode.ToBpTree() - tree.Add(bptree.String(name), nil) + return nil } @@ -31,19 +43,69 @@ func removeChild(ctx context.Context, client redis.UniversalClient, key string, return fmt.Errorf("read %s: %v", key, err) } } - rootNode := &bptree.ProtoNode{} - if err := proto.UnmarshalMerge([]byte(data), rootNode); err != nil { - return fmt.Errorf("decoding root for %s: %v", key, err) + store := newSkipListElementStore(key, client) + nameList := skiplist.LoadNameList([]byte(data), store, maxNameBatchSizeLimit) + + if err := nameList.DeleteName(name); err != nil { + return err + } + if !nameList.HasChanges() { + return nil + } + + if err := client.Set(ctx, key, nameList.ToBytes(), 0).Err(); err != nil { + return err } - tree := rootNode.ToBpTree() - tree.Add(bptree.String(name), nil) + return nil } func removeChildren(ctx context.Context, client redis.UniversalClient, key string, onDeleteFn func(name string) error) error { + + data, err := client.Get(ctx, key).Result() + if err != nil { + if err != redis.Nil { + return fmt.Errorf("read %s: %v", key, err) + } + } + store := newSkipListElementStore(key, client) + nameList := skiplist.LoadNameList([]byte(data), store, maxNameBatchSizeLimit) + + if err = nameList.ListNames("", func(name string) bool { + if err := onDeleteFn(name); err != nil { + glog.Errorf("delete %s child %s: %v", key, name, err) + return false + } + return true + }); err != nil { + return err + } + + if err = nameList.RemoteAllListElement(); err != nil { + return err + } + return nil + } -func iterateChildren(ctx context.Context, client redis.UniversalClient, key string, eachFn func(name string) error) error { +func listChildren(ctx context.Context, client redis.UniversalClient, key string, startFileName string, eachFn func(name string) bool) error { + + data, err := client.Get(ctx, key).Result() + if err != nil { + if err != redis.Nil { + return fmt.Errorf("read %s: %v", key, err) + } + } + store := newSkipListElementStore(key, client) + nameList := skiplist.LoadNameList([]byte(data), store, maxNameBatchSizeLimit) + + if err = nameList.ListNames(startFileName, func(name string) bool { + return eachFn(name) + }); err != nil { + return err + } + return nil + } diff --git a/weed/filer/redis3/skiplist_element_store.go b/weed/filer/redis3/skiplist_element_store.go new file mode 100644 index 000000000..fa13d35e9 --- /dev/null +++ b/weed/filer/redis3/skiplist_element_store.go @@ -0,0 +1,52 @@ +package redis3 + +import ( + "context" + "fmt" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/util/skiplist" + "github.com/go-redis/redis/v8" + "github.com/golang/protobuf/proto" +) + +type SkipListElementStore struct { + prefix string + client redis.UniversalClient +} + +var _ = skiplist.ListStore(&SkipListElementStore{}) + +func newSkipListElementStore(prefix string, client redis.UniversalClient) *SkipListElementStore { + return &SkipListElementStore{ + prefix: prefix, + client: client, + } +} + +func (m *SkipListElementStore) SaveElement(id int64, element *skiplist.SkipListElement) error { + key := fmt.Sprintf("%s%d", m.prefix, id) + data, err := proto.Marshal(element) + if err != nil { + glog.Errorf("marshal %s: %v", key, err) + } + return m.client.Set(context.Background(), key, data, 0).Err() +} + +func (m *SkipListElementStore) DeleteElement(id int64) error { + key := fmt.Sprintf("%s%d", m.prefix, id) + return m.client.Del(context.Background(), key).Err() +} + +func (m *SkipListElementStore) LoadElement(id int64) (*skiplist.SkipListElement, error) { + key := fmt.Sprintf("%s%d", m.prefix, id) + data, err := m.client.Get(context.Background(), key).Result() + if err != nil { + if err == redis.Nil { + return nil, nil + } + return nil, err + } + t := &skiplist.SkipListElement{} + err = proto.Unmarshal([]byte(data), t) + return t, err +} diff --git a/weed/filer/redis3/universal_redis_store.go b/weed/filer/redis3/universal_redis_store.go index 958338afe..8a89e7c48 100644 --- a/weed/filer/redis3/universal_redis_store.go +++ b/weed/filer/redis3/universal_redis_store.go @@ -115,6 +115,8 @@ func (store *UniversalRedis3Store) DeleteFolderChildren(ctx context.Context, ful if err != nil { return fmt.Errorf("DeleteFolderChildren %s in parent dir: %v", fullpath, err) } + // not efficient, but need to remove if it is a directory + store.Client.Del(ctx, genDirectoryListKey(string(path))) return nil }) @@ -127,41 +129,41 @@ func (store *UniversalRedis3Store) ListDirectoryPrefixedEntries(ctx context.Cont func (store *UniversalRedis3Store) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) { dirListKey := genDirectoryListKey(string(dirPath)) - start := int64(0) - if startFileName != "" { - start, _ = store.Client.ZRank(ctx, dirListKey, startFileName).Result() - if !includeStartFile { - start++ + counter := int64(0) + + err = listChildren(ctx, store.Client, dirListKey, startFileName, func(fileName string) bool { + if startFileName != "" { + if !includeStartFile && startFileName == fileName { + return true + } } - } - members, err := store.Client.ZRange(ctx, dirListKey, start, start+int64(limit)-1).Result() - if err != nil { - return lastFileName, fmt.Errorf("list %s : %v", dirPath, err) - } - // fetch entry meta - for _, fileName := range members { path := util.NewFullPath(string(dirPath), fileName) entry, err := store.FindEntry(ctx, path) lastFileName = fileName if err != nil { glog.V(0).Infof("list %s : %v", path, err) if err == filer_pb.ErrNotFound { - continue + return true } } else { if entry.TtlSec > 0 { if entry.Attr.Crtime.Add(time.Duration(entry.TtlSec) * time.Second).Before(time.Now()) { store.Client.Del(ctx, string(path)).Result() store.Client.ZRem(ctx, dirListKey, fileName).Result() - continue + return true } } + counter++ if !eachEntryFunc(entry) { - break + return false + } + if counter >= limit { + return false } } - } + return true + }) return lastFileName, err } diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go index b886bf641..aa66b4187 100644 --- a/weed/server/filer_server.go +++ b/weed/server/filer_server.go @@ -34,6 +34,7 @@ import ( _ "github.com/chrislusf/seaweedfs/weed/filer/postgres2" _ "github.com/chrislusf/seaweedfs/weed/filer/redis" _ "github.com/chrislusf/seaweedfs/weed/filer/redis2" + _ "github.com/chrislusf/seaweedfs/weed/filer/redis3" _ "github.com/chrislusf/seaweedfs/weed/filer/sqlite" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/notification" diff --git a/weed/util/skiplist/name_batch.go b/weed/util/skiplist/name_batch.go index 18427d341..71e5aeeba 100644 --- a/weed/util/skiplist/name_batch.go +++ b/weed/util/skiplist/name_batch.go @@ -35,7 +35,7 @@ func (nb *NameBatch) DeleteName(name string) { } func (nb *NameBatch) ListNames(startFrom string, visitNamesFn func(name string) bool) bool { var names []string - needFilter := startFrom == "" + needFilter := startFrom != "" for n := range nb.names { if !needFilter || strings.Compare(n, startFrom) >= 0 { names = append(names, n) diff --git a/weed/util/skiplist/name_list.go b/weed/util/skiplist/name_list.go index db328afba..4ba26665a 100644 --- a/weed/util/skiplist/name_list.go +++ b/weed/util/skiplist/name_list.go @@ -9,7 +9,7 @@ type NameList struct { batchSize int } -func NewNameList(store ListStore, batchSize int) *NameList { +func newNameList(store ListStore, batchSize int) *NameList { return &NameList{ skipList: New(store), batchSize: batchSize, @@ -59,6 +59,7 @@ There are multiple cases after finding the name for greater or equal node */ func (nl *NameList) WriteName(name string) error { + lookupKey := []byte(name) prevNode, nextNode, found, err := nl.skipList.FindGreaterOrEqual(lookupKey) if err != nil { @@ -301,3 +302,25 @@ func (nl *NameList) ListNames(startFrom string, visitNamesFn func(name string) b return nil } + +func (nl *NameList) RemoteAllListElement() error { + + t := nl.skipList + + nodeRef := t.startLevels[0] + for nodeRef != nil { + node, err := t.loadElement(nodeRef) + if err != nil { + return err + } + if node == nil { + return nil + } + if err := t.deleteElement(node); err != nil { + return err + } + nodeRef = node.Next[0] + } + return nil + +} \ No newline at end of file diff --git a/weed/util/skiplist/name_list_serde.go b/weed/util/skiplist/name_list_serde.go new file mode 100644 index 000000000..be9f06698 --- /dev/null +++ b/weed/util/skiplist/name_list_serde.go @@ -0,0 +1,71 @@ +package skiplist + +import ( + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/golang/protobuf/proto" +) + +func LoadNameList(data []byte, store ListStore, batchSize int) *NameList { + + nl := &NameList{ + skipList: New(store), + batchSize: batchSize, + } + + if len(data) == 0 { + return nl + } + + message := &SkipListProto{} + if err := proto.Unmarshal(data, message); err != nil { + glog.Errorf("loading skiplist: %v", err) + } + nl.skipList.maxNewLevel = int(message.MaxNewLevel) + nl.skipList.maxLevel = int(message.MaxLevel) + for i, ref := range message.StartLevels { + nl.skipList.startLevels[i] = &SkipListElementReference{ + ElementPointer: ref.ElementPointer, + Key: ref.Key, + } + } + for i, ref := range message.EndLevels { + nl.skipList.endLevels[i] = &SkipListElementReference{ + ElementPointer: ref.ElementPointer, + Key: ref.Key, + } + } + return nl +} + +func (nl *NameList) HasChanges() bool { + return nl.skipList.hasChanges +} + +func (nl *NameList) ToBytes() []byte { + message := &SkipListProto{} + message.MaxNewLevel = int32(nl.skipList.maxNewLevel) + message.MaxLevel = int32(nl.skipList.maxLevel) + for _, ref := range nl.skipList.startLevels { + if ref == nil { + break + } + message.StartLevels = append(message.StartLevels, &SkipListElementReference{ + ElementPointer: ref.ElementPointer, + Key: ref.Key, + }) + } + for _, ref := range nl.skipList.endLevels { + if ref == nil { + break + } + message.EndLevels = append(message.EndLevels, &SkipListElementReference{ + ElementPointer: ref.ElementPointer, + Key: ref.Key, + }) + } + data, err := proto.Marshal(message) + if err != nil { + glog.Errorf("marshal skiplist: %v", err) + } + return data +} \ No newline at end of file diff --git a/weed/util/skiplist/name_list_test.go b/weed/util/skiplist/name_list_test.go index 811a101f2..b3a686553 100644 --- a/weed/util/skiplist/name_list_test.go +++ b/weed/util/skiplist/name_list_test.go @@ -15,7 +15,7 @@ func String(x int) string { } func TestNameList(t *testing.T) { - list := NewNameList(memStore, 7) + list := newNameList(memStore, 7) for i := 0; i < maxNameCount; i++ { list.WriteName(String(i)) @@ -51,7 +51,7 @@ func TestNameList(t *testing.T) { } // randomized deletion - list = NewNameList(memStore, 7) + list = newNameList(memStore, 7) // Delete elements at random positions in the list. rList := rand.Perm(maxN) for _, i := range rList { diff --git a/weed/util/skiplist/skiplist.go b/weed/util/skiplist/skiplist.go index b48a05b4a..52e6c606a 100644 --- a/weed/util/skiplist/skiplist.go +++ b/weed/util/skiplist/skiplist.go @@ -22,6 +22,7 @@ type SkipList struct { maxNewLevel int maxLevel int listStore ListStore + hasChanges bool // elementCount int } @@ -93,6 +94,9 @@ func (t *SkipList) findExtended(key []byte, findGreaterOrEqual bool) (prevElemen if err != nil { return } + if currentNode == nil { + return + } // In case, that our first element is already greater-or-equal! if findGreaterOrEqual && compareElement(currentNode, key) > 0 { @@ -115,6 +119,9 @@ func (t *SkipList) findExtended(key []byte, findGreaterOrEqual bool) (prevElemen if err != nil { return } + if currentNode == nil { + return + } } else { if index > 0 { @@ -126,6 +133,9 @@ func (t *SkipList) findExtended(key []byte, findGreaterOrEqual bool) (prevElemen if err != nil { return } + if currentNodeNext == nil { + return + } foundElem = currentNodeNext ok = true return @@ -216,9 +226,11 @@ func (t *SkipList) Delete(key []byte) (err error) { if err != nil { return err } - nextNextNode.Prev = currentNode.Reference() - if err = t.saveElement(nextNextNode); err != nil { - return err + if nextNextNode != nil { + nextNextNode.Prev = currentNode.Reference() + if err = t.saveElement(nextNextNode); err != nil { + return err + } } } // t.elementCount-- @@ -230,6 +242,7 @@ func (t *SkipList) Delete(key []byte) (err error) { // Link from start needs readjustments. startNextKey := t.startLevels[index].Key if compareElement(nextNode, startNextKey) == 0 { + t.hasChanges = true t.startLevels[index] = nextNode.Next[index] // This was our currently highest node! if t.startLevels[index] == nil { @@ -240,6 +253,7 @@ func (t *SkipList) Delete(key []byte) (err error) { // Link from end needs readjustments. if nextNode.Next[index] == nil { t.endLevels[index] = currentNode.Reference() + t.hasChanges = true } nextNode.Next[index] = nil } @@ -260,7 +274,7 @@ func (t *SkipList) Delete(key []byte) (err error) { // Insert inserts the given ListElement into the skiplist. // Insert runs in approx. O(log(n)) -func (t *SkipList) Insert(key, value []byte) (err error){ +func (t *SkipList) Insert(key, value []byte) (err error) { if t == nil || key == nil { return @@ -272,6 +286,7 @@ func (t *SkipList) Insert(key, value []byte) (err error){ if level > t.maxLevel { level = t.maxLevel + 1 t.maxLevel = level + t.hasChanges = true } elem := &SkipListElement{ @@ -326,9 +341,11 @@ func (t *SkipList) Insert(key, value []byte) (err error){ if nextNode, err = t.loadElement(nextNodeRef); err != nil { return } - nextNode.Prev = elem.Reference() - if err = t.saveElement(nextNode); err != nil { - return + if nextNode != nil { + nextNode.Prev = elem.Reference() + if err = t.saveElement(nextNode); err != nil { + return + } } } } @@ -343,6 +360,9 @@ func (t *SkipList) Insert(key, value []byte) (err error){ } } currentNode = nextNode + if currentNode == nil { + return + } } else { // Go down index-- @@ -366,18 +386,22 @@ func (t *SkipList) Insert(key, value []byte) (err error){ if err != nil { return err } - startLevelElement.Prev = elem.Reference() - if err = t.saveElement(startLevelElement); err != nil { - return err + if startLevelElement != nil { + startLevelElement.Prev = elem.Reference() + if err = t.saveElement(startLevelElement); err != nil { + return err + } } } elem.Next[i] = t.startLevels[i] t.startLevels[i] = elem.Reference() + t.hasChanges = true } // link the endLevels to this element! if elem.Next[i] == nil { t.endLevels[i] = elem.Reference() + t.hasChanges = true } didSomething = true @@ -392,20 +416,24 @@ func (t *SkipList) Insert(key, value []byte) (err error){ if err != nil { return err } - endLevelElement.Next[i] = elem.Reference() - if err = t.saveElement(endLevelElement); err != nil { - return err + if endLevelElement != nil { + endLevelElement.Next[i] = elem.Reference() + if err = t.saveElement(endLevelElement); err != nil { + return err + } } } if i == 0 { elem.Prev = t.endLevels[i] } t.endLevels[i] = elem.Reference() + t.hasChanges = true } // Link the startLevels to this element! if t.startLevels[i] == nil || bytes.Compare(t.startLevels[i].Key, key) > 0 { t.startLevels[i] = elem.Reference() + t.hasChanges = true } didSomething = true @@ -486,6 +514,9 @@ func (t *SkipList) println() { for nodeRef != nil { print(fmt.Sprintf("%v: ", string(nodeRef.Key))) node, _ := t.loadElement(nodeRef) + if node == nil { + break + } for i := 0; i <= int(node.Level); i++ { l := node.Next[i] @@ -510,8 +541,8 @@ func (t *SkipList) println() { } } - println() nodeRef = node.Next[0] + println() } print("end --> ")