Browse Source

Merge pull request #2354 from chrislusf/bptree

Add Redis3
pull/2380/head
Chris Lu 3 years ago
committed by GitHub
parent
commit
d4bb16e20e
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      weed/command/imports.go
  2. 22
      weed/command/scaffold/filer.toml
  3. 5
      weed/filer.toml
  4. 111
      weed/filer/redis3/kv_directory_children.go
  5. 42
      weed/filer/redis3/redis_cluster_store.go
  6. 36
      weed/filer/redis3/redis_store.go
  7. 62
      weed/filer/redis3/skiplist_element_store.go
  8. 177
      weed/filer/redis3/universal_redis_store.go
  9. 42
      weed/filer/redis3/universal_redis_store_kv.go
  10. 1
      weed/server/filer_server.go
  11. 6
      weed/util/skiplist/Makefile
  12. 32
      weed/util/skiplist/list_store.go
  13. 102
      weed/util/skiplist/name_batch.go
  14. 326
      weed/util/skiplist/name_list.go
  15. 71
      weed/util/skiplist/name_list_serde.go
  16. 73
      weed/util/skiplist/name_list_test.go
  17. 563
      weed/util/skiplist/skiplist.go
  18. 438
      weed/util/skiplist/skiplist.pb.go
  19. 30
      weed/util/skiplist/skiplist.proto
  20. 51
      weed/util/skiplist/skiplist_serde.go
  21. 295
      weed/util/skiplist/skiplist_test.go

1
weed/command/imports.go

@ -29,6 +29,7 @@ import (
_ "github.com/chrislusf/seaweedfs/weed/filer/postgres2"
_ "github.com/chrislusf/seaweedfs/weed/filer/redis"
_ "github.com/chrislusf/seaweedfs/weed/filer/redis2"
_ "github.com/chrislusf/seaweedfs/weed/filer/redis3"
_ "github.com/chrislusf/seaweedfs/weed/filer/sqlite"
_ "github.com/chrislusf/seaweedfs/weed/filer/tikv"
)

22
weed/command/scaffold/filer.toml

@ -185,6 +185,28 @@ routeByLatency = false
# This changes the data layout. Only add new directories. Removing/Updating will cause data loss.
superLargeDirectories = []
[redis3] # beta
enabled = false
address = "localhost:6379"
password = ""
database = 0
[redis_cluster3] # beta
enabled = false
addresses = [
"localhost:30001",
"localhost:30002",
"localhost:30003",
"localhost:30004",
"localhost:30005",
"localhost:30006",
]
password = ""
# allows reads from slave servers or the master, but all writes still go to the master
readOnly = false
# automatically use the closest Redis server for reads
routeByLatency = false
[etcd]
enabled = false
servers = "localhost:2379"

5
weed/filer.toml

@ -0,0 +1,5 @@
[redis3]
enabled = true
address = "localhost:6379"
password = ""
database = 0

111
weed/filer/redis3/kv_directory_children.go

@ -0,0 +1,111 @@
package redis3
import (
"context"
"fmt"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/util/skiplist"
"github.com/go-redis/redis/v8"
)
const maxNameBatchSizeLimit = 1000
func insertChild(ctx context.Context, client redis.UniversalClient, key string, name string) error {
data, err := client.Get(ctx, key).Result()
if err != nil {
if err != redis.Nil {
return fmt.Errorf("read %s: %v", key, err)
}
}
store := newSkipListElementStore(key, client)
nameList := skiplist.LoadNameList([]byte(data), store, maxNameBatchSizeLimit)
// println("add", key, name)
if err := nameList.WriteName(name); err != nil {
glog.Errorf("add %s %s: %v", key, name, err)
return err
}
if !nameList.HasChanges() {
return nil
}
if err := client.Set(ctx, key, nameList.ToBytes(), 0).Err(); err != nil {
return err
}
return nil
}
func removeChild(ctx context.Context, client redis.UniversalClient, key string, name string) error {
data, err := client.Get(ctx, key).Result()
if err != nil {
if err != redis.Nil {
return fmt.Errorf("read %s: %v", key, err)
}
}
store := newSkipListElementStore(key, client)
nameList := skiplist.LoadNameList([]byte(data), store, maxNameBatchSizeLimit)
if err := nameList.DeleteName(name); err != nil {
return err
}
if !nameList.HasChanges() {
return nil
}
if err := client.Set(ctx, key, nameList.ToBytes(), 0).Err(); err != nil {
return err
}
return nil
}
func removeChildren(ctx context.Context, client redis.UniversalClient, key string, onDeleteFn func(name string) error) error {
data, err := client.Get(ctx, key).Result()
if err != nil {
if err != redis.Nil {
return fmt.Errorf("read %s: %v", key, err)
}
}
store := newSkipListElementStore(key, client)
nameList := skiplist.LoadNameList([]byte(data), store, maxNameBatchSizeLimit)
if err = nameList.ListNames("", func(name string) bool {
if err := onDeleteFn(name); err != nil {
glog.Errorf("delete %s child %s: %v", key, name, err)
return false
}
return true
}); err != nil {
return err
}
if err = nameList.RemoteAllListElement(); err != nil {
return err
}
return nil
}
func listChildren(ctx context.Context, client redis.UniversalClient, key string, startFileName string, eachFn func(name string) bool) error {
data, err := client.Get(ctx, key).Result()
if err != nil {
if err != redis.Nil {
return fmt.Errorf("read %s: %v", key, err)
}
}
store := newSkipListElementStore(key, client)
nameList := skiplist.LoadNameList([]byte(data), store, maxNameBatchSizeLimit)
if err = nameList.ListNames(startFileName, func(name string) bool {
return eachFn(name)
}); err != nil {
return err
}
return nil
}

42
weed/filer/redis3/redis_cluster_store.go

@ -0,0 +1,42 @@
package redis3
import (
"github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/go-redis/redis/v8"
)
func init() {
filer.Stores = append(filer.Stores, &RedisCluster3Store{})
}
type RedisCluster3Store struct {
UniversalRedis3Store
}
func (store *RedisCluster3Store) GetName() string {
return "redis_cluster3"
}
func (store *RedisCluster3Store) Initialize(configuration util.Configuration, prefix string) (err error) {
configuration.SetDefault(prefix+"useReadOnly", false)
configuration.SetDefault(prefix+"routeByLatency", false)
return store.initialize(
configuration.GetStringSlice(prefix+"addresses"),
configuration.GetString(prefix+"password"),
configuration.GetBool(prefix+"useReadOnly"),
configuration.GetBool(prefix+"routeByLatency"),
)
}
func (store *RedisCluster3Store) initialize(addresses []string, password string, readOnly, routeByLatency bool) (err error) {
store.Client = redis.NewClusterClient(&redis.ClusterOptions{
Addrs: addresses,
Password: password,
ReadOnly: readOnly,
RouteByLatency: routeByLatency,
})
return
}

36
weed/filer/redis3/redis_store.go

@ -0,0 +1,36 @@
package redis3
import (
"github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/go-redis/redis/v8"
)
func init() {
filer.Stores = append(filer.Stores, &Redis3Store{})
}
type Redis3Store struct {
UniversalRedis3Store
}
func (store *Redis3Store) GetName() string {
return "redis3"
}
func (store *Redis3Store) Initialize(configuration util.Configuration, prefix string) (err error) {
return store.initialize(
configuration.GetString(prefix+"address"),
configuration.GetString(prefix+"password"),
configuration.GetInt(prefix+"database"),
)
}
func (store *Redis3Store) initialize(hostPort string, password string, database int) (err error) {
store.Client = redis.NewClient(&redis.Options{
Addr: hostPort,
Password: password,
DB: database,
})
return
}

62
weed/filer/redis3/skiplist_element_store.go

