You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							299 lines
						
					
					
						
							7.4 KiB
						
					
					
				
			
		
		
		
			
			
			
		
		
	
	
							299 lines
						
					
					
						
							7.4 KiB
						
					
					
				| package topology | |
| 
 | |
| import ( | |
| 	"fmt" | |
| 	"github.com/seaweedfs/seaweedfs/weed/glog" | |
| 	"github.com/seaweedfs/seaweedfs/weed/pb" | |
| 	"github.com/seaweedfs/seaweedfs/weed/pb/master_pb" | |
| 	"github.com/seaweedfs/seaweedfs/weed/storage" | |
| 	"github.com/seaweedfs/seaweedfs/weed/storage/needle" | |
| 	"github.com/seaweedfs/seaweedfs/weed/storage/types" | |
| 	"github.com/seaweedfs/seaweedfs/weed/util" | |
| 	"sync/atomic" | |
| ) | |
| 
 | |
| type DataNode struct { | |
| 	NodeImpl | |
| 	Ip            string | |
| 	Port          int | |
| 	GrpcPort      int | |
| 	PublicUrl     string | |
| 	LastSeen      int64 // unix time in seconds | |
| 	Counter       int   // in race condition, the previous dataNode was not dead | |
| 	IsTerminating bool | |
| } | |
| 
 | |
| func NewDataNode(id string) *DataNode { | |
| 	dn := &DataNode{} | |
| 	dn.id = NodeId(id) | |
| 	dn.nodeType = "DataNode" | |
| 	dn.diskUsages = newDiskUsages() | |
| 	dn.children = make(map[NodeId]Node) | |
| 	dn.NodeImpl.value = dn | |
| 	return dn | |
| } | |
| 
 | |
| func (dn *DataNode) String() string { | |
| 	dn.RLock() | |
| 	defer dn.RUnlock() | |
| 	return fmt.Sprintf("Node:%s, Ip:%s, Port:%d, PublicUrl:%s", dn.NodeImpl.String(), dn.Ip, dn.Port, dn.PublicUrl) | |
| } | |
| 
 | |
| func (dn *DataNode) AddOrUpdateVolume(v storage.VolumeInfo) (isNew, isChangedRO bool) { | |
| 	dn.Lock() | |
| 	defer dn.Unlock() | |
| 	return dn.doAddOrUpdateVolume(v) | |
| } | |
| 
 | |
| func (dn *DataNode) getOrCreateDisk(diskType string) *Disk { | |
| 	c, found := dn.children[NodeId(diskType)] | |
| 	if !found { | |
| 		c = NewDisk(diskType) | |
| 		dn.doLinkChildNode(c) | |
| 	} | |
| 	disk := c.(*Disk) | |
| 	return disk | |
| } | |
| 
 | |
| func (dn *DataNode) doAddOrUpdateVolume(v storage.VolumeInfo) (isNew, isChanged bool) { | |
| 	disk := dn.getOrCreateDisk(v.DiskType) | |
| 	return disk.AddOrUpdateVolume(v) | |
| } | |
| 
 | |
| // UpdateVolumes detects new/deleted/changed volumes on a volume server | |
| // used in master to notify master clients of these changes. | |
| func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) (newVolumes, deletedVolumes, changedVolumes []storage.VolumeInfo) { | |
| 
 | |
| 	actualVolumeMap := make(map[needle.VolumeId]storage.VolumeInfo) | |
| 	for _, v := range actualVolumes { | |
| 		actualVolumeMap[v.Id] = v | |
| 	} | |
| 
 | |
| 	dn.Lock() | |
| 	defer dn.Unlock() | |
| 
 | |
| 	existingVolumes := dn.getVolumes() | |
| 
 | |
| 	for _, v := range existingVolumes { | |
| 		vid := v.Id | |
| 		if _, ok := actualVolumeMap[vid]; !ok { | |
| 			glog.V(0).Infoln("Deleting volume id:", vid) | |
| 			disk := dn.getOrCreateDisk(v.DiskType) | |
| 			delete(disk.volumes, vid) | |
| 			deletedVolumes = append(deletedVolumes, v) | |
| 
 | |
| 			deltaDiskUsages := newDiskUsages() | |
| 			deltaDiskUsage := deltaDiskUsages.getOrCreateDisk(types.ToDiskType(v.DiskType)) | |
| 			deltaDiskUsage.volumeCount = -1 | |
| 			if v.IsRemote() { | |
| 				deltaDiskUsage.remoteVolumeCount = -1 | |
| 			} | |
| 			if !v.ReadOnly { | |
| 				deltaDiskUsage.activeVolumeCount = -1 | |
| 			} | |
| 			disk.UpAdjustDiskUsageDelta(deltaDiskUsages) | |
| 		} | |
| 	} | |
| 	for _, v := range actualVolumes { | |
| 		isNew, isChanged := dn.doAddOrUpdateVolume(v) | |
| 		if isNew { | |
| 			newVolumes = append(newVolumes, v) | |
| 		} | |
| 		if isChanged { | |
| 			changedVolumes = append(changedVolumes, v) | |
| 		} | |
| 	} | |
| 	return | |
| } | |
| 
 | |
