You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

384 lines
11 KiB

  1. package shell
  2. import (
  3. "context"
  4. "flag"
  5. "fmt"
  6. "io"
  7. "sort"
  8. "sync"
  9. "time"
  10. "github.com/chrislusf/seaweedfs/weed/glog"
  11. "github.com/chrislusf/seaweedfs/weed/operation"
  12. "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
  13. "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
  14. "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
  15. "github.com/chrislusf/seaweedfs/weed/storage/needle"
  16. "github.com/chrislusf/seaweedfs/weed/wdclient"
  17. "google.golang.org/grpc"
  18. )
  19. func init() {
  20. commands = append(commands, &commandEcEncode{})
  21. }
  22. type commandEcEncode struct {
  23. }
  24. func (c *commandEcEncode) Name() string {
  25. return "ec.encode"
  26. }
  27. func (c *commandEcEncode) Help() string {
  28. return `apply erasure coding to a volume
  29. This command will:
  30. 1. freeze one volume
  31. 2. apply erasure coding to the volume
  32. 3. move the encoded shards to multiple volume servers
  33. The erasure coding is 10.4. So ideally you have more than 14 volume servers, and you can afford
  34. to lose 4 volume servers.
  35. If the number of volumes are not high, the worst case is that you only have 4 volume servers,
  36. and the shards are spread as 4,4,3,3, respectively. You can afford to lose one volume server.
  37. If you only have less than 4 volume servers, with erasure coding, at least you can afford to
  38. have 4 corrupted shard files.
  39. `
  40. }
  41. func (c *commandEcEncode) Do(args []string, commandEnv *commandEnv, writer io.Writer) (err error) {
  42. encodeCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
  43. volumeId := encodeCommand.Int("volumeId", 0, "the volume id")
  44. collection := encodeCommand.String("collection", "", "the collection name")
  45. quietPeriod := encodeCommand.Duration("quietFor", time.Hour, "select volumes without no writes for this period")
  46. if err = encodeCommand.Parse(args); err != nil {
  47. return nil
  48. }
  49. ctx := context.Background()
  50. vid := needle.VolumeId(*volumeId)
  51. // volumeId is provided
  52. if vid != 0 {
  53. return doEcEncode(ctx, commandEnv, *collection, vid)
  54. }
  55. // apply to all volumes in the collection
  56. volumeIds, err := collectVolumeIdsForEcEncode(ctx, commandEnv, *collection, *quietPeriod)
  57. if err != nil {
  58. return err
  59. }
  60. for _, vid := range volumeIds {
  61. if err = doEcEncode(ctx, commandEnv, *collection, vid); err != nil {
  62. return err
  63. }
  64. }
  65. return nil
  66. }
  67. func doEcEncode(ctx context.Context, commandEnv *commandEnv, collection string, vid needle.VolumeId) (err error) {
  68. // find volume location
  69. locations := commandEnv.masterClient.GetLocations(uint32(vid))
  70. if len(locations) == 0 {
  71. return fmt.Errorf("volume %d not found", vid)
  72. }
  73. // generate ec shards
  74. err = generateEcShards(ctx, commandEnv.option.GrpcDialOption, needle.VolumeId(vid), collection, locations[0].Url)
  75. if err != nil {
  76. return fmt.Errorf("generate ec shards for volume %d on %s: %v", vid, locations[0].Url, err)
  77. }
  78. // balance the ec shards to current cluster
  79. err = balanceEcShards(ctx, commandEnv, vid, collection, locations)
  80. if err != nil {
  81. return fmt.Errorf("balance ec shards for volume %d on %s: %v", vid, locations[0].Url, err)
  82. }
  83. return nil
  84. }
  85. func generateEcShards(ctx context.Context, grpcDialOption grpc.DialOption, volumeId needle.VolumeId, collection string, sourceVolumeServer string) error {
  86. err := operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
  87. _, genErr := volumeServerClient.VolumeEcShardsGenerate(ctx, &volume_server_pb.VolumeEcShardsGenerateRequest{
  88. VolumeId: uint32(volumeId),
  89. Collection: collection,
  90. })
  91. return genErr
  92. })
  93. return err
  94. }
  95. func balanceEcShards(ctx context.Context, commandEnv *commandEnv, volumeId needle.VolumeId, collection string, existingLocations []wdclient.Location) (err error) {
  96. allEcNodes, totalFreeEcSlots, err := collectEcNodes(ctx, commandEnv)
  97. if err != nil {
  98. return err
  99. }
  100. if totalFreeEcSlots < erasure_coding.TotalShardsCount {
  101. return fmt.Errorf("not enough free ec shard slots. only %d left", totalFreeEcSlots)
  102. }
  103. allocatedDataNodes := allEcNodes
  104. if len(allocatedDataNodes) > erasure_coding.TotalShardsCount {
  105. allocatedDataNodes = allocatedDataNodes[:erasure_coding.TotalShardsCount]
  106. }
  107. // calculate how many shards to allocate for these servers
  108. allocated := balancedEcDistribution(allocatedDataNodes)
  109. // ask the data nodes to copy from the source volume server
  110. copiedShardIds, err := parallelCopyEcShardsFromSource(ctx, commandEnv.option.GrpcDialOption, allocatedDataNodes, allocated, volumeId, collection, existingLocations[0])
  111. if err != nil {
  112. return nil
  113. }
  114. // ask the source volume server to clean up copied ec shards
  115. err = sourceServerDeleteEcShards(ctx, commandEnv.option.GrpcDialOption, volumeId, existingLocations[0].Url, copiedShardIds)
  116. if err != nil {
  117. return fmt.Errorf("sourceServerDeleteEcShards %s %d.%v: %v", existingLocations[0], volumeId, copiedShardIds, err)
  118. }
  119. // ask the source volume server to delete the original volume
  120. for _, location := range existingLocations {
  121. err = deleteVolume(ctx, commandEnv.option.GrpcDialOption, volumeId, location.Url)
  122. if err != nil {
  123. return fmt.Errorf("deleteVolume %s volume %d: %v", location.Url, volumeId, err)
  124. }
  125. }
  126. return err
  127. }
  128. func parallelCopyEcShardsFromSource(ctx context.Context, grpcDialOption grpc.DialOption,
  129. targetServers []*EcNode, allocated []int,
  130. volumeId needle.VolumeId, collection string, existingLocation wdclient.Location) (actuallyCopied []uint32, err error) {
  131. // parallelize
  132. shardIdChan := make(chan []uint32, len(targetServers))
  133. var wg sync.WaitGroup
  134. startFromShardId := uint32(0)
  135. for i, server := range targetServers {
  136. if allocated[i] <= 0 {
  137. continue
  138. }
  139. wg.Add(1)
  140. go func(server *EcNode, startFromShardId uint32, shardCount int) {
  141. defer wg.Done()
  142. copiedShardIds, copyErr := oneServerCopyEcShardsFromSource(ctx, grpcDialOption, server,
  143. startFromShardId, shardCount, volumeId, collection, existingLocation.Url)
  144. if copyErr != nil {
  145. err = copyErr
  146. } else {
  147. shardIdChan <- copiedShardIds
  148. server.freeEcSlot -= len(copiedShardIds)
  149. }
  150. }(server, startFromShardId, allocated[i])
  151. startFromShardId += uint32(allocated[i])
  152. }
  153. wg.Wait()
  154. close(shardIdChan)
  155. if err != nil {
  156. return nil, err
  157. }
  158. for shardIds := range shardIdChan {
  159. actuallyCopied = append(actuallyCopied, shardIds...)
  160. }
  161. return
  162. }
  163. func oneServerCopyEcShardsFromSource(ctx context.Context, grpcDialOption grpc.DialOption,
  164. targetServer *EcNode, startFromShardId uint32, shardCount int,
  165. volumeId needle.VolumeId, collection string, existingLocation string) (copiedShardIds []uint32, err error) {
  166. var shardIdsToCopy []uint32
  167. for shardId := startFromShardId; shardId < startFromShardId+uint32(shardCount); shardId++ {
  168. fmt.Printf("allocate %d.%d %s => %s\n", volumeId, shardId, existingLocation, targetServer.info.Id)
  169. shardIdsToCopy = append(shardIdsToCopy, shardId)
  170. }
  171. err = operation.WithVolumeServerClient(targetServer.info.Id, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
  172. if targetServer.info.Id != existingLocation {
  173. _, copyErr := volumeServerClient.VolumeEcShardsCopy(ctx, &volume_server_pb.VolumeEcShardsCopyRequest{
  174. VolumeId: uint32(volumeId),
  175. Collection: collection,
  176. ShardIds: shardIdsToCopy,
  177. SourceDataNode: existingLocation,
  178. })
  179. if copyErr != nil {
  180. return copyErr
  181. }
  182. }
  183. _, mountErr := volumeServerClient.VolumeEcShardsMount(ctx, &volume_server_pb.VolumeEcShardsMountRequest{
  184. VolumeId: uint32(volumeId),
  185. Collection: collection,
  186. ShardIds: shardIdsToCopy,
  187. })
  188. if mountErr != nil {
  189. return mountErr
  190. }
  191. if targetServer.info.Id != existingLocation {
  192. copiedShardIds = shardIdsToCopy
  193. glog.V(0).Infof("%s ec volume %d deletes shards %+v", existingLocation, volumeId, copiedShardIds)
  194. }
  195. return nil
  196. })
  197. if err != nil {
  198. return
  199. }
  200. return
  201. }
  202. func sourceServerDeleteEcShards(ctx context.Context, grpcDialOption grpc.DialOption,
  203. volumeId needle.VolumeId, sourceLocation string, toBeDeletedShardIds []uint32) error {
  204. shouldDeleteEcx := len(toBeDeletedShardIds) == erasure_coding.TotalShardsCount
  205. return operation.WithVolumeServerClient(sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
  206. _, deleteErr := volumeServerClient.VolumeEcShardsDelete(ctx, &volume_server_pb.VolumeEcShardsDeleteRequest{
  207. VolumeId: uint32(volumeId),
  208. ShardIds: toBeDeletedShardIds,
  209. ShouldDeleteEcx: shouldDeleteEcx,
  210. })
  211. return deleteErr
  212. })
  213. }
  214. func balancedEcDistribution(servers []*EcNode) (allocated []int) {
  215. freeSlots := make([]int, len(servers))
  216. allocated = make([]int, len(servers))
  217. for i, server := range servers {
  218. freeSlots[i] = countFreeShardSlots(server.info)
  219. }
  220. allocatedCount := 0
  221. for allocatedCount < erasure_coding.TotalShardsCount {
  222. for i, _ := range servers {
  223. if freeSlots[i]-allocated[i] > 0 {
  224. allocated[i] += 1
  225. allocatedCount += 1
  226. }
  227. if allocatedCount >= erasure_coding.TotalShardsCount {
  228. break
  229. }
  230. }
  231. }
  232. return allocated
  233. }
  234. func eachDataNode(topo *master_pb.TopologyInfo, fn func(*master_pb.DataNodeInfo)) {
  235. for _, dc := range topo.DataCenterInfos {
  236. for _, rack := range dc.RackInfos {
  237. for _, dn := range rack.DataNodeInfos {
  238. fn(dn)
  239. }
  240. }
  241. }
  242. }
  243. func countShards(ecShardInfos []*master_pb.VolumeEcShardInformationMessage) (count int) {
  244. for _, ecShardInfo := range ecShardInfos {
  245. shardBits := erasure_coding.ShardBits(ecShardInfo.EcIndexBits)
  246. count += shardBits.ShardIdCount()
  247. }
  248. return
  249. }
  250. func countFreeShardSlots(dn *master_pb.DataNodeInfo) (count int) {
  251. return int(dn.FreeVolumeCount)*10 - countShards(dn.EcShardInfos)
  252. }
  253. type EcNode struct {
  254. info *master_pb.DataNodeInfo
  255. freeEcSlot int
  256. }
  257. func sortEcNodes(ecNodes []*EcNode) {
  258. sort.Slice(ecNodes, func(i, j int) bool {
  259. return ecNodes[i].freeEcSlot > ecNodes[j].freeEcSlot
  260. })
  261. }
  262. func collectEcNodes(ctx context.Context, commandEnv *commandEnv) (ecNodes []*EcNode, totalFreeEcSlots int, err error) {
  263. // list all possible locations
  264. var resp *master_pb.VolumeListResponse
  265. err = commandEnv.masterClient.WithClient(ctx, func(client master_pb.SeaweedClient) error {
  266. resp, err = client.VolumeList(ctx, &master_pb.VolumeListRequest{})
  267. return err
  268. })
  269. if err != nil {
  270. return nil, 0, err
  271. }
  272. // find out all volume servers with one slot left.
  273. eachDataNode(resp.TopologyInfo, func(dn *master_pb.DataNodeInfo) {
  274. if freeEcSlots := countFreeShardSlots(dn); freeEcSlots > 0 {
  275. ecNodes = append(ecNodes, &EcNode{
  276. info: dn,
  277. freeEcSlot: int(freeEcSlots),
  278. })
  279. totalFreeEcSlots += freeEcSlots
  280. }
  281. })
  282. sortEcNodes(ecNodes)
  283. return
  284. }
  285. func collectVolumeIdsForEcEncode(ctx context.Context, commandEnv *commandEnv, selectedCollection string, quietPeriod time.Duration) (vids []needle.VolumeId, err error) {
  286. var resp *master_pb.VolumeListResponse
  287. err = commandEnv.masterClient.WithClient(ctx, func(client master_pb.SeaweedClient) error {
  288. resp, err = client.VolumeList(ctx, &master_pb.VolumeListRequest{})
  289. return err
  290. })
  291. if err != nil {
  292. return
  293. }
  294. quietSeconds := int64((quietPeriod * time.Second).Seconds())
  295. nowUnixSeconds := time.Now().Unix()
  296. vidMap := make(map[uint32]bool)
  297. for _, dc := range resp.TopologyInfo.DataCenterInfos {
  298. for _, r := range dc.RackInfos {
  299. for _, dn := range r.DataNodeInfos {
  300. for _, v := range dn.VolumeInfos {
  301. if v.Collection == selectedCollection && v.ModifiedAtSecond+quietSeconds < nowUnixSeconds {
  302. vidMap[v.Id] = true
  303. }
  304. }
  305. }
  306. }
  307. }
  308. for vid, _ := range vidMap {
  309. vids = append(vids, needle.VolumeId(vid))
  310. }
  311. return
  312. }