Browse Source

Update volume layout when volume option have changed

pull/279/head
tnextday 10 years ago
parent
commit
0260a9c030
  1. 6
      go/storage/replica_placement.go
  2. 4
      go/storage/volume_ttl.go
  3. 15
      go/topology/data_node.go
  4. 18
      go/topology/topology.go
  5. 3
      go/weed/weed_server/volume_server_handlers_sync.go

6
go/storage/replica_placement.go

@ -51,3 +51,9 @@ func (rp *ReplicaPlacement) String() string {
func (rp *ReplicaPlacement) GetCopyCount() int { func (rp *ReplicaPlacement) GetCopyCount() int {
return rp.DiffDataCenterCount + rp.DiffRackCount + rp.SameRackCount + 1 return rp.DiffDataCenterCount + rp.DiffRackCount + rp.SameRackCount + 1
} }
func (rp *ReplicaPlacement) Equal(rp1 *ReplicaPlacement) bool {
return rp.SameRackCount == rp1.SameRackCount &&
rp.DiffRackCount == rp1.DiffRackCount &&
rp.DiffDataCenterCount == rp1.DiffDataCenterCount
}

4
go/storage/volume_ttl.go

@ -114,6 +114,10 @@ func toStoredByte(readableUnitByte byte) byte {
return 0 return 0
} }
func (t *TTL) Equal(t1 *TTL) bool {
return t.count == t1.count && t.unit == t1.unit
}
func (t TTL) Minutes() uint32 { func (t TTL) Minutes() uint32 {
switch t.unit { switch t.unit {
case Empty: case Empty:

15
go/topology/data_node.go

@ -31,8 +31,9 @@ func (dn *DataNode) String() string {
return fmt.Sprintf("Node:%s, volumes:%v, Ip:%s, Port:%d, PublicUrl:%s, Dead:%v", dn.NodeImpl.String(), dn.volumes, dn.Ip, dn.Port, dn.PublicUrl, dn.Dead) return fmt.Sprintf("Node:%s, volumes:%v, Ip:%s, Port:%d, PublicUrl:%s, Dead:%v", dn.NodeImpl.String(), dn.volumes, dn.Ip, dn.Port, dn.PublicUrl, dn.Dead)
} }
func (dn *DataNode) AddOrUpdateVolume(v storage.VolumeInfo) {
if _, ok := dn.volumes[v.Id]; !ok {
func (dn *DataNode) AddOrUpdateVolume(v storage.VolumeInfo) (optionChanged bool) {
optionChanged = false
if v1, ok := dn.volumes[v.Id]; !ok {
dn.volumes[v.Id] = v dn.volumes[v.Id] = v
dn.UpAdjustVolumeCountDelta(1) dn.UpAdjustVolumeCountDelta(1)
if !v.ReadOnly { if !v.ReadOnly {
@ -40,11 +41,13 @@ func (dn *DataNode) AddOrUpdateVolume(v storage.VolumeInfo) {
} }
dn.UpAdjustMaxVolumeId(v.Id) dn.UpAdjustMaxVolumeId(v.Id)
} else { } else {
optionChanged = !v1.Ttl.Equal(v.Ttl) || v1.Collection != v.Collection || !v1.ReplicaPlacement.Equal(v.ReplicaPlacement)
dn.volumes[v.Id] = v dn.volumes[v.Id] = v
} }
return
} }
func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) (deletedVolumes []storage.VolumeInfo) {
func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) (needToDeleteVolumes []storage.VolumeInfo) {
actualVolumeMap := make(map[storage.VolumeId]storage.VolumeInfo) actualVolumeMap := make(map[storage.VolumeId]storage.VolumeInfo)
for _, v := range actualVolumes { for _, v := range actualVolumes {
actualVolumeMap[v.Id] = v actualVolumeMap[v.Id] = v
@ -53,13 +56,15 @@ func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) (deletedVo
if _, ok := actualVolumeMap[vid]; !ok { if _, ok := actualVolumeMap[vid]; !ok {
glog.V(0).Infoln("Deleting volume id:", vid) glog.V(0).Infoln("Deleting volume id:", vid)
delete(dn.volumes, vid) delete(dn.volumes, vid)
deletedVolumes = append(deletedVolumes, v)
needToDeleteVolumes = append(needToDeleteVolumes, v)
dn.UpAdjustVolumeCountDelta(-1) dn.UpAdjustVolumeCountDelta(-1)
dn.UpAdjustActiveVolumeCountDelta(-1) dn.UpAdjustActiveVolumeCountDelta(-1)
} }
} //TODO: adjust max volume id, if need to reclaim volume ids } //TODO: adjust max volume id, if need to reclaim volume ids
for _, v := range actualVolumes { for _, v := range actualVolumes {
dn.AddOrUpdateVolume(v)
if dn.AddOrUpdateVolume(v) {
needToDeleteVolumes = append(needToDeleteVolumes, v)
}
} }
return return
} }

18
go/topology/topology.go

@ -116,12 +116,12 @@ func (t *Topology) HasWritableVolume(option *VolumeGrowOption) bool {
} }
func (t *Topology) PickForWrite(count uint64, option *VolumeGrowOption) (string, uint64, *DataNode, error) { func (t *Topology) PickForWrite(count uint64, option *VolumeGrowOption) (string, uint64, *DataNode, error) {
vid, count, datanodes, err := t.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl).PickForWrite(count, option)
if err != nil || datanodes.Length() == 0 {
vid, count, dataNodes, err := t.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl).PickForWrite(count, option)
if err != nil || dataNodes.Length() == 0 {
return "", 0, nil, errors.New("No writable volumes available!") return "", 0, nil, errors.New("No writable volumes available!")
} }
fileId, count := t.Sequence.NextFileId(count) fileId, count := t.Sequence.NextFileId(count)
return storage.NewFileId(*vid, fileId, rand.Uint32()).String(), count, datanodes.Head(), nil
return storage.NewFileId(*vid, fileId, rand.Uint32()).String(), count, dataNodes.Head(), nil
} }
func (t *Topology) GetVolumeLayout(collectionName string, rp *storage.ReplicaPlacement, ttl *storage.TTL) *VolumeLayout { func (t *Topology) GetVolumeLayout(collectionName string, rp *storage.ReplicaPlacement, ttl *storage.TTL) *VolumeLayout {
@ -167,13 +167,17 @@ func (t *Topology) ProcessJoinMessage(joinMessage *operation.JoinMessage) {
glog.V(0).Infoln("Fail to convert joined volume information:", err.Error()) glog.V(0).Infoln("Fail to convert joined volume information:", err.Error())
} }
} }
deletedVolumes := dn.UpdateVolumes(volumeInfos)
// If volume options(replica placement, ttl or collection) have changed,
// we need update its volume layout.
needToDeleteVolumes := dn.UpdateVolumes(volumeInfos)
for _, v := range needToDeleteVolumes {
t.UnRegisterVolumeLayout(v, dn)
}
for _, v := range volumeInfos { for _, v := range volumeInfos {
t.RegisterVolumeLayout(v, dn) t.RegisterVolumeLayout(v, dn)
} }
for _, v := range deletedVolumes {
t.UnRegisterVolumeLayout(v, dn)
}
} }
func (t *Topology) GetOrCreateDataCenter(dcName string) *DataCenter { func (t *Topology) GetOrCreateDataCenter(dcName string) *DataCenter {

3
go/weed/weed_server/volume_server_handlers_sync.go

@ -94,9 +94,6 @@ func (vs *VolumeServer) getVolumeCleanDataHandler(w http.ResponseWriter, r *http
http.Error(w, fmt.Sprintf("Not Found volume: %v", e), http.StatusBadRequest) http.Error(w, fmt.Sprintf("Not Found volume: %v", e), http.StatusBadRequest)
return return
} }
//set read only when replicating
v.SetReadOnly(true)
defer v.SetReadOnly(false)
cr, e := v.GetVolumeCleanReader() cr, e := v.GetVolumeCleanReader()
if e != nil { if e != nil {
http.Error(w, fmt.Sprintf("Get volume clean reader: %v", e), http.StatusInternalServerError) http.Error(w, fmt.Sprintf("Get volume clean reader: %v", e), http.StatusInternalServerError)

Loading…
Cancel
Save