From 0260a9c03040715307971cba8656430cb46bb3b1 Mon Sep 17 00:00:00 2001 From: tnextday Date: Fri, 18 Dec 2015 21:27:11 +0800 Subject: [PATCH] Update volume layout when volume option have changed --- go/storage/replica_placement.go | 6 ++++++ go/storage/volume_ttl.go | 4 ++++ go/topology/data_node.go | 15 ++++++++++----- go/topology/topology.go | 18 +++++++++++------- .../weed_server/volume_server_handlers_sync.go | 3 --- 5 files changed, 31 insertions(+), 15 deletions(-) diff --git a/go/storage/replica_placement.go b/go/storage/replica_placement.go index c1aca52eb..31f8f464a 100644 --- a/go/storage/replica_placement.go +++ b/go/storage/replica_placement.go @@ -51,3 +51,9 @@ func (rp *ReplicaPlacement) String() string { func (rp *ReplicaPlacement) GetCopyCount() int { return rp.DiffDataCenterCount + rp.DiffRackCount + rp.SameRackCount + 1 } + +func (rp *ReplicaPlacement) Equal(rp1 *ReplicaPlacement) bool { + return rp.SameRackCount == rp1.SameRackCount && + rp.DiffRackCount == rp1.DiffRackCount && + rp.DiffDataCenterCount == rp1.DiffDataCenterCount +} diff --git a/go/storage/volume_ttl.go b/go/storage/volume_ttl.go index 4318bb048..676479cfb 100644 --- a/go/storage/volume_ttl.go +++ b/go/storage/volume_ttl.go @@ -114,6 +114,10 @@ func toStoredByte(readableUnitByte byte) byte { return 0 } +func (t *TTL) Equal(t1 *TTL) bool { + return t.count == t1.count && t.unit == t1.unit +} + func (t TTL) Minutes() uint32 { switch t.unit { case Empty: diff --git a/go/topology/data_node.go b/go/topology/data_node.go index fe0926e85..378a7486f 100644 --- a/go/topology/data_node.go +++ b/go/topology/data_node.go @@ -31,8 +31,9 @@ func (dn *DataNode) String() string { return fmt.Sprintf("Node:%s, volumes:%v, Ip:%s, Port:%d, PublicUrl:%s, Dead:%v", dn.NodeImpl.String(), dn.volumes, dn.Ip, dn.Port, dn.PublicUrl, dn.Dead) } -func (dn *DataNode) AddOrUpdateVolume(v storage.VolumeInfo) { - if _, ok := dn.volumes[v.Id]; !ok { +func (dn *DataNode) AddOrUpdateVolume(v storage.VolumeInfo) (optionChanged bool) { + optionChanged = false + if v1, ok := dn.volumes[v.Id]; !ok { dn.volumes[v.Id] = v dn.UpAdjustVolumeCountDelta(1) if !v.ReadOnly { @@ -40,11 +41,13 @@ func (dn *DataNode) AddOrUpdateVolume(v storage.VolumeInfo) { } dn.UpAdjustMaxVolumeId(v.Id) } else { + optionChanged = !v1.Ttl.Equal(v.Ttl) || v1.Collection != v.Collection || !v1.ReplicaPlacement.Equal(v.ReplicaPlacement) dn.volumes[v.Id] = v } + return } -func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) (deletedVolumes []storage.VolumeInfo) { +func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) (needToDeleteVolumes []storage.VolumeInfo) { actualVolumeMap := make(map[storage.VolumeId]storage.VolumeInfo) for _, v := range actualVolumes { actualVolumeMap[v.Id] = v @@ -53,13 +56,15 @@ func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) (deletedVo if _, ok := actualVolumeMap[vid]; !ok { glog.V(0).Infoln("Deleting volume id:", vid) delete(dn.volumes, vid) - deletedVolumes = append(deletedVolumes, v) + needToDeleteVolumes = append(needToDeleteVolumes, v) dn.UpAdjustVolumeCountDelta(-1) dn.UpAdjustActiveVolumeCountDelta(-1) } } //TODO: adjust max volume id, if need to reclaim volume ids for _, v := range actualVolumes { - dn.AddOrUpdateVolume(v) + if dn.AddOrUpdateVolume(v) { + needToDeleteVolumes = append(needToDeleteVolumes, v) + } } return } diff --git a/go/topology/topology.go b/go/topology/topology.go index c329b5837..a2131ae3a 100644 --- a/go/topology/topology.go +++ b/go/topology/topology.go @@ -116,12 +116,12 @@ func (t *Topology) HasWritableVolume(option *VolumeGrowOption) bool { } func (t *Topology) PickForWrite(count uint64, option *VolumeGrowOption) (string, uint64, *DataNode, error) { - vid, count, datanodes, err := t.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl).PickForWrite(count, option) - if err != nil || datanodes.Length() == 0 { + vid, count, dataNodes, err := t.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl).PickForWrite(count, option) + if err != nil || dataNodes.Length() == 0 { return "", 0, nil, errors.New("No writable volumes available!") } fileId, count := t.Sequence.NextFileId(count) - return storage.NewFileId(*vid, fileId, rand.Uint32()).String(), count, datanodes.Head(), nil + return storage.NewFileId(*vid, fileId, rand.Uint32()).String(), count, dataNodes.Head(), nil } func (t *Topology) GetVolumeLayout(collectionName string, rp *storage.ReplicaPlacement, ttl *storage.TTL) *VolumeLayout { @@ -167,13 +167,17 @@ func (t *Topology) ProcessJoinMessage(joinMessage *operation.JoinMessage) { glog.V(0).Infoln("Fail to convert joined volume information:", err.Error()) } } - deletedVolumes := dn.UpdateVolumes(volumeInfos) + + // If volume options(replica placement, ttl or collection) have changed, + // we need update its volume layout. + needToDeleteVolumes := dn.UpdateVolumes(volumeInfos) + for _, v := range needToDeleteVolumes { + t.UnRegisterVolumeLayout(v, dn) + } for _, v := range volumeInfos { t.RegisterVolumeLayout(v, dn) } - for _, v := range deletedVolumes { - t.UnRegisterVolumeLayout(v, dn) - } + } func (t *Topology) GetOrCreateDataCenter(dcName string) *DataCenter { diff --git a/go/weed/weed_server/volume_server_handlers_sync.go b/go/weed/weed_server/volume_server_handlers_sync.go index 859bff563..fef434d28 100644 --- a/go/weed/weed_server/volume_server_handlers_sync.go +++ b/go/weed/weed_server/volume_server_handlers_sync.go @@ -94,9 +94,6 @@ func (vs *VolumeServer) getVolumeCleanDataHandler(w http.ResponseWriter, r *http http.Error(w, fmt.Sprintf("Not Found volume: %v", e), http.StatusBadRequest) return } - //set read only when replicating - v.SetReadOnly(true) - defer v.SetReadOnly(false) cr, e := v.GetVolumeCleanReader() if e != nil { http.Error(w, fmt.Sprintf("Get volume clean reader: %v", e), http.StatusInternalServerError)