@ -0,0 +1,62 @@
package redis3
import (
"context"
"fmt"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/util/skiplist"
"github.com/go-redis/redis/v8"
"github.com/golang/protobuf/proto"
)
type SkipListElementStore struct {
prefix string
client redis.UniversalClient
}
var _ = skiplist.ListStore(&SkipListElementStore{})
func newSkipListElementStore(prefix string, client redis.UniversalClient) *SkipListElementStore {
return &SkipListElementStore{
prefix: prefix,
client: client,
}
}
func (m *SkipListElementStore) SaveElement(id int64, element *skiplist.SkipListElement) error {
key := fmt.Sprintf("%s%d", m.prefix, id)
data, err := proto.Marshal(element)
if err != nil {
glog.Errorf("marshal %s: %v", key, err)
}
return m.client.Set(context.Background(), key, data, 0).Err()
}
func (m *SkipListElementStore) DeleteElement(id int64) error {
key := fmt.Sprintf("%s%d", m.prefix, id)
return m.client.Del(context.Background(), key).Err()
}
func (m *SkipListElementStore) LoadElement(id int64) (*skiplist.SkipListElement, error) {
key := fmt.Sprintf("%s%d", m.prefix, id)
data, err := m.client.Get(context.Background(), key).Result()
if err != nil {
if err == redis.Nil {
return nil, nil
}
return nil, err
}
t := &skiplist.SkipListElement{}
err = proto.Unmarshal([]byte(data), t)
if err == nil {
for i:=0;i<len(t.Next);i++{
if t.Next[i].IsNil() {
t.Next[i] = nil
}
}
if t.Prev.IsNil() {
t.Prev = nil
}
}
return t, err
}

177
weed/filer/redis3/universal_redis_store.go

@ -0,0 +1,177 @@
package redis3
import (
"context"
"fmt"
"time"
"github.com/go-redis/redis/v8"
"github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/util"
)
const (
DIR_LIST_MARKER = "\x00"
)
type UniversalRedis3Store struct {
Client redis.UniversalClient
}
func (store *UniversalRedis3Store) BeginTransaction(ctx context.Context) (context.Context, error) {
return ctx, nil
}
func (store *UniversalRedis3Store) CommitTransaction(ctx context.Context) error {
return nil
}
func (store *UniversalRedis3Store) RollbackTransaction(ctx context.Context) error {
return nil
}
func (store *UniversalRedis3Store) InsertEntry(ctx context.Context, entry *filer.Entry) (err error) {
value, err := entry.EncodeAttributesAndChunks()
if err != nil {
return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err)
}
if len(entry.Chunks) > 50 {
value = util.MaybeGzipData(value)
}
if err = store.Client.Set(ctx, string(entry.FullPath), value, time.Duration(entry.TtlSec)*time.Second).Err(); err != nil {
return fmt.Errorf("persisting %s : %v", entry.FullPath, err)
}
dir, name := entry.FullPath.DirAndName()
if name != "" {
if err = insertChild(ctx, store.Client, genDirectoryListKey(dir), name); err != nil {
return fmt.Errorf("persisting %s in parent dir: %v", entry.FullPath, err)
}
}
return nil
}
func (store *UniversalRedis3Store) UpdateEntry(ctx context.Context, entry *filer.Entry) (err error) {
return store.InsertEntry(ctx, entry)
}
func (store *UniversalRedis3Store) FindEntry(ctx context.Context, fullpath util.FullPath) (entry *filer.Entry, err error) {
data, err := store.Client.Get(ctx, string(fullpath)).Result()
if err == redis.Nil {
return nil, filer_pb.ErrNotFound
}
if err != nil {
return nil, fmt.Errorf("get %s : %v", fullpath, err)
}
entry = &filer.Entry{
FullPath: fullpath,
}
err = entry.DecodeAttributesAndChunks(util.MaybeDecompressData([]byte(data)))
if err != nil {
return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err)
}
return entry, nil
}
func (store *UniversalRedis3Store) DeleteEntry(ctx context.Context, fullpath util.FullPath) (err error) {
_, err = store.Client.Del(ctx, genDirectoryListKey(string(fullpath))).Result()
if err != nil {
return fmt.Errorf("delete dir list %s : %v", fullpath, err)
}
_, err = store.Client.Del(ctx, string(fullpath)).Result()
if err != nil {
return fmt.Errorf("delete %s : %v", fullpath, err)
}
dir, name := fullpath.DirAndName()
if name != "" {
if err = removeChild(ctx, store.Client, genDirectoryListKey(dir), name); err != nil {
return fmt.Errorf("DeleteEntry %s in parent dir: %v", fullpath, err)
}
}
return nil
}
func (store *UniversalRedis3Store) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath) (err error) {
return removeChildren(ctx, store.Client, genDirectoryListKey(string(fullpath)), func(name string) error {
path := util.NewFullPath(string(fullpath), name)
_, err = store.Client.Del(ctx, string(path)).Result()
if err != nil {
return fmt.Errorf("DeleteFolderChildren %s in parent dir: %v", fullpath, err)
}
// not efficient, but need to remove if it is a directory
store.Client.Del(ctx, genDirectoryListKey(string(path)))
return nil
})
}
func (store *UniversalRedis3Store) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
return lastFileName, filer.ErrUnsupportedListDirectoryPrefixed
}
func (store *UniversalRedis3Store) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
dirListKey := genDirectoryListKey(string(dirPath))
counter := int64(0)
err = listChildren(ctx, store.Client, dirListKey, startFileName, func(fileName string) bool {
if startFileName != "" {
if !includeStartFile && startFileName == fileName {
return true
}
}
path := util.NewFullPath(string(dirPath), fileName)
entry, err := store.FindEntry(ctx, path)
lastFileName = fileName
if err != nil {
glog.V(0).Infof("list %s : %v", path, err)
if err == filer_pb.ErrNotFound {
return true
}
} else {
if entry.TtlSec > 0 {
if entry.Attr.Crtime.Add(time.Duration(entry.TtlSec) * time.Second).Before(time.Now()) {
store.Client.Del(ctx, string(path)).Result()
store.Client.ZRem(ctx, dirListKey, fileName).Result()
return true
}
}
counter++
if !eachEntryFunc(entry) {
return false
}
if counter >= limit {
return false
}
}
return true
})
return lastFileName, err
}
func genDirectoryListKey(dir string) (dirList string) {
return dir + DIR_LIST_MARKER
}
func (store *UniversalRedis3Store) Shutdown() {
store.Client.Close()
}

42
weed/filer/redis3/universal_redis_store_kv.go

@ -0,0 +1,42 @@
package redis3
import (
"context"
"fmt"
"github.com/chrislusf/seaweedfs/weed/filer"
"github.com/go-redis/redis/v8"
)
func (store *UniversalRedis3Store) KvPut(ctx context.Context, key []byte, value []byte) (err error) {
_, err = store.Client.Set(ctx, string(key), value, 0).Result()
if err != nil {
return fmt.Errorf("kv put: %v", err)
}
return nil
}
func (store *UniversalRedis3Store) KvGet(ctx context.Context, key []byte) (value []byte, err error) {
data, err := store.Client.Get(ctx, string(key)).Result()
if err == redis.Nil {
return nil, filer.ErrKvNotFound
}
return []byte(data), err
}
func (store *UniversalRedis3Store) KvDelete(ctx context.Context, key []byte) (err error) {
_, err = store.Client.Del(ctx, string(key)).Result()
if err != nil {
return fmt.Errorf("kv delete: %v", err)
}
return nil
}

1
weed/server/filer_server.go

