You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

264 lines
8.2 KiB

  1. package shell
  2. import (
  3. "context"
  4. "flag"
  5. "fmt"
  6. "io"
  7. "google.golang.org/grpc"
  8. "github.com/chrislusf/seaweedfs/weed/operation"
  9. "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
  10. "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
  11. "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
  12. "github.com/chrislusf/seaweedfs/weed/storage/needle"
  13. )
  14. func init() {
  15. Commands = append(Commands, &commandEcDecode{})
  16. }
  17. type commandEcDecode struct {
  18. }
  19. func (c *commandEcDecode) Name() string {
  20. return "ec.decode"
  21. }
  22. func (c *commandEcDecode) Help() string {
  23. return `decode a erasure coded volume into a normal volume
  24. ec.decode [-collection=""] [-volumeId=<volume_id>]
  25. `
  26. }
  27. func (c *commandEcDecode) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
  28. encodeCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
  29. volumeId := encodeCommand.Int("volumeId", 0, "the volume id")
  30. collection := encodeCommand.String("collection", "", "the collection name")
  31. if err = encodeCommand.Parse(args); err != nil {
  32. return nil
  33. }
  34. vid := needle.VolumeId(*volumeId)
  35. // collect topology information
  36. topologyInfo, err := collectTopologyInfo(commandEnv)
  37. if err != nil {
  38. return err
  39. }
  40. // volumeId is provided
  41. if vid != 0 {
  42. return doEcDecode(commandEnv, topologyInfo, *collection, vid)
  43. }
  44. // apply to all volumes in the collection
  45. volumeIds := collectEcShardIds(topologyInfo, *collection)
  46. fmt.Printf("ec encode volumes: %v\n", volumeIds)
  47. for _, vid := range volumeIds {
  48. if err = doEcDecode(commandEnv, topologyInfo, *collection, vid); err != nil {
  49. return err
  50. }
  51. }
  52. return nil
  53. }
  54. func doEcDecode(commandEnv *CommandEnv, topoInfo *master_pb.TopologyInfo, collection string, vid needle.VolumeId) (err error) {
  55. // find volume location
  56. nodeToEcIndexBits := collectEcNodeShardBits(topoInfo, vid)
  57. fmt.Printf("ec volume %d shard locations: %+v\n", vid, nodeToEcIndexBits)
  58. // collect ec shards to the server with most space
  59. targetNodeLocation, err := collectEcShards(commandEnv, nodeToEcIndexBits, collection, vid)
  60. if err != nil {
  61. return fmt.Errorf("collectEcShards for volume %d: %v", vid, err)
  62. }
  63. // generate a normal volume
  64. err = generateNormalVolume(commandEnv.option.GrpcDialOption, needle.VolumeId(vid), collection, targetNodeLocation)
  65. if err != nil {
  66. return fmt.Errorf("generate normal volume %d on %s: %v", vid, targetNodeLocation, err)
  67. }
  68. // delete the previous ec shards
  69. err = mountVolumeAndDeleteEcShards(commandEnv.option.GrpcDialOption, collection, targetNodeLocation, nodeToEcIndexBits, vid)
  70. if err != nil {
  71. return fmt.Errorf("delete ec shards for volume %d: %v", vid, err)
  72. }
  73. return nil
  74. }
  75. func mountVolumeAndDeleteEcShards(grpcDialOption grpc.DialOption, collection, targetNodeLocation string, nodeToEcIndexBits map[string]erasure_coding.ShardBits, vid needle.VolumeId) error {
  76. // mount volume
  77. if err := operation.WithVolumeServerClient(targetNodeLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
  78. _, mountErr := volumeServerClient.VolumeMount(context.Background(), &volume_server_pb.VolumeMountRequest{
  79. VolumeId: uint32(vid),
  80. })
  81. return mountErr
  82. }); err != nil {
  83. return fmt.Errorf("mountVolumeAndDeleteEcShards mount volume %d on %s: %v", vid, targetNodeLocation, err)
  84. }
  85. // unmount ec shards
  86. for location, ecIndexBits := range nodeToEcIndexBits {
  87. fmt.Printf("unmount ec volume %d on %s has shards: %+v\n", vid, location, ecIndexBits.ShardIds())
  88. err := unmountEcShards(grpcDialOption, vid, location, ecIndexBits.ToUint32Slice())
  89. if err != nil {
  90. return fmt.Errorf("mountVolumeAndDeleteEcShards unmount ec volume %d on %s: %v", vid, location, err)
  91. }
  92. }
  93. // delete ec shards
  94. for location, ecIndexBits := range nodeToEcIndexBits {
  95. fmt.Printf("delete ec volume %d on %s has shards: %+v\n", vid, location, ecIndexBits.ShardIds())
  96. err := sourceServerDeleteEcShards(grpcDialOption, collection, vid, location, ecIndexBits.ToUint32Slice())
  97. if err != nil {
  98. return fmt.Errorf("mountVolumeAndDeleteEcShards delete ec volume %d on %s: %v", vid, location, err)
  99. }
  100. }
  101. return nil
  102. }
  103. func generateNormalVolume(grpcDialOption grpc.DialOption, vid needle.VolumeId, collection string, sourceVolumeServer string) error {
  104. fmt.Printf("generateNormalVolume from ec volume %d on %s\n", vid, sourceVolumeServer)
  105. err := operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
  106. _, genErr := volumeServerClient.VolumeEcShardsToVolume(context.Background(), &volume_server_pb.VolumeEcShardsToVolumeRequest{
  107. VolumeId: uint32(vid),
  108. Collection: collection,
  109. })
  110. return genErr
  111. })
  112. return err
  113. }
  114. func collectEcShards(commandEnv *CommandEnv, nodeToEcIndexBits map[string]erasure_coding.ShardBits, collection string, vid needle.VolumeId) (targetNodeLocation string, err error) {
  115. maxShardCount := 0
  116. var exisitngEcIndexBits erasure_coding.ShardBits
  117. for loc, ecIndexBits := range nodeToEcIndexBits {
  118. toBeCopiedShardCount := ecIndexBits.MinusParityShards().ShardIdCount()
  119. if toBeCopiedShardCount > maxShardCount {
  120. maxShardCount = toBeCopiedShardCount
  121. targetNodeLocation = loc
  122. exisitngEcIndexBits = ecIndexBits
  123. }
  124. }
  125. fmt.Printf("collectEcShards: ec volume %d collect shards to %s from: %+v\n", vid, targetNodeLocation, nodeToEcIndexBits)
  126. var copiedEcIndexBits erasure_coding.ShardBits
  127. for loc, ecIndexBits := range nodeToEcIndexBits {
  128. if loc == targetNodeLocation {
  129. continue
  130. }
  131. needToCopyEcIndexBits := ecIndexBits.Minus(exisitngEcIndexBits).MinusParityShards()
  132. if needToCopyEcIndexBits.ShardIdCount() == 0 {
  133. continue
  134. }
  135. err = operation.WithVolumeServerClient(targetNodeLocation, commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
  136. fmt.Printf("copy %d.%v %s => %s\n", vid, needToCopyEcIndexBits.ShardIds(), loc, targetNodeLocation)
  137. _, copyErr := volumeServerClient.VolumeEcShardsCopy(context.Background(), &volume_server_pb.VolumeEcShardsCopyRequest{
  138. VolumeId: uint32(vid),
  139. Collection: collection,
  140. ShardIds: needToCopyEcIndexBits.ToUint32Slice(),
  141. CopyEcxFile: false,
  142. CopyEcjFile: true,
  143. CopyVifFile: true,
  144. SourceDataNode: loc,
  145. })
  146. if copyErr != nil {
  147. return fmt.Errorf("copy %d.%v %s => %s : %v\n", vid, needToCopyEcIndexBits.ShardIds(), loc, targetNodeLocation, copyErr)
  148. }
  149. return nil
  150. })
  151. if err != nil {
  152. break
  153. }
  154. copiedEcIndexBits = copiedEcIndexBits.Plus(needToCopyEcIndexBits)
  155. }
  156. nodeToEcIndexBits[targetNodeLocation] = exisitngEcIndexBits.Plus(copiedEcIndexBits)
  157. return targetNodeLocation, err
  158. }
  159. func collectTopologyInfo(commandEnv *CommandEnv) (topoInfo *master_pb.TopologyInfo, err error) {
  160. var resp *master_pb.VolumeListResponse
  161. err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error {
  162. resp, err = client.VolumeList(context.Background(), &master_pb.VolumeListRequest{})
  163. return err
  164. })
  165. if err != nil {
  166. return
  167. }
  168. return resp.TopologyInfo, nil
  169. }
  170. func collectEcShardInfos(topoInfo *master_pb.TopologyInfo, selectedCollection string, vid needle.VolumeId) (ecShardInfos []*master_pb.VolumeEcShardInformationMessage) {
  171. eachDataNode(topoInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) {
  172. for _, v := range dn.EcShardInfos {
  173. if v.Collection == selectedCollection && v.Id == uint32(vid) {
  174. ecShardInfos = append(ecShardInfos, v)
  175. }
  176. }
  177. })
  178. return
  179. }
  180. func collectEcShardIds(topoInfo *master_pb.TopologyInfo, selectedCollection string) (vids []needle.VolumeId) {
  181. vidMap := make(map[uint32]bool)
  182. eachDataNode(topoInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) {
  183. for _, v := range dn.EcShardInfos {
  184. if v.Collection == selectedCollection {
  185. vidMap[v.Id] = true
  186. }
  187. }
  188. })
  189. for vid := range vidMap {
  190. vids = append(vids, needle.VolumeId(vid))
  191. }
  192. return
  193. }
  194. func collectEcNodeShardBits(topoInfo *master_pb.TopologyInfo, vid needle.VolumeId) map[string]erasure_coding.ShardBits {
  195. nodeToEcIndexBits := make(map[string]erasure_coding.ShardBits)
  196. eachDataNode(topoInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) {
  197. for _, v := range dn.EcShardInfos {
  198. if v.Id == uint32(vid) {
  199. nodeToEcIndexBits[dn.Id] = erasure_coding.ShardBits(v.EcIndexBits)
  200. }
  201. }
  202. })
  203. return nodeToEcIndexBits
  204. }