You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

303 lines
9.5 KiB

  1. package shell
  2. import (
  3. "context"
  4. "flag"
  5. "fmt"
  6. "io"
  7. "sort"
  8. "sync"
  9. "github.com/chrislusf/seaweedfs/weed/glog"
  10. "github.com/chrislusf/seaweedfs/weed/operation"
  11. "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
  12. "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
  13. "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
  14. "github.com/chrislusf/seaweedfs/weed/storage/needle"
  15. "github.com/chrislusf/seaweedfs/weed/wdclient"
  16. "google.golang.org/grpc"
  17. )
  18. func init() {
  19. commands = append(commands, &commandEcEncode{})
  20. }
  21. type commandEcEncode struct {
  22. }
  23. func (c *commandEcEncode) Name() string {
  24. return "ec.encode"
  25. }
  26. func (c *commandEcEncode) Help() string {
  27. return `apply erasure coding to a volume
  28. This command will:
  29. 1. freeze one volume
  30. 2. apply erasure coding to the volume
  31. 3. move the encoded shards to multiple volume servers
  32. The erasure coding is 10.4. So ideally you have more than 14 volume servers, and you can afford
  33. to lose 4 volume servers.
  34. If the number of volumes are not high, the worst case is that you only have 4 volume servers,
  35. and the shards are spread as 4,4,3,3, respectively. You can afford to lose one volume server.
  36. If you only have less than 4 volume servers, with erasure coding, at least you can afford to
  37. have 4 corrupted shard files.
  38. `
  39. }
  40. func (c *commandEcEncode) Do(args []string, commandEnv *commandEnv, writer io.Writer) (err error) {
  41. encodeCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
  42. volumeId := encodeCommand.Int("volumeId", 0, "the volume id")
  43. collection := encodeCommand.String("collection", "", "the collection name")
  44. if err = encodeCommand.Parse(args); err != nil {
  45. return nil
  46. }
  47. ctx := context.Background()
  48. // find volume location
  49. locations := commandEnv.masterClient.GetLocations(uint32(*volumeId))
  50. if len(locations) == 0 {
  51. return fmt.Errorf("volume %d not found", *volumeId)
  52. }
  53. // generate ec shards
  54. err = generateEcShards(ctx, commandEnv.option.GrpcDialOption, needle.VolumeId(*volumeId), *collection, locations[0].Url)
  55. if err != nil {
  56. return fmt.Errorf("generate ec shards for volume %d on %s: %v", *volumeId, locations[0].Url, err)
  57. }
  58. // balance the ec shards to current cluster
  59. err = balanceEcShards(ctx, commandEnv, needle.VolumeId(*volumeId), *collection, locations)
  60. if err != nil {
  61. return fmt.Errorf("balance ec shards for volume %d on %s: %v", *volumeId, locations[0].Url, err)
  62. }
  63. return err
  64. }
  65. func generateEcShards(ctx context.Context, grpcDialOption grpc.DialOption, volumeId needle.VolumeId, collection string, sourceVolumeServer string) error {
  66. err := operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
  67. _, genErr := volumeServerClient.VolumeEcShardsGenerate(ctx, &volume_server_pb.VolumeEcShardsGenerateRequest{
  68. VolumeId: uint32(volumeId),
  69. Collection: collection,
  70. })
  71. return genErr
  72. })
  73. return err
  74. }
  75. func balanceEcShards(ctx context.Context, commandEnv *commandEnv, volumeId needle.VolumeId, collection string, existingLocations []wdclient.Location) (err error) {
  76. // list all possible locations
  77. var resp *master_pb.VolumeListResponse
  78. err = commandEnv.masterClient.WithClient(ctx, func(client master_pb.SeaweedClient) error {
  79. resp, err = client.VolumeList(ctx, &master_pb.VolumeListRequest{})
  80. return err
  81. })
  82. if err != nil {
  83. return err
  84. }
  85. // find out all volume servers with one volume slot left.
  86. var allDataNodes []*master_pb.DataNodeInfo
  87. var totalFreeEcSlots uint32
  88. eachDataNode(resp.TopologyInfo, func(dn *master_pb.DataNodeInfo) {
  89. if freeEcSlots := countFreeShardSlots(dn); freeEcSlots > 0 {
  90. allDataNodes = append(allDataNodes, dn)
  91. totalFreeEcSlots += freeEcSlots
  92. }
  93. })
  94. if totalFreeEcSlots < erasure_coding.TotalShardsCount {
  95. return fmt.Errorf("not enough free ec shard slots. only %d left", totalFreeEcSlots)
  96. }
  97. sort.Slice(allDataNodes, func(i, j int) bool {
  98. return countFreeShardSlots(allDataNodes[j]) < countFreeShardSlots(allDataNodes[i])
  99. })
  100. if len(allDataNodes) > erasure_coding.TotalShardsCount {
  101. allDataNodes = allDataNodes[:erasure_coding.TotalShardsCount]
  102. }
  103. // calculate how many shards to allocate for these servers
  104. allocated := balancedEcDistribution(allDataNodes)
  105. // ask the data nodes to copy from the source volume server
  106. copiedShardIds, err := parallelCopyEcShardsFromSource(ctx, commandEnv.option.GrpcDialOption, allDataNodes, allocated, volumeId, collection, existingLocations[0])
  107. if err != nil {
  108. return nil
  109. }
  110. // ask the source volume server to clean up copied ec shards
  111. err = sourceServerDeleteEcShards(ctx, commandEnv.option.GrpcDialOption, volumeId, existingLocations[0], copiedShardIds)
  112. if err != nil {
  113. return fmt.Errorf("sourceServerDeleteEcShards %s %d.%v: %v", existingLocations[0], volumeId, copiedShardIds, err)
  114. }
  115. // ask the source volume server to delete the original volume
  116. for _, location := range existingLocations {
  117. err = deleteVolume(ctx, commandEnv.option.GrpcDialOption, volumeId, location.Url)
  118. if err != nil {
  119. return fmt.Errorf("deleteVolume %s volume %d: %v", location.Url, volumeId, err)
  120. }
  121. }
  122. return err
  123. }
  124. func parallelCopyEcShardsFromSource(ctx context.Context, grpcDialOption grpc.DialOption,
  125. targetServers []*master_pb.DataNodeInfo, allocated []uint32,
  126. volumeId needle.VolumeId, collection string, existingLocation wdclient.Location) (actuallyCopied []uint32, err error) {
  127. // parallelize
  128. shardIdChan := make(chan []uint32, len(targetServers))
  129. var wg sync.WaitGroup
  130. startFromShardId := uint32(0)
  131. for i, server := range targetServers {
  132. if allocated[i] <= 0 {
  133. continue
  134. }
  135. wg.Add(1)
  136. go func(server *master_pb.DataNodeInfo, startFromShardId uint32, shardCount uint32) {
  137. defer wg.Done()
  138. copiedShardIds, copyErr := oneServerCopyEcShardsFromSource(ctx, grpcDialOption, server,
  139. startFromShardId, shardCount, volumeId, collection, existingLocation)
  140. if copyErr != nil {
  141. err = copyErr
  142. } else {
  143. shardIdChan <- copiedShardIds
  144. }
  145. }(server, startFromShardId, allocated[i])
  146. startFromShardId += allocated[i]
  147. }
  148. wg.Wait()
  149. close(shardIdChan)
  150. if err != nil {
  151. return nil, err
  152. }
  153. for shardIds := range shardIdChan {
  154. actuallyCopied = append(actuallyCopied, shardIds...)
  155. }
  156. return
  157. }
  158. func oneServerCopyEcShardsFromSource(ctx context.Context, grpcDialOption grpc.DialOption,
  159. targetServer *master_pb.DataNodeInfo, startFromShardId uint32, shardCount uint32,
  160. volumeId needle.VolumeId, collection string, existingLocation wdclient.Location) (copiedShardIds []uint32, err error) {
  161. var shardIdsToCopy []uint32
  162. for shardId := startFromShardId; shardId < startFromShardId+shardCount; shardId++ {
  163. fmt.Printf("allocate %d.%d %s => %s\n", volumeId, shardId, existingLocation.Url, targetServer.Id)
  164. shardIdsToCopy = append(shardIdsToCopy, shardId)
  165. }
  166. err = operation.WithVolumeServerClient(targetServer.Id, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
  167. if targetServer.Id != existingLocation.Url {
  168. _, copyErr := volumeServerClient.VolumeEcShardsCopy(ctx, &volume_server_pb.VolumeEcShardsCopyRequest{
  169. VolumeId: uint32(volumeId),
  170. Collection: collection,
  171. ShardIds: shardIdsToCopy,
  172. SourceDataNode: existingLocation.Url,
  173. })
  174. if copyErr != nil {
  175. return copyErr
  176. }
  177. }
  178. _, mountErr := volumeServerClient.VolumeEcShardsMount(ctx, &volume_server_pb.VolumeEcShardsMountRequest{
  179. VolumeId: uint32(volumeId),
  180. Collection: collection,
  181. ShardIds: shardIdsToCopy,
  182. })
  183. if mountErr != nil {
  184. return mountErr
  185. }
  186. if targetServer.Id != existingLocation.Url {
  187. copiedShardIds = shardIdsToCopy
  188. glog.V(0).Infof("%s ec volume %d deletes shards %+v", existingLocation.Url, volumeId, copiedShardIds)
  189. }
  190. return nil
  191. })
  192. if err != nil {
  193. return
  194. }
  195. return
  196. }
  197. func sourceServerDeleteEcShards(ctx context.Context, grpcDialOption grpc.DialOption,
  198. volumeId needle.VolumeId, sourceLocation wdclient.Location, toBeDeletedShardIds []uint32) error {
  199. shouldDeleteEcx := len(toBeDeletedShardIds) == erasure_coding.TotalShardsCount
  200. return operation.WithVolumeServerClient(sourceLocation.Url, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
  201. _, deleteErr := volumeServerClient.VolumeEcShardsDelete(ctx, &volume_server_pb.VolumeEcShardsDeleteRequest{
  202. VolumeId: uint32(volumeId),
  203. ShardIds: toBeDeletedShardIds,
  204. ShouldDeleteEcx: shouldDeleteEcx,
  205. })
  206. return deleteErr
  207. })
  208. }
  209. func balancedEcDistribution(servers []*master_pb.DataNodeInfo) (allocated []uint32) {
  210. freeSlots := make([]uint32, len(servers))
  211. allocated = make([]uint32, len(servers))
  212. for i, server := range servers {
  213. freeSlots[i] = countFreeShardSlots(server)
  214. }
  215. allocatedCount := 0
  216. for allocatedCount < erasure_coding.TotalShardsCount {
  217. for i, _ := range servers {
  218. if freeSlots[i]-allocated[i] > 0 {
  219. allocated[i] += 1
  220. allocatedCount += 1
  221. }
  222. if allocatedCount >= erasure_coding.TotalShardsCount {
  223. break
  224. }
  225. }
  226. }
  227. return allocated
  228. }
  229. func eachDataNode(topo *master_pb.TopologyInfo, fn func(*master_pb.DataNodeInfo)) {
  230. for _, dc := range topo.DataCenterInfos {
  231. for _, rack := range dc.RackInfos {
  232. for _, dn := range rack.DataNodeInfos {
  233. fn(dn)
  234. }
  235. }
  236. }
  237. }
  238. func countShards(ecShardInfos []*master_pb.VolumeEcShardInformationMessage) (count uint32) {
  239. for _, ecShardInfo := range ecShardInfos {
  240. shardBits := erasure_coding.ShardBits(ecShardInfo.EcIndexBits)
  241. count += uint32(shardBits.ShardIdCount())
  242. }
  243. return
  244. }
  245. func countFreeShardSlots(dn *master_pb.DataNodeInfo) (count uint32) {
  246. return uint32(dn.FreeVolumeCount)*10 - countShards(dn.EcShardInfos)
  247. }