@ -34,6 +34,7 @@ import (
_ "github.com/chrislusf/seaweedfs/weed/filer/postgres2"
_ "github.com/chrislusf/seaweedfs/weed/filer/redis"
_ "github.com/chrislusf/seaweedfs/weed/filer/redis2"
_ "github.com/chrislusf/seaweedfs/weed/filer/redis3"
_ "github.com/chrislusf/seaweedfs/weed/filer/sqlite"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/notification"

6
weed/util/skiplist/Makefile

@ -0,0 +1,6 @@
all: gen
.PHONY : gen
gen:
protoc skiplist.proto --go_out=plugins=grpc:. --go_opt=paths=source_relative

32
weed/util/skiplist/list_store.go

@ -0,0 +1,32 @@
package skiplist
type ListStore interface {
SaveElement(id int64, element *SkipListElement) error
DeleteElement(id int64) error
LoadElement(id int64) (*SkipListElement, error)
}
type MemStore struct {
m map[int64]*SkipListElement
}
func newMemStore() *MemStore {
return &MemStore{
m: make(map[int64]*SkipListElement),
}
}
func (m *MemStore) SaveElement(id int64, element *SkipListElement) error {
m.m[id] = element
return nil
}
func (m *MemStore) DeleteElement(id int64) error {
delete(m.m, id)
return nil
}
func (m *MemStore) LoadElement(id int64) (*SkipListElement, error) {
element := m.m[id]
return element, nil
}

102
weed/util/skiplist/name_batch.go

@ -0,0 +1,102 @@
package skiplist
import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/golang/protobuf/proto"
"sort"
"strings"
)
type NameBatch struct {
key string
names map[string]struct{}
}
func (nb *NameBatch) ContainsName(name string) (found bool) {
_, found = nb.names[name]
return
}
func (nb *NameBatch) WriteName(name string) {
if nb.key == "" || strings.Compare(nb.key, name) > 0 {
nb.key = name
}
nb.names[name] = struct{}{}
}
func (nb *NameBatch) DeleteName(name string) {
delete(nb.names, name)
if nb.key == name {
nb.key = ""
for n := range nb.names {
if nb.key == "" || strings.Compare(nb.key, n) > 0 {
nb.key = n
}
}
}
}
func (nb *NameBatch) ListNames(startFrom string, visitNamesFn func(name string) bool) bool {
var names []string
needFilter := startFrom != ""
for n := range nb.names {
if !needFilter || strings.Compare(n, startFrom) >= 0 {
names = append(names, n)
}
}
sort.Slice(names, func(i, j int) bool {
return strings.Compare(names[i], names[j]) < 0
})
for _, n := range names {
if !visitNamesFn(n) {
return false
}
}
return true
}
func NewNameBatch() *NameBatch {
return &NameBatch{
names: make(map[string]struct{}),
}
}
func LoadNameBatch(data []byte) *NameBatch {
t := &NameBatchData{}
if len(data) > 0 {
err := proto.Unmarshal(data, t)
if err != nil {
glog.Errorf("unmarshal into NameBatchData{} : %v", err)
return nil
}
}
nb := NewNameBatch()
for _, n := range t.Names {
name := string(n)
if nb.key == "" || strings.Compare(nb.key, name) > 0 {
nb.key = name
}
nb.names[name] = struct{}{}
}
return nb
}
func (nb *NameBatch) ToBytes() []byte {
t := &NameBatchData{}
for n := range nb.names {
t.Names = append(t.Names, []byte(n))
}
data, _ := proto.Marshal(t)
return data
}
func (nb *NameBatch) SplitBy(name string) (x, y *NameBatch) {
x, y = NewNameBatch(), NewNameBatch()
for n := range nb.names {
// there should be no equal case though
if strings.Compare(n, name) <= 0 {
x.WriteName(n)
} else {
y.WriteName(n)
}
}
return
}

326
weed/util/skiplist/name_list.go

@ -0,0 +1,326 @@
package skiplist
import (
"bytes"
)
type NameList struct {
skipList *SkipList
batchSize int
}
func newNameList(store ListStore, batchSize int) *NameList {
return &NameList{
skipList: New(store),
batchSize: batchSize,
}
}
/*
Be reluctant to create new nodes. Try to fit into either previous node or next node.
Prefer to add to previous node.
There are multiple cases after finding the name for greater or equal node
1. found and node.Key == name
The node contains a batch with leading key the same as the name
nothing to do
2. no such node found or node.Key > name
if no such node found
prevNode = list.LargestNode
// case 2.1
if previousNode contains name
nothing to do
// prefer to add to previous node
if prevNode != nil {
// case 2.2
if prevNode has capacity
prevNode.add name, and save
return
// case 2.3
split prevNode by name
}
// case 2.4
// merge into next node. Avoid too many nodes if adding data in reverse order.
if nextNode is not nil and nextNode has capacity
delete nextNode.Key
nextNode.Key = name
nextNode.batch.add name
insert nodeNode.Key
return
// case 2.5
if prevNode is nil
insert new node with key = name, value = batch{name}
return
*/
func (nl *NameList) WriteName(name string) error {
lookupKey := []byte(name)
prevNode, nextNode, found, err := nl.skipList.FindGreaterOrEqual(lookupKey)
if err != nil {
return err
}
// case 1: the name already exists as one leading key in the batch
if found && bytes.Compare(nextNode.Key, lookupKey) == 0 {
return nil
}
if !found {
prevNode, err = nl.skipList.GetLargestNode()
if err != nil {
return err
}
}
if nextNode != nil && prevNode == nil {
prevNode, err = nl.skipList.loadElement(nextNode.Prev)
if err != nil {
return err
}
}
if prevNode != nil {
prevNameBatch := LoadNameBatch(prevNode.Value)
// case 2.1
if prevNameBatch.ContainsName(name) {
return nil
}
// case 2.2
if len(prevNameBatch.names) < nl.batchSize {
prevNameBatch.WriteName(name)
return nl.skipList.ChangeValue(prevNode, prevNameBatch.ToBytes())
}
// case 2.3
x, y := prevNameBatch.SplitBy(name)
addToX := len(x.names) <= len(y.names)
if len(x.names) != len(prevNameBatch.names) {
if addToX {
x.WriteName(name)
}
if x.key == prevNameBatch.key {
if err := nl.skipList.ChangeValue(prevNode, x.ToBytes()); err != nil {
return err
}
} else {
if err := nl.skipList.Insert([]byte(x.key), x.ToBytes()); err != nil {
return err
}
}
}
if len(y.names) != len(prevNameBatch.names) {
if !addToX {
y.WriteName(name)
}
if y.key == prevNameBatch.key {
if err := nl.skipList.ChangeValue(prevNode, y.ToBytes()); err != nil {
return err
}
} else {
if err := nl.skipList.Insert([]byte(y.key), y.ToBytes()); err != nil {
return err
}
}
}
return nil
}
// case 2.4
if nextNode != nil {
nextNameBatch := LoadNameBatch(nextNode.Value)
if len(nextNameBatch.names) < nl.batchSize {
if err := nl.skipList.Delete(nextNode.Key); err != nil {
return err
}
nextNameBatch.WriteName(name)
if err := nl.skipList.Insert([]byte(nextNameBatch.key), nextNameBatch.ToBytes()); err != nil {
return err
}
return nil
}
}
// case 2.5
// now prevNode is nil
newNameBatch := NewNameBatch()
newNameBatch.WriteName(name)
if err := nl.skipList.Insert([]byte(newNameBatch.key), newNameBatch.ToBytes()); err != nil {
return err
}
return nil
}
/*
// case 1: exists in nextNode
if nextNode != nil && nextNode.Key == name {
remove from nextNode, update nextNode
// TODO: merge with prevNode if possible?
return
}
if nextNode is nil
prevNode = list.Largestnode
if prevNode == nil and nextNode.Prev != nil
prevNode = load(nextNode.Prev)
// case 2: does not exist
// case 2.1
if prevNode == nil {
return
}
// case 2.2
if prevNameBatch does not contain name {
return
}
// case 3
delete from prevNameBatch
if prevNameBatch + nextNode < capacityList
// case 3.1
merge
else
// case 3.2
update prevNode
*/
func (nl *NameList) DeleteName(name string) error {
lookupKey := []byte(name)
prevNode, nextNode, found, err := nl.skipList.FindGreaterOrEqual(lookupKey)
if err != nil {
return err
}
// case 1
var nextNameBatch *NameBatch
if nextNode != nil {
nextNameBatch = LoadNameBatch(nextNode.Value)
}
if found && bytes.Compare(nextNode.Key, lookupKey) == 0 {
if err := nl.skipList.Delete(nextNode.Key); err != nil {
return err
}
nextNameBatch.DeleteName(name)
if len(nextNameBatch.names) > 0 {
if err := nl.skipList.Insert([]byte(nextNameBatch.key), nextNameBatch.ToBytes()); err != nil {
return err
}
}
return nil
}
if !found {
prevNode, err = nl.skipList.GetLargestNode()
if err != nil {
return err
}
}
if nextNode != nil && prevNode == nil {
prevNode, err = nl.skipList.loadElement(nextNode.Prev)
if err != nil {
return err
}
}
// case 2
if prevNode == nil {
// case 2.1
return nil
}
prevNameBatch := LoadNameBatch(prevNode.Value)
if !prevNameBatch.ContainsName(name) {
// case 2.2
return nil
}
// case 3
prevNameBatch.DeleteName(name)
if len(prevNameBatch.names) == 0 {
if err := nl.skipList.Delete(prevNode.Key); err != nil {
return err
}
return nil
}
if nextNameBatch != nil && len(nextNameBatch.names) + len(prevNameBatch.names) < nl.batchSize {
// case 3.1 merge nextNode and prevNode
if err := nl.skipList.Delete(nextNode.Key); err != nil {
return err
}
for nextName := range nextNameBatch.names {
prevNameBatch.WriteName(nextName)
}
return nl.skipList.ChangeValue(prevNode, prevNameBatch.ToBytes())
} else {
// case 3.2 update prevNode
return nl.skipList.ChangeValue(prevNode, prevNameBatch.ToBytes())
}
return nil
}
func (nl *NameList) ListNames(startFrom string, visitNamesFn func(name string) bool) error {
lookupKey := []byte(startFrom)
prevNode, nextNode, found, err := nl.skipList.FindGreaterOrEqual(lookupKey)
if err != nil {
return err
}
if found && bytes.Compare(nextNode.Key, lookupKey) == 0 {
prevNode = nil
}
if !found {
prevNode, err = nl.skipList.GetLargestNode()
if err != nil {
return err
}
}
if prevNode != nil {
prevNameBatch := LoadNameBatch(prevNode.Value)
if !prevNameBatch.ListNames(startFrom, visitNamesFn) {
return nil
}
}
for nextNode != nil {
nextNameBatch := LoadNameBatch(nextNode.Value)
if !nextNameBatch.ListNames(startFrom, visitNamesFn) {
return nil
}
nextNode, err = nl.skipList.loadElement(nextNode.Next[0])
if err != nil {
return err
}
}
return nil
}
func (nl *NameList) RemoteAllListElement() error {
t := nl.skipList
nodeRef := t.startLevels[0]
for nodeRef != nil {
node, err := t.loadElement(nodeRef)
if err != nil {
return err
}
if node == nil {
return nil
}
if err := t.deleteElement(node); err != nil {
return err
}
nodeRef = node.Next[0]
}
return nil
}

