You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

291 lines
9.5 KiB

3 years ago
3 years ago
4 years ago
3 years ago
3 years ago
3 years ago
4 years ago
  1. package shell
  2. import (
  3. "context"
  4. "flag"
  5. "fmt"
  6. "github.com/seaweedfs/seaweedfs/weed/glog"
  7. "github.com/seaweedfs/seaweedfs/weed/pb"
  8. "github.com/seaweedfs/seaweedfs/weed/storage/types"
  9. "io"
  10. "time"
  11. "google.golang.org/grpc"
  12. "github.com/seaweedfs/seaweedfs/weed/operation"
  13. "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
  14. "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
  15. "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
  16. "github.com/seaweedfs/seaweedfs/weed/storage/needle"
  17. )
  18. func init() {
  19. Commands = append(Commands, &commandEcDecode{})
  20. }
  21. type commandEcDecode struct {
  22. }
  23. func (c *commandEcDecode) Name() string {
  24. return "ec.decode"
  25. }
  26. func (c *commandEcDecode) Help() string {
  27. return `decode a erasure coded volume into a normal volume
  28. ec.decode [-collection=""] [-volumeId=<volume_id>]
  29. `
  30. }
  31. func (c *commandEcDecode) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
  32. encodeCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
  33. volumeId := encodeCommand.Int("volumeId", 0, "the volume id")
  34. collection := encodeCommand.String("collection", "", "the collection name")
  35. forceChanges := encodeCommand.Bool("force", false, "force the encoding even if the cluster has less than recommended 4 nodes")
  36. if err = encodeCommand.Parse(args); err != nil {
  37. return nil
  38. }
  39. infoAboutSimulationMode(writer, *forceChanges, "-force")
  40. if err = commandEnv.confirmIsLocked(args); err != nil {
  41. return
  42. }
  43. vid := needle.VolumeId(*volumeId)
  44. // collect topology information
  45. topologyInfo, _, err := collectTopologyInfo(commandEnv, 0)
  46. if err != nil {
  47. return err
  48. }
  49. if !*forceChanges {
  50. var nodeCount int
  51. eachDataNode(topologyInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) {
  52. nodeCount++
  53. })
  54. if nodeCount < erasure_coding.ParityShardsCount {
  55. glog.V(0).Infof("skip erasure coding with %d nodes, less than recommended %d nodes", nodeCount, erasure_coding.ParityShardsCount)
  56. return nil
  57. }
  58. }
  59. // volumeId is provided
  60. if vid != 0 {
  61. return doEcDecode(commandEnv, topologyInfo, *collection, vid)
  62. }
  63. // apply to all volumes in the collection
  64. volumeIds := collectEcShardIds(topologyInfo, *collection)
  65. fmt.Printf("ec encode volumes: %v\n", volumeIds)
  66. for _, vid := range volumeIds {
  67. if err = doEcDecode(commandEnv, topologyInfo, *collection, vid); err != nil {
  68. return err
  69. }
  70. }
  71. return nil
  72. }
  73. func doEcDecode(commandEnv *CommandEnv, topoInfo *master_pb.TopologyInfo, collection string, vid needle.VolumeId) (err error) {
  74. // find volume location
  75. nodeToEcIndexBits := collectEcNodeShardBits(topoInfo, vid)
  76. fmt.Printf("ec volume %d shard locations: %+v\n", vid, nodeToEcIndexBits)
  77. // collect ec shards to the server with most space
  78. targetNodeLocation, err := collectEcShards(commandEnv, nodeToEcIndexBits, collection, vid)
  79. if err != nil {
  80. return fmt.Errorf("collectEcShards for volume %d: %v", vid, err)
  81. }
  82. // generate a normal volume
  83. err = generateNormalVolume(commandEnv.option.GrpcDialOption, vid, collection, targetNodeLocation)
  84. if err != nil {
  85. return fmt.Errorf("generate normal volume %d on %s: %v", vid, targetNodeLocation, err)
  86. }
  87. // delete the previous ec shards
  88. err = mountVolumeAndDeleteEcShards(commandEnv.option.GrpcDialOption, collection, targetNodeLocation, nodeToEcIndexBits, vid)
  89. if err != nil {
  90. return fmt.Errorf("delete ec shards for volume %d: %v", vid, err)
  91. }
  92. return nil
  93. }
  94. func mountVolumeAndDeleteEcShards(grpcDialOption grpc.DialOption, collection string, targetNodeLocation pb.ServerAddress, nodeToEcIndexBits map[pb.ServerAddress]erasure_coding.ShardBits, vid needle.VolumeId) error {
  95. // mount volume
  96. if err := operation.WithVolumeServerClient(false, targetNodeLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
  97. _, mountErr := volumeServerClient.VolumeMount(context.Background(), &volume_server_pb.VolumeMountRequest{
  98. VolumeId: uint32(vid),
  99. })
  100. return mountErr
  101. }); err != nil {
  102. return fmt.Errorf("mountVolumeAndDeleteEcShards mount volume %d on %s: %v", vid, targetNodeLocation, err)
  103. }
  104. // unmount ec shards
  105. for location, ecIndexBits := range nodeToEcIndexBits {
  106. fmt.Printf("unmount ec volume %d on %s has shards: %+v\n", vid, location, ecIndexBits.ShardIds())
  107. err := unmountEcShards(grpcDialOption, vid, location, ecIndexBits.ToUint32Slice())
  108. if err != nil {
  109. return fmt.Errorf("mountVolumeAndDeleteEcShards unmount ec volume %d on %s: %v", vid, location, err)
  110. }
  111. }
  112. // delete ec shards
  113. for location, ecIndexBits := range nodeToEcIndexBits {
  114. fmt.Printf("delete ec volume %d on %s has shards: %+v\n", vid, location, ecIndexBits.ShardIds())
  115. err := sourceServerDeleteEcShards(grpcDialOption, collection, vid, location, ecIndexBits.ToUint32Slice())
  116. if err != nil {
  117. return fmt.Errorf("mountVolumeAndDeleteEcShards delete ec volume %d on %s: %v", vid, location, err)
  118. }
  119. }
  120. return nil
  121. }
  122. func generateNormalVolume(grpcDialOption grpc.DialOption, vid needle.VolumeId, collection string, sourceVolumeServer pb.ServerAddress) error {
  123. fmt.Printf("generateNormalVolume from ec volume %d on %s\n", vid, sourceVolumeServer)
  124. err := operation.WithVolumeServerClient(false, sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
  125. _, genErr := volumeServerClient.VolumeEcShardsToVolume(context.Background(), &volume_server_pb.VolumeEcShardsToVolumeRequest{
  126. VolumeId: uint32(vid),
  127. Collection: collection,
  128. })
  129. return genErr
  130. })
  131. return err
  132. }
  133. func collectEcShards(commandEnv *CommandEnv, nodeToEcIndexBits map[pb.ServerAddress]erasure_coding.ShardBits, collection string, vid needle.VolumeId) (targetNodeLocation pb.ServerAddress, err error) {
  134. maxShardCount := 0
  135. var exisitngEcIndexBits erasure_coding.ShardBits
  136. for loc, ecIndexBits := range nodeToEcIndexBits {
  137. toBeCopiedShardCount := ecIndexBits.MinusParityShards().ShardIdCount()
  138. if toBeCopiedShardCount > maxShardCount {
  139. maxShardCount = toBeCopiedShardCount
  140. targetNodeLocation = loc
  141. exisitngEcIndexBits = ecIndexBits
  142. }
  143. }
  144. fmt.Printf("collectEcShards: ec volume %d collect shards to %s from: %+v\n", vid, targetNodeLocation, nodeToEcIndexBits)
  145. var copiedEcIndexBits erasure_coding.ShardBits
  146. for loc, ecIndexBits := range nodeToEcIndexBits {
  147. if loc == targetNodeLocation {
  148. continue
  149. }
  150. needToCopyEcIndexBits := ecIndexBits.Minus(exisitngEcIndexBits).MinusParityShards()
  151. if needToCopyEcIndexBits.ShardIdCount() == 0 {
  152. continue
  153. }
  154. err = operation.WithVolumeServerClient(false, targetNodeLocation, commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
  155. fmt.Printf("copy %d.%v %s => %s\n", vid, needToCopyEcIndexBits.ShardIds(), loc, targetNodeLocation)
  156. _, copyErr := volumeServerClient.VolumeEcShardsCopy(context.Background(), &volume_server_pb.VolumeEcShardsCopyRequest{
  157. VolumeId: uint32(vid),
  158. Collection: collection,
  159. ShardIds: needToCopyEcIndexBits.ToUint32Slice(),
  160. CopyEcxFile: false,
  161. CopyEcjFile: true,
  162. CopyVifFile: true,
  163. SourceDataNode: string(loc),
  164. })
  165. if copyErr != nil {
  166. return fmt.Errorf("copy %d.%v %s => %s : %v\n", vid, needToCopyEcIndexBits.ShardIds(), loc, targetNodeLocation, copyErr)
  167. }
  168. return nil
  169. })
  170. if err != nil {
  171. break
  172. }
  173. copiedEcIndexBits = copiedEcIndexBits.Plus(needToCopyEcIndexBits)
  174. }
  175. nodeToEcIndexBits[targetNodeLocation] = exisitngEcIndexBits.Plus(copiedEcIndexBits)
  176. return targetNodeLocation, err
  177. }
  178. func lookupVolumeIds(commandEnv *CommandEnv, volumeIds []string) (volumeIdLocations []*master_pb.LookupVolumeResponse_VolumeIdLocation, err error) {
  179. var resp *master_pb.LookupVolumeResponse
  180. err = commandEnv.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error {
  181. resp, err = client.LookupVolume(context.Background(), &master_pb.LookupVolumeRequest{VolumeOrFileIds: volumeIds})
  182. return err
  183. })
  184. if err != nil {
  185. return nil, err
  186. }
  187. return resp.VolumeIdLocations, nil
  188. }
  189. func collectTopologyInfo(commandEnv *CommandEnv, delayBeforeCollecting time.Duration) (topoInfo *master_pb.TopologyInfo, volumeSizeLimitMb uint64, err error) {
  190. if delayBeforeCollecting > 0 {
  191. time.Sleep(delayBeforeCollecting)
  192. }
  193. var resp *master_pb.VolumeListResponse
  194. err = commandEnv.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error {
  195. resp, err = client.VolumeList(context.Background(), &master_pb.VolumeListRequest{})
  196. return err
  197. })
  198. if err != nil {
  199. return
  200. }
  201. return resp.TopologyInfo, resp.VolumeSizeLimitMb, nil
  202. }
  203. func collectEcShardIds(topoInfo *master_pb.TopologyInfo, selectedCollection string) (vids []needle.VolumeId) {
  204. vidMap := make(map[uint32]bool)
  205. eachDataNode(topoInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) {
  206. if diskInfo, found := dn.DiskInfos[string(types.HardDriveType)]; found {
  207. for _, v := range diskInfo.EcShardInfos {
  208. if v.Collection == selectedCollection {
  209. vidMap[v.Id] = true
  210. }
  211. }
  212. }
  213. })
  214. for vid := range vidMap {
  215. vids = append(vids, needle.VolumeId(vid))
  216. }
  217. return
  218. }
  219. func collectEcNodeShardBits(topoInfo *master_pb.TopologyInfo, vid needle.VolumeId) map[pb.ServerAddress]erasure_coding.ShardBits {
  220. nodeToEcIndexBits := make(map[pb.ServerAddress]erasure_coding.ShardBits)
  221. eachDataNode(topoInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) {
  222. if diskInfo, found := dn.DiskInfos[string(types.HardDriveType)]; found {
  223. for _, v := range diskInfo.EcShardInfos {
  224. if v.Id == uint32(vid) {
  225. nodeToEcIndexBits[pb.NewServerAddressFromDataNode(dn)] = erasure_coding.ShardBits(v.EcIndexBits)
  226. }
  227. }
  228. }
  229. })
  230. return nodeToEcIndexBits
  231. }