You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

150 lines
4.3 KiB

  1. package storage
  2. import (
  3. "fmt"
  4. "github.com/chrislusf/seaweedfs/weed/glog"
  5. "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
  6. "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
  7. "github.com/chrislusf/seaweedfs/weed/storage/needle"
  8. )
  9. func (s *Store) CollectErasureCodingHeartbeat() *master_pb.Heartbeat {
  10. var ecShardMessages []*master_pb.VolumeEcShardInformationMessage
  11. for _, location := range s.Locations {
  12. location.ecShardsLock.RLock()
  13. for _, ecShards := range location.ecShards {
  14. ecShardMessages = append(ecShardMessages, ecShards.ToVolumeEcShardInformationMessage()...)
  15. }
  16. location.ecShardsLock.RUnlock()
  17. }
  18. return &master_pb.Heartbeat{
  19. EcShards: ecShardMessages,
  20. }
  21. }
  22. func (s *Store) MountEcShards(collection string, vid needle.VolumeId, shardId erasure_coding.ShardId) error {
  23. for _, location := range s.Locations {
  24. if err := location.LoadEcShard(collection, vid, shardId); err == nil {
  25. glog.V(0).Infof("MountEcShards %d.%d", vid, shardId)
  26. var shardBits erasure_coding.ShardBits
  27. s.NewEcShardsChan <- master_pb.VolumeEcShardInformationMessage{
  28. Id: uint32(vid),
  29. Collection: collection,
  30. EcIndexBits: uint32(shardBits.AddShardId(shardId)),
  31. }
  32. return nil
  33. }
  34. }
  35. return fmt.Errorf("MountEcShards %d.%d not found on disk", vid, shardId)
  36. }
  37. func (s *Store) UnmountEcShards(vid needle.VolumeId, shardId erasure_coding.ShardId) error {
  38. ecShard, found := s.findEcShard(vid, shardId)
  39. if !found {
  40. return nil
  41. }
  42. var shardBits erasure_coding.ShardBits
  43. message := master_pb.VolumeEcShardInformationMessage{
  44. Id: uint32(vid),
  45. Collection: ecShard.Collection,
  46. EcIndexBits: uint32(shardBits.AddShardId(shardId)),
  47. }
  48. for _, location := range s.Locations {
  49. if deleted := location.UnloadEcShard(vid, shardId); deleted {
  50. glog.V(0).Infof("UnmountEcShards %d.%d", vid, shardId)
  51. s.DeletedEcShardsChan <- message
  52. return nil
  53. }
  54. }
  55. return fmt.Errorf("UnmountEcShards %d.%d not found on disk", vid, shardId)
  56. }
  57. func (s *Store) findEcShard(vid needle.VolumeId, shardId erasure_coding.ShardId) (*erasure_coding.EcVolumeShard, bool) {
  58. for _, location := range s.Locations {
  59. if v, found := location.FindEcShard(vid, shardId); found {
  60. return v, found
  61. }
  62. }
  63. return nil, false
  64. }
  65. func (s *Store) HasEcShard(vid needle.VolumeId) (erasure_coding.EcVolumeShards, bool) {
  66. for _, location := range s.Locations {
  67. if s, found := location.HasEcShard(vid); found {
  68. return s, true
  69. }
  70. }
  71. return nil, false
  72. }
  73. func (s *Store) ReadEcShardNeedle(vid needle.VolumeId, n *needle.Needle) (int, error) {
  74. for _, location := range s.Locations {
  75. if localEcShards, found := location.HasEcShard(vid); found {
  76. offset, size, intervals, err := localEcShards.LocateEcShardNeedle(n)
  77. if err != nil {
  78. return 0, err
  79. }
  80. bytes, err := s.ReadEcShardIntervals(vid, localEcShards, intervals)
  81. if err != nil {
  82. return 0, fmt.Errorf("ReadEcShardIntervals: %v", err)
  83. }
  84. version := needle.CurrentVersion
  85. err = n.ReadBytes(bytes, offset.ToAcutalOffset(), size, version)
  86. if err != nil {
  87. return 0, fmt.Errorf("readbytes: %v", err)
  88. }
  89. return len(bytes), nil
  90. }
  91. }
  92. return 0, fmt.Errorf("ec shard %d not found", vid)
  93. }
  94. func (s *Store) ReadEcShardIntervals(vid needle.VolumeId, localEcShards erasure_coding.EcVolumeShards, intervals []erasure_coding.Interval) (data []byte, err error) {
  95. for i, interval := range intervals {
  96. if d, e := s.readOneEcShardInterval(vid, localEcShards, interval); e != nil {
  97. return nil, e
  98. } else {
  99. if i == 0 {
  100. data = d
  101. } else {
  102. data = append(data, d...)
  103. }
  104. }
  105. }
  106. return
  107. }
  108. func (s *Store) readOneEcShardInterval(vid needle.VolumeId, localEcShards erasure_coding.EcVolumeShards, interval erasure_coding.Interval) (data []byte, err error) {
  109. shardId, actualOffset := interval.ToShardIdAndOffset(erasure_coding.ErasureCodingLargeBlockSize, erasure_coding.ErasureCodingSmallBlockSize)
  110. data = make([]byte, interval.Size)
  111. if shard, found := localEcShards.FindEcVolumeShard(shardId); found {
  112. if _, err = shard.ReadAt(data, actualOffset); err != nil {
  113. return
  114. }
  115. } else {
  116. s.readOneRemoteEcShardInterval(vid, shardId, data, actualOffset)
  117. }
  118. return
  119. }
  120. func (s *Store) readOneRemoteEcShardInterval(vid needle.VolumeId, shardId erasure_coding.ShardId, buf []byte, offset int64) (n int, err error) {
  121. return
  122. }