71
weed/util/skiplist/name_list_serde.go

@ -0,0 +1,71 @@
package skiplist
import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/golang/protobuf/proto"
)
func LoadNameList(data []byte, store ListStore, batchSize int) *NameList {
nl := &NameList{
skipList: New(store),
batchSize: batchSize,
}
if len(data) == 0 {
return nl
}
message := &SkipListProto{}
if err := proto.Unmarshal(data, message); err != nil {
glog.Errorf("loading skiplist: %v", err)
}
nl.skipList.maxNewLevel = int(message.MaxNewLevel)
nl.skipList.maxLevel = int(message.MaxLevel)
for i, ref := range message.StartLevels {
nl.skipList.startLevels[i] = &SkipListElementReference{
ElementPointer: ref.ElementPointer,
Key: ref.Key,
}
}
for i, ref := range message.EndLevels {
nl.skipList.endLevels[i] = &SkipListElementReference{
ElementPointer: ref.ElementPointer,
Key: ref.Key,
}
}
return nl
}
func (nl *NameList) HasChanges() bool {
return nl.skipList.hasChanges
}
func (nl *NameList) ToBytes() []byte {
message := &SkipListProto{}
message.MaxNewLevel = int32(nl.skipList.maxNewLevel)
message.MaxLevel = int32(nl.skipList.maxLevel)
for _, ref := range nl.skipList.startLevels {
if ref == nil {
break
}
message.StartLevels = append(message.StartLevels, &SkipListElementReference{
ElementPointer: ref.ElementPointer,
Key: ref.Key,
})
}
for _, ref := range nl.skipList.endLevels {
if ref == nil {
break
}
message.EndLevels = append(message.EndLevels, &SkipListElementReference{
ElementPointer: ref.ElementPointer,
Key: ref.Key,
})
}
data, err := proto.Marshal(message)
if err != nil {
glog.Errorf("marshal skiplist: %v", err)
}
return data
}

73
weed/util/skiplist/name_list_test.go

@ -0,0 +1,73 @@
package skiplist
import (
"math/rand"
"strconv"
"testing"
)
const (
maxNameCount = 100
)
func String(x int) string {
return strconv.Itoa(x)
}
func TestNameList(t *testing.T) {
list := newNameList(memStore, 7)
for i := 0; i < maxNameCount; i++ {
list.WriteName(String(i))
}
counter := 0
list.ListNames("", func(name string) bool {
counter++
print(name, " ")
return true
})
if counter != maxNameCount {
t.Fail()
}
// list.skipList.println()
deleteBase := 5
deleteCount := maxNameCount - 3 * deleteBase
for i := deleteBase; i < deleteBase+deleteCount; i++ {
list.DeleteName(String(i))
}
counter = 0
list.ListNames("", func(name string) bool {
counter++
return true
})
// list.skipList.println()
if counter != maxNameCount-deleteCount {
t.Fail()
}
// randomized deletion
list = newNameList(memStore, 7)
// Delete elements at random positions in the list.
rList := rand.Perm(maxN)
for _, i := range rList {
list.WriteName(String(i))
}
for _, i := range rList {
list.DeleteName(String(i))
}
counter = 0
list.ListNames("", func(name string) bool {
counter++
print(name, " ")
return true
})
if counter != 0 {
t.Fail()
}
}

563
weed/util/skiplist/skiplist.go

