You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

245 lines
8.0 KiB

6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
  1. package storage
  2. import (
  3. "context"
  4. "fmt"
  5. "io"
  6. "time"
  7. "github.com/chrislusf/seaweedfs/weed/glog"
  8. "github.com/chrislusf/seaweedfs/weed/operation"
  9. "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
  10. "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
  11. "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
  12. "github.com/chrislusf/seaweedfs/weed/storage/needle"
  13. )
  14. func (s *Store) CollectErasureCodingHeartbeat() *master_pb.Heartbeat {
  15. var ecShardMessages []*master_pb.VolumeEcShardInformationMessage
  16. for _, location := range s.Locations {
  17. location.ecVolumesLock.RLock()
  18. for _, ecShards := range location.ecVolumes {
  19. ecShardMessages = append(ecShardMessages, ecShards.ToVolumeEcShardInformationMessage()...)
  20. }
  21. location.ecVolumesLock.RUnlock()
  22. }
  23. return &master_pb.Heartbeat{
  24. EcShards: ecShardMessages,
  25. }
  26. }
  27. func (s *Store) MountEcShards(collection string, vid needle.VolumeId, shardId erasure_coding.ShardId) error {
  28. for _, location := range s.Locations {
  29. if err := location.LoadEcShard(collection, vid, shardId); err == nil {
  30. glog.V(0).Infof("MountEcShards %d.%d", vid, shardId)
  31. var shardBits erasure_coding.ShardBits
  32. s.NewEcShardsChan <- master_pb.VolumeEcShardInformationMessage{
  33. Id: uint32(vid),
  34. Collection: collection,
  35. EcIndexBits: uint32(shardBits.AddShardId(shardId)),
  36. }
  37. return nil
  38. }
  39. }
  40. return fmt.Errorf("MountEcShards %d.%d not found on disk", vid, shardId)
  41. }
  42. func (s *Store) UnmountEcShards(vid needle.VolumeId, shardId erasure_coding.ShardId) error {
  43. ecShard, found := s.findEcShard(vid, shardId)
  44. if !found {
  45. return nil
  46. }
  47. var shardBits erasure_coding.ShardBits
  48. message := master_pb.VolumeEcShardInformationMessage{
  49. Id: uint32(vid),
  50. Collection: ecShard.Collection,
  51. EcIndexBits: uint32(shardBits.AddShardId(shardId)),
  52. }
  53. for _, location := range s.Locations {
  54. if deleted := location.UnloadEcShard(vid, shardId); deleted {
  55. glog.V(0).Infof("UnmountEcShards %d.%d", vid, shardId)
  56. s.DeletedEcShardsChan <- message
  57. return nil
  58. }
  59. }
  60. return fmt.Errorf("UnmountEcShards %d.%d not found on disk", vid, shardId)
  61. }
  62. func (s *Store) findEcShard(vid needle.VolumeId, shardId erasure_coding.ShardId) (*erasure_coding.EcVolumeShard, bool) {
  63. for _, location := range s.Locations {
  64. if v, found := location.FindEcShard(vid, shardId); found {
  65. return v, found
  66. }
  67. }
  68. return nil, false
  69. }
  70. func (s *Store) FindEcVolume(vid needle.VolumeId) (*erasure_coding.EcVolume, bool) {
  71. for _, location := range s.Locations {
  72. if s, found := location.FindEcVolume(vid); found {
  73. return s, true
  74. }
  75. }
  76. return nil, false
  77. }
  78. func (s *Store) ReadEcShardNeedle(ctx context.Context, vid needle.VolumeId, n *needle.Needle) (int, error) {
  79. for _, location := range s.Locations {
  80. if localEcVolume, found := location.FindEcVolume(vid); found {
  81. offset, size, intervals, err := localEcVolume.LocateEcShardNeedle(n)
  82. if err != nil {
  83. return 0, err
  84. }
  85. glog.V(4).Infof("read ec volume %d offset %d size %d intervals:%+v", vid, offset.ToAcutalOffset(), size, intervals)
  86. // TODO need to read the version
  87. version := needle.CurrentVersion
  88. // TODO the interval size should be the actual size
  89. bytes, err := s.readEcShardIntervals(ctx, vid, localEcVolume, version, intervals)
  90. if err != nil {
  91. return 0, fmt.Errorf("ReadEcShardIntervals: %v", err)
  92. }
  93. err = n.ReadBytes(bytes, offset.ToAcutalOffset(), size, version)
  94. if err != nil {
  95. return 0, fmt.Errorf("readbytes: %v", err)
  96. }
  97. return len(bytes), nil
  98. }
  99. }
  100. return 0, fmt.Errorf("ec shard %d not found", vid)
  101. }
  102. func (s *Store) readEcShardIntervals(ctx context.Context, vid needle.VolumeId, ecVolume *erasure_coding.EcVolume, version needle.Version, intervals []erasure_coding.Interval) (data []byte, err error) {
  103. if err = s.cachedLookupEcShardLocations(ctx, ecVolume); err != nil {
  104. return nil, fmt.Errorf("failed to locate shard via master grpc %s: %v", s.MasterAddress, err)
  105. }
  106. for i, interval := range intervals {
  107. if d, e := s.readOneEcShardInterval(ctx, ecVolume, version, interval); e != nil {
  108. return nil, e
  109. } else {
  110. if i == 0 {
  111. data = d
  112. } else {
  113. data = append(data, d...)
  114. }
  115. }
  116. }
  117. return
  118. }
  119. func (s *Store) readOneEcShardInterval(ctx context.Context, ecVolume *erasure_coding.EcVolume, version needle.Version, interval erasure_coding.Interval) (data []byte, err error) {
  120. shardId, actualOffset := interval.ToShardIdAndOffset(erasure_coding.ErasureCodingLargeBlockSize, erasure_coding.ErasureCodingSmallBlockSize)
  121. data = make([]byte, int(needle.GetActualSize(interval.Size, version)))
  122. if shard, found := ecVolume.FindEcVolumeShard(shardId); found {
  123. glog.V(3).Infof("read local ec shard %d.%d", ecVolume.VolumeId, shardId)
  124. if _, err = shard.ReadAt(data, actualOffset); err != nil {
  125. glog.V(0).Infof("read local ec shard %d.%d: %v", ecVolume.VolumeId, shardId, err)
  126. return
  127. }
  128. } else {
  129. ecVolume.ShardLocationsLock.RLock()
  130. sourceDataNodes, found := ecVolume.ShardLocations[shardId]
  131. ecVolume.ShardLocationsLock.RUnlock()
  132. if !found || len(sourceDataNodes) == 0 {
  133. return nil, fmt.Errorf("failed to find ec shard %d.%d", ecVolume.VolumeId, shardId)
  134. }
  135. glog.V(3).Infof("read remote ec shard %d.%d from %s", ecVolume.VolumeId, shardId, sourceDataNodes[0])
  136. _, err = s.readOneRemoteEcShardInterval(ctx, sourceDataNodes[0], ecVolume.VolumeId, shardId, data, actualOffset)
  137. if err != nil {
  138. glog.V(1).Infof("failed to read from %s for ec shard %d.%d : %v", sourceDataNodes[0], ecVolume.VolumeId, shardId, err)
  139. }
  140. }
  141. return
  142. }
  143. func (s *Store) cachedLookupEcShardLocations(ctx context.Context, ecVolume *erasure_coding.EcVolume) (err error) {
  144. if ecVolume.ShardLocationsRefreshTime.Add(10 * time.Minute).After(time.Now()) {
  145. // still fresh
  146. return nil
  147. }
  148. glog.V(3).Infof("lookup and cache ec volume %d locations", ecVolume.VolumeId)
  149. err = operation.WithMasterServerClient(s.MasterAddress, s.grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
  150. req := &master_pb.LookupEcVolumeRequest{
  151. VolumeId: uint32(ecVolume.VolumeId),
  152. }
  153. resp, err := masterClient.LookupEcVolume(ctx, req)
  154. if err != nil {
  155. return fmt.Errorf("lookup ec volume %d: %v", ecVolume.VolumeId, err)
  156. }
  157. ecVolume.ShardLocationsLock.Lock()
  158. for _, shardIdLocations := range resp.ShardIdLocations {
  159. shardId := erasure_coding.ShardId(shardIdLocations.ShardId)
  160. delete(ecVolume.ShardLocations, shardId)
  161. for _, loc := range shardIdLocations.Locations {
  162. ecVolume.ShardLocations[shardId] = append(ecVolume.ShardLocations[shardId], loc.Url)
  163. }
  164. }
  165. ecVolume.ShardLocationsLock.Unlock()
  166. return nil
  167. })
  168. return
  169. }
  170. func (s *Store) readOneRemoteEcShardInterval(ctx context.Context, sourceDataNode string, vid needle.VolumeId, shardId erasure_coding.ShardId, buf []byte, offset int64) (n int, err error) {
  171. err = operation.WithVolumeServerClient(sourceDataNode, s.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
  172. // copy data slice
  173. shardReadClient, err := client.VolumeEcShardRead(ctx, &volume_server_pb.VolumeEcShardReadRequest{
  174. VolumeId: uint32(vid),
  175. ShardId: uint32(shardId),
  176. Offset: offset,
  177. Size: int64(len(buf)),
  178. })
  179. if err != nil {
  180. return fmt.Errorf("failed to start reading ec shard %d.%d from %s: %v", vid, shardId, sourceDataNode, err)
  181. }
  182. for {
  183. resp, receiveErr := shardReadClient.Recv()
  184. if receiveErr == io.EOF {
  185. break
  186. }
  187. if receiveErr != nil {
  188. return fmt.Errorf("receiving ec shard %d.%d from %s: %v", vid, shardId, sourceDataNode, err)
  189. }
  190. copy(buf[n:n+len(resp.Data)], resp.Data)
  191. n += len(resp.Data)
  192. }
  193. return nil
  194. })
  195. if err != nil {
  196. return 0, fmt.Errorf("read ec shard %d.%d from %s: %v", vid, shardId, sourceDataNode, err)
  197. }
  198. return
  199. }
  200. func (s *Store) recoverOneRemoteEcShardInterval(ctx context.Context, string, vid needle.VolumeId, shardIdToRecover erasure_coding.ShardId, buf []byte, offset int64) (n int, err error) {
  201. glog.V(1).Infof("recover ec shard %d.%d from other locations", vid, shardIdToRecover)
  202. // TODO add recovering
  203. return 0, fmt.Errorf("recover is not implemented yet")
  204. }