| func (dn *DataNode) DeltaUpdateVolumes(newVolumes, deletedVolumes []storage.VolumeInfo) { | |
| 	dn.Lock() | |
| 	defer dn.Unlock() | |
| 
 | |
| 	for _, v := range deletedVolumes { | |
| 		disk := dn.getOrCreateDisk(v.DiskType) | |
| 		if _, found := disk.volumes[v.Id]; !found { | |
| 			continue | |
| 		} | |
| 		delete(disk.volumes, v.Id) | |
| 
 | |
| 		deltaDiskUsages := newDiskUsages() | |
| 		deltaDiskUsage := deltaDiskUsages.getOrCreateDisk(types.ToDiskType(v.DiskType)) | |
| 		deltaDiskUsage.volumeCount = -1 | |
| 		if v.IsRemote() { | |
| 			deltaDiskUsage.remoteVolumeCount = -1 | |
| 		} | |
| 		if !v.ReadOnly { | |
| 			deltaDiskUsage.activeVolumeCount = -1 | |
| 		} | |
| 		disk.UpAdjustDiskUsageDelta(deltaDiskUsages) | |
| 	} | |
| 	for _, v := range newVolumes { | |
| 		dn.doAddOrUpdateVolume(v) | |
| 	} | |
| 	return | |
| } | |
| 
 | |
| func (dn *DataNode) AdjustMaxVolumeCounts(maxVolumeCounts map[string]uint32) { | |
| 	deltaDiskUsages := newDiskUsages() | |
| 	for diskType, maxVolumeCount := range maxVolumeCounts { | |
| 		if maxVolumeCount == 0 { | |
| 			// the volume server may have set the max to zero | |
| 			continue | |
| 		} | |
| 		dt := types.ToDiskType(diskType) | |
| 		currentDiskUsage := dn.diskUsages.getOrCreateDisk(dt) | |
| 		currentDiskUsageMaxVolumeCount := atomic.LoadInt64(¤tDiskUsage.maxVolumeCount) | |
| 		if currentDiskUsageMaxVolumeCount == int64(maxVolumeCount) { | |
| 			continue | |
| 		} | |
| 		disk := dn.getOrCreateDisk(dt.String()) | |
| 		deltaDiskUsage := deltaDiskUsages.getOrCreateDisk(dt) | |
| 		deltaDiskUsage.maxVolumeCount = int64(maxVolumeCount) - currentDiskUsageMaxVolumeCount | |
| 		disk.UpAdjustDiskUsageDelta(deltaDiskUsages) | |
| 	} | |
| } | |
| 
 | |
| func (dn *DataNode) GetVolumes() (ret []storage.VolumeInfo) { | |
| 	dn.RLock() | |
| 	for _, c := range dn.children { | |
| 		disk := c.(*Disk) | |
| 		ret = append(ret, disk.GetVolumes()...) | |
| 	} | |
| 	dn.RUnlock() | |
| 	return ret | |
| } | |
| 
 | |
| func (dn *DataNode) GetVolumesById(id needle.VolumeId) (vInfo storage.VolumeInfo, err error) { | |
| 	dn.RLock() | |
| 	defer dn.RUnlock() | |
| 	found := false | |
| 	for _, c := range dn.children { | |
| 		disk := c.(*Disk) | |
| 		vInfo, found = disk.volumes[id] | |
| 		if found { | |
| 			break | |
| 		} | |
| 	} | |
| 	if found { | |
| 		return vInfo, nil | |
| 	} else { | |
| 		return storage.VolumeInfo{}, fmt.Errorf("volumeInfo not found") | |
| 	} | |
| } | |
| 
 | |
| func (dn *DataNode) GetDataCenter() *DataCenter { | |
| 	rack := dn.Parent() | |
| 	if rack == nil { | |
| 		return nil | |
| 	} | |
| 	dcNode := rack.Parent() | |
| 	if dcNode == nil { | |
| 		return nil | |
| 	} | |
| 	dcValue := dcNode.GetValue() | |
| 	return dcValue.(*DataCenter) | |
| } | |
| 
 | |