@ -0,0 +1,563 @@
package skiplist
// adapted from https://github.com/MauriceGit/skiplist/blob/master/skiplist.go
import (
"bytes"
"fmt"
"math/bits"
"math/rand"
"time"
)
const (
// maxLevel denotes the maximum height of the skiplist. This height will keep the skiplist
// efficient for up to 34m entries. If there is a need for much more, please adjust this constant accordingly.
maxLevel = 25
)
type SkipList struct {
startLevels [maxLevel]*SkipListElementReference
endLevels [maxLevel]*SkipListElementReference
maxNewLevel int
maxLevel int
listStore ListStore
hasChanges bool
// elementCount int
}
// NewSeedEps returns a new empty, initialized Skiplist.
// Given a seed, a deterministic height/list behaviour can be achieved.
// Eps is used to compare keys given by the ExtractKey() function on equality.
func NewSeed(seed int64, listStore ListStore) *SkipList {
// Initialize random number generator.
rand.Seed(seed)
//fmt.Printf("SkipList seed: %v\n", seed)
list := &SkipList{
maxNewLevel: maxLevel,
maxLevel: 0,
listStore: listStore,
// elementCount: 0,
}
return list
}
// New returns a new empty, initialized Skiplist.
func New(listStore ListStore) *SkipList {
return NewSeed(time.Now().UTC().UnixNano(), listStore)
}
// IsEmpty checks, if the skiplist is empty.
func (t *SkipList) IsEmpty() bool {
return t.startLevels[0] == nil
}
func (t *SkipList) generateLevel(maxLevel int) int {
level := maxLevel - 1
// First we apply some mask which makes sure that we don't get a level
// above our desired level. Then we find the first set bit.
var x = rand.Uint64() & ((1 << uint(maxLevel-1)) - 1)
zeroes := bits.TrailingZeros64(x)
if zeroes <= maxLevel {
level = zeroes
}
return level
}
func (t *SkipList) findEntryIndex(key []byte, minLevel int) int {
// Find good entry point so we don't accidentally skip half the list.
for i := t.maxLevel; i >= 0; i-- {
if t.startLevels[i] != nil && bytes.Compare(t.startLevels[i].Key, key) < 0 || i <= minLevel {
return i
}
}
return 0
}
func (t *SkipList) findExtended(key []byte, findGreaterOrEqual bool) (prevElementIfVisited *SkipListElement, foundElem *SkipListElement, ok bool, err error) {
foundElem = nil
ok = false
if t.IsEmpty() {
return
}
index := t.findEntryIndex(key, 0)
var currentNode *SkipListElement
currentNode, err = t.loadElement(t.startLevels[index])
if err != nil {
return
}
if currentNode == nil {
return
}
// In case, that our first element is already greater-or-equal!
if findGreaterOrEqual && compareElement(currentNode, key) > 0 {
foundElem = currentNode
ok = true
return
}
for {
if compareElement(currentNode, key) == 0 {
foundElem = currentNode
ok = true
return
}
// Which direction are we continuing next time?
if currentNode.Next[index] != nil && bytes.Compare(currentNode.Next[index].Key, key) <= 0 {
// Go right
currentNode, err = t.loadElement(currentNode.Next[index])
if err != nil {
return
}
if currentNode == nil {
return
}
} else {
if index > 0 {
// Early exit
if currentNode.Next[0] != nil && bytes.Compare(currentNode.Next[0].Key, key) == 0 {
prevElementIfVisited = currentNode
var currentNodeNext *SkipListElement
currentNodeNext, err = t.loadElement(currentNode.Next[0])
if err != nil {
return
}
if currentNodeNext == nil {
return
}
foundElem = currentNodeNext
ok = true
return
}
// Go down
index--
} else {
// Element is not found and we reached the bottom.
if findGreaterOrEqual {
foundElem, err = t.loadElement(currentNode.Next[index])
if err != nil {
return
}
ok = foundElem != nil
}
return
}
}
}
}
// Find tries to find an element in the skiplist based on the key from the given ListElement.
// elem can be used, if ok is true.
// Find runs in approx. O(log(n))
func (t *SkipList) Find(key []byte) (prevIfVisited *SkipListElement, elem *SkipListElement, ok bool, err error) {
if t == nil || key == nil {
return
}
prevIfVisited, elem, ok, err = t.findExtended(key, false)
return
}
// FindGreaterOrEqual finds the first element, that is greater or equal to the given ListElement e.
// The comparison is done on the keys (So on ExtractKey()).
// FindGreaterOrEqual runs in approx. O(log(n))
func (t *SkipList) FindGreaterOrEqual(key []byte) (prevIfVisited *SkipListElement, elem *SkipListElement, ok bool, err error) {
if t == nil || key == nil {
return
}
prevIfVisited, elem, ok, err = t.findExtended(key, true)
return
}
// Delete removes an element equal to e from the skiplist, if there is one.
// If there are multiple entries with the same value, Delete will remove one of them
// (Which one will change based on the actual skiplist layout)
// Delete runs in approx. O(log(n))
func (t *SkipList) Delete(key []byte) (err error) {
if t == nil || t.IsEmpty() || key == nil {
return
}
index := t.findEntryIndex(key, t.maxLevel)
var currentNode *SkipListElement
var nextNode *SkipListElement
for {
if currentNode == nil {
nextNode, err = t.loadElement(t.startLevels[index])
} else {
nextNode, err = t.loadElement(currentNode.Next[index])
}
if err != nil {
return err
}
// Found and remove!
if nextNode != nil && compareElement(nextNode, key) == 0 {
if currentNode != nil {
currentNode.Next[index] = nextNode.Next[index]
if err = t.saveElement(currentNode); err != nil {
return err
}
}
if index == 0 {
if nextNode.Next[index] != nil {
nextNextNode, err := t.loadElement(nextNode.Next[index])
if err != nil {
return err
}
if nextNextNode != nil {
nextNextNode.Prev = currentNode.Reference()
if err = t.saveElement(nextNextNode); err != nil {
return err
}
}
}
// t.elementCount--
if err = t.deleteElement(nextNode); err != nil {
return err
}
}
// Link from start needs readjustments.
startNextKey := t.startLevels[index].Key
if compareElement(nextNode, startNextKey) == 0 {
t.hasChanges = true
t.startLevels[index] = nextNode.Next[index]
// This was our currently highest node!
if t.startLevels[index] == nil {
t.maxLevel = index - 1
}
}
// Link from end needs readjustments.
if nextNode.Next[index] == nil {
t.endLevels[index] = currentNode.Reference()
t.hasChanges = true
}
nextNode.Next[index] = nil
}
if nextNode != nil && compareElement(nextNode, key) < 0 {
// Go right
currentNode = nextNode
} else {
// Go down
index--
if index < 0 {
break
}
}
}
return
}
// Insert inserts the given ListElement into the skiplist.
// Insert runs in approx. O(log(n))
func (t *SkipList) Insert(key, value []byte) (err error) {
if t == nil || key == nil {
return
}
level := t.generateLevel(t.maxNewLevel)
// Only grow the height of the skiplist by one at a time!
if level > t.maxLevel {
level = t.maxLevel + 1
t.maxLevel = level
t.hasChanges = true
}
elem := &SkipListElement{
Id: rand.Int63(),
Next: make([]*SkipListElementReference, t.maxNewLevel, t.maxNewLevel),
Level: int32(level),
Key: key,
Value: value,
}
// t.elementCount++
newFirst := true
newLast := true
if !t.IsEmpty() {
newFirst = compareElement(elem, t.startLevels[0].Key) < 0
newLast = compareElement(elem, t.endLevels[0].Key) > 0
}
normallyInserted := false
if !newFirst && !newLast {
normallyInserted = true
index := t.findEntryIndex(key, level)
var currentNode *SkipListElement
var nextNodeRef *SkipListElementReference
for {
if currentNode == nil {
nextNodeRef = t.startLevels[index]
} else {
nextNodeRef = currentNode.Next[index]
}
var nextNode *SkipListElement
// Connect node to next
if index <= level && (nextNodeRef == nil || bytes.Compare(nextNodeRef.Key, key) > 0) {
elem.Next[index] = nextNodeRef
if currentNode != nil {
currentNode.Next[index] = elem.Reference()
if err = t.saveElement(currentNode); err != nil {
return
}
}
if index == 0 {
elem.Prev = currentNode.Reference()
if nextNodeRef != nil {
if nextNode, err = t.loadElement(nextNodeRef); err != nil {
return
}
if nextNode != nil {
nextNode.Prev = elem.Reference()
if err = t.saveElement(nextNode); err != nil {
return
}
}
}
}
}
if nextNodeRef != nil && bytes.Compare(nextNodeRef.Key, key) <= 0 {
// Go right
if nextNode == nil {
// reuse nextNode when index == 0
if nextNode, err = t.loadElement(nextNodeRef); err != nil {
return
}
}
currentNode = nextNode
if currentNode == nil {
return
}
} else {
// Go down
index--
if index < 0 {
break
}
}
}
}
// Where we have a left-most position that needs to be referenced!
for i := level; i >= 0; i-- {
didSomething := false
if newFirst || normallyInserted {
if t.startLevels[i] == nil || bytes.Compare(t.startLevels[i].Key, key) > 0 {
if i == 0 && t.startLevels[i] != nil {
startLevelElement, err := t.loadElement(t.startLevels[i])
if err != nil {
return err
}
if startLevelElement != nil {
startLevelElement.Prev = elem.Reference()
if err = t.saveElement(startLevelElement); err != nil {
return err
}
}
}
elem.Next[i] = t.startLevels[i]
t.startLevels[i] = elem.Reference()
t.hasChanges = true
}
// link the endLevels to this element!
if elem.Next[i] == nil {
t.endLevels[i] = elem.Reference()
t.hasChanges = true
}
didSomething = true
}
if newLast {
// Places the element after the very last element on this level!
// This is very important, so we are not linking the very first element (newFirst AND newLast) to itself!
if !newFirst {
if t.endLevels[i] != nil {
endLevelElement, err := t.loadElement(t.endLevels[i])
if err != nil {
return err
}
if endLevelElement != nil {
endLevelElement.Next[i] = elem.Reference()
if err = t.saveElement(endLevelElement); err != nil {
return err
}
}
}
if i == 0 {
elem.Prev = t.endLevels[i]
}
t.endLevels[i] = elem.Reference()
t.hasChanges = true
}
// Link the startLevels to this element!
if t.startLevels[i] == nil || bytes.Compare(t.startLevels[i].Key, key) > 0 {
t.startLevels[i] = elem.Reference()
t.hasChanges = true
}
didSomething = true
}
if !didSomething {
break
}
}
if err = t.saveElement(elem); err != nil {
return err
}
return nil
}
// GetSmallestNode returns the very first/smallest node in the skiplist.
// GetSmallestNode runs in O(1)
func (t *SkipList) GetSmallestNode() (*SkipListElement, error) {
return t.loadElement(t.startLevels[0])
}
// GetLargestNode returns the very last/largest node in the skiplist.
// GetLargestNode runs in O(1)
func (t *SkipList) GetLargestNode() (*SkipListElement, error) {
return t.loadElement(t.endLevels[0])
}
// Next returns the next element based on the given node.
// Next will loop around to the first node, if you call it on the last!
func (t *SkipList) Next(e *SkipListElement) (*SkipListElement, error) {
if e.Next[0] == nil {
return t.loadElement(t.startLevels[0])
}
return t.loadElement(e.Next[0])
}
// Prev returns the previous element based on the given node.
// Prev will loop around to the last node, if you call it on the first!
func (t *SkipList) Prev(e *SkipListElement) (*SkipListElement, error) {
if e.Prev == nil {
return t.loadElement(t.endLevels[0])
}
return t.loadElement(e.Prev)
}
// ChangeValue can be used to change the actual value of a node in the skiplist
// without the need of Deleting and reinserting the node again.
// Be advised, that ChangeValue only works, if the actual key from ExtractKey() will stay the same!
// ok is an indicator, wether the value is actually changed.
func (t *SkipList) ChangeValue(e *SkipListElement, newValue []byte) (err error) {
// The key needs to stay correct, so this is very important!
e.Value = newValue
return t.saveElement(e)
}
// String returns a string format of the skiplist. Useful to get a graphical overview and/or debugging.
func (t *SkipList) println() {
print("start --> ")
for i, l := range t.startLevels {
if l == nil {
break
}
if i > 0 {
print(" -> ")
}
next := "---"
if l != nil {
next = string(l.Key)
}
print(fmt.Sprintf("[%v]", next))
}
println()
nodeRef := t.startLevels[0]
for nodeRef != nil {
print(fmt.Sprintf("%v: ", string(nodeRef.Key)))
node, _ := t.loadElement(nodeRef)
if node == nil {
break
}
for i := 0; i <= int(node.Level); i++ {
l := node.Next[i]
next := "---"
if l != nil {
next = string(l.Key)
}
if i == 0 {
prev := "---"
if node.Prev != nil {
prev = string(node.Prev.Key)
}
print(fmt.Sprintf("[%v|%v]", prev, next))
} else {
print(fmt.Sprintf("[%v]", next))
}
if i < int(node.Level) {
print(" -> ")
}
}
nodeRef = node.Next[0]
println()
}
print("end --> ")
for i, l := range t.endLevels {
if l == nil {
break
}
if i > 0 {
print(" -> ")
}
next := "---"
if l != nil {
next = string(l.Key)
}
print(fmt.Sprintf("[%v]", next))
}
println()
}

