You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

298 lines
9.6 KiB

5 years ago
5 years ago
4 years ago
4 years ago
4 years ago
4 years ago
6 years ago
5 years ago
  1. package shell
  2. import (
  3. "context"
  4. "flag"
  5. "fmt"
  6. "github.com/chrislusf/seaweedfs/weed/pb"
  7. "io"
  8. "math/rand"
  9. "sync"
  10. "time"
  11. "google.golang.org/grpc"
  12. "github.com/chrislusf/seaweedfs/weed/operation"
  13. "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
  14. "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
  15. "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
  16. "github.com/chrislusf/seaweedfs/weed/storage/needle"
  17. "github.com/chrislusf/seaweedfs/weed/wdclient"
  18. )
  19. func init() {
  20. Commands = append(Commands, &commandEcEncode{})
  21. }
  22. type commandEcEncode struct {
  23. }
  24. func (c *commandEcEncode) Name() string {
  25. return "ec.encode"
  26. }
  27. func (c *commandEcEncode) Help() string {
  28. return `apply erasure coding to a volume
  29. ec.encode [-collection=""] [-fullPercent=95 -quietFor=1h]
  30. ec.encode [-collection=""] [-volumeId=<volume_id>]
  31. This command will:
  32. 1. freeze one volume
  33. 2. apply erasure coding to the volume
  34. 3. move the encoded shards to multiple volume servers
  35. The erasure coding is 10.4. So ideally you have more than 14 volume servers, and you can afford
  36. to lose 4 volume servers.
  37. If the number of volumes are not high, the worst case is that you only have 4 volume servers,
  38. and the shards are spread as 4,4,3,3, respectively. You can afford to lose one volume server.
  39. If you only have less than 4 volume servers, with erasure coding, at least you can afford to
  40. have 4 corrupted shard files.
  41. `
  42. }
  43. func (c *commandEcEncode) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
  44. encodeCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
  45. volumeId := encodeCommand.Int("volumeId", 0, "the volume id")
  46. collection := encodeCommand.String("collection", "", "the collection name")
  47. fullPercentage := encodeCommand.Float64("fullPercent", 95, "the volume reaches the percentage of max volume size")
  48. quietPeriod := encodeCommand.Duration("quietFor", time.Hour, "select volumes without no writes for this period")
  49. parallelCopy := encodeCommand.Bool("parallelCopy", true, "copy shards in parallel")
  50. if err = encodeCommand.Parse(args); err != nil {
  51. return nil
  52. }
  53. if err = commandEnv.confirmIsLocked(args); err != nil {
  54. return
  55. }
  56. vid := needle.VolumeId(*volumeId)
  57. // volumeId is provided
  58. if vid != 0 {
  59. return doEcEncode(commandEnv, *collection, vid, *parallelCopy)
  60. }
  61. // apply to all volumes in the collection
  62. volumeIds, err := collectVolumeIdsForEcEncode(commandEnv, *collection, *fullPercentage, *quietPeriod)
  63. if err != nil {
  64. return err
  65. }
  66. fmt.Printf("ec encode volumes: %v\n", volumeIds)
  67. for _, vid := range volumeIds {
  68. if err = doEcEncode(commandEnv, *collection, vid, *parallelCopy); err != nil {
  69. return err
  70. }
  71. }
  72. return nil
  73. }
  74. func doEcEncode(commandEnv *CommandEnv, collection string, vid needle.VolumeId, parallelCopy bool) (err error) {
  75. // find volume location
  76. locations, found := commandEnv.MasterClient.GetLocations(uint32(vid))
  77. if !found {
  78. return fmt.Errorf("volume %d not found", vid)
  79. }
  80. // fmt.Printf("found ec %d shards on %v\n", vid, locations)
  81. // mark the volume as readonly
  82. err = markVolumeReplicasWritable(commandEnv.option.GrpcDialOption, vid, locations, false)
  83. if err != nil {
  84. return fmt.Errorf("mark volume %d as readonly on %s: %v", vid, locations[0].Url, err)
  85. }
  86. // generate ec shards
  87. err = generateEcShards(commandEnv.option.GrpcDialOption, vid, collection, locations[0].ServerAddress())
  88. if err != nil {
  89. return fmt.Errorf("generate ec shards for volume %d on %s: %v", vid, locations[0].Url, err)
  90. }
  91. // balance the ec shards to current cluster
  92. err = spreadEcShards(commandEnv, vid, collection, locations, parallelCopy)
  93. if err != nil {
  94. return fmt.Errorf("spread ec shards for volume %d from %s: %v", vid, locations[0].Url, err)
  95. }
  96. return nil
  97. }
  98. func generateEcShards(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, collection string, sourceVolumeServer pb.ServerAddress) error {
  99. fmt.Printf("generateEcShards %s %d on %s ...\n", collection, volumeId, sourceVolumeServer)
  100. err := operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
  101. _, genErr := volumeServerClient.VolumeEcShardsGenerate(context.Background(), &volume_server_pb.VolumeEcShardsGenerateRequest{
  102. VolumeId: uint32(volumeId),
  103. Collection: collection,
  104. })
  105. return genErr
  106. })
  107. return err
  108. }
  109. func spreadEcShards(commandEnv *CommandEnv, volumeId needle.VolumeId, collection string, existingLocations []wdclient.Location, parallelCopy bool) (err error) {
  110. allEcNodes, totalFreeEcSlots, err := collectEcNodes(commandEnv, "")
  111. if err != nil {
  112. return err
  113. }
  114. if totalFreeEcSlots < erasure_coding.TotalShardsCount {
  115. return fmt.Errorf("not enough free ec shard slots. only %d left", totalFreeEcSlots)
  116. }
  117. allocatedDataNodes := allEcNodes
  118. if len(allocatedDataNodes) > erasure_coding.TotalShardsCount {
  119. allocatedDataNodes = allocatedDataNodes[:erasure_coding.TotalShardsCount]
  120. }
  121. // calculate how many shards to allocate for these servers
  122. allocatedEcIds := balancedEcDistribution(allocatedDataNodes)
  123. // ask the data nodes to copy from the source volume server
  124. copiedShardIds, err := parallelCopyEcShardsFromSource(commandEnv.option.GrpcDialOption, allocatedDataNodes, allocatedEcIds, volumeId, collection, existingLocations[0], parallelCopy)
  125. if err != nil {
  126. return err
  127. }
  128. // unmount the to be deleted shards
  129. err = unmountEcShards(commandEnv.option.GrpcDialOption, volumeId, existingLocations[0].ServerAddress(), copiedShardIds)
  130. if err != nil {
  131. return err
  132. }
  133. // ask the source volume server to clean up copied ec shards
  134. err = sourceServerDeleteEcShards(commandEnv.option.GrpcDialOption, collection, volumeId, existingLocations[0].ServerAddress(), copiedShardIds)
  135. if err != nil {
  136. return fmt.Errorf("source delete copied ecShards %s %d.%v: %v", existingLocations[0].Url, volumeId, copiedShardIds, err)
  137. }
  138. // ask the source volume server to delete the original volume
  139. for _, location := range existingLocations {
  140. fmt.Printf("delete volume %d from %s\n", volumeId, location.Url)
  141. err = deleteVolume(commandEnv.option.GrpcDialOption, volumeId, location.ServerAddress())
  142. if err != nil {
  143. return fmt.Errorf("deleteVolume %s volume %d: %v", location.Url, volumeId, err)
  144. }
  145. }
  146. return err
  147. }
  148. func parallelCopyEcShardsFromSource(grpcDialOption grpc.DialOption, targetServers []*EcNode, allocatedEcIds [][]uint32, volumeId needle.VolumeId, collection string, existingLocation wdclient.Location, parallelCopy bool) (actuallyCopied []uint32, err error) {
  149. fmt.Printf("parallelCopyEcShardsFromSource %d %s\n", volumeId, existingLocation.Url)
  150. var wg sync.WaitGroup
  151. shardIdChan := make(chan []uint32, len(targetServers))
  152. copyFunc := func(server *EcNode, allocatedEcShardIds []uint32) {
  153. defer wg.Done()
  154. copiedShardIds, copyErr := oneServerCopyAndMountEcShardsFromSource(grpcDialOption, server,
  155. allocatedEcShardIds, volumeId, collection, existingLocation.ServerAddress())
  156. if copyErr != nil {
  157. err = copyErr
  158. } else {
  159. shardIdChan <- copiedShardIds
  160. server.addEcVolumeShards(volumeId, collection, copiedShardIds)
  161. }
  162. }
  163. cleanupFunc := func(server *EcNode, allocatedEcShardIds []uint32) {
  164. if err := unmountEcShards(grpcDialOption, volumeId, pb.NewServerAddressFromDataNode(server.info), allocatedEcShardIds); err != nil {
  165. fmt.Printf("unmount aborted shards %d.%v on %s: %v\n", volumeId, allocatedEcShardIds, server.info.Id, err)
  166. }
  167. if err := sourceServerDeleteEcShards(grpcDialOption, collection, volumeId, pb.NewServerAddressFromDataNode(server.info), allocatedEcShardIds); err != nil {
  168. fmt.Printf("remove aborted shards %d.%v on %s: %v\n", volumeId, allocatedEcShardIds, server.info.Id, err)
  169. }
  170. }
  171. // maybe parallelize
  172. for i, server := range targetServers {
  173. if len(allocatedEcIds[i]) <= 0 {
  174. continue
  175. }
  176. wg.Add(1)
  177. if parallelCopy {
  178. go copyFunc(server, allocatedEcIds[i])
  179. } else {
  180. copyFunc(server, allocatedEcIds[i])
  181. }
  182. }
  183. wg.Wait()
  184. close(shardIdChan)
  185. if err != nil {
  186. for i, server := range targetServers {
  187. if len(allocatedEcIds[i]) <= 0 {
  188. continue
  189. }
  190. cleanupFunc(server, allocatedEcIds[i])
  191. }
  192. return nil, err
  193. }
  194. for shardIds := range shardIdChan {
  195. actuallyCopied = append(actuallyCopied, shardIds...)
  196. }
  197. return
  198. }
  199. func balancedEcDistribution(servers []*EcNode) (allocated [][]uint32) {
  200. allocated = make([][]uint32, len(servers))
  201. allocatedShardIdIndex := uint32(0)
  202. serverIndex := rand.Intn(len(servers))
  203. for allocatedShardIdIndex < erasure_coding.TotalShardsCount {
  204. if servers[serverIndex].freeEcSlot > 0 {
  205. allocated[serverIndex] = append(allocated[serverIndex], allocatedShardIdIndex)
  206. allocatedShardIdIndex++
  207. }
  208. serverIndex++
  209. if serverIndex >= len(servers) {
  210. serverIndex = 0
  211. }
  212. }
  213. return allocated
  214. }
  215. func collectVolumeIdsForEcEncode(commandEnv *CommandEnv, selectedCollection string, fullPercentage float64, quietPeriod time.Duration) (vids []needle.VolumeId, err error) {
  216. // collect topology information
  217. topologyInfo, volumeSizeLimitMb, err := collectTopologyInfo(commandEnv)
  218. if err != nil {
  219. return
  220. }
  221. quietSeconds := int64(quietPeriod / time.Second)
  222. nowUnixSeconds := time.Now().Unix()
  223. fmt.Printf("collect volumes quiet for: %d seconds\n", quietSeconds)
  224. vidMap := make(map[uint32]bool)
  225. eachDataNode(topologyInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) {
  226. for _, diskInfo := range dn.DiskInfos {
  227. for _, v := range diskInfo.VolumeInfos {
  228. if v.Collection == selectedCollection && v.ModifiedAtSecond+quietSeconds < nowUnixSeconds {
  229. if float64(v.Size) > fullPercentage/100*float64(volumeSizeLimitMb)*1024*1024 {
  230. vidMap[v.Id] = true
  231. }
  232. }
  233. }
  234. }
  235. })
  236. for vid := range vidMap {
  237. vids = append(vids, needle.VolumeId(vid))
  238. }
  239. return
  240. }