You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

295 lines
10 KiB

4 months ago
6 years ago
4 months ago
7 months ago
7 months ago
7 months ago
7 months ago
3 years ago
6 years ago
3 years ago
  1. package topology
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
  6. "github.com/seaweedfs/seaweedfs/weed/server/constants"
  7. "math/rand/v2"
  8. "reflect"
  9. "sync"
  10. "time"
  11. "google.golang.org/grpc"
  12. "github.com/seaweedfs/seaweedfs/weed/glog"
  13. "github.com/seaweedfs/seaweedfs/weed/storage"
  14. "github.com/seaweedfs/seaweedfs/weed/storage/needle"
  15. "github.com/seaweedfs/seaweedfs/weed/storage/super_block"
  16. "github.com/seaweedfs/seaweedfs/weed/storage/types"
  17. )
  18. /*
  19. This package is created to resolve these replica placement issues:
  20. 1. growth factor for each replica level, e.g., add 10 volumes for 1 copy, 20 volumes for 2 copies, 30 volumes for 3 copies
  21. 2. in time of tight storage, how to reduce replica level
  22. 3. optimizing for hot data on faster disk, cold data on cheaper storage,
  23. 4. volume allocation for each bucket
  24. */
  25. type VolumeGrowRequest struct {
  26. Option *VolumeGrowOption
  27. Count uint32
  28. Force bool
  29. Reason string
  30. }
  31. func (vg *VolumeGrowRequest) Equals(req *VolumeGrowRequest) bool {
  32. return reflect.DeepEqual(vg.Option, req.Option) && vg.Count == req.Count && vg.Force == req.Force
  33. }
  34. type volumeGrowthStrategy struct {
  35. Copy1Count uint32
  36. Copy2Count uint32
  37. Copy3Count uint32
  38. CopyOtherCount uint32
  39. Threshold float64
  40. }
  41. var (
  42. VolumeGrowStrategy = volumeGrowthStrategy{
  43. Copy1Count: 7,
  44. Copy2Count: 6,
  45. Copy3Count: 3,
  46. CopyOtherCount: 1,
  47. Threshold: 0.9,
  48. }
  49. )
  50. type VolumeGrowOption struct {
  51. Collection string `json:"collection,omitempty"`
  52. ReplicaPlacement *super_block.ReplicaPlacement `json:"replication,omitempty"`
  53. Ttl *needle.TTL `json:"ttl,omitempty"`
  54. DiskType types.DiskType `json:"disk,omitempty"`
  55. Preallocate int64 `json:"preallocate,omitempty"`
  56. DataCenter string `json:"dataCenter,omitempty"`
  57. Rack string `json:"rack,omitempty"`
  58. DataNode string `json:"dataNode,omitempty"`
  59. MemoryMapMaxSizeMb uint32 `json:"memoryMapMaxSizeMb,omitempty"`
  60. }
  61. type VolumeGrowth struct {
  62. accessLock sync.Mutex
  63. }
  64. func (o *VolumeGrowOption) String() string {
  65. blob, _ := json.Marshal(o)
  66. return string(blob)
  67. }
  68. func NewDefaultVolumeGrowth() *VolumeGrowth {
  69. return &VolumeGrowth{}
  70. }
  71. // one replication type may need rp.GetCopyCount() actual volumes
  72. // given copyCount, how many logical volumes to create
  73. func (vg *VolumeGrowth) findVolumeCount(copyCount int) (count uint32) {
  74. switch copyCount {
  75. case 1:
  76. count = VolumeGrowStrategy.Copy1Count
  77. case 2:
  78. count = VolumeGrowStrategy.Copy2Count
  79. case 3:
  80. count = VolumeGrowStrategy.Copy3Count
  81. default:
  82. count = VolumeGrowStrategy.CopyOtherCount
  83. }
  84. return
  85. }
  86. func (vg *VolumeGrowth) AutomaticGrowByType(option *VolumeGrowOption, grpcDialOption grpc.DialOption, topo *Topology, targetCount uint32) (result []*master_pb.VolumeLocation, err error) {
  87. if targetCount == 0 {
  88. targetCount = vg.findVolumeCount(option.ReplicaPlacement.GetCopyCount())
  89. }
  90. result, err = vg.GrowByCountAndType(grpcDialOption, targetCount, option, topo)
  91. if len(result) > 0 && len(result)%option.ReplicaPlacement.GetCopyCount() == 0 {
  92. return result, nil
  93. }
  94. return result, err
  95. }
  96. func (vg *VolumeGrowth) GrowByCountAndType(grpcDialOption grpc.DialOption, targetCount uint32, option *VolumeGrowOption, topo *Topology) (result []*master_pb.VolumeLocation, err error) {
  97. vg.accessLock.Lock()
  98. defer vg.accessLock.Unlock()
  99. for i := uint32(0); i < targetCount; i++ {
  100. if res, e := vg.findAndGrow(grpcDialOption, topo, option); e == nil {
  101. result = append(result, res...)
  102. } else {
  103. glog.V(0).Infof("create %d volume, created %d: %v", targetCount, len(result), e)
  104. return result, e
  105. }
  106. }
  107. return
  108. }
  109. func (vg *VolumeGrowth) findAndGrow(grpcDialOption grpc.DialOption, topo *Topology, option *VolumeGrowOption) (result []*master_pb.VolumeLocation, err error) {
  110. servers, e := vg.findEmptySlotsForOneVolume(topo, option)
  111. if e != nil {
  112. return nil, e
  113. }
  114. for !topo.LastLeaderChangeTime.Add(constants.VolumePulseSeconds * 2).Before(time.Now()) {
  115. glog.V(0).Infof("wait for volume servers to join back")
  116. time.Sleep(constants.VolumePulseSeconds / 2)
  117. }
  118. vid, raftErr := topo.NextVolumeId()
  119. if raftErr != nil {
  120. return nil, raftErr
  121. }
  122. if err = vg.grow(grpcDialOption, topo, vid, option, servers...); err == nil {
  123. for _, server := range servers {
  124. result = append(result, &master_pb.VolumeLocation{
  125. Url: server.Url(),
  126. PublicUrl: server.PublicUrl,
  127. DataCenter: server.GetDataCenterId(),
  128. GrpcPort: uint32(server.GrpcPort),
  129. NewVids: []uint32{uint32(vid)},
  130. })
  131. }
  132. }
  133. return
  134. }
  135. // 1. find the main data node
  136. // 1.1 collect all data nodes that have 1 slots
  137. // 2.2 collect all racks that have rp.SameRackCount+1
  138. // 2.2 collect all data centers that have DiffRackCount+rp.SameRackCount+1
  139. // 2. find rest data nodes
  140. func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *VolumeGrowOption) (servers []*DataNode, err error) {
  141. //find main datacenter and other data centers
  142. rp := option.ReplicaPlacement
  143. mainDataCenter, otherDataCenters, dc_err := topo.PickNodesByWeight(rp.DiffDataCenterCount+1, option, func(node Node) error {
  144. if option.DataCenter != "" && node.IsDataCenter() && node.Id() != NodeId(option.DataCenter) {
  145. return fmt.Errorf("Not matching preferred data center:%s", option.DataCenter)
  146. }
  147. if len(node.Children()) < rp.DiffRackCount+1 {
  148. return fmt.Errorf("Only has %d racks, not enough for %d.", len(node.Children()), rp.DiffRackCount+1)
  149. }
  150. if node.AvailableSpaceFor(option) < int64(rp.DiffRackCount+rp.SameRackCount+1) {
  151. return fmt.Errorf("Free:%d < Expected:%d", node.AvailableSpaceFor(option), rp.DiffRackCount+rp.SameRackCount+1)
  152. }
  153. possibleRacksCount := 0
  154. for _, rack := range node.Children() {
  155. possibleDataNodesCount := 0
  156. for _, n := range rack.Children() {
  157. if n.AvailableSpaceFor(option) >= 1 {
  158. possibleDataNodesCount++
  159. }
  160. }
  161. if possibleDataNodesCount >= rp.SameRackCount+1 {
  162. possibleRacksCount++
  163. }
  164. }
  165. if possibleRacksCount < rp.DiffRackCount+1 {
  166. return fmt.Errorf("Only has %d racks with more than %d free data nodes, not enough for %d.", possibleRacksCount, rp.SameRackCount+1, rp.DiffRackCount+1)
  167. }
  168. return nil
  169. })
  170. if dc_err != nil {
  171. return nil, dc_err
  172. }
  173. //find main rack and other racks
  174. mainRack, otherRacks, rackErr := mainDataCenter.(*DataCenter).PickNodesByWeight(rp.DiffRackCount+1, option, func(node Node) error {
  175. if option.Rack != "" && node.IsRack() && node.Id() != NodeId(option.Rack) {
  176. return fmt.Errorf("Not matching preferred rack:%s", option.Rack)
  177. }
  178. if node.AvailableSpaceFor(option) < int64(rp.SameRackCount+1) {
  179. return fmt.Errorf("Free:%d < Expected:%d", node.AvailableSpaceFor(option), rp.SameRackCount+1)
  180. }
  181. if len(node.Children()) < rp.SameRackCount+1 {
  182. // a bit faster way to test free racks
  183. return fmt.Errorf("Only has %d data nodes, not enough for %d.", len(node.Children()), rp.SameRackCount+1)
  184. }
  185. possibleDataNodesCount := 0
  186. for _, n := range node.Children() {
  187. if n.AvailableSpaceFor(option) >= 1 {
  188. possibleDataNodesCount++
  189. }
  190. }
  191. if possibleDataNodesCount < rp.SameRackCount+1 {
  192. return fmt.Errorf("Only has %d data nodes with a slot, not enough for %d.", possibleDataNodesCount, rp.SameRackCount+1)
  193. }
  194. return nil
  195. })
  196. if rackErr != nil {
  197. return nil, rackErr
  198. }
  199. //find main server and other servers
  200. mainServer, otherServers, serverErr := mainRack.(*Rack).PickNodesByWeight(rp.SameRackCount+1, option, func(node Node) error {
  201. if option.DataNode != "" && node.IsDataNode() && node.Id() != NodeId(option.DataNode) {
  202. return fmt.Errorf("Not matching preferred data node:%s", option.DataNode)
  203. }
  204. if node.AvailableSpaceFor(option) < 1 {
  205. return fmt.Errorf("Free:%d < Expected:%d", node.AvailableSpaceFor(option), 1)
  206. }
  207. return nil
  208. })
  209. if serverErr != nil {
  210. return nil, serverErr
  211. }
  212. servers = append(servers, mainServer.(*DataNode))
  213. for _, server := range otherServers {
  214. servers = append(servers, server.(*DataNode))
  215. }
  216. for _, rack := range otherRacks {
  217. r := rand.Int64N(rack.AvailableSpaceFor(option))
  218. if server, e := rack.ReserveOneVolume(r, option); e == nil {
  219. servers = append(servers, server)
  220. } else {
  221. return servers, e
  222. }
  223. }
  224. for _, datacenter := range otherDataCenters {
  225. r := rand.Int64N(datacenter.AvailableSpaceFor(option))
  226. if server, e := datacenter.ReserveOneVolume(r, option); e == nil {
  227. servers = append(servers, server)
  228. } else {
  229. return servers, e
  230. }
  231. }
  232. return
  233. }
  234. func (vg *VolumeGrowth) grow(grpcDialOption grpc.DialOption, topo *Topology, vid needle.VolumeId, option *VolumeGrowOption, servers ...*DataNode) (growErr error) {
  235. var createdVolumes []storage.VolumeInfo
  236. for _, server := range servers {
  237. if err := AllocateVolume(server, grpcDialOption, vid, option); err == nil {
  238. createdVolumes = append(createdVolumes, storage.VolumeInfo{
  239. Id: vid,
  240. Size: 0,
  241. Collection: option.Collection,
  242. ReplicaPlacement: option.ReplicaPlacement,
  243. Ttl: option.Ttl,
  244. Version: needle.CurrentVersion,
  245. DiskType: option.DiskType.String(),
  246. ModifiedAtSecond: time.Now().Unix(),
  247. })
  248. glog.V(0).Infof("Created Volume %d on %s", vid, server.NodeImpl.String())
  249. } else {
  250. glog.Warningf("Failed to assign volume %d on %s: %v", vid, server.NodeImpl.String(), err)
  251. growErr = fmt.Errorf("failed to assign volume %d on %s: %v", vid, server.NodeImpl.String(), err)
  252. break
  253. }
  254. }
  255. if growErr == nil {
  256. for i, vi := range createdVolumes {
  257. server := servers[i]
  258. server.AddOrUpdateVolume(vi)
  259. topo.RegisterVolumeLayout(vi, server)
  260. glog.V(0).Infof("Registered Volume %d on %s", vid, server.NodeImpl.String())
  261. }
  262. } else {
  263. // cleaning up created volume replicas
  264. for i, vi := range createdVolumes {
  265. server := servers[i]
  266. if err := DeleteVolume(server, grpcDialOption, vi.Id); err != nil {
  267. glog.Warningf("Failed to clean up volume %d on %s", vid, server.NodeImpl.String())
  268. }
  269. }
  270. }
  271. return growErr
  272. }