You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

289 lines
7.5 KiB

4 years ago
4 years ago
4 years ago
  1. package topology
  2. import (
  3. "fmt"
  4. "github.com/chrislusf/seaweedfs/weed/storage/types"
  5. "github.com/chrislusf/seaweedfs/weed/util"
  6. "sync"
  7. "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
  8. "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
  9. "github.com/chrislusf/seaweedfs/weed/storage/needle"
  10. "github.com/chrislusf/seaweedfs/weed/storage"
  11. )
  12. type Disk struct {
  13. NodeImpl
  14. volumes map[needle.VolumeId]storage.VolumeInfo
  15. ecShards map[needle.VolumeId]*erasure_coding.EcVolumeInfo
  16. ecShardsLock sync.RWMutex
  17. }
  18. func NewDisk(diskType string) *Disk {
  19. s := &Disk{}
  20. s.id = NodeId(diskType)
  21. s.nodeType = "Disk"
  22. s.diskUsages = newDiskUsages()
  23. s.volumes = make(map[needle.VolumeId]storage.VolumeInfo, 2)
  24. s.ecShards = make(map[needle.VolumeId]*erasure_coding.EcVolumeInfo, 2)
  25. s.NodeImpl.value = s
  26. return s
  27. }
  28. type DiskUsages struct {
  29. sync.RWMutex
  30. usages map[types.DiskType]*DiskUsageCounts
  31. }
  32. func newDiskUsages() *DiskUsages {
  33. return &DiskUsages{
  34. usages: make(map[types.DiskType]*DiskUsageCounts),
  35. }
  36. }
  37. func (d *DiskUsages) negative() *DiskUsages {
  38. d.RLock()
  39. defer d.RUnlock()
  40. t := newDiskUsages()
  41. for diskType, b := range d.usages {
  42. a := t.getOrCreateDisk(diskType)
  43. a.volumeCount = -b.volumeCount
  44. a.remoteVolumeCount = -b.remoteVolumeCount
  45. a.activeVolumeCount = -b.activeVolumeCount
  46. a.ecShardCount = -b.ecShardCount
  47. a.maxVolumeCount = -b.maxVolumeCount
  48. }
  49. return t
  50. }
  51. func (d *DiskUsages) ToMap() interface{} {
  52. d.RLock()
  53. defer d.RUnlock()
  54. ret := make(map[string]interface{})
  55. for diskType, diskUsage := range d.usages {
  56. ret[diskType.String()] = diskUsage.ToMap()
  57. }
  58. return ret
  59. }
  60. func (d *DiskUsages) ToDiskInfo() map[string]*master_pb.DiskInfo {
  61. ret := make(map[string]*master_pb.DiskInfo)
  62. for diskType, diskUsageCounts := range d.usages {
  63. m := &master_pb.DiskInfo{
  64. VolumeCount: uint64(diskUsageCounts.volumeCount),
  65. MaxVolumeCount: uint64(diskUsageCounts.maxVolumeCount),
  66. FreeVolumeCount: uint64(diskUsageCounts.maxVolumeCount - diskUsageCounts.volumeCount),
  67. ActiveVolumeCount: uint64(diskUsageCounts.activeVolumeCount),
  68. RemoteVolumeCount: uint64(diskUsageCounts.remoteVolumeCount),
  69. }
  70. ret[string(diskType)] = m
  71. }
  72. return ret
  73. }
  74. func (d *DiskUsages) FreeSpace() (freeSpace int64) {
  75. d.RLock()
  76. defer d.RUnlock()
  77. for _, diskUsage := range d.usages {
  78. freeSpace += diskUsage.FreeSpace()
  79. }
  80. return
  81. }
  82. func (d *DiskUsages) GetMaxVolumeCount() (maxVolumeCount int64) {
  83. d.RLock()
  84. defer d.RUnlock()
  85. for _, diskUsage := range d.usages {
  86. maxVolumeCount += diskUsage.maxVolumeCount
  87. }
  88. return
  89. }
  90. type DiskUsageCounts struct {
  91. volumeCount int64
  92. remoteVolumeCount int64
  93. activeVolumeCount int64
  94. ecShardCount int64
  95. maxVolumeCount int64
  96. }
  97. func (a *DiskUsageCounts) addDiskUsageCounts(b *DiskUsageCounts) {
  98. a.volumeCount += b.volumeCount
  99. a.remoteVolumeCount += b.remoteVolumeCount
  100. a.activeVolumeCount += b.activeVolumeCount
  101. a.ecShardCount += b.ecShardCount
  102. a.maxVolumeCount += b.maxVolumeCount
  103. }
  104. func (a *DiskUsageCounts) FreeSpace() int64 {
  105. freeVolumeSlotCount := a.maxVolumeCount + a.remoteVolumeCount - a.volumeCount
  106. if a.ecShardCount > 0 {
  107. freeVolumeSlotCount = freeVolumeSlotCount - a.ecShardCount/erasure_coding.DataShardsCount - 1
  108. }
  109. return freeVolumeSlotCount
  110. }
  111. func (a *DiskUsageCounts) minus(b *DiskUsageCounts) *DiskUsageCounts {
  112. return &DiskUsageCounts{
  113. volumeCount: a.volumeCount - b.volumeCount,
  114. remoteVolumeCount: a.remoteVolumeCount - b.remoteVolumeCount,
  115. activeVolumeCount: a.activeVolumeCount - b.activeVolumeCount,
  116. ecShardCount: a.ecShardCount - b.ecShardCount,
  117. maxVolumeCount: a.maxVolumeCount - b.maxVolumeCount,
  118. }
  119. }
  120. func (diskUsage *DiskUsageCounts) ToMap() interface{} {
  121. ret := make(map[string]interface{})
  122. ret["Volumes"] = diskUsage.volumeCount
  123. ret["EcShards"] = diskUsage.ecShardCount
  124. ret["Max"] = diskUsage.maxVolumeCount
  125. ret["Free"] = diskUsage.FreeSpace()
  126. return ret
  127. }
  128. func (du *DiskUsages) getOrCreateDisk(diskType types.DiskType) *DiskUsageCounts {
  129. du.Lock()
  130. defer du.Unlock()
  131. t, found := du.usages[diskType]
  132. if found {
  133. return t
  134. }
  135. t = &DiskUsageCounts{}
  136. du.usages[diskType] = t
  137. return t
  138. }
  139. func (d *Disk) String() string {
  140. d.RLock()
  141. defer d.RUnlock()
  142. return fmt.Sprintf("Disk:%s, volumes:%v, ecShards:%v", d.NodeImpl.String(), d.volumes, d.ecShards)
  143. }
  144. func (d *Disk) AddOrUpdateVolume(v storage.VolumeInfo) (isNew, isChangedRO bool) {
  145. d.Lock()
  146. defer d.Unlock()
  147. return d.doAddOrUpdateVolume(v)
  148. }
  149. func (d *Disk) doAddOrUpdateVolume(v storage.VolumeInfo) (isNew, isChangedRO bool) {
  150. deltaDiskUsages := newDiskUsages()
  151. deltaDiskUsage := deltaDiskUsages.getOrCreateDisk(types.ToDiskType(v.DiskType))
  152. if oldV, ok := d.volumes[v.Id]; !ok {
  153. d.volumes[v.Id] = v
  154. deltaDiskUsage.volumeCount = 1
  155. if v.IsRemote() {
  156. deltaDiskUsage.remoteVolumeCount = 1
  157. }
  158. if !v.ReadOnly {
  159. deltaDiskUsage.activeVolumeCount = 1
  160. }
  161. d.UpAdjustMaxVolumeId(v.Id)
  162. d.UpAdjustDiskUsageDelta(deltaDiskUsages)
  163. isNew = true
  164. } else {
  165. if oldV.IsRemote() != v.IsRemote() {
  166. if v.IsRemote() {
  167. deltaDiskUsage.remoteVolumeCount = 1
  168. }
  169. if oldV.IsRemote() {
  170. deltaDiskUsage.remoteVolumeCount = -1
  171. }
  172. d.UpAdjustDiskUsageDelta(deltaDiskUsages)
  173. }
  174. isChangedRO = d.volumes[v.Id].ReadOnly != v.ReadOnly
  175. d.volumes[v.Id] = v
  176. }
  177. return
  178. }
  179. func (d *Disk) GetVolumes() (ret []storage.VolumeInfo) {
  180. d.RLock()
  181. for _, v := range d.volumes {
  182. ret = append(ret, v)
  183. }
  184. d.RUnlock()
  185. return ret
  186. }
  187. func (d *Disk) GetVolumesById(id needle.VolumeId) (storage.VolumeInfo, error) {
  188. d.RLock()
  189. defer d.RUnlock()
  190. vInfo, ok := d.volumes[id]
  191. if ok {
  192. return vInfo, nil
  193. } else {
  194. return storage.VolumeInfo{}, fmt.Errorf("volumeInfo not found")
  195. }
  196. }
  197. func (d *Disk) GetDataCenter() *DataCenter {
  198. dn := d.Parent()
  199. rack := dn.Parent()
  200. dcNode := rack.Parent()
  201. dcValue := dcNode.GetValue()
  202. return dcValue.(*DataCenter)
  203. }
  204. func (d *Disk) GetRack() *Rack {
  205. return d.Parent().Parent().(*NodeImpl).value.(*Rack)
  206. }
  207. func (d *Disk) GetTopology() *Topology {
  208. p := d.Parent()
  209. for p.Parent() != nil {
  210. p = p.Parent()
  211. }
  212. t := p.(*Topology)
  213. return t
  214. }
  215. func (d *Disk) ToMap() interface{} {
  216. ret := make(map[string]interface{})
  217. diskUsage := d.diskUsages.getOrCreateDisk(types.ToDiskType(string(d.Id())))
  218. ret["Volumes"] = diskUsage.volumeCount
  219. ret["VolumeIds"] = d.GetVolumeIds()
  220. ret["EcShards"] = diskUsage.ecShardCount
  221. ret["Max"] = diskUsage.maxVolumeCount
  222. ret["Free"] = d.FreeSpace()
  223. return ret
  224. }
  225. func (d *Disk) FreeSpace() int64 {
  226. t := d.diskUsages.getOrCreateDisk(types.ToDiskType(string(d.Id())))
  227. return t.FreeSpace()
  228. }
  229. func (d *Disk) ToDiskInfo() *master_pb.DiskInfo {
  230. diskUsage := d.diskUsages.getOrCreateDisk(types.ToDiskType(string(d.Id())))
  231. m := &master_pb.DiskInfo{
  232. Type: string(d.Id()),
  233. VolumeCount: uint64(diskUsage.volumeCount),
  234. MaxVolumeCount: uint64(diskUsage.maxVolumeCount),
  235. FreeVolumeCount: uint64(diskUsage.maxVolumeCount - diskUsage.volumeCount),
  236. ActiveVolumeCount: uint64(diskUsage.activeVolumeCount),
  237. RemoteVolumeCount: uint64(diskUsage.remoteVolumeCount),
  238. }
  239. for _, v := range d.GetVolumes() {
  240. m.VolumeInfos = append(m.VolumeInfos, v.ToVolumeInformationMessage())
  241. }
  242. for _, ecv := range d.GetEcShards() {
  243. m.EcShardInfos = append(m.EcShardInfos, ecv.ToVolumeEcShardInformationMessage())
  244. }
  245. return m
  246. }
  247. // GetVolumeIds returns the human readable volume ids limited to count of max 100.
  248. func (d *Disk) GetVolumeIds() string {
  249. d.RLock()
  250. defer d.RUnlock()
  251. ids := make([]int, 0, len(d.volumes))
  252. for k := range d.volumes {
  253. ids = append(ids, int(k))
  254. }
  255. return util.HumanReadableIntsMax(100, ids...)
  256. }