Browse Source

add redis3

pull/2354/head
Chris Lu 3 years ago
parent
commit
366f522a2d
  1. 1
      weed/command/imports.go
  2. 5
      weed/filer.toml
  3. 88
      weed/filer/redis3/kv_directory_children.go
  4. 52
      weed/filer/redis3/skiplist_element_store.go
  5. 34
      weed/filer/redis3/universal_redis_store.go
  6. 1
      weed/server/filer_server.go
  7. 2
      weed/util/skiplist/name_batch.go
  8. 25
      weed/util/skiplist/name_list.go
  9. 71
      weed/util/skiplist/name_list_serde.go
  10. 4
      weed/util/skiplist/name_list_test.go
  11. 59
      weed/util/skiplist/skiplist.go

1
weed/command/imports.go

@ -29,6 +29,7 @@ import (
_ "github.com/chrislusf/seaweedfs/weed/filer/postgres2" _ "github.com/chrislusf/seaweedfs/weed/filer/postgres2"
_ "github.com/chrislusf/seaweedfs/weed/filer/redis" _ "github.com/chrislusf/seaweedfs/weed/filer/redis"
_ "github.com/chrislusf/seaweedfs/weed/filer/redis2" _ "github.com/chrislusf/seaweedfs/weed/filer/redis2"
_ "github.com/chrislusf/seaweedfs/weed/filer/redis3"
_ "github.com/chrislusf/seaweedfs/weed/filer/sqlite" _ "github.com/chrislusf/seaweedfs/weed/filer/sqlite"
_ "github.com/chrislusf/seaweedfs/weed/filer/tikv" _ "github.com/chrislusf/seaweedfs/weed/filer/tikv"
) )

5
weed/filer.toml

@ -0,0 +1,5 @@
[redis3]
enabled = true
address = "localhost:6379"
password = ""
database = 0

88
weed/filer/redis3/kv_directory_children.go

