diff --git a/weed/s3api/filer_util.go b/weed/s3api/filer_util.go index 1c3814fce..b93b603e2 100644 --- a/weed/s3api/filer_util.go +++ b/weed/s3api/filer_util.go @@ -37,6 +37,7 @@ func (s3a *S3ApiServer) mkdir(ctx context.Context, parentDirectoryPath string, d glog.V(1).Infof("mkdir: %v", request) if _, err := client.CreateEntry(ctx, request); err != nil { + glog.V(0).Infof("mkdir %v: %v", request, err) return fmt.Errorf("mkdir %s/%s: %v", parentDirectoryPath, dirName, err) } @@ -67,6 +68,7 @@ func (s3a *S3ApiServer) mkFile(ctx context.Context, parentDirectoryPath string, glog.V(1).Infof("create file: %s/%s", parentDirectoryPath, fileName) if _, err := client.CreateEntry(ctx, request); err != nil { + glog.V(0).Infof("create file %v:%v", request, err) return fmt.Errorf("create file %s/%s: %v", parentDirectoryPath, fileName, err) } @@ -89,6 +91,7 @@ func (s3a *S3ApiServer) list(ctx context.Context, parentDirectoryPath, prefix, s glog.V(4).Infof("read directory: %v", request) resp, err := client.ListEntries(ctx, request) if err != nil { + glog.V(0).Infof("read directory %v: %v", request, err) return fmt.Errorf("list dir %v: %v", parentDirectoryPath, err) } @@ -114,6 +117,7 @@ func (s3a *S3ApiServer) rm(ctx context.Context, parentDirectoryPath string, entr glog.V(1).Infof("delete entry %v/%v: %v", parentDirectoryPath, entryName, request) if _, err := client.DeleteEntry(ctx, request); err != nil { + glog.V(0).Infof("delete entry %v: %v", request, err) return fmt.Errorf("delete entry %s/%s: %v", parentDirectoryPath, entryName, err) } @@ -134,6 +138,7 @@ func (s3a *S3ApiServer) exists(ctx context.Context, parentDirectoryPath string, glog.V(4).Infof("exists entry %v/%v: %v", parentDirectoryPath, entryName, request) resp, err := client.LookupDirectoryEntry(ctx, request) if err != nil { + glog.V(0).Infof("exists entry %v: %v", request, err) return fmt.Errorf("exists entry %s/%s: %v", parentDirectoryPath, entryName, err) } diff --git a/weed/wdclient/vid_map.go b/weed/wdclient/vid_map.go index 01d9cdaed..97df49cb6 100644 --- a/weed/wdclient/vid_map.go +++ b/weed/wdclient/vid_map.go @@ -3,15 +3,18 @@ package wdclient import ( "errors" "fmt" - "math/rand" "strconv" "strings" "sync" - "time" + "sync/atomic" "github.com/chrislusf/seaweedfs/weed/glog" ) +const ( + maxCursorIndex = 4096 +) + type Location struct { Url string `json:"url,omitempty"` PublicUrl string `json:"publicUrl,omitempty"` @@ -20,14 +23,25 @@ type Location struct { type vidMap struct { sync.RWMutex vid2Locations map[uint32][]Location - r *rand.Rand + + cursor int32 } func newVidMap() vidMap { return vidMap{ vid2Locations: make(map[uint32][]Location), - r: rand.New(rand.NewSource(time.Now().UnixNano())), + cursor: -1, + } +} + +func (vc *vidMap) getLocationIndex(length int) (int, error) { + if length <= 0 { + return 0, fmt.Errorf("invalid length: %d", length) + } + if atomic.LoadInt32(&vc.cursor) == maxCursorIndex { + atomic.CompareAndSwapInt32(&vc.cursor, maxCursorIndex, -1) } + return int(atomic.AddInt32(&vc.cursor, 1)) % length, nil } func (vc *vidMap) LookupVolumeServerUrl(vid string) (serverUrl string, err error) { @@ -94,7 +108,12 @@ func (vc *vidMap) GetRandomLocation(vid uint32) (serverUrl string, err error) { return "", fmt.Errorf("volume %d not found", vid) } - return locations[vc.r.Intn(len(locations))].Url, nil + index, err := vc.getLocationIndex(len(locations)) + if err != nil { + return "", fmt.Errorf("volume %d: %v", vid, err) + } + + return locations[index].Url, nil } func (vc *vidMap) addLocation(vid uint32, location Location) { diff --git a/weed/wdclient/vid_map_test.go b/weed/wdclient/vid_map_test.go new file mode 100644 index 000000000..87be2fc25 --- /dev/null +++ b/weed/wdclient/vid_map_test.go @@ -0,0 +1,76 @@ +package wdclient + +import ( + "fmt" + "testing" +) + +func TestLocationIndex(t *testing.T) { + vm := vidMap{} + // test must be failed + mustFailed := func(length int) { + _, err := vm.getLocationIndex(length) + if err == nil { + t.Errorf("length %d must be failed", length) + } + if err.Error() != fmt.Sprintf("invalid length: %d", length) { + t.Errorf("length %d must be failed. error: %v", length, err) + } + } + + mustFailed(-1) + mustFailed(0) + + mustOk := func(length, cursor, expect int) { + if length <= 0 { + t.Fatal("please don't do this") + } + vm.cursor = int32(cursor) + got, err := vm.getLocationIndex(length) + if err != nil { + t.Errorf("length: %d, why? %v\n", length, err) + return + } + if got != expect { + t.Errorf("cursor: %d, length: %d, expect: %d, got: %d\n", cursor, length, expect, got) + return + } + } + + for i := -1; i < 100; i++ { + mustOk(7, i, (i+1)%7) + } + + // when cursor reaches MaxInt64 + mustOk(7, maxCursorIndex, 0) + + // test with constructor + vm = newVidMap() + length := 7 + for i := 0; i < 100; i++ { + got, err := vm.getLocationIndex(length) + if err != nil { + t.Errorf("length: %d, why? %v\n", length, err) + return + } + if got != i%length { + t.Errorf("length: %d, i: %d, got: %d\n", length, i, got) + } + } +} + +func BenchmarkLocationIndex(b *testing.B) { + b.SetParallelism(8) + vm := vidMap{ + cursor: maxCursorIndex - 4000, + } + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + _, err := vm.getLocationIndex(3) + if err != nil { + b.Error(err) + } + } + }) +}