Browse Source

fiil serverUrls sorted by data center

pull/1610/head
Konstantin Lebedev 4 years ago
parent
commit
fc7baef5bb
  1. 2
      weed/command/benchmark.go
  2. 5
      weed/filer/filer.go
  3. 2
      weed/filer/leveldb/leveldb_store_test.go
  4. 2
      weed/server/filer_server.go
  5. 2
      weed/server/master_server.go
  6. 2
      weed/shell/commands.go
  7. 6
      weed/wdclient/masterclient.go
  8. 14
      weed/wdclient/vid_map.go
  9. 2
      weed/wdclient/vid_map_test.go

2
weed/command/benchmark.go

@ -125,7 +125,7 @@ func runBenchmark(cmd *Command, args []string) bool {
defer pprof.StopCPUProfile() defer pprof.StopCPUProfile()
} }
b.masterClient = wdclient.NewMasterClient(b.grpcDialOption, "client", "", 0, strings.Split(*b.masters, ","))
b.masterClient = wdclient.NewMasterClient(b.grpcDialOption, "client", "", 0, "", strings.Split(*b.masters, ","))
go b.masterClient.KeepConnectedToMaster() go b.masterClient.KeepConnectedToMaster()
b.masterClient.WaitUntilConnected() b.masterClient.WaitUntilConnected()

5
weed/filer/filer.go

@ -44,16 +44,15 @@ type Filer struct {
} }
func NewFiler(masters []string, grpcDialOption grpc.DialOption, func NewFiler(masters []string, grpcDialOption grpc.DialOption,
filerHost string, filerGrpcPort uint32, collection string, replication string, notifyFn func()) *Filer {
filerHost string, filerGrpcPort uint32, collection string, replication string, dataCenter string, notifyFn func()) *Filer {
f := &Filer{ f := &Filer{
MasterClient: wdclient.NewMasterClient(grpcDialOption, "filer", filerHost, filerGrpcPort, masters),
MasterClient: wdclient.NewMasterClient(grpcDialOption, "filer", filerHost, filerGrpcPort, dataCenter, masters),
fileIdDeletionQueue: util.NewUnboundedQueue(), fileIdDeletionQueue: util.NewUnboundedQueue(),
GrpcDialOption: grpcDialOption, GrpcDialOption: grpcDialOption,
} }
f.LocalMetaLogBuffer = log_buffer.NewLogBuffer(LogFlushInterval, f.logFlushFunc, notifyFn) f.LocalMetaLogBuffer = log_buffer.NewLogBuffer(LogFlushInterval, f.logFlushFunc, notifyFn)
f.metaLogCollection = collection f.metaLogCollection = collection
f.metaLogReplication = replication f.metaLogReplication = replication
go f.loopProcessingDeletion() go f.loopProcessingDeletion()
return f return f

2
weed/filer/leveldb/leveldb_store_test.go

@ -11,7 +11,7 @@ import (
) )
func TestCreateAndFind(t *testing.T) { func TestCreateAndFind(t *testing.T) {
testFiler := filer.NewFiler(nil, nil, "", 0, "", "", nil)
testFiler := filer.NewFiler(nil, nil, "", 0, "", "", "", nil)
dir, _ := ioutil.TempDir("", "seaweedfs_filer_test") dir, _ := ioutil.TempDir("", "seaweedfs_filer_test")
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
store := &LevelDBStore{} store := &LevelDBStore{}

2
weed/server/filer_server.go

@ -89,7 +89,7 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption)
glog.Fatal("master list is required!") glog.Fatal("master list is required!")
} }
fs.filer = filer.NewFiler(option.Masters, fs.grpcDialOption, option.Host, option.Port, option.Collection, option.DefaultReplication, func() {
fs.filer = filer.NewFiler(option.Masters, fs.grpcDialOption, option.Host, option.Port, option.Collection, option.DefaultReplication, option.DataCenter, func() {
fs.listenersCond.Broadcast() fs.listenersCond.Broadcast()
}) })
fs.filer.Cipher = option.Cipher fs.filer.Cipher = option.Cipher

2
weed/server/master_server.go

@ -93,7 +93,7 @@ func NewMasterServer(r *mux.Router, option *MasterOption, peers []string) *Maste
preallocateSize: preallocateSize, preallocateSize: preallocateSize,
clientChans: make(map[string]chan *master_pb.VolumeLocation), clientChans: make(map[string]chan *master_pb.VolumeLocation),
grpcDialOption: grpcDialOption, grpcDialOption: grpcDialOption,
MasterClient: wdclient.NewMasterClient(grpcDialOption, "master", option.Host, 0, peers),
MasterClient: wdclient.NewMasterClient(grpcDialOption, "master", option.Host, 0, "", peers),
adminLocks: NewAdminLocks(), adminLocks: NewAdminLocks(),
} }
ms.bounedLeaderChan = make(chan int, 16) ms.bounedLeaderChan = make(chan int, 16)

