You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

379 lines
11 KiB

  1. package shell
  2. import (
  3. "context"
  4. "flag"
  5. "fmt"
  6. "io"
  7. "sort"
  8. "sync"
  9. "github.com/chrislusf/seaweedfs/weed/glog"
  10. "github.com/chrislusf/seaweedfs/weed/operation"
  11. "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
  12. "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
  13. "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
  14. "github.com/chrislusf/seaweedfs/weed/storage/needle"
  15. "github.com/chrislusf/seaweedfs/weed/wdclient"
  16. "google.golang.org/grpc"
  17. )
  18. func init() {
  19. commands = append(commands, &commandEcEncode{})
  20. }
  21. type commandEcEncode struct {
  22. }
  23. func (c *commandEcEncode) Name() string {
  24. return "ec.encode"
  25. }
  26. func (c *commandEcEncode) Help() string {
  27. return `apply erasure coding to a volume
  28. This command will:
  29. 1. freeze one volume
  30. 2. apply erasure coding to the volume
  31. 3. move the encoded shards to multiple volume servers
  32. The erasure coding is 10.4. So ideally you have more than 14 volume servers, and you can afford
  33. to lose 4 volume servers.
  34. If the number of volumes are not high, the worst case is that you only have 4 volume servers,
  35. and the shards are spread as 4,4,3,3, respectively. You can afford to lose one volume server.
  36. If you only have less than 4 volume servers, with erasure coding, at least you can afford to
  37. have 4 corrupted shard files.
  38. `
  39. }
  40. func (c *commandEcEncode) Do(args []string, commandEnv *commandEnv, writer io.Writer) (err error) {
  41. encodeCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
  42. volumeId := encodeCommand.Int("volumeId", 0, "the volume id")
  43. collection := encodeCommand.String("collection", "", "the collection name")
  44. if err = encodeCommand.Parse(args); err != nil {
  45. return nil
  46. }
  47. ctx := context.Background()
  48. vid := needle.VolumeId(*volumeId)
  49. // volumeId is provided
  50. if vid != 0 {
  51. return doEcEncode(ctx, commandEnv, *collection, vid)
  52. }
  53. // apply to all volumes in the collection
  54. volumeIds, err := collectVolumeByCollection(ctx, commandEnv, *collection)
  55. if err != nil {
  56. return err
  57. }
  58. for _, vid := range volumeIds {
  59. if err = doEcEncode(ctx, commandEnv, *collection, vid); err != nil {
  60. return err
  61. }
  62. }
  63. return nil
  64. }
  65. func doEcEncode(ctx context.Context, commandEnv *commandEnv, collection string, vid needle.VolumeId) (err error) {
  66. // find volume location
  67. locations := commandEnv.masterClient.GetLocations(uint32(vid))
  68. if len(locations) == 0 {
  69. return fmt.Errorf("volume %d not found", vid)
  70. }
  71. // generate ec shards
  72. err = generateEcShards(ctx, commandEnv.option.GrpcDialOption, needle.VolumeId(vid), collection, locations[0].Url)
  73. if err != nil {
  74. return fmt.Errorf("generate ec shards for volume %d on %s: %v", vid, locations[0].Url, err)
  75. }
  76. // balance the ec shards to current cluster
  77. err = balanceEcShards(ctx, commandEnv, vid, collection, locations)
  78. if err != nil {
  79. return fmt.Errorf("balance ec shards for volume %d on %s: %v", vid, locations[0].Url, err)
  80. }
  81. return nil
  82. }
  83. func generateEcShards(ctx context.Context, grpcDialOption grpc.DialOption, volumeId needle.VolumeId, collection string, sourceVolumeServer string) error {
  84. err := operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
  85. _, genErr := volumeServerClient.VolumeEcShardsGenerate(ctx, &volume_server_pb.VolumeEcShardsGenerateRequest{
  86. VolumeId: uint32(volumeId),
  87. Collection: collection,
  88. })
  89. return genErr
  90. })
  91. return err
  92. }
  93. func balanceEcShards(ctx context.Context, commandEnv *commandEnv, volumeId needle.VolumeId, collection string, existingLocations []wdclient.Location) (err error) {
  94. allEcNodes, totalFreeEcSlots, err := collectEcNodes(ctx, commandEnv)
  95. if err != nil {
  96. return err
  97. }
  98. if totalFreeEcSlots < erasure_coding.TotalShardsCount {
  99. return fmt.Errorf("not enough free ec shard slots. only %d left", totalFreeEcSlots)
  100. }
  101. allocatedDataNodes := allEcNodes
  102. if len(allocatedDataNodes) > erasure_coding.TotalShardsCount {
  103. allocatedDataNodes = allocatedDataNodes[:erasure_coding.TotalShardsCount]
  104. }
  105. // calculate how many shards to allocate for these servers
  106. allocated := balancedEcDistribution(allocatedDataNodes)
  107. // ask the data nodes to copy from the source volume server
  108. copiedShardIds, err := parallelCopyEcShardsFromSource(ctx, commandEnv.option.GrpcDialOption, allocatedDataNodes, allocated, volumeId, collection, existingLocations[0])
  109. if err != nil {
  110. return nil
  111. }
  112. // ask the source volume server to clean up copied ec shards
  113. err = sourceServerDeleteEcShards(ctx, commandEnv.option.GrpcDialOption, volumeId, existingLocations[0], copiedShardIds)
  114. if err != nil {
  115. return fmt.Errorf("sourceServerDeleteEcShards %s %d.%v: %v", existingLocations[0], volumeId, copiedShardIds, err)
  116. }
  117. // ask the source volume server to delete the original volume
  118. for _, location := range existingLocations {
  119. err = deleteVolume(ctx, commandEnv.option.GrpcDialOption, volumeId, location.Url)
  120. if err != nil {
  121. return fmt.Errorf("deleteVolume %s volume %d: %v", location.Url, volumeId, err)
  122. }
  123. }
  124. return err
  125. }
  126. func parallelCopyEcShardsFromSource(ctx context.Context, grpcDialOption grpc.DialOption,
  127. targetServers []*EcNode, allocated []int,
  128. volumeId needle.VolumeId, collection string, existingLocation wdclient.Location) (actuallyCopied []uint32, err error) {
  129. // parallelize
  130. shardIdChan := make(chan []uint32, len(targetServers))
  131. var wg sync.WaitGroup
  132. startFromShardId := uint32(0)
  133. for i, server := range targetServers {
  134. if allocated[i] <= 0 {
  135. continue
  136. }
  137. wg.Add(1)
  138. go func(server *EcNode, startFromShardId uint32, shardCount int) {
  139. defer wg.Done()
  140. copiedShardIds, copyErr := oneServerCopyEcShardsFromSource(ctx, grpcDialOption, server,
  141. startFromShardId, shardCount, volumeId, collection, existingLocation)
  142. if copyErr != nil {
  143. err = copyErr
  144. } else {
  145. shardIdChan <- copiedShardIds
  146. server.freeEcSlot -= len(copiedShardIds)
  147. }
  148. }(server, startFromShardId, allocated[i])
  149. startFromShardId += uint32(allocated[i])
  150. }
  151. wg.Wait()
  152. close(shardIdChan)
  153. if err != nil {
  154. return nil, err
  155. }
  156. for shardIds := range shardIdChan {
  157. actuallyCopied = append(actuallyCopied, shardIds...)
  158. }
  159. return
  160. }
  161. func oneServerCopyEcShardsFromSource(ctx context.Context, grpcDialOption grpc.DialOption,
  162. targetServer *EcNode, startFromShardId uint32, shardCount int,
  163. volumeId needle.VolumeId, collection string, existingLocation wdclient.Location) (copiedShardIds []uint32, err error) {
  164. var shardIdsToCopy []uint32
  165. for shardId := startFromShardId; shardId < startFromShardId+uint32(shardCount); shardId++ {
  166. fmt.Printf("allocate %d.%d %s => %s\n", volumeId, shardId, existingLocation.Url, targetServer.info.Id)
  167. shardIdsToCopy = append(shardIdsToCopy, shardId)
  168. }
  169. err = operation.WithVolumeServerClient(targetServer.info.Id, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
  170. if targetServer.info.Id != existingLocation.Url {
  171. _, copyErr := volumeServerClient.VolumeEcShardsCopy(ctx, &volume_server_pb.VolumeEcShardsCopyRequest{
  172. VolumeId: uint32(volumeId),
  173. Collection: collection,
  174. ShardIds: shardIdsToCopy,
  175. SourceDataNode: existingLocation.Url,
  176. })
  177. if copyErr != nil {
  178. return copyErr
  179. }
  180. }
  181. _, mountErr := volumeServerClient.VolumeEcShardsMount(ctx, &volume_server_pb.VolumeEcShardsMountRequest{
  182. VolumeId: uint32(volumeId),
  183. Collection: collection,
  184. ShardIds: shardIdsToCopy,
  185. })
  186. if mountErr != nil {
  187. return mountErr
  188. }
  189. if targetServer.info.Id != existingLocation.Url {
  190. copiedShardIds = shardIdsToCopy
  191. glog.V(0).Infof("%s ec volume %d deletes shards %+v", existingLocation.Url, volumeId, copiedShardIds)
  192. }
  193. return nil
  194. })
  195. if err != nil {
  196. return
  197. }
  198. return
  199. }
  200. func sourceServerDeleteEcShards(ctx context.Context, grpcDialOption grpc.DialOption,
  201. volumeId needle.VolumeId, sourceLocation wdclient.Location, toBeDeletedShardIds []uint32) error {
  202. shouldDeleteEcx := len(toBeDeletedShardIds) == erasure_coding.TotalShardsCount
  203. return operation.WithVolumeServerClient(sourceLocation.Url, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
  204. _, deleteErr := volumeServerClient.VolumeEcShardsDelete(ctx, &volume_server_pb.VolumeEcShardsDeleteRequest{
  205. VolumeId: uint32(volumeId),
  206. ShardIds: toBeDeletedShardIds,
  207. ShouldDeleteEcx: shouldDeleteEcx,
  208. })
  209. return deleteErr
  210. })
  211. }
  212. func balancedEcDistribution(servers []*EcNode) (allocated []int) {
  213. freeSlots := make([]int, len(servers))
  214. allocated = make([]int, len(servers))
  215. for i, server := range servers {
  216. freeSlots[i] = countFreeShardSlots(server.info)
  217. }
  218. allocatedCount := 0
  219. for allocatedCount < erasure_coding.TotalShardsCount {
  220. for i, _ := range servers {
  221. if freeSlots[i]-allocated[i] > 0 {
  222. allocated[i] += 1
  223. allocatedCount += 1
  224. }
  225. if allocatedCount >= erasure_coding.TotalShardsCount {
  226. break
  227. }
  228. }
  229. }
  230. return allocated
  231. }
  232. func eachDataNode(topo *master_pb.TopologyInfo, fn func(*master_pb.DataNodeInfo)) {
  233. for _, dc := range topo.DataCenterInfos {
  234. for _, rack := range dc.RackInfos {
  235. for _, dn := range rack.DataNodeInfos {
  236. fn(dn)
  237. }
  238. }
  239. }
  240. }
  241. func countShards(ecShardInfos []*master_pb.VolumeEcShardInformationMessage) (count int) {
  242. for _, ecShardInfo := range ecShardInfos {
  243. shardBits := erasure_coding.ShardBits(ecShardInfo.EcIndexBits)
  244. count += shardBits.ShardIdCount()
  245. }
  246. return
  247. }
  248. func countFreeShardSlots(dn *master_pb.DataNodeInfo) (count int) {
  249. return int(dn.FreeVolumeCount)*10 - countShards(dn.EcShardInfos)
  250. }
  251. type EcNode struct {
  252. info *master_pb.DataNodeInfo
  253. freeEcSlot int
  254. }
  255. func sortEcNodes(ecNodes []*EcNode) {
  256. sort.Slice(ecNodes, func(i, j int) bool {
  257. return ecNodes[i].freeEcSlot > ecNodes[j].freeEcSlot
  258. })
  259. }
  260. func collectEcNodes(ctx context.Context, commandEnv *commandEnv) (ecNodes []*EcNode, totalFreeEcSlots int, err error) {
  261. // list all possible locations
  262. var resp *master_pb.VolumeListResponse
  263. err = commandEnv.masterClient.WithClient(ctx, func(client master_pb.SeaweedClient) error {
  264. resp, err = client.VolumeList(ctx, &master_pb.VolumeListRequest{})
  265. return err
  266. })
  267. if err != nil {
  268. return nil, 0, err
  269. }
  270. // find out all volume servers with one slot left.
  271. eachDataNode(resp.TopologyInfo, func(dn *master_pb.DataNodeInfo) {
  272. if freeEcSlots := countFreeShardSlots(dn); freeEcSlots > 0 {
  273. ecNodes = append(ecNodes, &EcNode{
  274. info: dn,
  275. freeEcSlot: int(freeEcSlots),
  276. })
  277. totalFreeEcSlots += freeEcSlots
  278. }
  279. })
  280. sortEcNodes(ecNodes)
  281. return
  282. }
  283. func collectVolumeByCollection(ctx context.Context, commandEnv *commandEnv, selectedCollection string) (vids []needle.VolumeId, err error) {
  284. var resp *master_pb.VolumeListResponse
  285. err = commandEnv.masterClient.WithClient(ctx, func(client master_pb.SeaweedClient) error {
  286. resp, err = client.VolumeList(ctx, &master_pb.VolumeListRequest{})
  287. return err
  288. })
  289. if err != nil {
  290. return
  291. }
  292. vidMap := make(map[uint32]bool)
  293. for _, dc := range resp.TopologyInfo.DataCenterInfos {
  294. for _, r := range dc.RackInfos {
  295. for _, dn := range r.DataNodeInfos {
  296. for _, v := range dn.VolumeInfos {
  297. if v.Collection == selectedCollection {
  298. vidMap[v.Id] = true
  299. }
  300. }
  301. }
  302. }
  303. }
  304. for vid, _ := range vidMap {
  305. vids = append(vids, needle.VolumeId(vid))
  306. }
  307. return
  308. }