Browse Source

MasterClient replicates all vid locations

pull/702/head
Chris Lu 7 years ago
parent
commit
1d779389cb
  1. 8
      weed/filer2/filer.go
  2. 16
      weed/server/filer_grpc_server.go
  3. 12
      weed/wdclient/masterclient.go
  4. 59
      weed/wdclient/vid_map.go

8
weed/filer2/filer.go

@ -18,13 +18,13 @@ import (
type Filer struct { type Filer struct {
store FilerStore store FilerStore
directoryCache *ccache.Cache directoryCache *ccache.Cache
masterClient *wdclient.MasterClient
MasterClient *wdclient.MasterClient
} }
func NewFiler(masters []string) *Filer { func NewFiler(masters []string) *Filer {
return &Filer{ return &Filer{
directoryCache: ccache.New(ccache.Configure().MaxSize(1000).ItemsToPrune(100)), directoryCache: ccache.New(ccache.Configure().MaxSize(1000).ItemsToPrune(100)),
masterClient: wdclient.NewMasterClient(context.Background(), "filer", masters),
MasterClient: wdclient.NewMasterClient(context.Background(), "filer", masters),
} }
} }
@ -37,11 +37,11 @@ func (f *Filer) DisableDirectoryCache() {
} }
func (fs *Filer) GetMaster() string { func (fs *Filer) GetMaster() string {
return fs.masterClient.GetMaster()
return fs.MasterClient.GetMaster()
} }
func (fs *Filer) KeepConnectedToMaster() { func (fs *Filer) KeepConnectedToMaster() {
fs.masterClient.KeepConnectedToMaster()
fs.MasterClient.KeepConnectedToMaster()
} }
func (f *Filer) CreateEntry(entry *Entry) error { func (f *Filer) CreateEntry(entry *Entry) error {

16
weed/server/filer_grpc_server.go

@ -99,24 +99,24 @@ func (fs *FilerServer) GetEntryAttributes(ctx context.Context, req *filer_pb.Get
func (fs *FilerServer) LookupVolume(ctx context.Context, req *filer_pb.LookupVolumeRequest) (*filer_pb.LookupVolumeResponse, error) { func (fs *FilerServer) LookupVolume(ctx context.Context, req *filer_pb.LookupVolumeRequest) (*filer_pb.LookupVolumeResponse, error) {
lookupResult, err := operation.LookupVolumeIds(fs.filer.GetMaster(), req.VolumeIds)
if err != nil {
return nil, err
}
resp := &filer_pb.LookupVolumeResponse{ resp := &filer_pb.LookupVolumeResponse{
LocationsMap: make(map[string]*filer_pb.Locations), LocationsMap: make(map[string]*filer_pb.Locations),
} }
for vid, locations := range lookupResult {
for _, vidString := range req.VolumeIds {
vid, err := strconv.Atoi(vidString)
if err != nil {
glog.V(1).Infof("Unknown volume id %s", vid)
return nil, err
}
var locs []*filer_pb.Location var locs []*filer_pb.Location
for _, loc := range locations.Locations {
for _, loc := range fs.filer.MasterClient.GetLocations(uint32(vid)) {
locs = append(locs, &filer_pb.Location{ locs = append(locs, &filer_pb.Location{
Url: loc.Url, Url: loc.Url,
PublicUrl: loc.PublicUrl, PublicUrl: loc.PublicUrl,
}) })
} }
resp.LocationsMap[vid] = &filer_pb.Locations{
resp.LocationsMap[vidString] = &filer_pb.Locations{
Locations: locs, Locations: locs,
} }
} }

12
weed/wdclient/masterclient.go

@ -15,6 +15,8 @@ type MasterClient struct {
name string name string
currentMaster string currentMaster string
masters []string masters []string
VidMap
} }
func NewMasterClient(ctx context.Context, clientName string, masters []string) *MasterClient { func NewMasterClient(ctx context.Context, clientName string, masters []string) *MasterClient {
@ -61,6 +63,16 @@ func (mc *MasterClient) tryAllMasters() {
return err return err
} else { } else {
glog.V(0).Infof("volume location: %+v", volumeLocation) glog.V(0).Infof("volume location: %+v", volumeLocation)
loc := Location{
Url: volumeLocation.Url,
PublicUrl: volumeLocation.PublicUrl,
}
for _, newVid := range volumeLocation.NewVids {
mc.AddLocation(newVid, loc)
}
for _, deletedVid := range volumeLocation.DeletedVids {
mc.DeleteLocation(deletedVid, loc)
}
} }
} }
}) })

59
weed/wdclient/vid_map.go

@ -0,0 +1,59 @@
package wdclient
import (
"sync"
)
type Location struct {
Url string `json:"url,omitempty"`
PublicUrl string `json:"publicUrl,omitempty"`
}
type VidMap struct {
sync.RWMutex
vid2Locations map[uint32][]Location
}
func (vc *VidMap) GetLocations(vid uint32) (locations []Location) {
vc.RLock()
defer vc.RUnlock()
return vc.vid2Locations[vid]
}
func (vc *VidMap) AddLocation(vid uint32, location Location) {
vc.Lock()
defer vc.Unlock()
locations, found := vc.vid2Locations[vid]
if !found {
vc.vid2Locations[vid] = []Location{location}
return
}
for _, loc := range locations {
if loc.Url == location.Url {
return
}
}
vc.vid2Locations[vid] = append(locations, location)
}
func (vc *VidMap) DeleteLocation(vid uint32, location Location) {
vc.Lock()
defer vc.Unlock()
locations, found := vc.vid2Locations[vid]
if !found {
return
}
for i, loc := range locations {
if loc.Url == location.Url {
vc.vid2Locations[vid] = append(locations[0:i], locations[i+1:]...)
}
}
}
Loading…
Cancel
Save