271 lines
7.0 KiB

4 years ago
4 years ago
4 years ago
  1. package topology
  2. import (
  3. "fmt"
  4. "github.com/seaweedfs/seaweedfs/weed/storage/types"
  5. "github.com/seaweedfs/seaweedfs/weed/util"
  6. "sync"
  7. "sync/atomic"
  8. "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
  9. "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
  10. "github.com/seaweedfs/seaweedfs/weed/storage/needle"
  11. "github.com/seaweedfs/seaweedfs/weed/storage"
  12. )
  13. type Disk struct {
  14. NodeImpl
  15. volumes map[needle.VolumeId]storage.VolumeInfo
  16. ecShards map[needle.VolumeId]*erasure_coding.EcVolumeInfo
  17. ecShardsLock sync.RWMutex
  18. }
  19. func NewDisk(diskType string) *Disk {
  20. s := &Disk{}
  21. s.id = NodeId(diskType)
  22. s.nodeType = "Disk"
  23. s.diskUsages = newDiskUsages()
  24. s.volumes = make(map[needle.VolumeId]storage.VolumeInfo, 2)
  25. s.ecShards = make(map[needle.VolumeId]*erasure_coding.EcVolumeInfo, 2)
  26. s.NodeImpl.value = s
  27. return s
  28. }
  29. type DiskUsages struct {
  30. sync.RWMutex
  31. usages map[types.DiskType]*DiskUsageCounts
  32. }
  33. func newDiskUsages() *DiskUsages {
  34. return &DiskUsages{
  35. usages: make(map[types.DiskType]*DiskUsageCounts),
  36. }
  37. }
  38. func (d *DiskUsages) negative() *DiskUsages {
  39. d.RLock()
  40. defer d.RUnlock()
  41. t := newDiskUsages()
  42. for diskType, b := range d.usages {
  43. a := t.getOrCreateDisk(diskType)
  44. a.volumeCount = -b.volumeCount
  45. a.remoteVolumeCount = -b.remoteVolumeCount
  46. a.activeVolumeCount = -b.activeVolumeCount
  47. a.ecShardCount = -b.ecShardCount
  48. a.maxVolumeCount = -b.maxVolumeCount
  49. }
  50. return t
  51. }
  52. func (d *DiskUsages) ToDiskInfo() map[string]*master_pb.DiskInfo {
  53. ret := make(map[string]*master_pb.DiskInfo)
  54. for diskType, diskUsageCounts := range d.usages {
  55. m := &master_pb.DiskInfo{
  56. VolumeCount: diskUsageCounts.volumeCount,
  57. MaxVolumeCount: diskUsageCounts.maxVolumeCount,
  58. FreeVolumeCount: diskUsageCounts.maxVolumeCount - diskUsageCounts.volumeCount,
  59. ActiveVolumeCount: diskUsageCounts.activeVolumeCount,
  60. RemoteVolumeCount: diskUsageCounts.remoteVolumeCount,
  61. }
  62. ret[string(diskType)] = m
  63. }
  64. return ret
  65. }
  66. func (d *DiskUsages) FreeSpace() (freeSpace int64) {
  67. d.RLock()
  68. defer d.RUnlock()
  69. for _, diskUsage := range d.usages {
  70. freeSpace += diskUsage.FreeSpace()
  71. }
  72. return
  73. }
  74. func (d *DiskUsages) GetMaxVolumeCount() (maxVolumeCount int64) {
  75. d.RLock()
  76. defer d.RUnlock()
  77. for _, diskUsage := range d.usages {
  78. maxVolumeCount += diskUsage.maxVolumeCount
  79. }
  80. return
  81. }
  82. type DiskUsageCounts struct {
  83. volumeCount int64
  84. remoteVolumeCount int64
  85. activeVolumeCount int64
  86. ecShardCount int64
  87. maxVolumeCount int64
  88. }
  89. func (a *DiskUsageCounts) addDiskUsageCounts(b *DiskUsageCounts) {
  90. atomic.AddInt64(&a.volumeCount, b.volumeCount)
  91. atomic.AddInt64(&a.remoteVolumeCount, b.remoteVolumeCount)
  92. atomic.AddInt64(&a.activeVolumeCount, b.activeVolumeCount)
  93. atomic.AddInt64(&a.ecShardCount, b.ecShardCount)
  94. atomic.AddInt64(&a.maxVolumeCount, b.maxVolumeCount)
  95. }
  96. func (a *DiskUsageCounts) FreeSpace() int64 {
  97. freeVolumeSlotCount := a.maxVolumeCount + a.remoteVolumeCount - a.volumeCount
  98. if a.ecShardCount > 0 {
  99. freeVolumeSlotCount = freeVolumeSlotCount - a.ecShardCount/erasure_coding.DataShardsCount - 1
  100. }
  101. return freeVolumeSlotCount
  102. }
  103. func (a *DiskUsageCounts) minus(b *DiskUsageCounts) *DiskUsageCounts {
  104. return &DiskUsageCounts{
  105. volumeCount: a.volumeCount - b.volumeCount,
  106. remoteVolumeCount: a.remoteVolumeCount - b.remoteVolumeCount,
  107. activeVolumeCount: a.activeVolumeCount - b.activeVolumeCount,
  108. ecShardCount: a.ecShardCount - b.ecShardCount,
  109. maxVolumeCount: a.maxVolumeCount - b.maxVolumeCount,
  110. }
  111. }
  112. func (du *DiskUsages) getOrCreateDisk(diskType types.DiskType) *DiskUsageCounts {
  113. du.Lock()
  114. defer du.Unlock()
  115. t, found := du.usages[diskType]
  116. if found {
  117. return t
  118. }
  119. t = &DiskUsageCounts{}
  120. du.usages[diskType] = t
  121. return t
  122. }
  123. func (d *Disk) String() string {
  124. d.RLock()
  125. defer d.RUnlock()
  126. return fmt.Sprintf("Disk:%s, volumes:%v, ecShards:%v", d.NodeImpl.String(), d.volumes, d.ecShards)
  127. }
  128. func (d *Disk) AddOrUpdateVolume(v storage.VolumeInfo) (isNew, isChanged bool) {
  129. d.Lock()
  130. defer d.Unlock()
  131. return d.doAddOrUpdateVolume(v)
  132. }
  133. func (d *Disk) doAddOrUpdateVolume(v storage.VolumeInfo) (isNew, isChanged bool) {
  134. deltaDiskUsages := newDiskUsages()
  135. deltaDiskUsage := deltaDiskUsages.getOrCreateDisk(types.ToDiskType(v.DiskType))
  136. if oldV, ok := d.volumes[v.Id]; !ok {
  137. d.volumes[v.Id] = v
  138. deltaDiskUsage.volumeCount = 1
  139. if v.IsRemote() {
  140. deltaDiskUsage.remoteVolumeCount = 1
  141. }
  142. if !v.ReadOnly {
  143. deltaDiskUsage.activeVolumeCount = 1
  144. }
  145. d.UpAdjustMaxVolumeId(v.Id)
  146. d.UpAdjustDiskUsageDelta(deltaDiskUsages)
  147. isNew = true
  148. } else {
  149. if oldV.IsRemote() != v.IsRemote() {
  150. if v.IsRemote() {
  151. deltaDiskUsage.remoteVolumeCount = 1
  152. }
  153. if oldV.IsRemote() {
  154. deltaDiskUsage.remoteVolumeCount = -1
  155. }
  156. d.UpAdjustDiskUsageDelta(deltaDiskUsages)
  157. }
  158. isChanged = d.volumes[v.Id].ReadOnly != v.ReadOnly
  159. d.volumes[v.Id] = v
  160. }
  161. return
  162. }
  163. func (d *Disk) GetVolumes() (ret []storage.VolumeInfo) {
  164. d.RLock()
  165. for _, v := range d.volumes {
  166. ret = append(ret, v)
  167. }
  168. d.RUnlock()
  169. return ret
  170. }
  171. func (d *Disk) GetVolumesById(id needle.VolumeId) (storage.VolumeInfo, error) {
  172. d.RLock()
  173. defer d.RUnlock()
  174. vInfo, ok := d.volumes[id]
  175. if ok {
  176. return vInfo, nil
  177. } else {
  178. return storage.VolumeInfo{}, fmt.Errorf("volumeInfo not found")
  179. }
  180. }
  181. func (d *Disk) GetDataCenter() *DataCenter {
  182. dn := d.Parent()
  183. rack := dn.Parent()
  184. dcNode := rack.Parent()
  185. dcValue := dcNode.GetValue()
  186. return dcValue.(*DataCenter)
  187. }
  188. func (d *Disk) GetRack() *Rack {
  189. return d.Parent().Parent().(*NodeImpl).value.(*Rack)
  190. }
  191. func (d *Disk) GetTopology() *Topology {
  192. p := d.Parent()
  193. for p.Parent() != nil {
  194. p = p.Parent()
  195. }
  196. t := p.(*Topology)
  197. return t
  198. }
  199. func (d *Disk) ToMap() interface{} {
  200. ret := make(map[string]interface{})
  201. diskUsage := d.diskUsages.getOrCreateDisk(types.ToDiskType(string(d.Id())))
  202. ret["Volumes"] = diskUsage.volumeCount
  203. ret["VolumeIds"] = d.GetVolumeIds()
  204. ret["EcShards"] = diskUsage.ecShardCount
  205. ret["Max"] = diskUsage.maxVolumeCount
  206. ret["Free"] = d.FreeSpace()
  207. return ret
  208. }
  209. func (d *Disk) FreeSpace() int64 {
  210. t := d.diskUsages.getOrCreateDisk(types.ToDiskType(string(d.Id())))
  211. return t.FreeSpace()
  212. }
  213. func (d *Disk) ToDiskInfo() *master_pb.DiskInfo {
  214. diskUsage := d.diskUsages.getOrCreateDisk(types.ToDiskType(string(d.Id())))
  215. m := &master_pb.DiskInfo{
  216. Type: string(d.Id()),
  217. VolumeCount: diskUsage.volumeCount,
  218. MaxVolumeCount: diskUsage.maxVolumeCount,
  219. FreeVolumeCount: diskUsage.maxVolumeCount - diskUsage.volumeCount,
  220. ActiveVolumeCount: diskUsage.activeVolumeCount,
  221. RemoteVolumeCount: diskUsage.remoteVolumeCount,
  222. }
  223. for _, v := range d.GetVolumes() {
  224. m.VolumeInfos = append(m.VolumeInfos, v.ToVolumeInformationMessage())
  225. }
  226. for _, ecv := range d.GetEcShards() {
  227. m.EcShardInfos = append(m.EcShardInfos, ecv.ToVolumeEcShardInformationMessage())
  228. }
  229. return m
  230. }
  231. // GetVolumeIds returns the human readable volume ids limited to count of max 100.
  232. func (d *Disk) GetVolumeIds() string {
  233. d.RLock()
  234. defer d.RUnlock()
  235. ids := make([]int, 0, len(d.volumes))
  236. for k := range d.volumes {
  237. ids = append(ids, int(k))
  238. }
  239. return util.HumanReadableIntsMax(100, ids...)
  240. }