2
weed/shell/commands.go

@ -45,7 +45,7 @@ var (
func NewCommandEnv(options ShellOptions) *CommandEnv { func NewCommandEnv(options ShellOptions) *CommandEnv {
ce := &CommandEnv{ ce := &CommandEnv{
env: make(map[string]string), env: make(map[string]string),
MasterClient: wdclient.NewMasterClient(options.GrpcDialOption, pb.AdminShellClient, "", 0, strings.Split(*options.Masters, ",")),
MasterClient: wdclient.NewMasterClient(options.GrpcDialOption, pb.AdminShellClient, "", 0, "", strings.Split(*options.Masters, ",")),
option: options, option: options,
} }
ce.locker = exclusive_locks.NewExclusiveLocker(ce.MasterClient) ce.locker = exclusive_locks.NewExclusiveLocker(ce.MasterClient)

6
weed/wdclient/masterclient.go

@ -24,14 +24,14 @@ type MasterClient struct {
vidMap vidMap
} }
func NewMasterClient(grpcDialOption grpc.DialOption, clientType string, clientHost string, clientGrpcPort uint32, masters []string) *MasterClient {
func NewMasterClient(grpcDialOption grpc.DialOption, clientType string, clientHost string, clientGrpcPort uint32, clientDataCenter string, masters []string) *MasterClient {
return &MasterClient{ return &MasterClient{
clientType: clientType, clientType: clientType,
clientHost: clientHost, clientHost: clientHost,
grpcPort: clientGrpcPort, grpcPort: clientGrpcPort,
masters: masters, masters: masters,
grpcDialOption: grpcDialOption, grpcDialOption: grpcDialOption,
vidMap: newVidMap(),
vidMap: newVidMap(clientDataCenter),
} }
} }
@ -89,7 +89,7 @@ func (mc *MasterClient) tryAllMasters() {
} }
mc.currentMaster = "" mc.currentMaster = ""
mc.vidMap = newVidMap()
mc.vidMap = newVidMap("")
} }
} }

14
weed/wdclient/vid_map.go

@ -24,13 +24,14 @@ type Location struct {
type vidMap struct { type vidMap struct {
sync.RWMutex sync.RWMutex
vid2Locations map[uint32][]Location vid2Locations map[uint32][]Location
cursor int32
DataCenter string
cursor int32
} }
func newVidMap() vidMap {
func newVidMap(dataCenter string) vidMap {
return vidMap{ return vidMap{
vid2Locations: make(map[uint32][]Location), vid2Locations: make(map[uint32][]Location),
DataCenter: dataCenter,
cursor: -1, cursor: -1,
} }
} }
@ -57,7 +58,11 @@ func (vc *vidMap) LookupVolumeServerUrl(vid string) (serverUrls []string, err er
return nil, fmt.Errorf("volume %d not found", id) return nil, fmt.Errorf("volume %d not found", id)
} }
for _, loc := range locations { for _, loc := range locations {
serverUrls = append(serverUrls, loc.Url)
if vc.DataCenter == "" || loc.DataCenter == "" || vc.DataCenter != loc.DataCenter {
serverUrls = append(serverUrls, loc.Url)
} else {
serverUrls = append([]string{loc.Url}, serverUrls...)
}
} }
return return
} }
@ -93,7 +98,6 @@ func (vc *vidMap) GetVidLocations(vid string) (locations []Location, err error)
func (vc *vidMap) GetLocations(vid uint32) (locations []Location, found bool) { func (vc *vidMap) GetLocations(vid uint32) (locations []Location, found bool) {
vc.RLock() vc.RLock()
defer vc.RUnlock() defer vc.RUnlock()
locations, found = vc.vid2Locations[vid] locations, found = vc.vid2Locations[vid]
return return
} }

2
weed/wdclient/vid_map_test.go

@ -45,7 +45,7 @@ func TestLocationIndex(t *testing.T) {
mustOk(7, maxCursorIndex, 0) mustOk(7, maxCursorIndex, 0)
// test with constructor // test with constructor
vm = newVidMap()
vm = newVidMap("")
length := 7 length := 7
for i := 0; i < 100; i++ { for i := 0; i < 100; i++ {
got, err := vm.getLocationIndex(length) got, err := vm.getLocationIndex(length)

Loading…
Cancel
Save