You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

278 lines
6.9 KiB

6 years ago
9 years ago
9 years ago
4 years ago
6 years ago
9 years ago
9 years ago
  1. package topology
  2. import (
  3. "fmt"
  4. "github.com/chrislusf/seaweedfs/weed/glog"
  5. "github.com/chrislusf/seaweedfs/weed/pb"
  6. "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
  7. "github.com/chrislusf/seaweedfs/weed/storage"
  8. "github.com/chrislusf/seaweedfs/weed/storage/needle"
  9. "github.com/chrislusf/seaweedfs/weed/storage/types"
  10. "github.com/chrislusf/seaweedfs/weed/util"
  11. )
  12. type DataNode struct {
  13. NodeImpl
  14. Ip string
  15. Port int
  16. GrpcPort int
  17. PublicUrl string
  18. LastSeen int64 // unix time in seconds
  19. Counter int // in race condition, the previous dataNode was not dead
  20. }
  21. func NewDataNode(id string) *DataNode {
  22. dn := &DataNode{}
  23. dn.id = NodeId(id)
  24. dn.nodeType = "DataNode"
  25. dn.diskUsages = newDiskUsages()
  26. dn.children = make(map[NodeId]Node)
  27. dn.NodeImpl.value = dn
  28. return dn
  29. }
  30. func (dn *DataNode) String() string {
  31. dn.RLock()
  32. defer dn.RUnlock()
  33. return fmt.Sprintf("Node:%s, Ip:%s, Port:%d, PublicUrl:%s", dn.NodeImpl.String(), dn.Ip, dn.Port, dn.PublicUrl)
  34. }
  35. func (dn *DataNode) AddOrUpdateVolume(v storage.VolumeInfo) (isNew, isChangedRO bool) {
  36. dn.Lock()
  37. defer dn.Unlock()
  38. return dn.doAddOrUpdateVolume(v)
  39. }
  40. func (dn *DataNode) getOrCreateDisk(diskType string) *Disk {
  41. c, found := dn.children[NodeId(diskType)]
  42. if !found {
  43. c = NewDisk(diskType)
  44. dn.doLinkChildNode(c)
  45. }
  46. disk := c.(*Disk)
  47. return disk
  48. }
  49. func (dn *DataNode) doAddOrUpdateVolume(v storage.VolumeInfo) (isNew, isChangedRO bool) {
  50. disk := dn.getOrCreateDisk(v.DiskType)
  51. return disk.AddOrUpdateVolume(v)
  52. }
  53. // UpdateVolumes detects new/deleted/changed volumes on a volume server
  54. // used in master to notify master clients of these changes.
  55. func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) (newVolumes, deletedVolumes, changeRO []storage.VolumeInfo) {
  56. actualVolumeMap := make(map[needle.VolumeId]storage.VolumeInfo)
  57. for _, v := range actualVolumes {
  58. actualVolumeMap[v.Id] = v
  59. }
  60. dn.Lock()
  61. defer dn.Unlock()
  62. existingVolumes := dn.getVolumes()
  63. for _, v := range existingVolumes {
  64. vid := v.Id
  65. if _, ok := actualVolumeMap[vid]; !ok {
  66. glog.V(0).Infoln("Deleting volume id:", vid)
  67. disk := dn.getOrCreateDisk(v.DiskType)
  68. delete(disk.volumes, vid)
  69. deletedVolumes = append(deletedVolumes, v)
  70. deltaDiskUsages := newDiskUsages()
  71. deltaDiskUsage := deltaDiskUsages.getOrCreateDisk(types.ToDiskType(v.DiskType))
  72. deltaDiskUsage.volumeCount = -1
  73. if v.IsRemote() {
  74. deltaDiskUsage.remoteVolumeCount = -1
  75. }
  76. if !v.ReadOnly {
  77. deltaDiskUsage.activeVolumeCount = -1
  78. }
  79. disk.UpAdjustDiskUsageDelta(deltaDiskUsages)
  80. }
  81. }
  82. for _, v := range actualVolumes {
  83. isNew, isChangedRO := dn.doAddOrUpdateVolume(v)
  84. if isNew {
  85. newVolumes = append(newVolumes, v)
  86. }
  87. if isChangedRO {
  88. changeRO = append(changeRO, v)
  89. }
  90. }
  91. return
  92. }
  93. func (dn *DataNode) DeltaUpdateVolumes(newVolumes, deletedVolumes []storage.VolumeInfo) {
  94. dn.Lock()
  95. defer dn.Unlock()
  96. for _, v := range deletedVolumes {
  97. disk := dn.getOrCreateDisk(v.DiskType)
  98. delete(disk.volumes, v.Id)
  99. deltaDiskUsages := newDiskUsages()
  100. deltaDiskUsage := deltaDiskUsages.getOrCreateDisk(types.ToDiskType(v.DiskType))
  101. deltaDiskUsage.volumeCount = -1
  102. if v.IsRemote() {
  103. deltaDiskUsage.remoteVolumeCount = -1
  104. }
  105. if !v.ReadOnly {
  106. deltaDiskUsage.activeVolumeCount = -1
  107. }
  108. disk.UpAdjustDiskUsageDelta(deltaDiskUsages)
  109. }
  110. for _, v := range newVolumes {
  111. dn.doAddOrUpdateVolume(v)
  112. }
  113. return
  114. }
  115. func (dn *DataNode) AdjustMaxVolumeCounts(maxVolumeCounts map[string]uint32) {
  116. deltaDiskUsages := newDiskUsages()
  117. for diskType, maxVolumeCount := range maxVolumeCounts {
  118. if maxVolumeCount == 0 {
  119. // the volume server may have set the max to zero
  120. continue
  121. }
  122. dt := types.ToDiskType(diskType)
  123. currentDiskUsage := dn.diskUsages.getOrCreateDisk(dt)
  124. if currentDiskUsage.maxVolumeCount == int64(maxVolumeCount) {
  125. continue
  126. }
  127. disk := dn.getOrCreateDisk(dt.String())
  128. deltaDiskUsage := deltaDiskUsages.getOrCreateDisk(dt)
  129. deltaDiskUsage.maxVolumeCount = int64(maxVolumeCount) - currentDiskUsage.maxVolumeCount
  130. disk.UpAdjustDiskUsageDelta(deltaDiskUsages)
  131. }
  132. }
  133. func (dn *DataNode) GetVolumes() (ret []storage.VolumeInfo) {
  134. dn.RLock()
  135. for _, c := range dn.children {
  136. disk := c.(*Disk)
  137. ret = append(ret, disk.GetVolumes()...)
  138. }
  139. dn.RUnlock()
  140. return ret
  141. }
  142. func (dn *DataNode) GetVolumesById(id needle.VolumeId) (vInfo storage.VolumeInfo, err error) {
  143. dn.RLock()
  144. defer dn.RUnlock()
  145. found := false
  146. for _, c := range dn.children {
  147. disk := c.(*Disk)
  148. vInfo, found = disk.volumes[id]
  149. if found {
  150. break
  151. }
  152. }
  153. if found {
  154. return vInfo, nil
  155. } else {
  156. return storage.VolumeInfo{}, fmt.Errorf("volumeInfo not found")
  157. }
  158. }
  159. func (dn *DataNode) GetDataCenter() *DataCenter {
  160. rack := dn.Parent()
  161. if rack == nil {
  162. return nil
  163. }
  164. dcNode := rack.Parent()
  165. if dcNode == nil {
  166. return nil
  167. }
  168. dcValue := dcNode.GetValue()
  169. return dcValue.(*DataCenter)
  170. }
  171. func (dn *DataNode) GetRack() *Rack {
  172. return dn.Parent().(*NodeImpl).value.(*Rack)
  173. }
  174. func (dn *DataNode) GetTopology() *Topology {
  175. p := dn.Parent()
  176. for p.Parent() != nil {
  177. p = p.Parent()
  178. }
  179. t := p.(*Topology)
  180. return t
  181. }
  182. func (dn *DataNode) MatchLocation(ip string, port int) bool {
  183. return dn.Ip == ip && dn.Port == port
  184. }
  185. func (dn *DataNode) Url() string {
  186. return util.JoinHostPort(dn.Ip, dn.Port)
  187. }
  188. func (dn *DataNode) ServerAddress() pb.ServerAddress {
  189. return pb.NewServerAddress(dn.Ip, dn.Port, dn.GrpcPort)
  190. }
  191. func (dn *DataNode) ToMap() interface{} {
  192. ret := make(map[string]interface{})
  193. ret["Url"] = dn.Url()
  194. ret["PublicUrl"] = dn.PublicUrl
  195. // aggregated volume info
  196. var volumeCount, ecShardCount, maxVolumeCount int64
  197. var volumeIds string
  198. for _, diskUsage := range dn.diskUsages.usages {
  199. volumeCount += diskUsage.volumeCount
  200. ecShardCount += diskUsage.ecShardCount
  201. maxVolumeCount += diskUsage.maxVolumeCount
  202. }
  203. for _, disk := range dn.Children() {
  204. d := disk.(*Disk)
  205. volumeIds += " " + d.GetVolumeIds()
  206. }
  207. ret["Volumes"] = volumeCount
  208. ret["EcShards"] = ecShardCount
  209. ret["Max"] = maxVolumeCount
  210. ret["VolumeIds"] = volumeIds
  211. return ret
  212. }
  213. func (dn *DataNode) ToDataNodeInfo() *master_pb.DataNodeInfo {
  214. m := &master_pb.DataNodeInfo{
  215. Id: string(dn.Id()),
  216. DiskInfos: make(map[string]*master_pb.DiskInfo),
  217. GrpcPort: uint32(dn.GrpcPort),
  218. }
  219. for _, c := range dn.Children() {
  220. disk := c.(*Disk)
  221. m.DiskInfos[string(disk.Id())] = disk.ToDiskInfo()
  222. }
  223. return m
  224. }
  225. // GetVolumeIds returns the human readable volume ids limited to count of max 100.
  226. func (dn *DataNode) GetVolumeIds() string {
  227. dn.RLock()
  228. defer dn.RUnlock()
  229. existingVolumes := dn.getVolumes()
  230. ids := make([]int, 0, len(existingVolumes))
  231. for k := range existingVolumes {
  232. ids = append(ids, int(k))
  233. }
  234. return util.HumanReadableIntsMax(100, ids...)
  235. }
  236. func (dn *DataNode) getVolumes() []storage.VolumeInfo {
  237. var existingVolumes []storage.VolumeInfo
  238. for _, c := range dn.children {
  239. disk := c.(*Disk)
  240. existingVolumes = append(existingVolumes, disk.GetVolumes()...)
  241. }
  242. return existingVolumes
  243. }