diff --git a/weed-fs/src/cmd/weed/master.go b/weed-fs/src/cmd/weed/master.go index a70f8273e..2ee2fcc85 100644 --- a/weed-fs/src/cmd/weed/master.go +++ b/weed-fs/src/cmd/weed/master.go @@ -102,6 +102,7 @@ func dirAssignHandler(w http.ResponseWriter, r *http.Request) { } func dirJoinHandler(w http.ResponseWriter, r *http.Request) { + init := r.FormValue("init")=="true" ip := r.FormValue("ip") if ip == "" { ip = r.RemoteAddr[0:strings.Index(r.RemoteAddr, ":")] @@ -113,7 +114,7 @@ func dirJoinHandler(w http.ResponseWriter, r *http.Request) { volumes := new([]storage.VolumeInfo) json.Unmarshal([]byte(r.FormValue("volumes")), volumes) debug(s, "volumes", r.FormValue("volumes")) - topo.RegisterVolumes(*volumes, ip, port, publicUrl, maxVolumeCount) + topo.RegisterVolumes(init, *volumes, ip, port, publicUrl, maxVolumeCount) m := make(map[string]interface{}) m["VolumeSizeLimit"] = uint64(*volumeSizeLimitMB)*1024*1024 writeJson(w, r, m) diff --git a/weed-fs/src/pkg/storage/store.go b/weed-fs/src/pkg/storage/store.go index 2131ac91a..a2c5f040b 100644 --- a/weed-fs/src/pkg/storage/store.go +++ b/weed-fs/src/pkg/storage/store.go @@ -19,9 +19,10 @@ type Store struct { PublicUrl string MaxVolumeCount int - //read from the master masterNode string - volumeSizeLimit uint64 + connected bool + volumeSizeLimit uint64 //read from the master + } func NewStore(port int, ip, publicUrl, dirname string, maxVolumeCount int) (s *Store) { @@ -108,7 +109,7 @@ func (s *Store) loadExistingVolumes() { if s.volumes[vid] == nil { v := NewVolume(s.dir, vid, CopyNil) s.volumes[vid] = v - log.Println("In dir", s.dir, "read volume =", vid, "replicationType =", v.replicaType, "version =", v.version,"size =", v.Size()) + log.Println("In dir", s.dir, "read volume =", vid, "replicationType =", v.replicaType, "version =", v.version, "size =", v.Size()) } } } @@ -141,6 +142,9 @@ func (s *Store) Join() error { } bytes, _ := json.Marshal(stats) values := make(url.Values) + if !s.connected { + values.Add("init", "true") + } values.Add("port", strconv.Itoa(s.Port)) values.Add("ip", s.Ip) values.Add("publicUrl", s.PublicUrl) @@ -155,6 +159,7 @@ func (s *Store) Join() error { return err } s.volumeSizeLimit = ret.VolumeSizeLimit + s.connected = true return nil } func (s *Store) Close() { diff --git a/weed-fs/src/pkg/topology/rack.go b/weed-fs/src/pkg/topology/rack.go index f27e5b492..1555ef682 100644 --- a/weed-fs/src/pkg/topology/rack.go +++ b/weed-fs/src/pkg/topology/rack.go @@ -18,6 +18,15 @@ func NewRack(id string) *Rack { return r } +func (r *Rack) FindDataNode(ip string, port int) *DataNode { + for _, c := range r.Children() { + dn := c.(*DataNode) + if dn.MatchLocation(ip, port) { + return dn + } + } + return nil +} func (r *Rack) GetOrCreateDataNode(ip string, port int, publicUrl string, maxVolumeCount int) *DataNode { for _, c := range r.Children() { dn := c.(*DataNode) diff --git a/weed-fs/src/pkg/topology/topology.go b/weed-fs/src/pkg/topology/topology.go index adaa16691..e0a2b0c34 100644 --- a/weed-fs/src/pkg/topology/topology.go +++ b/weed-fs/src/pkg/topology/topology.go @@ -120,11 +120,15 @@ func (t *Topology) RegisterVolumeLayout(v *storage.VolumeInfo, dn *DataNode) { t.GetVolumeLayout(v.RepType).RegisterVolume(v, dn) } -func (t *Topology) RegisterVolumes(volumeInfos []storage.VolumeInfo, ip string, port int, publicUrl string, maxVolumeCount int) { +func (t *Topology) RegisterVolumes(init bool, volumeInfos []storage.VolumeInfo, ip string, port int, publicUrl string, maxVolumeCount int) { dcName, rackName := t.configuration.Locate(ip) dc := t.GetOrCreateDataCenter(dcName) rack := dc.GetOrCreateRack(rackName) - dn := rack.GetOrCreateDataNode(ip, port, publicUrl, maxVolumeCount) + dn := rack.FindDataNode(ip, port) + if init && dn != nil { + t.UnRegisterDataNode(dn) + } + dn = rack.GetOrCreateDataNode(ip, port, publicUrl, maxVolumeCount) for _, v := range volumeInfos { dn.AddOrUpdateVolume(v) t.RegisterVolumeLayout(&v, dn) @@ -142,4 +146,3 @@ func (t *Topology) GetOrCreateDataCenter(dcName string) *DataCenter { t.LinkChildNode(dc) return dc } -