438
weed/util/skiplist/skiplist.pb.go

@ -0,0 +1,438 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.25.0
// protoc v3.12.3
// source: skiplist.proto
package skiplist
import (
proto "github.com/golang/protobuf/proto"
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
reflect "reflect"
sync "sync"
)
const (
// Verify that this generated code is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
// Verify that runtime/protoimpl is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
)
// This is a compile-time assertion that a sufficiently up-to-date version
// of the legacy proto package is being used.
const _ = proto.ProtoPackageIsVersion4
type SkipListProto struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
StartLevels []*SkipListElementReference `protobuf:"bytes,1,rep,name=start_levels,json=startLevels,proto3" json:"start_levels,omitempty"`
EndLevels []*SkipListElementReference `protobuf:"bytes,2,rep,name=end_levels,json=endLevels,proto3" json:"end_levels,omitempty"`
MaxNewLevel int32 `protobuf:"varint,3,opt,name=max_new_level,json=maxNewLevel,proto3" json:"max_new_level,omitempty"`
MaxLevel int32 `protobuf:"varint,4,opt,name=max_level,json=maxLevel,proto3" json:"max_level,omitempty"`
}
func (x *SkipListProto) Reset() {
*x = SkipListProto{}
if protoimpl.UnsafeEnabled {
mi := &file_skiplist_proto_msgTypes[0]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *SkipListProto) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*SkipListProto) ProtoMessage() {}
func (x *SkipListProto) ProtoReflect() protoreflect.Message {
mi := &file_skiplist_proto_msgTypes[0]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use SkipListProto.ProtoReflect.Descriptor instead.
func (*SkipListProto) Descriptor() ([]byte, []int) {
return file_skiplist_proto_rawDescGZIP(), []int{0}
}
func (x *SkipListProto) GetStartLevels() []*SkipListElementReference {
if x != nil {
return x.StartLevels
}
return nil
}
func (x *SkipListProto) GetEndLevels() []*SkipListElementReference {
if x != nil {
return x.EndLevels
}
return nil
}
func (x *SkipListProto) GetMaxNewLevel() int32 {
if x != nil {
return x.MaxNewLevel
}
return 0
}
func (x *SkipListProto) GetMaxLevel() int32 {
if x != nil {
return x.MaxLevel
}
return 0
}
type SkipListElementReference struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
ElementPointer int64 `protobuf:"varint,1,opt,name=element_pointer,json=elementPointer,proto3" json:"element_pointer,omitempty"`
Key []byte `protobuf:"bytes,2,opt,name=key,proto3" json:"key,omitempty"`
}
func (x *SkipListElementReference) Reset() {
*x = SkipListElementReference{}
if protoimpl.UnsafeEnabled {
mi := &file_skiplist_proto_msgTypes[1]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *SkipListElementReference) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*SkipListElementReference) ProtoMessage() {}
func (x *SkipListElementReference) ProtoReflect() protoreflect.Message {
mi := &file_skiplist_proto_msgTypes[1]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use SkipListElementReference.ProtoReflect.Descriptor instead.
func (*SkipListElementReference) Descriptor() ([]byte, []int) {
return file_skiplist_proto_rawDescGZIP(), []int{1}
}
func (x *SkipListElementReference) GetElementPointer() int64 {
if x != nil {
return x.ElementPointer
}
return 0
}
func (x *SkipListElementReference) GetKey() []byte {
if x != nil {
return x.Key
}
return nil
}
type SkipListElement struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Id int64 `protobuf:"varint,1,opt,name=id,proto3" json:"id,omitempty"`
Next []*SkipListElementReference `protobuf:"bytes,2,rep,name=next,proto3" json:"next,omitempty"`
Level int32 `protobuf:"varint,3,opt,name=level,proto3" json:"level,omitempty"`
Key []byte `protobuf:"bytes,4,opt,name=key,proto3" json:"key,omitempty"`
Value []byte `protobuf:"bytes,5,opt,name=value,proto3" json:"value,omitempty"`
Prev *SkipListElementReference `protobuf:"bytes,6,opt,name=prev,proto3" json:"prev,omitempty"`
}
func (x *SkipListElement) Reset() {
*x = SkipListElement{}
if protoimpl.UnsafeEnabled {
mi := &file_skiplist_proto_msgTypes[2]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *SkipListElement) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*SkipListElement) ProtoMessage() {}
func (x *SkipListElement) ProtoReflect() protoreflect.Message {
mi := &file_skiplist_proto_msgTypes[2]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use SkipListElement.ProtoReflect.Descriptor instead.
func (*SkipListElement) Descriptor() ([]byte, []int) {
return file_skiplist_proto_rawDescGZIP(), []int{2}
}
func (x *SkipListElement) GetId() int64 {
if x != nil {
return x.Id
}
return 0
}
func (x *SkipListElement) GetNext() []*SkipListElementReference {
if x != nil {
return x.Next
}
return nil
}
func (x *SkipListElement) GetLevel() int32 {
if x != nil {
return x.Level
}
return 0
}
func (x *SkipListElement) GetKey() []byte {
if x != nil {
return x.Key
}
return nil
}
func (x *SkipListElement) GetValue() []byte {
if x != nil {
return x.Value
}
return nil
}
func (x *SkipListElement) GetPrev() *SkipListElementReference {
if x != nil {
return x.Prev
}
return nil
}
type NameBatchData struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Names [][]byte `protobuf:"bytes,1,rep,name=names,proto3" json:"names,omitempty"`
}
func (x *NameBatchData) Reset() {
*x = NameBatchData{}
if protoimpl.UnsafeEnabled {
mi := &file_skiplist_proto_msgTypes[3]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *NameBatchData) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*NameBatchData) ProtoMessage() {}
func (x *NameBatchData) ProtoReflect() protoreflect.Message {
mi := &file_skiplist_proto_msgTypes[3]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use NameBatchData.ProtoReflect.Descriptor instead.
func (*NameBatchData) Descriptor() ([]byte, []int) {
return file_skiplist_proto_rawDescGZIP(), []int{3}
}
func (x *NameBatchData) GetNames() [][]byte {
if x != nil {
return x.Names
}
return nil
}
var File_skiplist_proto protoreflect.FileDescriptor
var file_skiplist_proto_rawDesc = []byte{
0x0a, 0x0e, 0x73, 0x6b, 0x69, 0x70, 0x6c, 0x69, 0x73, 0x74, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f,
0x12, 0x08, 0x73, 0x6b, 0x69, 0x70, 0x6c, 0x69, 0x73, 0x74, 0x22, 0xda, 0x01, 0x0a, 0x0d, 0x53,
0x6b, 0x69, 0x70, 0x4c, 0x69, 0x73, 0x74, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x45, 0x0a, 0x0c,
0x73, 0x74, 0x61, 0x72, 0x74, 0x5f, 0x6c, 0x65, 0x76, 0x65, 0x6c, 0x73, 0x18, 0x01, 0x20, 0x03,
0x28, 0x0b, 0x32, 0x22, 0x2e, 0x73, 0x6b, 0x69, 0x70, 0x6c, 0x69, 0x73, 0x74, 0x2e, 0x53, 0x6b,
0x69, 0x70, 0x4c, 0x69, 0x73, 0x74, 0x45, 0x6c, 0x65, 0x6d, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x66,
0x65, 0x72, 0x65, 0x6e, 0x63, 0x65, 0x52, 0x0b, 0x73, 0x74, 0x61, 0x72, 0x74, 0x4c, 0x65, 0x76,
0x65, 0x6c, 0x73, 0x12, 0x41, 0x0a, 0x0a, 0x65, 0x6e, 0x64, 0x5f, 0x6c, 0x65, 0x76, 0x65, 0x6c,
0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x22, 0x2e, 0x73, 0x6b, 0x69, 0x70, 0x6c, 0x69,
0x73, 0x74, 0x2e, 0x53, 0x6b, 0x69, 0x70, 0x4c, 0x69, 0x73, 0x74, 0x45, 0x6c, 0x65, 0x6d, 0x65,
0x6e, 0x74, 0x52, 0x65, 0x66, 0x65, 0x72, 0x65, 0x6e, 0x63, 0x65, 0x52, 0x09, 0x65, 0x6e, 0x64,
0x4c, 0x65, 0x76, 0x65, 0x6c, 0x73, 0x12, 0x22, 0x0a, 0x0d, 0x6d, 0x61, 0x78, 0x5f, 0x6e, 0x65,
0x77, 0x5f, 0x6c, 0x65, 0x76, 0x65, 0x6c, 0x18, 0x03, 0x20, 0x01, 0x28, 0x05, 0x52, 0x0b, 0x6d,
0x61, 0x78, 0x4e, 0x65, 0x77, 0x4c, 0x65, 0x76, 0x65, 0x6c, 0x12, 0x1b, 0x0a, 0x09, 0x6d, 0x61,
0x78, 0x5f, 0x6c, 0x65, 0x76, 0x65, 0x6c, 0x18, 0x04, 0x20, 0x01, 0x28, 0x05, 0x52, 0x08, 0x6d,
0x61, 0x78, 0x4c, 0x65, 0x76, 0x65, 0x6c, 0x22, 0x55, 0x0a, 0x18, 0x53, 0x6b, 0x69, 0x70, 0x4c,
0x69, 0x73, 0x74, 0x45, 0x6c, 0x65, 0x6d, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x66, 0x65, 0x72, 0x65,
0x6e, 0x63, 0x65, 0x12, 0x27, 0x0a, 0x0f, 0x65, 0x6c, 0x65, 0x6d, 0x65, 0x6e, 0x74, 0x5f, 0x70,
0x6f, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0e, 0x65, 0x6c,
0x65, 0x6d, 0x65, 0x6e, 0x74, 0x50, 0x6f, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x12, 0x10, 0x0a, 0x03,
0x6b, 0x65, 0x79, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x22, 0xcf,
0x01, 0x0a, 0x0f, 0x53, 0x6b, 0x69, 0x70, 0x4c, 0x69, 0x73, 0x74, 0x45, 0x6c, 0x65, 0x6d, 0x65,
0x6e, 0x74, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x03, 0x52, 0x02,
0x69, 0x64, 0x12, 0x36, 0x0a, 0x04, 0x6e, 0x65, 0x78, 0x74, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b,
0x32, 0x22, 0x2e, 0x73, 0x6b, 0x69, 0x70, 0x6c, 0x69, 0x73, 0x74, 0x2e, 0x53, 0x6b, 0x69, 0x70,
0x4c, 0x69, 0x73, 0x74, 0x45, 0x6c, 0x65, 0x6d, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x66, 0x65, 0x72,
0x65, 0x6e, 0x63, 0x65, 0x52, 0x04, 0x6e, 0x65, 0x78, 0x74, 0x12, 0x14, 0x0a, 0x05, 0x6c, 0x65,
0x76, 0x65, 0x6c, 0x18, 0x03, 0x20, 0x01, 0x28, 0x05, 0x52, 0x05, 0x6c, 0x65, 0x76, 0x65, 0x6c,
0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x03, 0x6b,
0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x05, 0x20, 0x01, 0x28,
0x0c, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x12, 0x36, 0x0a, 0x04, 0x70, 0x72, 0x65, 0x76,
0x18, 0x06, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x22, 0x2e, 0x73, 0x6b, 0x69, 0x70, 0x6c, 0x69, 0x73,
0x74, 0x2e, 0x53, 0x6b, 0x69, 0x70, 0x4c, 0x69, 0x73, 0x74, 0x45, 0x6c, 0x65, 0x6d, 0x65, 0x6e,
0x74, 0x52, 0x65, 0x66, 0x65, 0x72, 0x65, 0x6e, 0x63, 0x65, 0x52, 0x04, 0x70, 0x72, 0x65, 0x76,
0x22, 0x25, 0x0a, 0x0d, 0x4e, 0x61, 0x6d, 0x65, 0x42, 0x61, 0x74, 0x63, 0x68, 0x44, 0x61, 0x74,
0x61, 0x12, 0x14, 0x0a, 0x05, 0x6e, 0x61, 0x6d, 0x65, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0c,
0x52, 0x05, 0x6e, 0x61, 0x6d, 0x65, 0x73, 0x42, 0x33, 0x5a, 0x31, 0x67, 0x69, 0x74, 0x68, 0x75,
0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x63, 0x68, 0x72, 0x69, 0x73, 0x6c, 0x75, 0x73, 0x66, 0x2f,
0x73, 0x65, 0x61, 0x77, 0x65, 0x65, 0x64, 0x66, 0x73, 0x2f, 0x77, 0x65, 0x65, 0x64, 0x2f, 0x75,
0x74, 0x69, 0x6c, 0x2f, 0x73, 0x6b, 0x69, 0x70, 0x6c, 0x69, 0x73, 0x74, 0x62, 0x06, 0x70, 0x72,
0x6f, 0x74, 0x6f, 0x33,
}
var (
file_skiplist_proto_rawDescOnce sync.Once
file_skiplist_proto_rawDescData = file_skiplist_proto_rawDesc
)
func file_skiplist_proto_rawDescGZIP() []byte {
file_skiplist_proto_rawDescOnce.Do(func() {
file_skiplist_proto_rawDescData = protoimpl.X.CompressGZIP(file_skiplist_proto_rawDescData)
})
return file_skiplist_proto_rawDescData
}
var file_skiplist_proto_msgTypes = make([]protoimpl.MessageInfo, 4)
var file_skiplist_proto_goTypes = []interface{}{
(*SkipListProto)(nil), // 0: skiplist.SkipListProto
(*SkipListElementReference)(nil), // 1: skiplist.SkipListElementReference
(*SkipListElement)(nil), // 2: skiplist.SkipListElement
(*NameBatchData)(nil), // 3: skiplist.NameBatchData
}
var file_skiplist_proto_depIdxs = []int32{
1, // 0: skiplist.SkipListProto.start_levels:type_name -> skiplist.SkipListElementReference
1, // 1: skiplist.SkipListProto.end_levels:type_name -> skiplist.SkipListElementReference
1, // 2: skiplist.SkipListElement.next:type_name -> skiplist.SkipListElementReference
1, // 3: skiplist.SkipListElement.prev:type_name -> skiplist.SkipListElementReference
4, // [4:4] is the sub-list for method output_type
4, // [4:4] is the sub-list for method input_type
4, // [4:4] is the sub-list for extension type_name
4, // [4:4] is the sub-list for extension extendee
0, // [0:4] is the sub-list for field type_name
}
func init() { file_skiplist_proto_init() }
func file_skiplist_proto_init() {
if File_skiplist_proto != nil {
return
}
if !protoimpl.UnsafeEnabled {
file_skiplist_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*SkipListProto); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_skiplist_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*SkipListElementReference); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_skiplist_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*SkipListElement); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_skiplist_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*NameBatchData); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
}
type x struct{}
out := protoimpl.TypeBuilder{
File: protoimpl.DescBuilder{
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_skiplist_proto_rawDesc,
NumEnums: 0,
NumMessages: 4,
NumExtensions: 0,
NumServices: 0,
},
GoTypes: file_skiplist_proto_goTypes,
DependencyIndexes: file_skiplist_proto_depIdxs,
MessageInfos: file_skiplist_proto_msgTypes,
}.Build()
File_skiplist_proto = out.File
file_skiplist_proto_rawDesc = nil
file_skiplist_proto_goTypes = nil
file_skiplist_proto_depIdxs = nil
}