@ -3,11 +3,13 @@ package redis3
import ( import (
"context" "context"
"fmt" "fmt"
"github.com/chrislusf/seaweedfs/weed/util/bptree"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/util/skiplist"
"github.com/go-redis/redis/v8" "github.com/go-redis/redis/v8"
"github.com/golang/protobuf/proto"
) )
const maxNameBatchSizeLimit = 5
func insertChild(ctx context.Context, client redis.UniversalClient, key string, name string) error { func insertChild(ctx context.Context, client redis.UniversalClient, key string, name string) error {
data, err := client.Get(ctx, key).Result() data, err := client.Get(ctx, key).Result()
if err != nil { if err != nil {
@ -15,12 +17,22 @@ func insertChild(ctx context.Context, client redis.UniversalClient, key string,
return fmt.Errorf("read %s: %v", key, err) return fmt.Errorf("read %s: %v", key, err)
} }
} }
rootNode := &bptree.ProtoNode{}
if err := proto.UnmarshalMerge([]byte(data), rootNode); err != nil {
return fmt.Errorf("decoding root for %s: %v", key, err)
store := newSkipListElementStore(key, client)
nameList := skiplist.LoadNameList([]byte(data), store, maxNameBatchSizeLimit)
// println("add", key, name)
if err := nameList.WriteName(name); err != nil {
glog.Errorf("add %s %s: %v", key, name, err)
return err
}
if !nameList.HasChanges() {
return nil
}
if err := client.Set(ctx, key, nameList.ToBytes(), 0).Err(); err != nil {
return err
} }
tree := rootNode.ToBpTree()
tree.Add(bptree.String(name), nil)
return nil return nil
} }
@ -31,19 +43,69 @@ func removeChild(ctx context.Context, client redis.UniversalClient, key string,
return fmt.Errorf("read %s: %v", key, err) return fmt.Errorf("read %s: %v", key, err)
} }
} }
rootNode := &bptree.ProtoNode{}
if err := proto.UnmarshalMerge([]byte(data), rootNode); err != nil {
return fmt.Errorf("decoding root for %s: %v", key, err)
store := newSkipListElementStore(key, client)
nameList := skiplist.LoadNameList([]byte(data), store, maxNameBatchSizeLimit)
if err := nameList.DeleteName(name); err != nil {
return err
}
if !nameList.HasChanges() {
return nil
}
if err := client.Set(ctx, key, nameList.ToBytes(), 0).Err(); err != nil {
return err
} }
tree := rootNode.ToBpTree()
tree.Add(bptree.String(name), nil)
return nil return nil
} }
func removeChildren(ctx context.Context, client redis.UniversalClient, key string, onDeleteFn func(name string) error) error { func removeChildren(ctx context.Context, client redis.UniversalClient, key string, onDeleteFn func(name string) error) error {
data, err := client.Get(ctx, key).Result()
if err != nil {
if err != redis.Nil {
return fmt.Errorf("read %s: %v", key, err)
}
}
store := newSkipListElementStore(key, client)
nameList := skiplist.LoadNameList([]byte(data), store, maxNameBatchSizeLimit)
if err = nameList.ListNames("", func(name string) bool {
if err := onDeleteFn(name); err != nil {
glog.Errorf("delete %s child %s: %v", key, name, err)
return false
}
return true
}); err != nil {
return err
}
if err = nameList.RemoteAllListElement(); err != nil {
return err
}
return nil return nil
} }
func iterateChildren(ctx context.Context, client redis.UniversalClient, key string, eachFn func(name string) error) error {
func listChildren(ctx context.Context, client redis.UniversalClient, key string, startFileName string, eachFn func(name string) bool) error {
data, err := client.Get(ctx, key).Result()
if err != nil {
if err != redis.Nil {
return fmt.Errorf("read %s: %v", key, err)
}
}
store := newSkipListElementStore(key, client)
nameList := skiplist.LoadNameList([]byte(data), store, maxNameBatchSizeLimit)
if err = nameList.ListNames(startFileName, func(name string) bool {
return eachFn(name)
}); err != nil {
return err
}
return nil return nil
} }

52
weed/filer/redis3/skiplist_element_store.go

@ -0,0 +1,52 @@
package redis3
import (
"context"
"fmt"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/util/skiplist"
"github.com/go-redis/redis/v8"
"github.com/golang/protobuf/proto"
)
type SkipListElementStore struct {
prefix string
client redis.UniversalClient
}
var _ = skiplist.ListStore(&SkipListElementStore{})
func newSkipListElementStore(prefix string, client redis.UniversalClient) *SkipListElementStore {
return &SkipListElementStore{
prefix: prefix,
client: client,
}
}
func (m *SkipListElementStore) SaveElement(id int64, element *skiplist.SkipListElement) error {
key := fmt.Sprintf("%s%d", m.prefix, id)
data, err := proto.Marshal(element)
if err != nil {
glog.Errorf("marshal %s: %v", key, err)
}
return m.client.Set(context.Background(), key, data, 0).Err()
}
func (m *SkipListElementStore) DeleteElement(id int64) error {
key := fmt.Sprintf("%s%d", m.prefix, id)
return m.client.Del(context.Background(), key).Err()
}
func (m *SkipListElementStore) LoadElement(id int64) (*skiplist.SkipListElement, error) {
key := fmt.Sprintf("%s%d", m.prefix, id)
data, err := m.client.Get(context.Background(), key).Result()
if err != nil {
if err == redis.Nil {
return nil, nil
}
return nil, err
}
t := &skiplist.SkipListElement{}
err = proto.Unmarshal([]byte(data), t)
return t, err
}

34
weed/filer/redis3/universal_redis_store.go

