diff --git a/go/operation/sync_volume.go b/go/operation/sync_volume.go index 54944a64e..713cf33c1 100644 --- a/go/operation/sync_volume.go +++ b/go/operation/sync_volume.go @@ -10,7 +10,6 @@ import ( ) type SyncVolumeResponse struct { - Replication string `json:"Replication,omitempty"` Ttl string `json:"Ttl,omitempty"` TailOffset uint64 `json:"TailOffset,omitempty"` CompactRevision uint16 `json:"CompactRevision,omitempty"` diff --git a/go/proto/system_message.proto b/go/proto/system_message.proto index 703b1f4a0..f89eaf180 100644 --- a/go/proto/system_message.proto +++ b/go/proto/system_message.proto @@ -8,7 +8,7 @@ message VolumeInformationMessage { required uint64 delete_count = 5; required uint64 deleted_byte_count = 6; optional bool read_only = 7; - required uint32 replica_placement = 8; +// required uint32 replica_placement = 8; optional uint32 version = 9 [default=2]; optional uint32 ttl = 10; } diff --git a/go/storage/replica_placement.go b/go/storage/replica_placement.go index 31f8f464a..e53bffba8 100644 --- a/go/storage/replica_placement.go +++ b/go/storage/replica_placement.go @@ -1,6 +1,7 @@ package storage import ( + "encoding/json" "errors" "fmt" ) @@ -11,6 +12,10 @@ type ReplicaPlacement struct { DiffDataCenterCount int } +type ReplicaPlacements struct { + settings map[string]*ReplicaPlacement +} + func NewReplicaPlacementFromString(t string) (*ReplicaPlacement, error) { rp := &ReplicaPlacement{} for i, c := range t { @@ -57,3 +62,41 @@ func (rp *ReplicaPlacement) Equal(rp1 *ReplicaPlacement) bool { rp.DiffRackCount == rp1.DiffRackCount && rp.DiffDataCenterCount == rp1.DiffDataCenterCount } + +func NewReplicaPlacements(defaultRP string) *ReplicaPlacements { + rp, e := NewReplicaPlacementFromString(defaultRP) + if e != nil { + rp, _ = NewReplicaPlacementFromString("000") + } + rps := &ReplicaPlacements{settings: make(map[string]*ReplicaPlacement)} + rps.settings[""] = rp + return rps +} + +func NewReplicaPlacementsFromJson(s string) *ReplicaPlacements { + m := make(map[string]*ReplicaPlacement) + if json.Unmarshal([]byte(s), m) == nil { + m[""], _ = NewReplicaPlacementFromString("000") + } + return &ReplicaPlacements{settings: m} +} + +func (rps *ReplicaPlacements) Get(collection string) *ReplicaPlacement { + if rp, ok := rps.settings[collection]; ok { + return rp + } + return rps.settings[""] +} + +func (rps *ReplicaPlacements) Set(collection, t string) error { + rp, e := NewReplicaPlacementFromString(t) + if e == nil { + rps.settings[collection] = rp + } + return e +} + +func (rps *ReplicaPlacements) Marshal() string { + buf, _ := json.Marshal(rps.settings) + return string(buf) +} diff --git a/go/storage/store.go b/go/storage/store.go index 6c7871084..9b077737d 100644 --- a/go/storage/store.go +++ b/go/storage/store.go @@ -106,11 +106,7 @@ func NewStore(port int, ip, publicUrl string, dirnames []string, maxVolumeCounts } return } -func (s *Store) AddVolume(volumeListString string, collection string, needleMapKind NeedleMapType, replicaPlacement string, ttlString string) error { - rt, e := NewReplicaPlacementFromString(replicaPlacement) - if e != nil { - return e - } +func (s *Store) AddVolume(volumeListString string, collection string, needleMapKind NeedleMapType, ttlString string) error { ttl, e := ReadTTL(ttlString) if e != nil { return e @@ -122,7 +118,7 @@ func (s *Store) AddVolume(volumeListString string, collection string, needleMapK if err != nil { return fmt.Errorf("Volume Id %s is not a valid unsigned integer!", id_string) } - e = s.addVolume(VolumeId(id), collection, needleMapKind, rt, ttl) + e = s.addVolume(VolumeId(id), collection, needleMapKind, ttl) } else { pair := strings.Split(range_string, "-") start, start_err := strconv.ParseUint(pair[0], 10, 64) @@ -134,7 +130,7 @@ func (s *Store) AddVolume(volumeListString string, collection string, needleMapK return fmt.Errorf("Volume End Id %s is not a valid unsigned integer!", pair[1]) } for id := start; id <= end; id++ { - if err := s.addVolume(VolumeId(id), collection, needleMapKind, rt, ttl); err != nil { + if err := s.addVolume(VolumeId(id), collection, needleMapKind, ttl); err != nil { e = err } } @@ -183,14 +179,14 @@ func (s *Store) findFreeLocation() (ret *DiskLocation) { } return ret } -func (s *Store) addVolume(vid VolumeId, collection string, needleMapKind NeedleMapType, replicaPlacement *ReplicaPlacement, ttl *TTL) error { +func (s *Store) addVolume(vid VolumeId, collection string, needleMapKind NeedleMapType, ttl *TTL) error { if s.findVolume(vid) != nil { return fmt.Errorf("Volume Id %d already exists!", vid) } if location := s.findFreeLocation(); location != nil { - glog.V(0).Infof("In dir %s adds volume:%v collection:%s replicaPlacement:%v ttl:%v", - location.Directory, vid, collection, replicaPlacement, ttl) - if volume, err := NewVolume(location.Directory, collection, vid, needleMapKind, replicaPlacement, ttl); err == nil { + glog.V(0).Infof("In dir %s adds volume:%v collection:%s ttl:%v", + location.Directory, vid, collection, ttl) + if volume, err := NewVolume(location.Directory, collection, vid, needleMapKind, ttl); err == nil { location.volumes[vid] = volume return nil } else { @@ -213,9 +209,9 @@ func (l *DiskLocation) loadExistingVolumes(needleMapKind NeedleMapType) { } if vid, err := NewVolumeId(base); err == nil { if l.volumes[vid] == nil { - if v, e := NewVolume(l.Directory, collection, vid, needleMapKind, nil, nil); e == nil { + if v, e := NewVolume(l.Directory, collection, vid, needleMapKind, nil); e == nil { l.volumes[vid] = v - glog.V(0).Infof("data file %s, replicaPlacement=%s v=%d size=%d ttl=%s", l.Directory+"/"+name, v.ReplicaPlacement, v.Version(), v.Size(), v.Ttl.String()) + glog.V(0).Infof("data file %s, v=%d size=%d ttl=%s", l.Directory+"/"+name, v.Version(), v.Size(), v.Ttl.String()) } else { glog.V(0).Infof("new volume %s error %s", name, e) } @@ -234,7 +230,6 @@ func (s *Store) Status() []*VolumeInfo { Id: VolumeId(k), Size: v.ContentSize(), Collection: v.Collection, - ReplicaPlacement: v.ReplicaPlacement, Version: v.Version(), FileCount: v.nm.FileCount(), DeleteCount: v.nm.DeletedCount(), @@ -281,7 +276,6 @@ func (s *Store) SendHeartbeatToMaster() (masterNode string, secretKey security.S DeleteCount: proto.Uint64(uint64(v.nm.DeletedCount())), DeletedByteCount: proto.Uint64(v.nm.DeletedSize()), ReadOnly: proto.Bool(v.readOnly), - ReplicaPlacement: proto.Uint32(uint32(v.ReplicaPlacement.Byte())), Version: proto.Uint32(uint32(v.Version())), Ttl: proto.Uint32(v.Ttl.ToUint32()), } diff --git a/go/storage/volume.go b/go/storage/volume.go index 44d80a6be..b7b492b9d 100644 --- a/go/storage/volume.go +++ b/go/storage/volume.go @@ -28,9 +28,9 @@ type Volume struct { lastModifiedTime uint64 //unix time in seconds } -func NewVolume(dirname string, collection string, id VolumeId, needleMapKind NeedleMapType, replicaPlacement *ReplicaPlacement, ttl *TTL) (v *Volume, e error) { +func NewVolume(dirname string, collection string, id VolumeId, needleMapKind NeedleMapType, ttl *TTL) (v *Volume, e error) { v = &Volume{dir: dirname, Collection: collection, Id: id} - v.SuperBlock = SuperBlock{ReplicaPlacement: replicaPlacement, Ttl: ttl} + v.SuperBlock = SuperBlock{Ttl: ttl} v.needleMapKind = needleMapKind e = v.load(true, true, needleMapKind) return @@ -87,7 +87,7 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind } } - if v.ReplicaPlacement == nil { + if v.version == NoneVersion { e = v.readSuperBlock() } else { e = v.maybeWriteSuperBlock() @@ -145,10 +145,6 @@ func (v *Volume) Close() { _ = v.dataFile.Close() } -func (v *Volume) NeedToReplicate() bool { - return v.ReplicaPlacement.GetCopyCount() > 1 -} - // isFileUnchanged checks whether this needle to write is same as last one. // It requires serialized access in the same volume. func (v *Volume) isFileUnchanged(n *Needle) bool { @@ -427,17 +423,6 @@ func (v *Volume) exiredLongEnough(maxDelayMinutes uint32) bool { return false } -func (v *Volume) SetReplica(replica *ReplicaPlacement) error { - if replica == nil { - replica, _ = NewReplicaPlacementFromString("000") - } - if v.ReplicaPlacement.String() == replica.String() { - return nil - } - v.ReplicaPlacement = replica - return v.writeSuperBlock() -} - func (v *Volume) SetReadOnly(isReadOnly bool) error { if isReadOnly == false { if fi, e := v.dataFile.Stat(); e != nil { diff --git a/go/storage/volume_info.go b/go/storage/volume_info.go index e4979c790..659faf213 100644 --- a/go/storage/volume_info.go +++ b/go/storage/volume_info.go @@ -10,7 +10,6 @@ import ( type VolumeInfo struct { Id VolumeId Size uint64 - ReplicaPlacement *ReplicaPlacement Ttl *TTL Collection string Version Version @@ -31,18 +30,13 @@ func NewVolumeInfo(m *operation.VolumeInformationMessage) (vi VolumeInfo, err er ReadOnly: *m.ReadOnly, Version: Version(*m.Version), } - rp, e := NewReplicaPlacementFromByte(byte(*m.ReplicaPlacement)) - if e != nil { - return vi, e - } - vi.ReplicaPlacement = rp vi.Ttl = LoadTTLFromUint32(*m.Ttl) return vi, nil } func (vi VolumeInfo) String() string { - return fmt.Sprintf("Id:%d, Size:%d, ReplicaPlacement:%s, Collection:%s, Version:%v, FileCount:%d, DeleteCount:%d, DeletedByteCount:%d, ReadOnly:%v", - vi.Id, vi.Size, vi.ReplicaPlacement, vi.Collection, vi.Version, vi.FileCount, vi.DeleteCount, vi.DeletedByteCount, vi.ReadOnly) + return fmt.Sprintf("Id:%d, Size:%d, Collection:%s, Version:%v, FileCount:%d, DeleteCount:%d, DeletedByteCount:%d, ReadOnly:%v", + vi.Id, vi.Size, vi.Collection, vi.Version, vi.FileCount, vi.DeleteCount, vi.DeletedByteCount, vi.ReadOnly) } /*VolumesInfo sorting*/ diff --git a/go/storage/volume_super_block.go b/go/storage/volume_super_block.go index e0fbd9e9a..5c7d01c21 100644 --- a/go/storage/volume_super_block.go +++ b/go/storage/volume_super_block.go @@ -15,14 +15,13 @@ const ( /* * Super block currently has 8 bytes allocated for each volume. * Byte 0: version, 1 or 2 -* Byte 1: Replica Placement strategy, 000, 001, 002, 010, etc +* Byte 1: Replica Placement strategy, 000, 001, 002, 010, etc (Deprecated!) * Byte 2 and byte 3: Time to live. See TTL for definition * Byte 4 and byte 5: The number of times the volume has been compacted. * Rest bytes: Reserved */ type SuperBlock struct { version Version - ReplicaPlacement *ReplicaPlacement Ttl *TTL CompactRevision uint16 } @@ -33,7 +32,7 @@ func (s *SuperBlock) Version() Version { func (s *SuperBlock) Bytes() []byte { header := make([]byte, SuperBlockSize) header[0] = byte(s.version) - header[1] = s.ReplicaPlacement.Byte() + header[1] = 0 s.Ttl.ToBytes(header[2:4]) util.Uint16toBytes(header[4:6], s.CompactRevision) return header @@ -83,9 +82,6 @@ func (v *Volume) writeSuperBlock() (err error) { func ParseSuperBlock(header []byte) (superBlock SuperBlock, err error) { superBlock.version = Version(header[0]) - if superBlock.ReplicaPlacement, err = NewReplicaPlacementFromByte(header[1]); err != nil { - err = fmt.Errorf("cannot read replica type: %s", err.Error()) - } superBlock.Ttl = LoadTTLFromBytes(header[2:4]) superBlock.CompactRevision = util.BytesToUint16(header[4:6]) return diff --git a/go/storage/volume_sync.go b/go/storage/volume_sync.go index 01d59d6ae..7d09c873d 100644 --- a/go/storage/volume_sync.go +++ b/go/storage/volume_sync.go @@ -169,7 +169,6 @@ func (v *Volume) GetVolumeSyncStatus() operation.SyncVolumeResponse { syncStatus.IdxFileSize = v.nm.IndexFileSize() syncStatus.CompactRevision = v.SuperBlock.CompactRevision syncStatus.Ttl = v.SuperBlock.Ttl.String() - syncStatus.Replication = v.SuperBlock.ReplicaPlacement.String() return syncStatus } diff --git a/go/storage/volume_version.go b/go/storage/volume_version.go index 2e9f58aa2..8cd132c58 100644 --- a/go/storage/volume_version.go +++ b/go/storage/volume_version.go @@ -3,6 +3,7 @@ package storage type Version uint8 const ( + NoneVersion = Version(0) Version1 = Version(1) Version2 = Version(2) CurrentVersion = Version2 diff --git a/go/topology/allocate_volume.go b/go/topology/allocate_volume.go index f014c3527..6de3130b1 100644 --- a/go/topology/allocate_volume.go +++ b/go/topology/allocate_volume.go @@ -18,7 +18,6 @@ func AllocateVolume(dn *DataNode, vid storage.VolumeId, option *VolumeGrowOption values := make(url.Values) values.Add("volume", vid.String()) values.Add("collection", option.Collection) - values.Add("replication", option.ReplicaPlacement.String()) values.Add("ttl", option.Ttl.String()) jsonBlob, err := util.Post("http://"+dn.Url()+"/admin/assign_volume", values) if err != nil { diff --git a/go/topology/collection.go b/go/topology/collection.go index 376b62405..3d7bb7e0e 100644 --- a/go/topology/collection.go +++ b/go/topology/collection.go @@ -10,10 +10,11 @@ import ( type Collection struct { Name string volumeSizeLimit uint64 + rp *storage.ReplicaPlacement storageType2VolumeLayout *util.ConcurrentReadMap } -func NewCollection(name string, volumeSizeLimit uint64) *Collection { +func NewCollection(name string, rp *storage.ReplicaPlacement, volumeSizeLimit uint64) *Collection { c := &Collection{Name: name, volumeSizeLimit: volumeSizeLimit} c.storageType2VolumeLayout = util.NewConcurrentReadMap() return c @@ -23,13 +24,13 @@ func (c *Collection) String() string { return fmt.Sprintf("Name:%s, volumeSizeLimit:%d, storageType2VolumeLayout:%v", c.Name, c.volumeSizeLimit, c.storageType2VolumeLayout) } -func (c *Collection) GetOrCreateVolumeLayout(rp *storage.ReplicaPlacement, ttl *storage.TTL) *VolumeLayout { - keyString := rp.String() +func (c *Collection) GetOrCreateVolumeLayout(ttl *storage.TTL) *VolumeLayout { + keyString := "" if ttl != nil { keyString += ttl.String() } vl := c.storageType2VolumeLayout.Get(keyString, func() interface{} { - return NewVolumeLayout(rp, ttl, c.volumeSizeLimit) + return NewVolumeLayout(c.rp, ttl, c.volumeSizeLimit) }) return vl.(*VolumeLayout) } diff --git a/go/topology/data_node.go b/go/topology/data_node.go index 378a7486f..72ced1b73 100644 --- a/go/topology/data_node.go +++ b/go/topology/data_node.go @@ -31,9 +31,8 @@ func (dn *DataNode) String() string { return fmt.Sprintf("Node:%s, volumes:%v, Ip:%s, Port:%d, PublicUrl:%s, Dead:%v", dn.NodeImpl.String(), dn.volumes, dn.Ip, dn.Port, dn.PublicUrl, dn.Dead) } -func (dn *DataNode) AddOrUpdateVolume(v storage.VolumeInfo) (optionChanged bool) { - optionChanged = false - if v1, ok := dn.volumes[v.Id]; !ok { +func (dn *DataNode) AddOrUpdateVolume(v storage.VolumeInfo){ + if _, ok := dn.volumes[v.Id]; !ok { dn.volumes[v.Id] = v dn.UpAdjustVolumeCountDelta(1) if !v.ReadOnly { @@ -41,13 +40,12 @@ func (dn *DataNode) AddOrUpdateVolume(v storage.VolumeInfo) (optionChanged bool) } dn.UpAdjustMaxVolumeId(v.Id) } else { - optionChanged = !v1.Ttl.Equal(v.Ttl) || v1.Collection != v.Collection || !v1.ReplicaPlacement.Equal(v.ReplicaPlacement) dn.volumes[v.Id] = v } return } -func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) (needToDeleteVolumes []storage.VolumeInfo) { +func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) (deletedVolumes []storage.VolumeInfo) { actualVolumeMap := make(map[storage.VolumeId]storage.VolumeInfo) for _, v := range actualVolumes { actualVolumeMap[v.Id] = v @@ -56,15 +54,13 @@ func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) (needToDel if _, ok := actualVolumeMap[vid]; !ok { glog.V(0).Infoln("Deleting volume id:", vid) delete(dn.volumes, vid) - needToDeleteVolumes = append(needToDeleteVolumes, v) + deletedVolumes = append(deletedVolumes, v) dn.UpAdjustVolumeCountDelta(-1) dn.UpAdjustActiveVolumeCountDelta(-1) } } //TODO: adjust max volume id, if need to reclaim volume ids for _, v := range actualVolumes { - if dn.AddOrUpdateVolume(v) { - needToDeleteVolumes = append(needToDeleteVolumes, v) - } + dn.AddOrUpdateVolume(v) } return } diff --git a/go/topology/store_replicate.go b/go/topology/store_replicate.go index dc26dade0..89ada0a69 100644 --- a/go/topology/store_replicate.go +++ b/go/topology/store_replicate.go @@ -20,31 +20,25 @@ func ReplicatedWrite(masterNode string, s *storage.Store, jwt := security.GetJwt(r) ret, err := s.Write(volumeId, needle) - needToReplicate := !s.HasVolume(volumeId) if err != nil { errorStatus = "Failed to write to local disk (" + err.Error() + ")" - } else if ret > 0 { - needToReplicate = needToReplicate || s.GetVolume(volumeId).NeedToReplicate() - } else { + } else if ret <= 0 { errorStatus = "Failed to write to local disk" } - if !needToReplicate && ret > 0 { - needToReplicate = s.GetVolume(volumeId).NeedToReplicate() - } - if needToReplicate { //send to other replica locations - if r.FormValue("type") != "replicate" { - if !distributedOperation(masterNode, s, volumeId, func(location operation.Location) bool { - _, err := operation.Upload( - "http://"+location.Url+r.URL.Path+"?type=replicate&ts="+strconv.FormatUint(needle.LastModified, 10), - string(needle.Name), bytes.NewReader(needle.Data), needle.IsGzipped(), string(needle.Mime), - jwt) - return err == nil - }) { - ret = 0 - errorStatus = "Failed to write to replicas for volume " + volumeId.String() - } + //send to other replica locations + if r.FormValue("type") != "replicate" { + if !distributedOperation(masterNode, s, volumeId, func(location operation.Location) bool { + _, err := operation.Upload( + "http://"+location.Url+r.URL.Path+"?type=replicate&ts="+strconv.FormatUint(needle.LastModified, 10), + string(needle.Name), bytes.NewReader(needle.Data), needle.IsGzipped(), string(needle.Mime), + jwt) + return err == nil + }) { + ret = 0 + errorStatus = "Failed to write to replicas for volume " + volumeId.String() } } + size = ret return } @@ -61,18 +55,12 @@ func ReplicatedDelete(masterNode string, store *storage.Store, glog.V(0).Infoln("delete error:", err) return } - - needToReplicate := !store.HasVolume(volumeId) - if !needToReplicate && ret > 0 { - needToReplicate = store.GetVolume(volumeId).NeedToReplicate() - } - if needToReplicate { //send to other replica locations - if r.FormValue("type") != "replicate" { - if !distributedOperation(masterNode, store, volumeId, func(location operation.Location) bool { - return nil == util.Delete("http://"+location.Url+r.URL.Path+"?type=replicate", jwt) - }) { - ret = 0 - } + //send to other replica locations + if r.FormValue("type") != "replicate" { + if !distributedOperation(masterNode, store, volumeId, func(location operation.Location) bool { + return nil == util.Delete("http://"+location.Url+r.URL.Path+"?type=replicate", jwt) + }) { + ret = 0 } } return diff --git a/go/topology/topology.go b/go/topology/topology.go index a2131ae3a..b0324d73a 100644 --- a/go/topology/topology.go +++ b/go/topology/topology.go @@ -28,12 +28,14 @@ type Topology struct { chanRecoveredDataNodes chan *DataNode chanFullVolumes chan storage.VolumeInfo + ReplicaPlacements *storage.ReplicaPlacements + configuration *Configuration RaftServer raft.Server } -func NewTopology(id string, confFile string, seq sequence.Sequencer, volumeSizeLimit uint64, pulse int) (*Topology, error) { +func NewTopology(id string, confFile string, rp *storage.ReplicaPlacements, seq sequence.Sequencer, volumeSizeLimit uint64, pulse int) (*Topology, error) { t := &Topology{} t.id = NodeId(id) t.nodeType = "Topology" @@ -42,6 +44,7 @@ func NewTopology(id string, confFile string, seq sequence.Sequencer, volumeSizeL t.collectionMap = util.NewConcurrentReadMap() t.pulse = int64(pulse) t.volumeSizeLimit = volumeSizeLimit + t.ReplicaPlacements = rp t.Sequence = seq @@ -111,12 +114,12 @@ func (t *Topology) NextVolumeId() storage.VolumeId { } func (t *Topology) HasWritableVolume(option *VolumeGrowOption) bool { - vl := t.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl) + vl := t.GetVolumeLayout(option.Collection, option.Ttl) return vl.GetActiveVolumeCount(option) > 0 } func (t *Topology) PickForWrite(count uint64, option *VolumeGrowOption) (string, uint64, *DataNode, error) { - vid, count, dataNodes, err := t.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl).PickForWrite(count, option) + vid, count, dataNodes, err := t.GetVolumeLayout(option.Collection, option.Ttl).PickForWrite(count, option) if err != nil || dataNodes.Length() == 0 { return "", 0, nil, errors.New("No writable volumes available!") } @@ -124,10 +127,10 @@ func (t *Topology) PickForWrite(count uint64, option *VolumeGrowOption) (string, return storage.NewFileId(*vid, fileId, rand.Uint32()).String(), count, dataNodes.Head(), nil } -func (t *Topology) GetVolumeLayout(collectionName string, rp *storage.ReplicaPlacement, ttl *storage.TTL) *VolumeLayout { +func (t *Topology) GetVolumeLayout(collectionName string, ttl *storage.TTL) *VolumeLayout { return t.collectionMap.Get(collectionName, func() interface{} { - return NewCollection(collectionName, t.volumeSizeLimit) - }).(*Collection).GetOrCreateVolumeLayout(rp, ttl) + return NewCollection(collectionName, t.ReplicaPlacements.Get(collectionName), t.volumeSizeLimit) + }).(*Collection).GetOrCreateVolumeLayout(ttl) } func (t *Topology) GetCollection(collectionName string) (*Collection, bool) { @@ -140,11 +143,11 @@ func (t *Topology) DeleteCollection(collectionName string) { } func (t *Topology) RegisterVolumeLayout(v storage.VolumeInfo, dn *DataNode) { - t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl).RegisterVolume(&v, dn) + t.GetVolumeLayout(v.Collection, v.Ttl).RegisterVolume(&v, dn) } func (t *Topology) UnRegisterVolumeLayout(v storage.VolumeInfo, dn *DataNode) { glog.Infof("removing volume info:%+v", v) - t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl).UnRegisterVolume(&v, dn) + t.GetVolumeLayout(v.Collection, v.Ttl).UnRegisterVolume(&v, dn) } func (t *Topology) ProcessJoinMessage(joinMessage *operation.JoinMessage) { @@ -168,15 +171,13 @@ func (t *Topology) ProcessJoinMessage(joinMessage *operation.JoinMessage) { } } - // If volume options(replica placement, ttl or collection) have changed, - // we need update its volume layout. - needToDeleteVolumes := dn.UpdateVolumes(volumeInfos) - for _, v := range needToDeleteVolumes { - t.UnRegisterVolumeLayout(v, dn) - } + deletedVolumes := dn.UpdateVolumes(volumeInfos) for _, v := range volumeInfos { t.RegisterVolumeLayout(v, dn) } + for _, v := range deletedVolumes { + t.UnRegisterVolumeLayout(v, dn) + } } diff --git a/go/topology/topology_event_handling.go b/go/topology/topology_event_handling.go index 5f5faf04e..6dfa07487 100644 --- a/go/topology/topology_event_handling.go +++ b/go/topology/topology_event_handling.go @@ -42,7 +42,7 @@ func (t *Topology) StartRefreshWritableVolumes(garbageThreshold string) { }() } func (t *Topology) SetVolumeCapacityFull(volumeInfo storage.VolumeInfo) bool { - vl := t.GetVolumeLayout(volumeInfo.Collection, volumeInfo.ReplicaPlacement, volumeInfo.Ttl) + vl := t.GetVolumeLayout(volumeInfo.Collection, volumeInfo.Ttl) if !vl.SetVolumeCapacityFull(volumeInfo.Id) { return false } @@ -56,7 +56,7 @@ func (t *Topology) SetVolumeCapacityFull(volumeInfo storage.VolumeInfo) bool { func (t *Topology) UnRegisterDataNode(dn *DataNode) { for _, v := range dn.volumes { glog.V(0).Infoln("Removing Volume", v.Id, "from the dead volume server", dn) - vl := t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl) + vl := t.GetVolumeLayout(v.Collection, v.Ttl) vl.SetVolumeUnavailable(dn, v.Id) } dn.UpAdjustVolumeCountDelta(-dn.GetVolumeCount()) @@ -66,7 +66,7 @@ func (t *Topology) UnRegisterDataNode(dn *DataNode) { } func (t *Topology) RegisterRecoveredDataNode(dn *DataNode) { for _, v := range dn.volumes { - vl := t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl) + vl := t.GetVolumeLayout(v.Collection, v.Ttl) if vl.isWritable(&v) { vl.SetVolumeAvailable(dn, v.Id) } diff --git a/go/topology/volume_growth.go b/go/topology/volume_growth.go index a25ba116b..3de1a771f 100644 --- a/go/topology/volume_growth.go +++ b/go/topology/volume_growth.go @@ -195,7 +195,6 @@ func (vg *VolumeGrowth) grow(topo *Topology, vid storage.VolumeId, option *Volum Id: vid, Size: 0, Collection: option.Collection, - ReplicaPlacement: option.ReplicaPlacement, Ttl: option.Ttl, Version: storage.CurrentVersion, } diff --git a/go/topology/volume_growth_test.go b/go/topology/volume_growth_test.go index 15abfcc73..df464e47e 100644 --- a/go/topology/volume_growth_test.go +++ b/go/topology/volume_growth_test.go @@ -80,6 +80,7 @@ func setup(topologyLayout string) *Topology { //need to connect all nodes first before server adding volumes topo, err := NewTopology("weedfs", "/etc/weedfs/weedfs.conf", + storage.NewReplicaPlacements("000"), sequence.NewMemorySequencer(), 32*1024, 5) if err != nil { panic("error: " + err.Error()) diff --git a/go/topology/volume_layout.go b/go/topology/volume_layout.go index 3c1dd9503..050f576ce 100644 --- a/go/topology/volume_layout.go +++ b/go/topology/volume_layout.go @@ -42,7 +42,8 @@ func (vl *VolumeLayout) RegisterVolume(v *storage.VolumeInfo, dn *DataNode) { vl.vid2location[v.Id] = NewVolumeLocationList() } vl.vid2location[v.Id].Set(dn) - glog.V(4).Infoln("volume", v.Id, "added to dn", dn.Id(), "len", vl.vid2location[v.Id].Length(), "copy", v.ReplicaPlacement.GetCopyCount()) + glog.V(4).Infoln("volume", v.Id, "added to dn", dn.Id(), "len", vl.vid2location[v.Id].Length()) + //TODO if vl.vid2location[v.Id].Length() == vl.rp.GetCopyCount() && vl.isWritable(v) { vl.AddToWritable(v.Id) } else { @@ -53,7 +54,7 @@ func (vl *VolumeLayout) RegisterVolume(v *storage.VolumeInfo, dn *DataNode) { func (vl *VolumeLayout) UnRegisterVolume(v *storage.VolumeInfo, dn *DataNode) { vl.accessLock.Lock() defer vl.accessLock.Unlock() - + //TODO only delete data node from locations? vl.removeFromWritable(v.Id) delete(vl.vid2location, v.Id) } diff --git a/go/topology/volume_location_list.go b/go/topology/volume_location_list.go index d5eaf5e92..7166a4add 100644 --- a/go/topology/volume_location_list.go +++ b/go/topology/volume_location_list.go @@ -2,6 +2,8 @@ package topology import ( "fmt" + + "github.com/chrislusf/seaweedfs/go/storage" ) type VolumeLocationList struct { @@ -25,6 +27,11 @@ func (dnll *VolumeLocationList) Length() int { return len(dnll.list) } +func (dnll *VolumeLocationList) CalcReplicaPlacement() (rp *storage.ReplicaPlacement) { + //TODO CalcReplicaPlacement + return nil +} + func (dnll *VolumeLocationList) Set(loc *DataNode) { for i := 0; i < len(dnll.list); i++ { if loc.Ip == dnll.list[i].Ip && loc.Port == dnll.list[i].Port { diff --git a/go/util/concurrent_read_map.go b/go/util/concurrent_read_map.go index 41cce8b82..9e9e7f438 100644 --- a/go/util/concurrent_read_map.go +++ b/go/util/concurrent_read_map.go @@ -7,9 +7,8 @@ import ( // A mostly for read map, which can thread-safely // initialize the map entries. type ConcurrentReadMap struct { - rmutex sync.RWMutex - mutex sync.Mutex - Items map[string]interface{} + rwmutex sync.RWMutex + Items map[string]interface{} } func NewConcurrentReadMap() *ConcurrentReadMap { @@ -17,8 +16,8 @@ func NewConcurrentReadMap() *ConcurrentReadMap { } func (m *ConcurrentReadMap) initMapEntry(key string, newEntry func() interface{}) (value interface{}) { - m.mutex.Lock() - defer m.mutex.Unlock() + m.rwmutex.Lock() + defer m.rwmutex.Unlock() if value, ok := m.Items[key]; ok { return value } @@ -28,11 +27,11 @@ func (m *ConcurrentReadMap) initMapEntry(key string, newEntry func() interface{} } func (m *ConcurrentReadMap) Get(key string, newEntry func() interface{}) interface{} { - m.rmutex.RLock() - if value, ok := m.Items[key]; ok { - m.rmutex.RUnlock() + m.rwmutex.RLock() + value, ok := m.Items[key] + m.rwmutex.RUnlock() + if ok { return value } - m.rmutex.RUnlock() return m.initMapEntry(key, newEntry) } diff --git a/go/weed/backup.go b/go/weed/backup.go index 0e78f2e2b..2f97751f8 100644 --- a/go/weed/backup.go +++ b/go/weed/backup.go @@ -69,13 +69,8 @@ func runBackup(cmd *Command, args []string) bool { fmt.Printf("Error get volume %d ttl %s: %v\n", vid, stats.Ttl, err) return true } - replication, err := storage.NewReplicaPlacementFromString(stats.Replication) - if err != nil { - fmt.Printf("Error get volume %d replication %s : %v\n", vid, stats.Replication, err) - return true - } - v, err := storage.NewVolume(*s.dir, *s.collection, vid, storage.NeedleMapInMemory, replication, ttl) + v, err := storage.NewVolume(*s.dir, *s.collection, vid, storage.NeedleMapInMemory, ttl) if err != nil { fmt.Printf("Error creating or reading from volume %d: %v\n", vid, err) return true diff --git a/go/weed/compact.go b/go/weed/compact.go index 673b96901..b51879f97 100644 --- a/go/weed/compact.go +++ b/go/weed/compact.go @@ -33,7 +33,7 @@ func runCompact(cmd *Command, args []string) bool { vid := storage.VolumeId(*compactVolumeId) v, err := storage.NewVolume(*compactVolumePath, *compactVolumeCollection, vid, - storage.NeedleMapInMemory, nil, nil) + storage.NeedleMapInMemory, nil) if err != nil { glog.Fatalf("Load Volume [ERROR] %s\n", err) } diff --git a/go/weed/weed_server/master_server.go b/go/weed/weed_server/master_server.go index 3cac1873a..37fb44c74 100644 --- a/go/weed/weed_server/master_server.go +++ b/go/weed/weed_server/master_server.go @@ -11,6 +11,7 @@ import ( "github.com/chrislusf/seaweedfs/go/glog" "github.com/chrislusf/seaweedfs/go/security" "github.com/chrislusf/seaweedfs/go/sequence" + "github.com/chrislusf/seaweedfs/go/storage" "github.com/chrislusf/seaweedfs/go/topology" "github.com/chrislusf/seaweedfs/go/util" "github.com/gorilla/mux" @@ -51,8 +52,8 @@ func NewMasterServer(r *mux.Router, port int, metaFolder string, ms.bounedLeaderChan = make(chan int, 16) seq := sequence.NewMemorySequencer() var e error - if ms.Topo, e = topology.NewTopology("topo", confFile, seq, - uint64(volumeSizeLimitMB)*1024*1024, pulseSeconds); e != nil { + if ms.Topo, e = topology.NewTopology("topo", confFile, storage.NewReplicaPlacements(defaultReplicaPlacement), + seq, uint64(volumeSizeLimitMB)*1024*1024, pulseSeconds); e != nil { glog.Fatalf("cannot create topology:%s", e) } ms.vg = topology.NewDefaultVolumeGrowth() @@ -71,7 +72,6 @@ func NewMasterServer(r *mux.Router, port int, metaFolder string, r.HandleFunc("/vol/grow", ms.proxyToLeader(ms.guard.WhiteList(ms.volumeGrowHandler))) r.HandleFunc("/vol/status", ms.proxyToLeader(ms.guard.WhiteList(ms.volumeStatusHandler))) r.HandleFunc("/vol/vacuum", ms.proxyToLeader(ms.guard.WhiteList(ms.volumeVacuumHandler))) - r.HandleFunc("/replica/set", ms.proxyToLeader(ms.guard.WhiteList(ms.setReplicaHandler))) r.HandleFunc("/submit", ms.guard.WhiteList(ms.submitFromMasterServerHandler)) r.HandleFunc("/delete", ms.guard.WhiteList(ms.deleteFromMasterServerHandler)) r.HandleFunc("/{fileId}", ms.proxyToLeader(ms.redirectHandler)) diff --git a/go/weed/weed_server/master_server_handlers_admin.go b/go/weed/weed_server/master_server_handlers_admin.go index 5738aaef8..89c373ec7 100644 --- a/go/weed/weed_server/master_server_handlers_admin.go +++ b/go/weed/weed_server/master_server_handlers_admin.go @@ -160,7 +160,7 @@ func (ms *MasterServer) deleteFromMasterServerHandler(w http.ResponseWriter, r * } func (ms *MasterServer) HasWritableVolume(option *topology.VolumeGrowOption) bool { - vl := ms.Topo.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl) + vl := ms.Topo.GetVolumeLayout(option.Collection, option.Ttl) return vl.GetActiveVolumeCount(option) > 0 } @@ -188,23 +188,6 @@ func (ms *MasterServer) getVolumeGrowOption(r *http.Request) (*topology.VolumeGr return volumeGrowOption, nil } -//only proxy to each volume server -func (ms *MasterServer) setReplicaHandler(w http.ResponseWriter, r *http.Request) { - r.ParseForm() - replicationValue := r.FormValue("replication") - if _, e := storage.NewReplicaPlacementFromString(replicationValue); e != nil { - writeJsonError(w, r, http.StatusBadRequest, e) - return - } - all, _ := strconv.ParseBool(r.FormValue("all")) - if !all && len(r.Form["volume"]) == 0 && len(r.Form["collection"]) == 0 { - writeJsonError(w, r, http.StatusBadRequest, errors.New("No available agrs found.")) - return - } - result := ms.batchSetVolumeOption("replication", replicationValue, r.Form["volume"], r.Form["collection"]) - writeJson(w, r, http.StatusOK, result) -} - func (ms *MasterServer) batchSetVolumeOption(settingKey, settingValue string, volumes, collections []string) (result map[string]interface{}) { forms := url.Values{} forms.Set("key", settingKey) diff --git a/go/weed/weed_server/volume_server_handlers_admin.go b/go/weed/weed_server/volume_server_handlers_admin.go index 15dccfb24..9a304d895 100644 --- a/go/weed/weed_server/volume_server_handlers_admin.go +++ b/go/weed/weed_server/volume_server_handlers_admin.go @@ -26,7 +26,7 @@ func (vs *VolumeServer) statusHandler(w http.ResponseWriter, r *http.Request) { } func (vs *VolumeServer) assignVolumeHandler(w http.ResponseWriter, r *http.Request) { - err := vs.store.AddVolume(r.FormValue("volume"), r.FormValue("collection"), vs.needleMapKind, r.FormValue("replication"), r.FormValue("ttl")) + err := vs.store.AddVolume(r.FormValue("volume"), r.FormValue("collection"), vs.needleMapKind, r.FormValue("ttl")) if err == nil { writeJsonQuiet(w, r, http.StatusAccepted, map[string]string{"error": ""}) } else { @@ -82,21 +82,6 @@ func (vs *VolumeServer) setVolumeOptionHandler(w http.ResponseWriter, r *http.Re } return nil } - } else if key == "replication" { - replica, e := storage.NewReplicaPlacementFromString(r.FormValue(value)) - if e != nil { - writeJsonError(w, r, http.StatusBadRequest, e) - return - } - setter = func(v *storage.Volume) error { - if e := v.SetReplica(replica); e != nil { - errs = append(errs, VolumeOptError{ - Volume: v.Id.String(), - Err: e.Error(), - }) - } - return nil - } } else { writeJsonError(w, r, http.StatusBadRequest, errors.New("Unkonw setting: "+key)) return