30
weed/util/skiplist/skiplist.proto

@ -0,0 +1,30 @@
syntax = "proto3";
package skiplist;
option go_package = "github.com/chrislusf/seaweedfs/weed/util/skiplist";
message SkipListProto {
repeated SkipListElementReference start_levels = 1;
repeated SkipListElementReference end_levels = 2;
int32 max_new_level = 3;
int32 max_level = 4;
}
message SkipListElementReference {
int64 element_pointer = 1;
bytes key = 2;
}
message SkipListElement {
int64 id = 1;
repeated SkipListElementReference next = 2;
int32 level = 3;
bytes key = 4;
bytes value = 5;
SkipListElementReference prev = 6;
}
message NameBatchData {
repeated bytes names = 1;
}

51
weed/util/skiplist/skiplist_serde.go

@ -0,0 +1,51 @@
package skiplist
import "bytes"
func compareElement(a *SkipListElement, key []byte) int {
if len(a.Key) == 0 {
return -1
}
return bytes.Compare(a.Key, key)
}
func (node *SkipListElement) Reference() *SkipListElementReference {
if node == nil {
return nil
}
return &SkipListElementReference{
ElementPointer: node.Id,
Key: node.Key,
}
}
func (t *SkipList) saveElement(element *SkipListElement) error {
if element == nil {
return nil
}
return t.listStore.SaveElement(element.Id, element)
}
func (t *SkipList) deleteElement(element *SkipListElement) error {
if element == nil {
return nil
}
return t.listStore.DeleteElement(element.Id)
}
func (t *SkipList) loadElement(ref *SkipListElementReference) (*SkipListElement, error) {
if ref.IsNil() {
return nil, nil
}
return t.listStore.LoadElement(ref.ElementPointer)
}
func (ref *SkipListElementReference) IsNil() bool {
if ref == nil {
return true
}
if len(ref.Key) == 0 {
return true
}
return false
}

