Browse Source

volume server set volume type and heartbeat to the master

pull/1794/head
Chris Lu 4 years ago
parent
commit
d156c74ec0
  1. 9
      weed/command/volume.go
  2. 14
      weed/pb/master.proto
  3. 1128
      weed/pb/master_pb/master.pb.go
  4. 8
      weed/server/master_grpc_server.go
  5. 2
      weed/server/master_grpc_server_volume.go
  6. 6
      weed/server/volume_server.go
  7. 4
      weed/storage/disk_location.go
  8. 31
      weed/storage/store.go
  9. 3
      weed/storage/volume.go
  10. 4
      weed/storage/volume_info.go
  11. 11
      weed/topology/collection.go
  12. 3
      weed/topology/data_center.go
  13. 21
      weed/topology/data_node.go
  14. 36
      weed/topology/node.go
  15. 6
      weed/topology/rack.go
  16. 19
      weed/topology/topology.go
  17. 8
      weed/topology/topology_event_handling.go
  18. 4
      weed/topology/topology_map.go
  19. 27
      weed/topology/topology_test.go

9
weed/command/volume.go

@ -49,6 +49,7 @@ type VolumeServerOptions struct {
rack *string rack *string
whiteList []string whiteList []string
indexType *string indexType *string
volumeType *string
fixJpgOrientation *bool fixJpgOrientation *bool
readRedirect *bool readRedirect *bool
cpuProfile *string cpuProfile *string
@ -76,6 +77,7 @@ func init() {
v.dataCenter = cmdVolume.Flag.String("dataCenter", "", "current volume server's data center name") v.dataCenter = cmdVolume.Flag.String("dataCenter", "", "current volume server's data center name")
v.rack = cmdVolume.Flag.String("rack", "", "current volume server's rack name") v.rack = cmdVolume.Flag.String("rack", "", "current volume server's rack name")
v.indexType = cmdVolume.Flag.String("index", "memory", "Choose [memory|leveldb|leveldbMedium|leveldbLarge] mode for memory~performance balance.") v.indexType = cmdVolume.Flag.String("index", "memory", "Choose [memory|leveldb|leveldbMedium|leveldbLarge] mode for memory~performance balance.")
v.volumeType = cmdVolume.Flag.String("volumeType", "", "[hdd|ssd] choose between hard drive or solid state drive")
v.fixJpgOrientation = cmdVolume.Flag.Bool("images.fix.orientation", false, "Adjust jpg orientation when uploading.") v.fixJpgOrientation = cmdVolume.Flag.Bool("images.fix.orientation", false, "Adjust jpg orientation when uploading.")
v.readRedirect = cmdVolume.Flag.Bool("read.redirect", true, "Redirect moved or non-local volumes.") v.readRedirect = cmdVolume.Flag.Bool("read.redirect", true, "Redirect moved or non-local volumes.")
v.cpuProfile = cmdVolume.Flag.String("cpuprofile", "", "cpu profile output file") v.cpuProfile = cmdVolume.Flag.String("cpuprofile", "", "cpu profile output file")
@ -210,10 +212,15 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v
masters := *v.masters masters := *v.masters
volumeType, err := storage.ToVolumeType(*v.volumeType)
if err != nil {
glog.Fatalf("failed to parse volume type: %v", err)
}
volumeServer := weed_server.NewVolumeServer(volumeMux, publicVolumeMux, volumeServer := weed_server.NewVolumeServer(volumeMux, publicVolumeMux,
*v.ip, *v.port, *v.publicUrl, *v.ip, *v.port, *v.publicUrl,
v.folders, v.folderMaxLimits, v.minFreeSpacePercents, v.folders, v.folderMaxLimits, v.minFreeSpacePercents,
*v.idxFolder,
*v.idxFolder, volumeType,
volumeNeedleMapKind, volumeNeedleMapKind,
strings.Split(masters, ","), 5, *v.dataCenter, *v.rack, strings.Split(masters, ","), 5, *v.dataCenter, *v.rack,
v.whiteList, v.whiteList,

14
weed/pb/master.proto

@ -44,7 +44,6 @@ message Heartbeat {
string ip = 1; string ip = 1;
uint32 port = 2; uint32 port = 2;
string public_url = 3; string public_url = 3;
uint32 max_volume_count = 4;
uint64 max_file_key = 5; uint64 max_file_key = 5;
string data_center = 6; string data_center = 6;
string rack = 7; string rack = 7;
@ -62,6 +61,9 @@ message Heartbeat {
repeated VolumeEcShardInformationMessage deleted_ec_shards = 18; repeated VolumeEcShardInformationMessage deleted_ec_shards = 18;
bool has_no_ec_shards = 19; bool has_no_ec_shards = 19;
uint32 max_volume_count = 4;
uint32 max_ssd_volume_count = 20;
} }
message HeartbeatResponse { message HeartbeatResponse {
@ -87,6 +89,7 @@ message VolumeInformationMessage {
int64 modified_at_second = 12; int64 modified_at_second = 12;
string remote_storage_name = 13; string remote_storage_name = 13;
string remote_storage_key = 14; string remote_storage_key = 14;
string volume_type = 15;
} }
message VolumeShortInformationMessage { message VolumeShortInformationMessage {
@ -95,6 +98,7 @@ message VolumeShortInformationMessage {
uint32 replica_placement = 8; uint32 replica_placement = 8;
uint32 version = 9; uint32 version = 9;
uint32 ttl = 10; uint32 ttl = 10;
string volume_type = 15;
} }
message VolumeEcShardInformationMessage { message VolumeEcShardInformationMessage {
@ -218,6 +222,8 @@ message DataNodeInfo {
repeated VolumeInformationMessage volume_infos = 6; repeated VolumeInformationMessage volume_infos = 6;
repeated VolumeEcShardInformationMessage ec_shard_infos = 7; repeated VolumeEcShardInformationMessage ec_shard_infos = 7;
uint64 remote_volume_count = 8; uint64 remote_volume_count = 8;
uint64 max_ssd_volume_count = 9;
uint64 ssd_volume_count = 10;
} }
message RackInfo { message RackInfo {
string id = 1; string id = 1;
@ -227,6 +233,8 @@ message RackInfo {
uint64 active_volume_count = 5; uint64 active_volume_count = 5;
repeated DataNodeInfo data_node_infos = 6; repeated DataNodeInfo data_node_infos = 6;
uint64 remote_volume_count = 7; uint64 remote_volume_count = 7;
uint64 max_ssd_volume_count = 8;
uint64 ssd_volume_count = 9;
} }
message DataCenterInfo { message DataCenterInfo {
string id = 1; string id = 1;
@ -236,6 +244,8 @@ message DataCenterInfo {
uint64 active_volume_count = 5; uint64 active_volume_count = 5;
repeated RackInfo rack_infos = 6; repeated RackInfo rack_infos = 6;
uint64 remote_volume_count = 7; uint64 remote_volume_count = 7;
uint64 max_ssd_volume_count = 8;
uint64 ssd_volume_count = 9;
} }
message TopologyInfo { message TopologyInfo {
string id = 1; string id = 1;
@ -245,6 +255,8 @@ message TopologyInfo {
uint64 active_volume_count = 5; uint64 active_volume_count = 5;
repeated DataCenterInfo data_center_infos = 6; repeated DataCenterInfo data_center_infos = 6;
uint64 remote_volume_count = 7; uint64 remote_volume_count = 7;
uint64 max_ssd_volume_count = 8;
uint64 ssd_volume_count = 9;
} }
message VolumeListRequest { message VolumeListRequest {
} }

1128
weed/pb/master_pb/master.pb.go
File diff suppressed because it is too large
View File

8
weed/server/master_grpc_server.go

@ -67,9 +67,7 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ
dcName, rackName := ms.Topo.Configuration.Locate(heartbeat.Ip, heartbeat.DataCenter, heartbeat.Rack) dcName, rackName := ms.Topo.Configuration.Locate(heartbeat.Ip, heartbeat.DataCenter, heartbeat.Rack)
dc := ms.Topo.GetOrCreateDataCenter(dcName) dc := ms.Topo.GetOrCreateDataCenter(dcName)
rack := dc.GetOrCreateRack(rackName) rack := dc.GetOrCreateRack(rackName)
dn = rack.GetOrCreateDataNode(heartbeat.Ip,
int(heartbeat.Port), heartbeat.PublicUrl,
int64(heartbeat.MaxVolumeCount))
dn = rack.GetOrCreateDataNode(heartbeat.Ip, int(heartbeat.Port), heartbeat.PublicUrl, int64(heartbeat.MaxVolumeCount), int64(heartbeat.MaxSsdVolumeCount))
glog.V(0).Infof("added volume server %v:%d", heartbeat.GetIp(), heartbeat.GetPort()) glog.V(0).Infof("added volume server %v:%d", heartbeat.GetIp(), heartbeat.GetPort())
if err := stream.Send(&master_pb.HeartbeatResponse{ if err := stream.Send(&master_pb.HeartbeatResponse{
VolumeSizeLimit: uint64(ms.option.VolumeSizeLimitMB) * 1024 * 1024, VolumeSizeLimit: uint64(ms.option.VolumeSizeLimitMB) * 1024 * 1024,
@ -83,6 +81,10 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ
delta := int64(heartbeat.MaxVolumeCount) - dn.GetMaxVolumeCount() delta := int64(heartbeat.MaxVolumeCount) - dn.GetMaxVolumeCount()
dn.UpAdjustMaxVolumeCountDelta(delta) dn.UpAdjustMaxVolumeCountDelta(delta)
} }
if heartbeat.MaxSsdVolumeCount != 0 && dn.GetMaxSsdVolumeCount() != int64(heartbeat.MaxSsdVolumeCount) {
delta := int64(heartbeat.MaxSsdVolumeCount) - dn.GetMaxSsdVolumeCount()
dn.UpAdjustMaxSsdVolumeCountDelta(delta)
}
glog.V(4).Infof("master received heartbeat %s", heartbeat.String()) glog.V(4).Infof("master received heartbeat %s", heartbeat.String())
message := &master_pb.VolumeLocation{ message := &master_pb.VolumeLocation{

2
weed/server/master_grpc_server_volume.go

@ -126,7 +126,7 @@ func (ms *MasterServer) Statistics(ctx context.Context, req *master_pb.Statistic
volumeLayout := ms.Topo.GetVolumeLayout(req.Collection, replicaPlacement, ttl, storage.VolumeType(req.VolumeType)) volumeLayout := ms.Topo.GetVolumeLayout(req.Collection, replicaPlacement, ttl, storage.VolumeType(req.VolumeType))
stats := volumeLayout.Stats() stats := volumeLayout.Stats()
totalSize := ms.Topo.GetMaxVolumeCount() * int64(ms.option.VolumeSizeLimitMB) * 1024 * 1024
totalSize := (ms.Topo.GetMaxVolumeCount() + ms.Topo.GetMaxSsdVolumeCount()) * int64(ms.option.VolumeSizeLimitMB) * 1024 * 1024
resp := &master_pb.StatisticsResponse{ resp := &master_pb.StatisticsResponse{
TotalSize: uint64(totalSize), TotalSize: uint64(totalSize),

6
weed/server/volume_server.go

@ -20,6 +20,7 @@ type VolumeServer struct {
pulseSeconds int pulseSeconds int
dataCenter string dataCenter string
rack string rack string
VolumeType storage.VolumeType
store *storage.Store store *storage.Store
guard *security.Guard guard *security.Guard
grpcDialOption grpc.DialOption grpcDialOption grpc.DialOption
@ -38,7 +39,7 @@ type VolumeServer struct {
func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string, func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
port int, publicUrl string, port int, publicUrl string,
folders []string, maxCounts []int, minFreeSpacePercents []float32, folders []string, maxCounts []int, minFreeSpacePercents []float32,
idxFolder string,
idxFolder string, volumeType storage.VolumeType,
needleMapKind storage.NeedleMapType, needleMapKind storage.NeedleMapType,
masterNodes []string, pulseSeconds int, masterNodes []string, pulseSeconds int,
dataCenter string, rack string, dataCenter string, rack string,
@ -63,6 +64,7 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
pulseSeconds: pulseSeconds, pulseSeconds: pulseSeconds,
dataCenter: dataCenter, dataCenter: dataCenter,
rack: rack, rack: rack,
VolumeType: volumeType,
needleMapKind: needleMapKind, needleMapKind: needleMapKind,
FixJpgOrientation: fixJpgOrientation, FixJpgOrientation: fixJpgOrientation,
ReadRedirect: readRedirect, ReadRedirect: readRedirect,
@ -76,7 +78,7 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
vs.checkWithMaster() vs.checkWithMaster()
vs.store = storage.NewStore(vs.grpcDialOption, port, ip, publicUrl, folders, maxCounts, minFreeSpacePercents, idxFolder, vs.needleMapKind)
vs.store = storage.NewStore(vs.grpcDialOption, port, ip, publicUrl, folders, maxCounts, minFreeSpacePercents, idxFolder, vs.needleMapKind, vs.VolumeType)
vs.guard = security.NewGuard(whiteList, signingKey, expiresAfterSec, readSigningKey, readExpiresAfterSec) vs.guard = security.NewGuard(whiteList, signingKey, expiresAfterSec, readSigningKey, readExpiresAfterSec)
handleStaticResources(adminMux) handleStaticResources(adminMux)

4
weed/storage/disk_location.go

@ -19,6 +19,7 @@ import (
type DiskLocation struct { type DiskLocation struct {
Directory string Directory string
IdxDirectory string IdxDirectory string
VolumeType VolumeType
MaxVolumeCount int MaxVolumeCount int
OriginalMaxVolumeCount int OriginalMaxVolumeCount int
MinFreeSpacePercent float32 MinFreeSpacePercent float32
@ -32,7 +33,7 @@ type DiskLocation struct {
isDiskSpaceLow bool isDiskSpaceLow bool
} }
func NewDiskLocation(dir string, maxVolumeCount int, minFreeSpacePercent float32, idxDir string) *DiskLocation {
func NewDiskLocation(dir string, maxVolumeCount int, minFreeSpacePercent float32, idxDir string, volumeType VolumeType) *DiskLocation {
dir = util.ResolvePath(dir) dir = util.ResolvePath(dir)
if idxDir == "" { if idxDir == "" {
idxDir = dir idxDir = dir
@ -42,6 +43,7 @@ func NewDiskLocation(dir string, maxVolumeCount int, minFreeSpacePercent float32
location := &DiskLocation{ location := &DiskLocation{
Directory: dir, Directory: dir,
IdxDirectory: idxDir, IdxDirectory: idxDir,
VolumeType: volumeType,
MaxVolumeCount: maxVolumeCount, MaxVolumeCount: maxVolumeCount,
OriginalMaxVolumeCount: maxVolumeCount, OriginalMaxVolumeCount: maxVolumeCount,
MinFreeSpacePercent: minFreeSpacePercent, MinFreeSpacePercent: minFreeSpacePercent,

31
weed/storage/store.go

@ -52,11 +52,11 @@ func (s *Store) String() (str string) {
return return
} }
func NewStore(grpcDialOption grpc.DialOption, port int, ip, publicUrl string, dirnames []string, maxVolumeCounts []int, minFreeSpacePercents []float32, idxFolder string, needleMapKind NeedleMapType) (s *Store) {
func NewStore(grpcDialOption grpc.DialOption, port int, ip, publicUrl string, dirnames []string, maxVolumeCounts []int, minFreeSpacePercents []float32, idxFolder string, needleMapKind NeedleMapType, volumeType VolumeType) (s *Store) {
s = &Store{grpcDialOption: grpcDialOption, Port: port, Ip: ip, PublicUrl: publicUrl, NeedleMapType: needleMapKind} s = &Store{grpcDialOption: grpcDialOption, Port: port, Ip: ip, PublicUrl: publicUrl, NeedleMapType: needleMapKind}
s.Locations = make([]*DiskLocation, 0) s.Locations = make([]*DiskLocation, 0)
for i := 0; i < len(dirnames); i++ { for i := 0; i < len(dirnames); i++ {
location := NewDiskLocation(dirnames[i], maxVolumeCounts[i], minFreeSpacePercents[i], idxFolder)
location := NewDiskLocation(dirnames[i], maxVolumeCounts[i], minFreeSpacePercents[i], idxFolder, volumeType)
location.loadExistingVolumes(needleMapKind) location.loadExistingVolumes(needleMapKind)
s.Locations = append(s.Locations, location) s.Locations = append(s.Locations, location)
stats.VolumeServerMaxVolumeCounter.Add(float64(maxVolumeCounts[i])) stats.VolumeServerMaxVolumeCounter.Add(float64(maxVolumeCounts[i]))
@ -203,12 +203,18 @@ func (s *Store) GetRack() string {
func (s *Store) CollectHeartbeat() *master_pb.Heartbeat { func (s *Store) CollectHeartbeat() *master_pb.Heartbeat {
var volumeMessages []*master_pb.VolumeInformationMessage var volumeMessages []*master_pb.VolumeInformationMessage
maxVolumeCount := 0 maxVolumeCount := 0
maxSsdVolumeCount := 0
var maxFileKey NeedleId var maxFileKey NeedleId
collectionVolumeSize := make(map[string]uint64) collectionVolumeSize := make(map[string]uint64)
collectionVolumeReadOnlyCount := make(map[string]map[string]uint8) collectionVolumeReadOnlyCount := make(map[string]map[string]uint8)
for _, location := range s.Locations { for _, location := range s.Locations {
var deleteVids []needle.VolumeId var deleteVids []needle.VolumeId
maxVolumeCount = maxVolumeCount + location.MaxVolumeCount
switch location.VolumeType {
case SsdType:
maxSsdVolumeCount = maxSsdVolumeCount + location.MaxVolumeCount
case HardDriveType:
maxVolumeCount = maxVolumeCount + location.MaxVolumeCount
}
location.volumesLock.RLock() location.volumesLock.RLock()
for _, v := range location.volumes { for _, v := range location.volumes {
curMaxFileKey, volumeMessage := v.ToVolumeInformationMessage() curMaxFileKey, volumeMessage := v.ToVolumeInformationMessage()
@ -280,15 +286,16 @@ func (s *Store) CollectHeartbeat() *master_pb.Heartbeat {
} }
return &master_pb.Heartbeat{ return &master_pb.Heartbeat{
Ip: s.Ip,
Port: uint32(s.Port),
PublicUrl: s.PublicUrl,
MaxVolumeCount: uint32(maxVolumeCount),
MaxFileKey: NeedleIdToUint64(maxFileKey),
DataCenter: s.dataCenter,
Rack: s.rack,
Volumes: volumeMessages,
HasNoVolumes: len(volumeMessages) == 0,
Ip: s.Ip,
Port: uint32(s.Port),
PublicUrl: s.PublicUrl,
MaxVolumeCount: uint32(maxVolumeCount),
MaxSsdVolumeCount: uint32(maxSsdVolumeCount),
MaxFileKey: NeedleIdToUint64(maxFileKey),
DataCenter: s.dataCenter,
Rack: s.rack,
Volumes: volumeMessages,
HasNoVolumes: len(volumeMessages) == 0,
} }
} }

3
weed/storage/volume.go

@ -47,7 +47,7 @@ type Volume struct {
volumeInfo *volume_server_pb.VolumeInfo volumeInfo *volume_server_pb.VolumeInfo
location *DiskLocation location *DiskLocation
lastIoError error
lastIoError error
} }
func NewVolume(dirname string, dirIdx string, collection string, id needle.VolumeId, needleMapKind NeedleMapType, replicaPlacement *super_block.ReplicaPlacement, ttl *needle.TTL, preallocate int64, memoryMapMaxSizeMb uint32) (v *Volume, e error) { func NewVolume(dirname string, dirIdx string, collection string, id needle.VolumeId, needleMapKind NeedleMapType, replicaPlacement *super_block.ReplicaPlacement, ttl *needle.TTL, preallocate int64, memoryMapMaxSizeMb uint32) (v *Volume, e error) {
@ -262,6 +262,7 @@ func (v *Volume) ToVolumeInformationMessage() (types.NeedleId, *master_pb.Volume
Ttl: v.Ttl.ToUint32(), Ttl: v.Ttl.ToUint32(),
CompactRevision: uint32(v.SuperBlock.CompactionRevision), CompactRevision: uint32(v.SuperBlock.CompactionRevision),
ModifiedAtSecond: modTime.Unix(), ModifiedAtSecond: modTime.Unix(),
VolumeType: string(v.location.VolumeType),
} }
volumeInfo.RemoteStorageName, volumeInfo.RemoteStorageKey = v.RemoteStorageNameKey() volumeInfo.RemoteStorageName, volumeInfo.RemoteStorageKey = v.RemoteStorageNameKey()

4
weed/storage/volume_info.go

@ -14,7 +14,7 @@ type VolumeInfo struct {
Size uint64 Size uint64
ReplicaPlacement *super_block.ReplicaPlacement ReplicaPlacement *super_block.ReplicaPlacement
Ttl *needle.TTL Ttl *needle.TTL
VolumeType VolumeType
VolumeType string
Collection string Collection string
Version needle.Version Version needle.Version
FileCount int FileCount int
@ -41,6 +41,7 @@ func NewVolumeInfo(m *master_pb.VolumeInformationMessage) (vi VolumeInfo, err er
ModifiedAtSecond: m.ModifiedAtSecond, ModifiedAtSecond: m.ModifiedAtSecond,
RemoteStorageName: m.RemoteStorageName, RemoteStorageName: m.RemoteStorageName,
RemoteStorageKey: m.RemoteStorageKey, RemoteStorageKey: m.RemoteStorageKey,
VolumeType: m.VolumeType,
} }
rp, e := super_block.NewReplicaPlacementFromByte(byte(m.ReplicaPlacement)) rp, e := super_block.NewReplicaPlacementFromByte(byte(m.ReplicaPlacement))
if e != nil { if e != nil {
@ -91,6 +92,7 @@ func (vi VolumeInfo) ToVolumeInformationMessage() *master_pb.VolumeInformationMe
ModifiedAtSecond: vi.ModifiedAtSecond, ModifiedAtSecond: vi.ModifiedAtSecond,
RemoteStorageName: vi.RemoteStorageName, RemoteStorageName: vi.RemoteStorageName,
RemoteStorageKey: vi.RemoteStorageKey, RemoteStorageKey: vi.RemoteStorageKey,
VolumeType: vi.VolumeType,
} }
} }

11
weed/topology/collection.go

@ -44,6 +44,17 @@ func (c *Collection) GetOrCreateVolumeLayout(rp *super_block.ReplicaPlacement, t
return vl.(*VolumeLayout) return vl.(*VolumeLayout)
} }
func (c *Collection) DeleteVolumeLayout(rp *super_block.ReplicaPlacement, ttl *needle.TTL, volumeType storage.VolumeType) {
keyString := rp.String()
if ttl != nil {
keyString += ttl.String()
}
if volumeType != storage.HardDriveType {
keyString += string(volumeType)
}
c.storageType2VolumeLayout.Delete(keyString)
}
func (c *Collection) Lookup(vid needle.VolumeId) []*DataNode { func (c *Collection) Lookup(vid needle.VolumeId) []*DataNode {
for _, vl := range c.storageType2VolumeLayout.Items() { for _, vl := range c.storageType2VolumeLayout.Items() {
if vl != nil { if vl != nil {

3
weed/topology/data_center.go

@ -31,6 +31,7 @@ func (dc *DataCenter) ToMap() interface{} {
m := make(map[string]interface{}) m := make(map[string]interface{})
m["Id"] = dc.Id() m["Id"] = dc.Id()
m["Max"] = dc.GetMaxVolumeCount() m["Max"] = dc.GetMaxVolumeCount()
m["MaxSsd"] = dc.GetMaxSsdVolumeCount()
m["Free"] = dc.FreeSpace() m["Free"] = dc.FreeSpace()
var racks []interface{} var racks []interface{}
for _, c := range dc.Children() { for _, c := range dc.Children() {
@ -46,6 +47,8 @@ func (dc *DataCenter) ToDataCenterInfo() *master_pb.DataCenterInfo {
Id: string(dc.Id()), Id: string(dc.Id()),
VolumeCount: uint64(dc.GetVolumeCount()), VolumeCount: uint64(dc.GetVolumeCount()),
MaxVolumeCount: uint64(dc.GetMaxVolumeCount()), MaxVolumeCount: uint64(dc.GetMaxVolumeCount()),
MaxSsdVolumeCount: uint64(dc.GetMaxSsdVolumeCount()),
SsdVolumeCount: uint64(dc.GetSsdVolumeCount()),
FreeVolumeCount: uint64(dc.FreeSpace()), FreeVolumeCount: uint64(dc.FreeSpace()),
ActiveVolumeCount: uint64(dc.GetActiveVolumeCount()), ActiveVolumeCount: uint64(dc.GetActiveVolumeCount()),
RemoteVolumeCount: uint64(dc.GetRemoteVolumeCount()), RemoteVolumeCount: uint64(dc.GetRemoteVolumeCount()),

21
weed/topology/data_node.go

@ -50,7 +50,11 @@ func (dn *DataNode) AddOrUpdateVolume(v storage.VolumeInfo) (isNew, isChangedRO
func (dn *DataNode) doAddOrUpdateVolume(v storage.VolumeInfo) (isNew, isChangedRO bool) { func (dn *DataNode) doAddOrUpdateVolume(v storage.VolumeInfo) (isNew, isChangedRO bool) {
if oldV, ok := dn.volumes[v.Id]; !ok { if oldV, ok := dn.volumes[v.Id]; !ok {
dn.volumes[v.Id] = v dn.volumes[v.Id] = v
dn.UpAdjustVolumeCountDelta(1)
if v.VolumeType == storage.SsdType {
dn.UpAdjustSsdVolumeCountDelta(1)
} else {
dn.UpAdjustVolumeCountDelta(1)
}
if v.IsRemote() { if v.IsRemote() {
dn.UpAdjustRemoteVolumeCountDelta(1) dn.UpAdjustRemoteVolumeCountDelta(1)
} }
@ -89,7 +93,11 @@ func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) (newVolume
glog.V(0).Infoln("Deleting volume id:", vid) glog.V(0).Infoln("Deleting volume id:", vid)
delete(dn.volumes, vid) delete(dn.volumes, vid)
deletedVolumes = append(deletedVolumes, v) deletedVolumes = append(deletedVolumes, v)
dn.UpAdjustVolumeCountDelta(-1)
if v.VolumeType == storage.SsdType {
dn.UpAdjustSsdVolumeCountDelta(-1)
} else {
dn.UpAdjustVolumeCountDelta(-1)
}
if v.IsRemote() { if v.IsRemote() {
dn.UpAdjustRemoteVolumeCountDelta(-1) dn.UpAdjustRemoteVolumeCountDelta(-1)
} }
@ -116,7 +124,11 @@ func (dn *DataNode) DeltaUpdateVolumes(newVolumes, deletedVolumes []storage.Volu
for _, v := range deletedVolumes { for _, v := range deletedVolumes {
delete(dn.volumes, v.Id) delete(dn.volumes, v.Id)
dn.UpAdjustVolumeCountDelta(-1)
if v.VolumeType == storage.SsdType {
dn.UpAdjustSsdVolumeCountDelta(-1)
} else {
dn.UpAdjustVolumeCountDelta(-1)
}
if v.IsRemote() { if v.IsRemote() {
dn.UpAdjustRemoteVolumeCountDelta(-1) dn.UpAdjustRemoteVolumeCountDelta(-1)
} }
@ -182,6 +194,7 @@ func (dn *DataNode) ToMap() interface{} {
ret["VolumeIds"] = dn.GetVolumeIds() ret["VolumeIds"] = dn.GetVolumeIds()
ret["EcShards"] = dn.GetEcShardCount() ret["EcShards"] = dn.GetEcShardCount()
ret["Max"] = dn.GetMaxVolumeCount() ret["Max"] = dn.GetMaxVolumeCount()
ret["MaxSsd"] = dn.GetMaxSsdVolumeCount()
ret["Free"] = dn.FreeSpace() ret["Free"] = dn.FreeSpace()
ret["PublicUrl"] = dn.PublicUrl ret["PublicUrl"] = dn.PublicUrl
return ret return ret
@ -192,6 +205,8 @@ func (dn *DataNode) ToDataNodeInfo() *master_pb.DataNodeInfo {
Id: string(dn.Id()), Id: string(dn.Id()),
VolumeCount: uint64(dn.GetVolumeCount()), VolumeCount: uint64(dn.GetVolumeCount()),
MaxVolumeCount: uint64(dn.GetMaxVolumeCount()), MaxVolumeCount: uint64(dn.GetMaxVolumeCount()),
MaxSsdVolumeCount: uint64(dn.GetMaxSsdVolumeCount()),
SsdVolumeCount: uint64(dn.GetSsdVolumeCount()),
FreeVolumeCount: uint64(dn.FreeSpace()), FreeVolumeCount: uint64(dn.FreeSpace()),
ActiveVolumeCount: uint64(dn.GetActiveVolumeCount()), ActiveVolumeCount: uint64(dn.GetActiveVolumeCount()),
RemoteVolumeCount: uint64(dn.GetRemoteVolumeCount()), RemoteVolumeCount: uint64(dn.GetRemoteVolumeCount()),

36
weed/topology/node.go

@ -19,17 +19,21 @@ type Node interface {
FreeSpace() int64 FreeSpace() int64
ReserveOneVolume(r int64) (*DataNode, error) ReserveOneVolume(r int64) (*DataNode, error)
UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta int64) UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta int64)
UpAdjustMaxSsdVolumeCountDelta(maxSsdVolumeCountDelta int64)
UpAdjustVolumeCountDelta(volumeCountDelta int64) UpAdjustVolumeCountDelta(volumeCountDelta int64)
UpAdjustSsdVolumeCountDelta(ssdVolumeCountDelta int64)
UpAdjustRemoteVolumeCountDelta(remoteVolumeCountDelta int64) UpAdjustRemoteVolumeCountDelta(remoteVolumeCountDelta int64)
UpAdjustEcShardCountDelta(ecShardCountDelta int64) UpAdjustEcShardCountDelta(ecShardCountDelta int64)
UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta int64) UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta int64)
UpAdjustMaxVolumeId(vid needle.VolumeId) UpAdjustMaxVolumeId(vid needle.VolumeId)
GetVolumeCount() int64 GetVolumeCount() int64
GetSsdVolumeCount() int64
GetEcShardCount() int64 GetEcShardCount() int64
GetActiveVolumeCount() int64 GetActiveVolumeCount() int64
GetRemoteVolumeCount() int64 GetRemoteVolumeCount() int64
GetMaxVolumeCount() int64 GetMaxVolumeCount() int64
GetMaxSsdVolumeCount() int64
GetMaxVolumeId() needle.VolumeId GetMaxVolumeId() needle.VolumeId
SetParent(Node) SetParent(Node)
LinkChildNode(node Node) LinkChildNode(node Node)
@ -47,9 +51,11 @@ type Node interface {
type NodeImpl struct { type NodeImpl struct {
volumeCount int64 volumeCount int64
remoteVolumeCount int64 remoteVolumeCount int64
ssdVolumeCount int64
activeVolumeCount int64 activeVolumeCount int64
ecShardCount int64 ecShardCount int64
maxVolumeCount int64 maxVolumeCount int64
maxSsdVolumeCount int64
id NodeId id NodeId
parent Node parent Node
sync.RWMutex // lock children sync.RWMutex // lock children
@ -143,7 +149,7 @@ func (n *NodeImpl) Id() NodeId {
return n.id return n.id
} }
func (n *NodeImpl) FreeSpace() int64 { func (n *NodeImpl) FreeSpace() int64 {
freeVolumeSlotCount := n.maxVolumeCount + n.remoteVolumeCount - n.volumeCount
freeVolumeSlotCount := n.maxVolumeCount + n.maxSsdVolumeCount + n.remoteVolumeCount - n.volumeCount - n.ssdVolumeCount
if n.ecShardCount > 0 { if n.ecShardCount > 0 {
freeVolumeSlotCount = freeVolumeSlotCount - n.ecShardCount/erasure_coding.DataShardsCount - 1 freeVolumeSlotCount = freeVolumeSlotCount - n.ecShardCount/erasure_coding.DataShardsCount - 1
} }
@ -200,6 +206,15 @@ func (n *NodeImpl) UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta int64) { //ca
n.parent.UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta) n.parent.UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta)
} }
} }
func (n *NodeImpl) UpAdjustMaxSsdVolumeCountDelta(maxSsdVolumeCountDelta int64) { //can be negative
if maxSsdVolumeCountDelta == 0 {
return
}
atomic.AddInt64(&n.maxSsdVolumeCount, maxSsdVolumeCountDelta)
if n.parent != nil {
n.parent.UpAdjustMaxSsdVolumeCountDelta(maxSsdVolumeCountDelta)
}
}
func (n *NodeImpl) UpAdjustVolumeCountDelta(volumeCountDelta int64) { //can be negative func (n *NodeImpl) UpAdjustVolumeCountDelta(volumeCountDelta int64) { //can be negative
if volumeCountDelta == 0 { if volumeCountDelta == 0 {
return return
@ -218,6 +233,15 @@ func (n *NodeImpl) UpAdjustRemoteVolumeCountDelta(remoteVolumeCountDelta int64)
n.parent.UpAdjustRemoteVolumeCountDelta(remoteVolumeCountDelta) n.parent.UpAdjustRemoteVolumeCountDelta(remoteVolumeCountDelta)
} }
} }
func (n *NodeImpl) UpAdjustSsdVolumeCountDelta(ssdVolumeCountDelta int64) { //can be negative
if ssdVolumeCountDelta == 0 {
return
}
atomic.AddInt64(&n.ssdVolumeCount, ssdVolumeCountDelta)
if n.parent != nil {
n.parent.UpAdjustSsdVolumeCountDelta(ssdVolumeCountDelta)
}
}
func (n *NodeImpl) UpAdjustEcShardCountDelta(ecShardCountDelta int64) { //can be negative func (n *NodeImpl) UpAdjustEcShardCountDelta(ecShardCountDelta int64) { //can be negative
if ecShardCountDelta == 0 { if ecShardCountDelta == 0 {
return return
@ -250,6 +274,9 @@ func (n *NodeImpl) GetMaxVolumeId() needle.VolumeId {
func (n *NodeImpl) GetVolumeCount() int64 { func (n *NodeImpl) GetVolumeCount() int64 {
return n.volumeCount return n.volumeCount
} }
func (n *NodeImpl) GetSsdVolumeCount() int64 {
return n.ssdVolumeCount
}
func (n *NodeImpl) GetEcShardCount() int64 { func (n *NodeImpl) GetEcShardCount() int64 {
return n.ecShardCount return n.ecShardCount
} }
@ -262,6 +289,9 @@ func (n *NodeImpl) GetActiveVolumeCount() int64 {
func (n *NodeImpl) GetMaxVolumeCount() int64 { func (n *NodeImpl) GetMaxVolumeCount() int64 {
return n.maxVolumeCount return n.maxVolumeCount
} }
func (n *NodeImpl) GetMaxSsdVolumeCount() int64 {
return n.maxSsdVolumeCount
}
func (n *NodeImpl) LinkChildNode(node Node) { func (n *NodeImpl) LinkChildNode(node Node) {
n.Lock() n.Lock()
@ -269,8 +299,10 @@ func (n *NodeImpl) LinkChildNode(node Node) {
if n.children[node.Id()] == nil { if n.children[node.Id()] == nil {
n.children[node.Id()] = node n.children[node.Id()] = node
n.UpAdjustMaxVolumeCountDelta(node.GetMaxVolumeCount()) n.UpAdjustMaxVolumeCountDelta(node.GetMaxVolumeCount())
n.UpAdjustMaxSsdVolumeCountDelta(node.GetMaxSsdVolumeCount())
n.UpAdjustMaxVolumeId(node.GetMaxVolumeId()) n.UpAdjustMaxVolumeId(node.GetMaxVolumeId())
n.UpAdjustVolumeCountDelta(node.GetVolumeCount()) n.UpAdjustVolumeCountDelta(node.GetVolumeCount())
n.UpAdjustSsdVolumeCountDelta(node.GetSsdVolumeCount())
n.UpAdjustRemoteVolumeCountDelta(node.GetRemoteVolumeCount()) n.UpAdjustRemoteVolumeCountDelta(node.GetRemoteVolumeCount())
n.UpAdjustEcShardCountDelta(node.GetEcShardCount()) n.UpAdjustEcShardCountDelta(node.GetEcShardCount())
n.UpAdjustActiveVolumeCountDelta(node.GetActiveVolumeCount()) n.UpAdjustActiveVolumeCountDelta(node.GetActiveVolumeCount())
@ -287,10 +319,12 @@ func (n *NodeImpl) UnlinkChildNode(nodeId NodeId) {
node.SetParent(nil) node.SetParent(nil)
delete(n.children, node.Id()) delete(n.children, node.Id())
n.UpAdjustVolumeCountDelta(-node.GetVolumeCount()) n.UpAdjustVolumeCountDelta(-node.GetVolumeCount())
n.UpAdjustSsdVolumeCountDelta(-node.GetSsdVolumeCount())
n.UpAdjustRemoteVolumeCountDelta(-node.GetRemoteVolumeCount()) n.UpAdjustRemoteVolumeCountDelta(-node.GetRemoteVolumeCount())
n.UpAdjustEcShardCountDelta(-node.GetEcShardCount()) n.UpAdjustEcShardCountDelta(-node.GetEcShardCount())
n.UpAdjustActiveVolumeCountDelta(-node.GetActiveVolumeCount()) n.UpAdjustActiveVolumeCountDelta(-node.GetActiveVolumeCount())
n.UpAdjustMaxVolumeCountDelta(-node.GetMaxVolumeCount()) n.UpAdjustMaxVolumeCountDelta(-node.GetMaxVolumeCount())
n.UpAdjustMaxSsdVolumeCountDelta(-node.GetMaxSsdVolumeCount())
glog.V(0).Infoln(n, "removes", node.Id()) glog.V(0).Infoln(n, "removes", node.Id())
} }
} }

6
weed/topology/rack.go

@ -28,7 +28,7 @@ func (r *Rack) FindDataNode(ip string, port int) *DataNode {
} }
return nil return nil
} }
func (r *Rack) GetOrCreateDataNode(ip string, port int, publicUrl string, maxVolumeCount int64) *DataNode {
func (r *Rack) GetOrCreateDataNode(ip string, port int, publicUrl string, maxVolumeCount int64, maxSsdVolumeCount int64) *DataNode {
for _, c := range r.Children() { for _, c := range r.Children() {
dn := c.(*DataNode) dn := c.(*DataNode)
if dn.MatchLocation(ip, port) { if dn.MatchLocation(ip, port) {
@ -41,6 +41,7 @@ func (r *Rack) GetOrCreateDataNode(ip string, port int, publicUrl string, maxVol
dn.Port = port dn.Port = port
dn.PublicUrl = publicUrl dn.PublicUrl = publicUrl
dn.maxVolumeCount = maxVolumeCount dn.maxVolumeCount = maxVolumeCount
dn.maxSsdVolumeCount = maxSsdVolumeCount
dn.LastSeen = time.Now().Unix() dn.LastSeen = time.Now().Unix()
r.LinkChildNode(dn) r.LinkChildNode(dn)
return dn return dn
@ -50,6 +51,7 @@ func (r *Rack) ToMap() interface{} {
m := make(map[string]interface{}) m := make(map[string]interface{})
m["Id"] = r.Id() m["Id"] = r.Id()
m["Max"] = r.GetMaxVolumeCount() m["Max"] = r.GetMaxVolumeCount()
m["MaxSsd"] = r.GetMaxSsdVolumeCount()
m["Free"] = r.FreeSpace() m["Free"] = r.FreeSpace()
var dns []interface{} var dns []interface{}
for _, c := range r.Children() { for _, c := range r.Children() {
@ -65,6 +67,8 @@ func (r *Rack) ToRackInfo() *master_pb.RackInfo {
Id: string(r.Id()), Id: string(r.Id()),
VolumeCount: uint64(r.GetVolumeCount()), VolumeCount: uint64(r.GetVolumeCount()),
MaxVolumeCount: uint64(r.GetMaxVolumeCount()), MaxVolumeCount: uint64(r.GetMaxVolumeCount()),
MaxSsdVolumeCount: uint64(r.GetMaxSsdVolumeCount()),
SsdVolumeCount: uint64(r.GetSsdVolumeCount()),
FreeVolumeCount: uint64(r.FreeSpace()), FreeVolumeCount: uint64(r.FreeSpace()),
ActiveVolumeCount: uint64(r.GetActiveVolumeCount()), ActiveVolumeCount: uint64(r.GetActiveVolumeCount()),
RemoteVolumeCount: uint64(r.GetRemoteVolumeCount()), RemoteVolumeCount: uint64(r.GetRemoteVolumeCount()),

19
weed/topology/topology.go

@ -176,17 +176,27 @@ func (t *Topology) DeleteCollection(collectionName string) {
t.collectionMap.Delete(collectionName) t.collectionMap.Delete(collectionName)
} }
func (t *Topology) DeleteLayout(collectionName string, rp *super_block.ReplicaPlacement, ttl *needle.TTL, volumeType storage.VolumeType) {
collection, found := t.FindCollection(collectionName)
if !found {
return
}
collection.DeleteVolumeLayout(rp, ttl, volumeType)
}
func (t *Topology) RegisterVolumeLayout(v storage.VolumeInfo, dn *DataNode) { func (t *Topology) RegisterVolumeLayout(v storage.VolumeInfo, dn *DataNode) {
vl := t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl, v.VolumeType)
volumeType, _ := storage.ToVolumeType(v.VolumeType)
vl := t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl, volumeType)
vl.RegisterVolume(&v, dn) vl.RegisterVolume(&v, dn)
vl.EnsureCorrectWritables(&v) vl.EnsureCorrectWritables(&v)
} }
func (t *Topology) UnRegisterVolumeLayout(v storage.VolumeInfo, dn *DataNode) { func (t *Topology) UnRegisterVolumeLayout(v storage.VolumeInfo, dn *DataNode) {
glog.Infof("removing volume info: %+v", v) glog.Infof("removing volume info: %+v", v)
volumeLayout := t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl, v.VolumeType)
volumeType, _ := storage.ToVolumeType(v.VolumeType)
volumeLayout := t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl, volumeType)
volumeLayout.UnRegisterVolume(&v, dn) volumeLayout.UnRegisterVolume(&v, dn)
if volumeLayout.isEmpty() { if volumeLayout.isEmpty() {
t.DeleteCollection(v.Collection)
t.DeleteLayout(v.Collection, v.ReplicaPlacement, v.Ttl, volumeType)
} }
} }
@ -222,7 +232,8 @@ func (t *Topology) SyncDataNodeRegistration(volumes []*master_pb.VolumeInformati
t.UnRegisterVolumeLayout(v, dn) t.UnRegisterVolumeLayout(v, dn)
} }
for _, v := range changedVolumes { for _, v := range changedVolumes {
vl := t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl, v.VolumeType)
volumeType, _ := storage.ToVolumeType(v.VolumeType)
vl := t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl, volumeType)
vl.EnsureCorrectWritables(&v) vl.EnsureCorrectWritables(&v)
} }
return return

8
weed/topology/topology_event_handling.go

@ -37,7 +37,8 @@ func (t *Topology) StartRefreshWritableVolumes(grpcDialOption grpc.DialOption, g
}() }()
} }
func (t *Topology) SetVolumeCapacityFull(volumeInfo storage.VolumeInfo) bool { func (t *Topology) SetVolumeCapacityFull(volumeInfo storage.VolumeInfo) bool {
vl := t.GetVolumeLayout(volumeInfo.Collection, volumeInfo.ReplicaPlacement, volumeInfo.Ttl, volumeInfo.VolumeType)
volumeType, _ := storage.ToVolumeType(volumeInfo.VolumeType)
vl := t.GetVolumeLayout(volumeInfo.Collection, volumeInfo.ReplicaPlacement, volumeInfo.Ttl, volumeType)
if !vl.SetVolumeCapacityFull(volumeInfo.Id) { if !vl.SetVolumeCapacityFull(volumeInfo.Id) {
return false return false
} }
@ -55,13 +56,16 @@ func (t *Topology) SetVolumeCapacityFull(volumeInfo storage.VolumeInfo) bool {
func (t *Topology) UnRegisterDataNode(dn *DataNode) { func (t *Topology) UnRegisterDataNode(dn *DataNode) {
for _, v := range dn.GetVolumes() { for _, v := range dn.GetVolumes() {
glog.V(0).Infoln("Removing Volume", v.Id, "from the dead volume server", dn.Id()) glog.V(0).Infoln("Removing Volume", v.Id, "from the dead volume server", dn.Id())
vl := t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl, v.VolumeType)
volumeType, _ := storage.ToVolumeType(v.VolumeType)
vl := t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl, volumeType)
vl.SetVolumeUnavailable(dn, v.Id) vl.SetVolumeUnavailable(dn, v.Id)
} }
dn.UpAdjustVolumeCountDelta(-dn.GetVolumeCount()) dn.UpAdjustVolumeCountDelta(-dn.GetVolumeCount())
dn.UpAdjustSsdVolumeCountDelta(-dn.GetSsdVolumeCount())
dn.UpAdjustRemoteVolumeCountDelta(-dn.GetRemoteVolumeCount()) dn.UpAdjustRemoteVolumeCountDelta(-dn.GetRemoteVolumeCount())
dn.UpAdjustActiveVolumeCountDelta(-dn.GetActiveVolumeCount()) dn.UpAdjustActiveVolumeCountDelta(-dn.GetActiveVolumeCount())
dn.UpAdjustMaxVolumeCountDelta(-dn.GetMaxVolumeCount()) dn.UpAdjustMaxVolumeCountDelta(-dn.GetMaxVolumeCount())
dn.UpAdjustMaxSsdVolumeCountDelta(-dn.GetMaxSsdVolumeCount())
if dn.Parent() != nil { if dn.Parent() != nil {
dn.Parent().UnlinkChildNode(dn.Id()) dn.Parent().UnlinkChildNode(dn.Id())
} }

4
weed/topology/topology_map.go

@ -5,6 +5,7 @@ import "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
func (t *Topology) ToMap() interface{} { func (t *Topology) ToMap() interface{} {
m := make(map[string]interface{}) m := make(map[string]interface{})
m["Max"] = t.GetMaxVolumeCount() m["Max"] = t.GetMaxVolumeCount()
m["MaxSsd"] = t.GetMaxSsdVolumeCount()
m["Free"] = t.FreeSpace() m["Free"] = t.FreeSpace()
var dcs []interface{} var dcs []interface{}
for _, c := range t.Children() { for _, c := range t.Children() {
@ -30,6 +31,7 @@ func (t *Topology) ToMap() interface{} {
func (t *Topology) ToVolumeMap() interface{} { func (t *Topology) ToVolumeMap() interface{} {
m := make(map[string]interface{}) m := make(map[string]interface{})
m["Max"] = t.GetMaxVolumeCount() m["Max"] = t.GetMaxVolumeCount()
m["MaxSsd"] = t.GetMaxSsdVolumeCount()
m["Free"] = t.FreeSpace() m["Free"] = t.FreeSpace()
dcs := make(map[NodeId]interface{}) dcs := make(map[NodeId]interface{})
for _, c := range t.Children() { for _, c := range t.Children() {
@ -83,9 +85,11 @@ func (t *Topology) ToTopologyInfo() *master_pb.TopologyInfo {
Id: string(t.Id()), Id: string(t.Id()),
VolumeCount: uint64(t.GetVolumeCount()), VolumeCount: uint64(t.GetVolumeCount()),
MaxVolumeCount: uint64(t.GetMaxVolumeCount()), MaxVolumeCount: uint64(t.GetMaxVolumeCount()),
MaxSsdVolumeCount: uint64(t.GetMaxSsdVolumeCount()),
FreeVolumeCount: uint64(t.FreeSpace()), FreeVolumeCount: uint64(t.FreeSpace()),
ActiveVolumeCount: uint64(t.GetActiveVolumeCount()), ActiveVolumeCount: uint64(t.GetActiveVolumeCount()),
RemoteVolumeCount: uint64(t.GetRemoteVolumeCount()), RemoteVolumeCount: uint64(t.GetRemoteVolumeCount()),
SsdVolumeCount: uint64(t.GetSsdVolumeCount()),
} }
for _, c := range t.Children() { for _, c := range t.Children() {
dc := c.(*DataCenter) dc := c.(*DataCenter)

27
weed/topology/topology_test.go

@ -27,7 +27,7 @@ func TestHandlingVolumeServerHeartbeat(t *testing.T) {
dc := topo.GetOrCreateDataCenter("dc1") dc := topo.GetOrCreateDataCenter("dc1")
rack := dc.GetOrCreateRack("rack1") rack := dc.GetOrCreateRack("rack1")
dn := rack.GetOrCreateDataNode("127.0.0.1", 34534, "127.0.0.1", 25)
dn := rack.GetOrCreateDataNode("127.0.0.1", 34534, "127.0.0.1", 25, 12)
{ {
volumeCount := 7 volumeCount := 7
@ -48,10 +48,28 @@ func TestHandlingVolumeServerHeartbeat(t *testing.T) {
volumeMessages = append(volumeMessages, volumeMessage) volumeMessages = append(volumeMessages, volumeMessage)
} }
for k := 1; k <= volumeCount; k++ {
volumeMessage := &master_pb.VolumeInformationMessage{
Id: uint32(volumeCount + k),
Size: uint64(25432),
Collection: "",
FileCount: uint64(2343),
DeleteCount: uint64(345),
DeletedByteCount: 34524,
ReadOnly: false,
ReplicaPlacement: uint32(0),
Version: uint32(needle.CurrentVersion),
Ttl: 0,
VolumeType: "ssd",
}
volumeMessages = append(volumeMessages, volumeMessage)
}
topo.SyncDataNodeRegistration(volumeMessages, dn) topo.SyncDataNodeRegistration(volumeMessages, dn)
assert(t, "activeVolumeCount1", int(topo.activeVolumeCount), volumeCount)
assert(t, "activeVolumeCount1", int(topo.activeVolumeCount), volumeCount*2)
assert(t, "volumeCount", int(topo.volumeCount), volumeCount) assert(t, "volumeCount", int(topo.volumeCount), volumeCount)
assert(t, "ssdVolumeCount", int(topo.ssdVolumeCount), volumeCount)
} }
{ {
@ -115,7 +133,7 @@ func TestHandlingVolumeServerHeartbeat(t *testing.T) {
nil, nil,
dn) dn)
for vid, _ := range layout.vid2location {
for vid := range layout.vid2location {
println("after add volume id", vid) println("after add volume id", vid)
} }
for _, vid := range layout.writables { for _, vid := range layout.writables {
@ -144,12 +162,13 @@ func TestAddRemoveVolume(t *testing.T) {
dc := topo.GetOrCreateDataCenter("dc1") dc := topo.GetOrCreateDataCenter("dc1")
rack := dc.GetOrCreateRack("rack1") rack := dc.GetOrCreateRack("rack1")
dn := rack.GetOrCreateDataNode("127.0.0.1", 34534, "127.0.0.1", 25)
dn := rack.GetOrCreateDataNode("127.0.0.1", 34534, "127.0.0.1", 25, 12)
v := storage.VolumeInfo{ v := storage.VolumeInfo{
Id: needle.VolumeId(1), Id: needle.VolumeId(1),
Size: 100, Size: 100,
Collection: "xcollection", Collection: "xcollection",
VolumeType: "ssd",
FileCount: 123, FileCount: 123,
DeleteCount: 23, DeleteCount: 23,
DeletedByteCount: 45, DeletedByteCount: 45,

Loading…
Cancel
Save