@ -115,6 +115,8 @@ func (store *UniversalRedis3Store) DeleteFolderChildren(ctx context.Context, ful
if err != nil { if err != nil {
return fmt.Errorf("DeleteFolderChildren %s in parent dir: %v", fullpath, err) return fmt.Errorf("DeleteFolderChildren %s in parent dir: %v", fullpath, err)
} }
// not efficient, but need to remove if it is a directory
store.Client.Del(ctx, genDirectoryListKey(string(path)))
return nil return nil
}) })
@ -127,41 +129,41 @@ func (store *UniversalRedis3Store) ListDirectoryPrefixedEntries(ctx context.Cont
func (store *UniversalRedis3Store) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) { func (store *UniversalRedis3Store) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
dirListKey := genDirectoryListKey(string(dirPath)) dirListKey := genDirectoryListKey(string(dirPath))
start := int64(0)
if startFileName != "" {
start, _ = store.Client.ZRank(ctx, dirListKey, startFileName).Result()
if !includeStartFile {
start++
counter := int64(0)
err = listChildren(ctx, store.Client, dirListKey, startFileName, func(fileName string) bool {
if startFileName != "" {
if !includeStartFile && startFileName == fileName {
return true
}
} }
}
members, err := store.Client.ZRange(ctx, dirListKey, start, start+int64(limit)-1).Result()
if err != nil {
return lastFileName, fmt.Errorf("list %s : %v", dirPath, err)
}
// fetch entry meta
for _, fileName := range members {
path := util.NewFullPath(string(dirPath), fileName) path := util.NewFullPath(string(dirPath), fileName)
entry, err := store.FindEntry(ctx, path) entry, err := store.FindEntry(ctx, path)
lastFileName = fileName lastFileName = fileName
if err != nil { if err != nil {
glog.V(0).Infof("list %s : %v", path, err) glog.V(0).Infof("list %s : %v", path, err)
if err == filer_pb.ErrNotFound { if err == filer_pb.ErrNotFound {
continue
return true
} }
} else { } else {
if entry.TtlSec > 0 { if entry.TtlSec > 0 {
if entry.Attr.Crtime.Add(time.Duration(entry.TtlSec) * time.Second).Before(time.Now()) { if entry.Attr.Crtime.Add(time.Duration(entry.TtlSec) * time.Second).Before(time.Now()) {
store.Client.Del(ctx, string(path)).Result() store.Client.Del(ctx, string(path)).Result()
store.Client.ZRem(ctx, dirListKey, fileName).Result() store.Client.ZRem(ctx, dirListKey, fileName).Result()
continue
return true
} }
} }
counter++
if !eachEntryFunc(entry) { if !eachEntryFunc(entry) {
break
return false
}
if counter >= limit {
return false
} }
} }
}
return true
})
return lastFileName, err return lastFileName, err
} }

1
weed/server/filer_server.go

