You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

202 lines
6.3 KiB

6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
  1. package storage
  2. import (
  3. "context"
  4. "fmt"
  5. "io"
  6. "github.com/chrislusf/seaweedfs/weed/glog"
  7. "github.com/chrislusf/seaweedfs/weed/operation"
  8. "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
  9. "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
  10. "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
  11. "github.com/chrislusf/seaweedfs/weed/storage/needle"
  12. )
  13. func (s *Store) CollectErasureCodingHeartbeat() *master_pb.Heartbeat {
  14. var ecShardMessages []*master_pb.VolumeEcShardInformationMessage
  15. for _, location := range s.Locations {
  16. location.ecVolumesLock.RLock()
  17. for _, ecShards := range location.ecVolumes {
  18. ecShardMessages = append(ecShardMessages, ecShards.ToVolumeEcShardInformationMessage()...)
  19. }
  20. location.ecVolumesLock.RUnlock()
  21. }
  22. return &master_pb.Heartbeat{
  23. EcShards: ecShardMessages,
  24. }
  25. }
  26. func (s *Store) MountEcShards(collection string, vid needle.VolumeId, shardId erasure_coding.ShardId) error {
  27. for _, location := range s.Locations {
  28. if err := location.LoadEcShard(collection, vid, shardId); err == nil {
  29. glog.V(0).Infof("MountEcShards %d.%d", vid, shardId)
  30. var shardBits erasure_coding.ShardBits
  31. s.NewEcShardsChan <- master_pb.VolumeEcShardInformationMessage{
  32. Id: uint32(vid),
  33. Collection: collection,
  34. EcIndexBits: uint32(shardBits.AddShardId(shardId)),
  35. }
  36. return nil
  37. }
  38. }
  39. return fmt.Errorf("MountEcShards %d.%d not found on disk", vid, shardId)
  40. }
  41. func (s *Store) UnmountEcShards(vid needle.VolumeId, shardId erasure_coding.ShardId) error {
  42. ecShard, found := s.findEcShard(vid, shardId)
  43. if !found {
  44. return nil
  45. }
  46. var shardBits erasure_coding.ShardBits
  47. message := master_pb.VolumeEcShardInformationMessage{
  48. Id: uint32(vid),
  49. Collection: ecShard.Collection,
  50. EcIndexBits: uint32(shardBits.AddShardId(shardId)),
  51. }
  52. for _, location := range s.Locations {
  53. if deleted := location.UnloadEcShard(vid, shardId); deleted {
  54. glog.V(0).Infof("UnmountEcShards %d.%d", vid, shardId)
  55. s.DeletedEcShardsChan <- message
  56. return nil
  57. }
  58. }
  59. return fmt.Errorf("UnmountEcShards %d.%d not found on disk", vid, shardId)
  60. }
  61. func (s *Store) findEcShard(vid needle.VolumeId, shardId erasure_coding.ShardId) (*erasure_coding.EcVolumeShard, bool) {
  62. for _, location := range s.Locations {
  63. if v, found := location.FindEcShard(vid, shardId); found {
  64. return v, found
  65. }
  66. }
  67. return nil, false
  68. }
  69. func (s *Store) FindEcVolume(vid needle.VolumeId) (*erasure_coding.EcVolume, bool) {
  70. for _, location := range s.Locations {
  71. if s, found := location.FindEcVolume(vid); found {
  72. return s, true
  73. }
  74. }
  75. return nil, false
  76. }
  77. func (s *Store) ReadEcShardNeedle(ctx context.Context, vid needle.VolumeId, n *needle.Needle) (int, error) {
  78. for _, location := range s.Locations {
  79. if localEcVolume, found := location.FindEcVolume(vid); found {
  80. offset, size, intervals, err := localEcVolume.LocateEcShardNeedle(n)
  81. if err != nil {
  82. return 0, err
  83. }
  84. bytes, err := s.readEcShardIntervals(ctx, vid, localEcVolume, intervals)
  85. if err != nil {
  86. return 0, fmt.Errorf("ReadEcShardIntervals: %v", err)
  87. }
  88. version := needle.CurrentVersion
  89. err = n.ReadBytes(bytes, offset.ToAcutalOffset(), size, version)
  90. if err != nil {
  91. return 0, fmt.Errorf("readbytes: %v", err)
  92. }
  93. return len(bytes), nil
  94. }
  95. }
  96. return 0, fmt.Errorf("ec shard %d not found", vid)
  97. }
  98. func (s *Store) readEcShardIntervals(ctx context.Context, vid needle.VolumeId, ecVolume *erasure_coding.EcVolume, intervals []erasure_coding.Interval) (data []byte, err error) {
  99. shardLocations, err := s.cachedLookupEcShardLocations(ctx, vid)
  100. if err != nil {
  101. return nil, fmt.Errorf("failed to locate shard via master grpc %s: %v", s.MasterGrpcAddress, err)
  102. }
  103. for i, interval := range intervals {
  104. if d, e := s.readOneEcShardInterval(ctx, vid, ecVolume, shardLocations, interval); e != nil {
  105. return nil, e
  106. } else {
  107. if i == 0 {
  108. data = d
  109. } else {
  110. data = append(data, d...)
  111. }
  112. }
  113. }
  114. return
  115. }
  116. func (s *Store) readOneEcShardInterval(ctx context.Context, vid needle.VolumeId, ecVolume *erasure_coding.EcVolume, shardLocations map[erasure_coding.ShardId]string, interval erasure_coding.Interval) (data []byte, err error) {
  117. shardId, actualOffset := interval.ToShardIdAndOffset(erasure_coding.ErasureCodingLargeBlockSize, erasure_coding.ErasureCodingSmallBlockSize)
  118. data = make([]byte, interval.Size)
  119. if shard, found := ecVolume.FindEcVolumeShard(shardId); found {
  120. if _, err = shard.ReadAt(data, actualOffset); err != nil {
  121. return
  122. }
  123. } else {
  124. sourceDataNode := shardLocations[shardId]
  125. _, err = s.readOneRemoteEcShardInterval(ctx, sourceDataNode, vid, shardId, data, actualOffset)
  126. if err != nil {
  127. glog.V(1).Infof("failed to read from %s for ec shard %d.%d : %v", sourceDataNode, vid, shardId, err)
  128. }
  129. }
  130. return
  131. }
  132. func (s *Store) cachedLookupEcShardLocations(ctx context.Context, vid needle.VolumeId) (shardLocations map[erasure_coding.ShardId]string, err error) {
  133. return
  134. }
  135. func (s *Store) readOneRemoteEcShardInterval(ctx context.Context, sourceDataNode string, vid needle.VolumeId, shardId erasure_coding.ShardId, buf []byte, offset int64) (n int, err error) {
  136. err = operation.WithVolumeServerClient(sourceDataNode, s.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
  137. // copy data slice
  138. shardReadClient, err := client.VolumeEcShardRead(ctx, &volume_server_pb.VolumeEcShardReadRequest{
  139. VolumeId: uint32(vid),
  140. ShardId: uint32(shardId),
  141. Offset: offset,
  142. Size: int64(len(buf)),
  143. })
  144. if err != nil {
  145. return fmt.Errorf("failed to start reading ec shard %d.%d from %s: %v", vid, shardId, sourceDataNode, err)
  146. }
  147. for {
  148. resp, receiveErr := shardReadClient.Recv()
  149. if receiveErr == io.EOF {
  150. break
  151. }
  152. if receiveErr != nil {
  153. return fmt.Errorf("receiving ec shard %d.%d from %s: %v", vid, shardId, sourceDataNode, err)
  154. }
  155. copy(buf[n:n+len(resp.Data)], resp.Data)
  156. n += len(resp.Data)
  157. }
  158. return nil
  159. })
  160. if err != nil {
  161. return 0, fmt.Errorf("read ec shard %d.%d from %s: %v", vid, shardId, sourceDataNode, err)
  162. }
  163. return
  164. }
  165. func (s *Store) recoverOneRemoteEcShardInterval(ctx context.Context, string, vid needle.VolumeId, shardIdToRecover erasure_coding.ShardId, buf []byte, offset int64) (n int, err error) {
  166. glog.V(1).Infof("recover ec shard %d.%d from other locations", vid, shardIdToRecover)
  167. // TODO add recovering
  168. return 0, fmt.Errorf("recover is not implemented yet")
  169. }