You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

263 lines
8.3 KiB

  1. package shell
  2. import (
  3. "context"
  4. "flag"
  5. "fmt"
  6. "io"
  7. "google.golang.org/grpc"
  8. "github.com/chrislusf/seaweedfs/weed/operation"
  9. "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
  10. "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
  11. "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
  12. "github.com/chrislusf/seaweedfs/weed/storage/needle"
  13. )
  14. func init() {
  15. Commands = append(Commands, &commandEcDecode{})
  16. }
  17. type commandEcDecode struct {
  18. }
  19. func (c *commandEcDecode) Name() string {
  20. return "ec.decode"
  21. }
  22. func (c *commandEcDecode) Help() string {
  23. return `decode a erasure coded volume into a normal volume
  24. ec.decode [-collection=""] [-volumeId=<volume_id>]
  25. `
  26. }
  27. func (c *commandEcDecode) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
  28. encodeCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
  29. volumeId := encodeCommand.Int("volumeId", 0, "the volume id")
  30. collection := encodeCommand.String("collection", "", "the collection name")
  31. if err = encodeCommand.Parse(args); err != nil {
  32. return nil
  33. }
  34. ctx := context.Background()
  35. vid := needle.VolumeId(*volumeId)
  36. // collect topology information
  37. topologyInfo, err := collectTopologyInfoForEcDecode(ctx, commandEnv)
  38. if err != nil {
  39. return err
  40. }
  41. // volumeId is provided
  42. if vid != 0 {
  43. return doEcDecode(ctx, commandEnv, topologyInfo, *collection, vid)
  44. }
  45. // apply to all volumes in the collection
  46. volumeIds := collectEcShardIds(topologyInfo, *collection)
  47. fmt.Printf("ec encode volumes: %v\n", volumeIds)
  48. for _, vid := range volumeIds {
  49. if err = doEcDecode(ctx, commandEnv, topologyInfo, *collection, vid); err != nil {
  50. return err
  51. }
  52. }
  53. return nil
  54. }
  55. func doEcDecode(ctx context.Context, commandEnv *CommandEnv, topoInfo *master_pb.TopologyInfo, collection string, vid needle.VolumeId) (err error) {
  56. // find volume location
  57. nodeToEcIndexBits := collectEcNodeShardBits(topoInfo, vid)
  58. fmt.Printf("ec volume %d shard locations: %+v\n", vid, nodeToEcIndexBits)
  59. // collect ec shards to the server with most space
  60. targetNodeLocation, err := collectEcShards(ctx, commandEnv, nodeToEcIndexBits, collection, vid)
  61. if err != nil {
  62. return fmt.Errorf("collectEcShards for volume %d: %v", vid, err)
  63. }
  64. // generate a normal volume
  65. err = generateNormalVolume(ctx, commandEnv.option.GrpcDialOption, needle.VolumeId(vid), collection, targetNodeLocation)
  66. if err != nil {
  67. return fmt.Errorf("generate normal volume %d on %s: %v", vid, targetNodeLocation, err)
  68. }
  69. // delete the previous ec shards
  70. err = mountVolumeAndDeleteEcShards(ctx, commandEnv.option.GrpcDialOption, collection, targetNodeLocation, nodeToEcIndexBits, vid)
  71. if err != nil {
  72. return fmt.Errorf("delete ec shards for volume %d: %v", vid, err)
  73. }
  74. return nil
  75. }
  76. func mountVolumeAndDeleteEcShards(ctx context.Context, grpcDialOption grpc.DialOption, collection, targetNodeLocation string, nodeToEcIndexBits map[string]erasure_coding.ShardBits, vid needle.VolumeId) error {
  77. // mount volume
  78. if err := operation.WithVolumeServerClient(targetNodeLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
  79. _, mountErr := volumeServerClient.VolumeMount(ctx, &volume_server_pb.VolumeMountRequest{
  80. VolumeId: uint32(vid),
  81. })
  82. return mountErr
  83. }); err != nil {
  84. return fmt.Errorf("mountVolumeAndDeleteEcShards mount volume %d on %s: %v", vid, targetNodeLocation, err)
  85. }
  86. // unmount ec shards
  87. for location, ecIndexBits := range nodeToEcIndexBits {
  88. fmt.Printf("unmount ec volume %d on %s has shards: %+v\n", vid, location, ecIndexBits.ShardIds())
  89. err := unmountEcShards(ctx, grpcDialOption, vid, location, ecIndexBits.ToUint32Slice())
  90. if err != nil {
  91. return fmt.Errorf("mountVolumeAndDeleteEcShards unmount ec volume %d on %s: %v", vid, location, err)
  92. }
  93. }
  94. // delete ec shards
  95. for location, ecIndexBits := range nodeToEcIndexBits {
  96. fmt.Printf("delete ec volume %d on %s has shards: %+v\n", vid, location, ecIndexBits.ShardIds())
  97. err := sourceServerDeleteEcShards(ctx, grpcDialOption, collection, vid, location, ecIndexBits.ToUint32Slice())
  98. if err != nil {
  99. return fmt.Errorf("mountVolumeAndDeleteEcShards delete ec volume %d on %s: %v", vid, location, err)
  100. }
  101. }
  102. return nil
  103. }
  104. func generateNormalVolume(ctx context.Context, grpcDialOption grpc.DialOption, vid needle.VolumeId, collection string, sourceVolumeServer string) error {
  105. fmt.Printf("generateNormalVolume from ec volume %d on %s\n", vid, sourceVolumeServer)
  106. err := operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
  107. _, genErr := volumeServerClient.VolumeEcShardsToVolume(ctx, &volume_server_pb.VolumeEcShardsToVolumeRequest{
  108. VolumeId: uint32(vid),
  109. Collection: collection,
  110. })
  111. return genErr
  112. })
  113. return err
  114. }
  115. func collectEcShards(ctx context.Context, commandEnv *CommandEnv, nodeToEcIndexBits map[string]erasure_coding.ShardBits, collection string, vid needle.VolumeId) (targetNodeLocation string, err error) {
  116. maxShardCount := 0
  117. var exisitngEcIndexBits erasure_coding.ShardBits
  118. for loc, ecIndexBits := range nodeToEcIndexBits {
  119. if ecIndexBits.ShardIdCount() > maxShardCount {
  120. maxShardCount = ecIndexBits.ShardIdCount()
  121. targetNodeLocation = loc
  122. exisitngEcIndexBits = ecIndexBits
  123. }
  124. }
  125. fmt.Printf("collectEcShards: ec volume %d collect shards to %s from: %+v\n", vid, targetNodeLocation, nodeToEcIndexBits)
  126. var copiedEcIndexBits erasure_coding.ShardBits
  127. for loc, ecIndexBits := range nodeToEcIndexBits {
  128. if loc == targetNodeLocation {
  129. continue
  130. }
  131. needToCopyEcIndexBits := ecIndexBits.Minus(exisitngEcIndexBits).MinusParityShards()
  132. if needToCopyEcIndexBits.ShardIdCount() == 0 {
  133. continue
  134. }
  135. err = operation.WithVolumeServerClient(targetNodeLocation, commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
  136. fmt.Printf("copy %d.%v %s => %s\n", vid, needToCopyEcIndexBits.ShardIds(), loc, targetNodeLocation)
  137. _, copyErr := volumeServerClient.VolumeEcShardsCopy(ctx, &volume_server_pb.VolumeEcShardsCopyRequest{
  138. VolumeId: uint32(vid),
  139. Collection: collection,
  140. ShardIds: needToCopyEcIndexBits.ToUint32Slice(),
  141. CopyEcxFile: false,
  142. CopyEcjFile: true,
  143. SourceDataNode: loc,
  144. })
  145. if copyErr != nil {
  146. return fmt.Errorf("copy %d.%v %s => %s : %v\n", vid, needToCopyEcIndexBits.ShardIds(), loc, targetNodeLocation, copyErr)
  147. }
  148. return nil
  149. })
  150. if err != nil {
  151. break
  152. }
  153. copiedEcIndexBits = copiedEcIndexBits.Plus(needToCopyEcIndexBits)
  154. }
  155. nodeToEcIndexBits[targetNodeLocation] = exisitngEcIndexBits.Plus(copiedEcIndexBits)
  156. return targetNodeLocation, err
  157. }
  158. func collectTopologyInfoForEcDecode(ctx context.Context, commandEnv *CommandEnv) (topoInfo *master_pb.TopologyInfo, err error) {
  159. var resp *master_pb.VolumeListResponse
  160. err = commandEnv.MasterClient.WithClient(ctx, func(client master_pb.SeaweedClient) error {
  161. resp, err = client.VolumeList(ctx, &master_pb.VolumeListRequest{})
  162. return err
  163. })
  164. if err != nil {
  165. return
  166. }
  167. return resp.TopologyInfo, nil
  168. }
  169. func collectEcShardInfos(topoInfo *master_pb.TopologyInfo, selectedCollection string, vid needle.VolumeId) (ecShardInfos []*master_pb.VolumeEcShardInformationMessage) {
  170. eachDataNode(topoInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) {
  171. for _, v := range dn.EcShardInfos {
  172. if v.Collection == selectedCollection && v.Id == uint32(vid) {
  173. ecShardInfos = append(ecShardInfos, v)
  174. }
  175. }
  176. })
  177. return
  178. }
  179. func collectEcShardIds(topoInfo *master_pb.TopologyInfo, selectedCollection string) (vids []needle.VolumeId) {
  180. vidMap := make(map[uint32]bool)
  181. eachDataNode(topoInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) {
  182. for _, v := range dn.EcShardInfos {
  183. if v.Collection == selectedCollection {
  184. vidMap[v.Id] = true
  185. }
  186. }
  187. })
  188. for vid := range vidMap {
  189. vids = append(vids, needle.VolumeId(vid))
  190. }
  191. return
  192. }
  193. func collectEcNodeShardBits(topoInfo *master_pb.TopologyInfo, vid needle.VolumeId) map[string]erasure_coding.ShardBits {
  194. nodeToEcIndexBits := make(map[string]erasure_coding.ShardBits)
  195. eachDataNode(topoInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) {
  196. for _, v := range dn.EcShardInfos {
  197. if v.Id == uint32(vid) {
  198. nodeToEcIndexBits[dn.Id] = erasure_coding.ShardBits(v.EcIndexBits)
  199. }
  200. }
  201. })
  202. return nodeToEcIndexBits
  203. }