You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

265 lines
8.3 KiB

  1. package shell
  2. import (
  3. "context"
  4. "flag"
  5. "fmt"
  6. "io"
  7. "google.golang.org/grpc"
  8. "github.com/chrislusf/seaweedfs/weed/operation"
  9. "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
  10. "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
  11. "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
  12. "github.com/chrislusf/seaweedfs/weed/storage/needle"
  13. )
  14. func init() {
  15. Commands = append(Commands, &commandEcDecode{})
  16. }
  17. type commandEcDecode struct {
  18. }
  19. func (c *commandEcDecode) Name() string {
  20. return "ec.decode"
  21. }
  22. func (c *commandEcDecode) Help() string {
  23. return `decode a erasure coded volume into a normal volume
  24. ec.decode [-collection=""] [-volumeId=<volume_id>]
  25. `
  26. }
  27. func (c *commandEcDecode) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
  28. encodeCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
  29. volumeId := encodeCommand.Int("volumeId", 0, "the volume id")
  30. collection := encodeCommand.String("collection", "", "the collection name")
  31. if err = encodeCommand.Parse(args); err != nil {
  32. return nil
  33. }
  34. ctx := context.Background()
  35. vid := needle.VolumeId(*volumeId)
  36. // collect topology information
  37. topologyInfo, err := collectTopologyInfo(ctx, commandEnv)
  38. if err != nil {
  39. return err
  40. }
  41. // volumeId is provided
  42. if vid != 0 {
  43. return doEcDecode(ctx, commandEnv, topologyInfo, *collection, vid)
  44. }
  45. // apply to all volumes in the collection
  46. volumeIds := collectEcShardIds(topologyInfo, *collection)
  47. fmt.Printf("ec encode volumes: %v\n", volumeIds)
  48. for _, vid := range volumeIds {
  49. if err = doEcDecode(ctx, commandEnv, topologyInfo, *collection, vid); err != nil {
  50. return err
  51. }
  52. }
  53. return nil
  54. }
  55. func doEcDecode(ctx context.Context, commandEnv *CommandEnv, topoInfo *master_pb.TopologyInfo, collection string, vid needle.VolumeId) (err error) {
  56. // find volume location
  57. nodeToEcIndexBits := collectEcNodeShardBits(topoInfo, vid)
  58. fmt.Printf("ec volume %d shard locations: %+v\n", vid, nodeToEcIndexBits)
  59. // collect ec shards to the server with most space
  60. targetNodeLocation, err := collectEcShards(ctx, commandEnv, nodeToEcIndexBits, collection, vid)
  61. if err != nil {
  62. return fmt.Errorf("collectEcShards for volume %d: %v", vid, err)
  63. }
  64. // generate a normal volume
  65. err = generateNormalVolume(ctx, commandEnv.option.GrpcDialOption, needle.VolumeId(vid), collection, targetNodeLocation)
  66. if err != nil {
  67. return fmt.Errorf("generate normal volume %d on %s: %v", vid, targetNodeLocation, err)
  68. }
  69. // delete the previous ec shards
  70. err = mountVolumeAndDeleteEcShards(ctx, commandEnv.option.GrpcDialOption, collection, targetNodeLocation, nodeToEcIndexBits, vid)
  71. if err != nil {
  72. return fmt.Errorf("delete ec shards for volume %d: %v", vid, err)
  73. }
  74. return nil
  75. }
  76. func mountVolumeAndDeleteEcShards(ctx context.Context, grpcDialOption grpc.DialOption, collection, targetNodeLocation string, nodeToEcIndexBits map[string]erasure_coding.ShardBits, vid needle.VolumeId) error {
  77. // mount volume
  78. if err := operation.WithVolumeServerClient(targetNodeLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
  79. _, mountErr := volumeServerClient.VolumeMount(ctx, &volume_server_pb.VolumeMountRequest{
  80. VolumeId: uint32(vid),
  81. })
  82. return mountErr
  83. }); err != nil {
  84. return fmt.Errorf("mountVolumeAndDeleteEcShards mount volume %d on %s: %v", vid, targetNodeLocation, err)
  85. }
  86. // unmount ec shards
  87. for location, ecIndexBits := range nodeToEcIndexBits {
  88. fmt.Printf("unmount ec volume %d on %s has shards: %+v\n", vid, location, ecIndexBits.ShardIds())
  89. err := unmountEcShards(ctx, grpcDialOption, vid, location, ecIndexBits.ToUint32Slice())
  90. if err != nil {
  91. return fmt.Errorf("mountVolumeAndDeleteEcShards unmount ec volume %d on %s: %v", vid, location, err)
  92. }
  93. }
  94. // delete ec shards
  95. for location, ecIndexBits := range nodeToEcIndexBits {
  96. fmt.Printf("delete ec volume %d on %s has shards: %+v\n", vid, location, ecIndexBits.ShardIds())
  97. err := sourceServerDeleteEcShards(ctx, grpcDialOption, collection, vid, location, ecIndexBits.ToUint32Slice())
  98. if err != nil {
  99. return fmt.Errorf("mountVolumeAndDeleteEcShards delete ec volume %d on %s: %v", vid, location, err)
  100. }
  101. }
  102. return nil
  103. }
  104. func generateNormalVolume(ctx context.Context, grpcDialOption grpc.DialOption, vid needle.VolumeId, collection string, sourceVolumeServer string) error {
  105. fmt.Printf("generateNormalVolume from ec volume %d on %s\n", vid, sourceVolumeServer)
  106. err := operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
  107. _, genErr := volumeServerClient.VolumeEcShardsToVolume(ctx, &volume_server_pb.VolumeEcShardsToVolumeRequest{
  108. VolumeId: uint32(vid),
  109. Collection: collection,
  110. })
  111. return genErr
  112. })
  113. return err
  114. }
  115. func collectEcShards(ctx context.Context, commandEnv *CommandEnv, nodeToEcIndexBits map[string]erasure_coding.ShardBits, collection string, vid needle.VolumeId) (targetNodeLocation string, err error) {
  116. maxShardCount := 0
  117. var exisitngEcIndexBits erasure_coding.ShardBits
  118. for loc, ecIndexBits := range nodeToEcIndexBits {
  119. toBeCopiedShardCount := ecIndexBits.MinusParityShards().ShardIdCount()
  120. if toBeCopiedShardCount > maxShardCount {
  121. maxShardCount = toBeCopiedShardCount
  122. targetNodeLocation = loc
  123. exisitngEcIndexBits = ecIndexBits
  124. }
  125. }
  126. fmt.Printf("collectEcShards: ec volume %d collect shards to %s from: %+v\n", vid, targetNodeLocation, nodeToEcIndexBits)
  127. var copiedEcIndexBits erasure_coding.ShardBits
  128. for loc, ecIndexBits := range nodeToEcIndexBits {
  129. if loc == targetNodeLocation {
  130. continue
  131. }
  132. needToCopyEcIndexBits := ecIndexBits.Minus(exisitngEcIndexBits).MinusParityShards()
  133. if needToCopyEcIndexBits.ShardIdCount() == 0 {
  134. continue
  135. }
  136. err = operation.WithVolumeServerClient(targetNodeLocation, commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
  137. fmt.Printf("copy %d.%v %s => %s\n", vid, needToCopyEcIndexBits.ShardIds(), loc, targetNodeLocation)
  138. _, copyErr := volumeServerClient.VolumeEcShardsCopy(ctx, &volume_server_pb.VolumeEcShardsCopyRequest{
  139. VolumeId: uint32(vid),
  140. Collection: collection,
  141. ShardIds: needToCopyEcIndexBits.ToUint32Slice(),
  142. CopyEcxFile: false,
  143. CopyEcjFile: true,
  144. CopyVifFile: true,
  145. SourceDataNode: loc,
  146. })
  147. if copyErr != nil {
  148. return fmt.Errorf("copy %d.%v %s => %s : %v\n", vid, needToCopyEcIndexBits.ShardIds(), loc, targetNodeLocation, copyErr)
  149. }
  150. return nil
  151. })
  152. if err != nil {
  153. break
  154. }
  155. copiedEcIndexBits = copiedEcIndexBits.Plus(needToCopyEcIndexBits)
  156. }
  157. nodeToEcIndexBits[targetNodeLocation] = exisitngEcIndexBits.Plus(copiedEcIndexBits)
  158. return targetNodeLocation, err
  159. }
  160. func collectTopologyInfo(ctx context.Context, commandEnv *CommandEnv) (topoInfo *master_pb.TopologyInfo, err error) {
  161. var resp *master_pb.VolumeListResponse
  162. err = commandEnv.MasterClient.WithClient(ctx, func(client master_pb.SeaweedClient) error {
  163. resp, err = client.VolumeList(ctx, &master_pb.VolumeListRequest{})
  164. return err
  165. })
  166. if err != nil {
  167. return
  168. }
  169. return resp.TopologyInfo, nil
  170. }
  171. func collectEcShardInfos(topoInfo *master_pb.TopologyInfo, selectedCollection string, vid needle.VolumeId) (ecShardInfos []*master_pb.VolumeEcShardInformationMessage) {
  172. eachDataNode(topoInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) {
  173. for _, v := range dn.EcShardInfos {
  174. if v.Collection == selectedCollection && v.Id == uint32(vid) {
  175. ecShardInfos = append(ecShardInfos, v)
  176. }
  177. }
  178. })
  179. return
  180. }
  181. func collectEcShardIds(topoInfo *master_pb.TopologyInfo, selectedCollection string) (vids []needle.VolumeId) {
  182. vidMap := make(map[uint32]bool)
  183. eachDataNode(topoInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) {
  184. for _, v := range dn.EcShardInfos {
  185. if v.Collection == selectedCollection {
  186. vidMap[v.Id] = true
  187. }
  188. }
  189. })
  190. for vid := range vidMap {
  191. vids = append(vids, needle.VolumeId(vid))
  192. }
  193. return
  194. }
  195. func collectEcNodeShardBits(topoInfo *master_pb.TopologyInfo, vid needle.VolumeId) map[string]erasure_coding.ShardBits {
  196. nodeToEcIndexBits := make(map[string]erasure_coding.ShardBits)
  197. eachDataNode(topoInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) {
  198. for _, v := range dn.EcShardInfos {
  199. if v.Id == uint32(vid) {
  200. nodeToEcIndexBits[dn.Id] = erasure_coding.ShardBits(v.EcIndexBits)
  201. }
  202. }
  203. })
  204. return nodeToEcIndexBits
  205. }