295
weed/util/skiplist/skiplist_test.go

@ -0,0 +1,295 @@
package skiplist
import (
"bytes"
"fmt"
"math/rand"
"strconv"
"testing"
)
const (
maxN = 10000
)
var (
memStore = newMemStore()
)
func TestReverseInsert(t *testing.T) {
list := NewSeed(100, memStore)
list.Insert([]byte("zzz"), []byte("zzz"))
list.Delete([]byte("zzz"))
list.Insert([]byte("aaa"), []byte("aaa"))
if list.IsEmpty() {
t.Fail()
}
}
func TestInsertAndFind(t *testing.T) {
k0 := []byte("0")
var list *SkipList
var listPointer *SkipList
listPointer.Insert(k0, k0)
if _, _, ok, _ := listPointer.Find(k0); ok {
t.Fail()
}
list = New(memStore)
if _, _, ok, _ := list.Find(k0); ok {
t.Fail()
}
if !list.IsEmpty() {
t.Fail()
}
// Test at the beginning of the list.
for i := 0; i < maxN; i++ {
key := []byte(strconv.Itoa(maxN - i))
list.Insert(key, key)
}
for i := 0; i < maxN; i++ {
key := []byte(strconv.Itoa(maxN - i))
if _, _, ok, _ := list.Find(key); !ok {
t.Fail()
}
}
list = New(memStore)
// Test at the end of the list.
for i := 0; i < maxN; i++ {
key := []byte(strconv.Itoa(i))
list.Insert(key, key)
}
for i := 0; i < maxN; i++ {
key := []byte(strconv.Itoa(i))
if _, _, ok, _ := list.Find(key); !ok {
t.Fail()
}
}
list = New(memStore)
// Test at random positions in the list.
rList := rand.Perm(maxN)
for _, e := range rList {
key := []byte(strconv.Itoa(e))
// println("insert", e)
list.Insert(key, key)
}
for _, e := range rList {
key := []byte(strconv.Itoa(e))
// println("find", e)
if _, _, ok, _ := list.Find(key); !ok {
t.Fail()
}
}
// println("print list")
list.println()
}
func Element(x int) []byte {
return []byte(strconv.Itoa(x))
}
func TestDelete(t *testing.T) {
k0 := []byte("0")
var list *SkipList
// Delete on empty list
list.Delete(k0)
list = New(memStore)
list.Delete(k0)
if !list.IsEmpty() {
t.Fail()
}
list.Insert(k0, k0)
list.Delete(k0)
if !list.IsEmpty() {
t.Fail()
}
// Delete elements at the beginning of the list.
for i := 0; i < maxN; i++ {
list.Insert(Element(i), Element(i))
}
for i := 0; i < maxN; i++ {
list.Delete(Element(i))
}
if !list.IsEmpty() {
t.Fail()
}
list = New(memStore)
// Delete elements at the end of the list.
for i := 0; i < maxN; i++ {
list.Insert(Element(i), Element(i))
}
for i := 0; i < maxN; i++ {
list.Delete(Element(maxN - i - 1))
}
if !list.IsEmpty() {
t.Fail()
}
list = New(memStore)
// Delete elements at random positions in the list.
rList := rand.Perm(maxN)
for _, e := range rList {
list.Insert(Element(e), Element(e))
}
for _, e := range rList {
list.Delete(Element(e))
}
if !list.IsEmpty() {
t.Fail()
}
}
func TestNext(t *testing.T) {
list := New(memStore)
for i := 0; i < maxN; i++ {
list.Insert(Element(i), Element(i))
}
smallest, _ := list.GetSmallestNode()
largest, _ := list.GetLargestNode()
lastNode := smallest
node := lastNode
for node != largest {
node, _ = list.Next(node)
// Must always be incrementing here!
if bytes.Compare(node.Key, lastNode.Key) <= 0 {
t.Fail()
}
// Next.Prev must always point to itself!
prevNode, _ := list.Prev(node)
nextNode, _ := list.Next(prevNode)
if nextNode != node {
t.Fail()
}
lastNode = node
}
if nextNode, _ := list.Next(largest); nextNode != smallest {
t.Fail()
}
}
func TestPrev(t *testing.T) {
list := New(memStore)
for i := 0; i < maxN; i++ {
list.Insert(Element(i), Element(i))
}
smallest, _ := list.GetSmallestNode()
largest, _ := list.GetLargestNode()
lastNode := largest
node := lastNode
for node != smallest {
node, _ = list.Prev(node)
// Must always be incrementing here!
if bytes.Compare(node.Key, lastNode.Key) >= 0 {
t.Fail()
}
// Next.Prev must always point to itself!
nextNode, _ := list.Next(node)
prevNode, _ := list.Prev(nextNode)
if prevNode != node {
t.Fail()
}
lastNode = node
}
if prevNode, _ := list.Prev(smallest); prevNode != largest {
t.Fail()
}
}
func TestFindGreaterOrEqual(t *testing.T) {
maxNumber := maxN * 100
var list *SkipList
var listPointer *SkipList
// Test on empty list.
if _, _, ok, _ := listPointer.FindGreaterOrEqual(Element(0)); ok {
t.Fail()
}
list = New(memStore)
for i := 0; i < maxN; i++ {
list.Insert(Element(rand.Intn(maxNumber)), Element(i))
}
for i := 0; i < maxN; i++ {
key := Element(rand.Intn(maxNumber))
if _, v, ok, _ := list.FindGreaterOrEqual(key); ok {
// if f is v should be bigger than the element before
if v.Prev != nil && bytes.Compare(v.Prev.Key, key) >= 0 {
fmt.Printf("PrevV: %s\n key: %s\n\n", string(v.Prev.Key), string(key))
t.Fail()
}
// v should be bigger or equal to f
// If we compare directly, we get an equal key with a difference on the 10th decimal point, which fails.
if bytes.Compare(v.Key, key) < 0 {
fmt.Printf("v: %s\n key: %s\n\n", string(v.Key), string(key))
t.Fail()
}
} else {
lastNode, _ := list.GetLargestNode()
lastV := lastNode.GetValue()
// It is OK, to fail, as long as f is bigger than the last element.
if bytes.Compare(key, lastV) <= 0 {
fmt.Printf("lastV: %s\n key: %s\n\n", string(lastV), string(key))
t.Fail()
}
}
}
}
func TestChangeValue(t *testing.T) {
list := New(memStore)
for i := 0; i < maxN; i++ {
list.Insert(Element(i), []byte("value"))
}
for i := 0; i < maxN; i++ {
// The key only looks at the int so the string doesn't matter here!
_, f1, ok, _ := list.Find(Element(i))
if !ok {
t.Fail()
}
err := list.ChangeValue(f1, []byte("different value"))
if err != nil {
t.Fail()
}
_, f2, ok, _ := list.Find(Element(i))
if !ok {
t.Fail()
}
if bytes.Compare(f2.GetValue(), []byte("different value")) != 0 {
t.Fail()
}
}
}
Loading…
Cancel
Save