@ -34,6 +34,7 @@ import (
_ "github.com/chrislusf/seaweedfs/weed/filer/postgres2" _ "github.com/chrislusf/seaweedfs/weed/filer/postgres2"
_ "github.com/chrislusf/seaweedfs/weed/filer/redis" _ "github.com/chrislusf/seaweedfs/weed/filer/redis"
_ "github.com/chrislusf/seaweedfs/weed/filer/redis2" _ "github.com/chrislusf/seaweedfs/weed/filer/redis2"
_ "github.com/chrislusf/seaweedfs/weed/filer/redis3"
_ "github.com/chrislusf/seaweedfs/weed/filer/sqlite" _ "github.com/chrislusf/seaweedfs/weed/filer/sqlite"
"github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/notification" "github.com/chrislusf/seaweedfs/weed/notification"

2
weed/util/skiplist/name_batch.go

@ -35,7 +35,7 @@ func (nb *NameBatch) DeleteName(name string) {
} }
func (nb *NameBatch) ListNames(startFrom string, visitNamesFn func(name string) bool) bool { func (nb *NameBatch) ListNames(startFrom string, visitNamesFn func(name string) bool) bool {
var names []string var names []string
needFilter := startFrom == ""
needFilter := startFrom != ""
for n := range nb.names { for n := range nb.names {
if !needFilter || strings.Compare(n, startFrom) >= 0 { if !needFilter || strings.Compare(n, startFrom) >= 0 {
names = append(names, n) names = append(names, n)

25
weed/util/skiplist/name_list.go

@ -9,7 +9,7 @@ type NameList struct {
batchSize int batchSize int
} }
func NewNameList(store ListStore, batchSize int) *NameList {
func newNameList(store ListStore, batchSize int) *NameList {
return &NameList{ return &NameList{
skipList: New(store), skipList: New(store),
batchSize: batchSize, batchSize: batchSize,
@ -59,6 +59,7 @@ There are multiple cases after finding the name for greater or equal node
*/ */
func (nl *NameList) WriteName(name string) error { func (nl *NameList) WriteName(name string) error {
lookupKey := []byte(name) lookupKey := []byte(name)
prevNode, nextNode, found, err := nl.skipList.FindGreaterOrEqual(lookupKey) prevNode, nextNode, found, err := nl.skipList.FindGreaterOrEqual(lookupKey)
if err != nil { if err != nil {
@ -301,3 +302,25 @@ func (nl *NameList) ListNames(startFrom string, visitNamesFn func(name string) b
return nil return nil
} }
func (nl *NameList) RemoteAllListElement() error {
t := nl.skipList
nodeRef := t.startLevels[0]
for nodeRef != nil {
node, err := t.loadElement(nodeRef)
if err != nil {
return err
}
if node == nil {
return nil
}
if err := t.deleteElement(node); err != nil {
return err
}
nodeRef = node.Next[0]
}
return nil
}

71
weed/util/skiplist/name_list_serde.go

@ -0,0 +1,71 @@
package skiplist
import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/golang/protobuf/proto"
)
func LoadNameList(data []byte, store ListStore, batchSize int) *NameList {
nl := &NameList{
skipList: New(store),
batchSize: batchSize,
}
if len(data) == 0 {
return nl
}
message := &SkipListProto{}
if err := proto.Unmarshal(data, message); err != nil {
glog.Errorf("loading skiplist: %v", err)
}
nl.skipList.maxNewLevel = int(message.MaxNewLevel)
nl.skipList.maxLevel = int(message.MaxLevel)
for i, ref := range message.StartLevels {
nl.skipList.startLevels[i] = &SkipListElementReference{
ElementPointer: ref.ElementPointer,
Key: ref.Key,
}
}
for i, ref := range message.EndLevels {
nl.skipList.endLevels[i] = &SkipListElementReference{
ElementPointer: ref.ElementPointer,
Key: ref.Key,
}
}
return nl
}
func (nl *NameList) HasChanges() bool {
return nl.skipList.hasChanges
}
func (nl *NameList) ToBytes() []byte {
message := &SkipListProto{}
message.MaxNewLevel = int32(nl.skipList.maxNewLevel)
message.MaxLevel = int32(nl.skipList.maxLevel)
for _, ref := range nl.skipList.startLevels {
if ref == nil {
break
}
message.StartLevels = append(message.StartLevels, &SkipListElementReference{
ElementPointer: ref.ElementPointer,
Key: ref.Key,
})
}
for _, ref := range nl.skipList.endLevels {
if ref == nil {
break
}
message.EndLevels = append(message.EndLevels, &SkipListElementReference{
ElementPointer: ref.ElementPointer,
Key: ref.Key,
})
}
data, err := proto.Marshal(message)
if err != nil {
glog.Errorf("marshal skiplist: %v", err)
}
return data
}

4
weed/util/skiplist/name_list_test.go

@ -15,7 +15,7 @@ func String(x int) string {
} }
func TestNameList(t *testing.T) { func TestNameList(t *testing.T) {
list := NewNameList(memStore, 7)
list := newNameList(memStore, 7)
for i := 0; i < maxNameCount; i++ { for i := 0; i < maxNameCount; i++ {
list.WriteName(String(i)) list.WriteName(String(i))
@ -51,7 +51,7 @@ func TestNameList(t *testing.T) {
} }
// randomized deletion // randomized deletion
list = NewNameList(memStore, 7)
list = newNameList(memStore, 7)
// Delete elements at random positions in the list. // Delete elements at random positions in the list.
rList := rand.Perm(maxN) rList := rand.Perm(maxN)
for _, i := range rList { for _, i := range rList {

59
weed/util/skiplist/skiplist.go

@ -22,6 +22,7 @@ type SkipList struct {
maxNewLevel int maxNewLevel int
maxLevel int maxLevel int
listStore ListStore listStore ListStore
hasChanges bool
// elementCount int // elementCount int
} }
@ -93,6 +94,9 @@ func (t *SkipList) findExtended(key []byte, findGreaterOrEqual bool) (prevElemen
if err != nil { if err != nil {
return return
} }
if currentNode == nil {
return
}
// In case, that our first element is already greater-or-equal! // In case, that our first element is already greater-or-equal!
if findGreaterOrEqual && compareElement(currentNode, key) > 0 { if findGreaterOrEqual && compareElement(currentNode, key) > 0 {
@ -115,6 +119,9 @@ func (t *SkipList) findExtended(key []byte, findGreaterOrEqual bool) (prevElemen
if err != nil { if err != nil {
return return
} }
if currentNode == nil {
return
}
} else { } else {
if index > 0 { if index > 0 {
@ -126,6 +133,9 @@ func (t *SkipList) findExtended(key []byte, findGreaterOrEqual bool) (prevElemen
if err != nil { if err != nil {
return return
} }
if currentNodeNext == nil {
return
}
foundElem = currentNodeNext foundElem = currentNodeNext
ok = true ok = true
return return
@ -216,9 +226,11 @@ func (t *SkipList) Delete(key []byte) (err error) {
if err != nil { if err != nil {
return err return err
} }
nextNextNode.Prev = currentNode.Reference()
if err = t.saveElement(nextNextNode); err != nil {
return err
if nextNextNode != nil {
nextNextNode.Prev = currentNode.Reference()
if err = t.saveElement(nextNextNode); err != nil {
return err
}
} }
} }
// t.elementCount-- // t.elementCount--
@ -230,6 +242,7 @@ func (t *SkipList) Delete(key []byte) (err error) {
// Link from start needs readjustments. // Link from start needs readjustments.
startNextKey := t.startLevels[index].Key startNextKey := t.startLevels[index].Key
if compareElement(nextNode, startNextKey) == 0 { if compareElement(nextNode, startNextKey) == 0 {
t.hasChanges = true
t.startLevels[index] = nextNode.Next[index] t.startLevels[index] = nextNode.Next[index]
// This was our currently highest node! // This was our currently highest node!
if t.startLevels[index] == nil { if t.startLevels[index] == nil {
@ -240,6 +253,7 @@ func (t *SkipList) Delete(key []byte) (err error) {
// Link from end needs readjustments. // Link from end needs readjustments.
if nextNode.Next[index] == nil { if nextNode.Next[index] == nil {
t.endLevels[index] = currentNode.Reference() t.endLevels[index] = currentNode.Reference()
t.hasChanges = true
} }
nextNode.Next[index] = nil nextNode.Next[index] = nil
} }
@ -260,7 +274,7 @@ func (t *SkipList) Delete(key []byte) (err error) {
// Insert inserts the given ListElement into the skiplist. // Insert inserts the given ListElement into the skiplist.
// Insert runs in approx. O(log(n)) // Insert runs in approx. O(log(n))
func (t *SkipList) Insert(key, value []byte) (err error){
func (t *SkipList) Insert(key, value []byte) (err error) {
if t == nil || key == nil { if t == nil || key == nil {
return return
@ -272,6 +286,7 @@ func (t *SkipList) Insert(key, value []byte) (err error){
if level > t.maxLevel { if level > t.maxLevel {
level = t.maxLevel + 1 level = t.maxLevel + 1
t.maxLevel = level t.maxLevel = level
t.hasChanges = true
} }
elem := &SkipListElement{ elem := &SkipListElement{
@ -326,9 +341,11 @@ func (t *SkipList) Insert(key, value []byte) (err error){
if nextNode, err = t.loadElement(nextNodeRef); err != nil { if nextNode, err = t.loadElement(nextNodeRef); err != nil {
return return
} }
nextNode.Prev = elem.Reference()
if err = t.saveElement(nextNode); err != nil {
return
if nextNode != nil {
nextNode.Prev = elem.Reference()
if err = t.saveElement(nextNode); err != nil {
return
}
} }
} }
} }
@ -343,6 +360,9 @@ func (t *SkipList) Insert(key, value []byte) (err error){
} }
} }
currentNode = nextNode currentNode = nextNode
if currentNode == nil {
return
}
} else { } else {
// Go down // Go down
index-- index--
@ -366,18 +386,22 @@ func (t *SkipList) Insert(key, value []byte) (err error){
if err != nil { if err != nil {
return err return err
} }
startLevelElement.Prev = elem.Reference()
if err = t.saveElement(startLevelElement); err != nil {
return err
if startLevelElement != nil {
startLevelElement.Prev = elem.Reference()
if err = t.saveElement(startLevelElement); err != nil {
return err
}
} }
} }
elem.Next[i] = t.startLevels[i] elem.Next[i] = t.startLevels[i]
t.startLevels[i] = elem.Reference() t.startLevels[i] = elem.Reference()
t.hasChanges = true
} }
// link the endLevels to this element! // link the endLevels to this element!
if elem.Next[i] == nil { if elem.Next[i] == nil {
t.endLevels[i] = elem.Reference() t.endLevels[i] = elem.Reference()
t.hasChanges = true
} }
didSomething = true didSomething = true
@ -392,20 +416,24 @@ func (t *SkipList) Insert(key, value []byte) (err error){
if err != nil { if err != nil {
return err return err
} }
endLevelElement.Next[i] = elem.Reference()
if err = t.saveElement(endLevelElement); err != nil {
return err
if endLevelElement != nil {
endLevelElement.Next[i] = elem.Reference()
if err = t.saveElement(endLevelElement); err != nil {
return err
}
} }
} }
if i == 0 { if i == 0 {
elem.Prev = t.endLevels[i] elem.Prev = t.endLevels[i]
} }
t.endLevels[i] = elem.Reference() t.endLevels[i] = elem.Reference()
t.hasChanges = true
} }
// Link the startLevels to this element! // Link the startLevels to this element!
if t.startLevels[i] == nil || bytes.Compare(t.startLevels[i].Key, key) > 0 { if t.startLevels[i] == nil || bytes.Compare(t.startLevels[i].Key, key) > 0 {
t.startLevels[i] = elem.Reference() t.startLevels[i] = elem.Reference()
t.hasChanges = true
} }
didSomething = true didSomething = true
@ -486,6 +514,9 @@ func (t *SkipList) println() {
for nodeRef != nil { for nodeRef != nil {
print(fmt.Sprintf("%v: ", string(nodeRef.Key))) print(fmt.Sprintf("%v: ", string(nodeRef.Key)))
node, _ := t.loadElement(nodeRef) node, _ := t.loadElement(nodeRef)
if node == nil {
break
}
for i := 0; i <= int(node.Level); i++ { for i := 0; i <= int(node.Level); i++ {
l := node.Next[i] l := node.Next[i]
@ -510,8 +541,8 @@ func (t *SkipList) println() {
} }
} }
println()
nodeRef = node.Next[0] nodeRef = node.Next[0]
println()
} }
print("end --> ") print("end --> ")

Loading…
Cancel
Save