| func (dn *DataNode) GetDataCenterId() string { | |
| 	if dc := dn.GetDataCenter(); dc != nil { | |
| 		return string(dc.Id()) | |
| 	} | |
| 	return "" | |
| } | |
| 
 | |
| func (dn *DataNode) GetRack() *Rack { | |
| 	return dn.Parent().(*NodeImpl).value.(*Rack) | |
| } | |
| 
 | |
| func (dn *DataNode) GetTopology() *Topology { | |
| 	p := dn.Parent() | |
| 	for p.Parent() != nil { | |
| 		p = p.Parent() | |
| 	} | |
| 	t := p.(*Topology) | |
| 	return t | |
| } | |
| 
 | |
| func (dn *DataNode) MatchLocation(ip string, port int) bool { | |
| 	return dn.Ip == ip && dn.Port == port | |
| } | |
| 
 | |
| func (dn *DataNode) Url() string { | |
| 	return util.JoinHostPort(dn.Ip, dn.Port) | |
| } | |
| 
 | |
| func (dn *DataNode) ServerAddress() pb.ServerAddress { | |
| 	return pb.NewServerAddress(dn.Ip, dn.Port, dn.GrpcPort) | |
| } | |
| 
 | |
| type DataNodeInfo struct { | |
| 	Url       string `json:"Url"` | |
| 	PublicUrl string `json:"PublicUrl"` | |
| 	Volumes   int64  `json:"Volumes"` | |
| 	EcShards  int64  `json:"EcShards"` | |
| 	Max       int64  `json:"Max"` | |
| 	VolumeIds string `json:"VolumeIds"` | |
| } | |
| 
 | |
| func (dn *DataNode) ToInfo() (info DataNodeInfo) { | |
| 	info.Url = dn.Url() | |
| 	info.PublicUrl = dn.PublicUrl | |
| 
 | |
| 	// aggregated volume info | |
| 	var volumeCount, ecShardCount, maxVolumeCount int64 | |
| 	var volumeIds string | |
| 	for _, diskUsage := range dn.diskUsages.usages { | |
| 		volumeCount += diskUsage.volumeCount | |
| 		ecShardCount += diskUsage.ecShardCount | |
| 		maxVolumeCount += diskUsage.maxVolumeCount | |
| 	} | |
| 
 | |
| 	for _, disk := range dn.Children() { | |
| 		d := disk.(*Disk) | |
| 		volumeIds += " " + d.GetVolumeIds() | |
| 	} | |
| 
 | |
| 	info.Volumes = volumeCount | |
| 	info.EcShards = ecShardCount | |
| 	info.Max = maxVolumeCount | |
| 	info.VolumeIds = volumeIds | |
| 
 | |
| 	return | |
| } | |
| 
 | |
| func (dn *DataNode) ToDataNodeInfo() *master_pb.DataNodeInfo { | |
| 	m := &master_pb.DataNodeInfo{ | |
| 		Id:        string(dn.Id()), | |
| 		DiskInfos: make(map[string]*master_pb.DiskInfo), | |
| 		GrpcPort:  uint32(dn.GrpcPort), | |
| 	} | |
| 	for _, c := range dn.Children() { | |
| 		disk := c.(*Disk) | |
| 		m.DiskInfos[string(disk.Id())] = disk.ToDiskInfo() | |
| 	} | |
| 	return m | |
| } | |
| 
 | |
| // GetVolumeIds returns the human readable volume ids limited to count of max 100. | |
| func (dn *DataNode) GetVolumeIds() string { | |
| 	dn.RLock() | |
| 	defer dn.RUnlock() | |
| 	existingVolumes := dn.getVolumes() | |
| 	ids := make([]int, 0, len(existingVolumes)) | |
| 
 | |
| 	for k := range existingVolumes { | |
| 		ids = append(ids, int(k)) | |
| 	} | |
| 
 | |
| 	return util.HumanReadableIntsMax(100, ids...) | |
| } | |
| 
 | |
| func (dn *DataNode) getVolumes() []storage.VolumeInfo { | |
| 	var existingVolumes []storage.VolumeInfo | |
| 	for _, c := range dn.children { | |
| 		disk := c.(*Disk) | |
| 		existingVolumes = append(existingVolumes, disk.GetVolumes()...) | |
| 	} | |
| 	return existingVolumes | |
| }
 |