You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

316 lines
9.8 KiB

6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
  1. package storage
  2. import (
  3. "context"
  4. "fmt"
  5. "io"
  6. "sync"
  7. "time"
  8. "github.com/chrislusf/seaweedfs/weed/glog"
  9. "github.com/chrislusf/seaweedfs/weed/operation"
  10. "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
  11. "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
  12. "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
  13. "github.com/chrislusf/seaweedfs/weed/storage/needle"
  14. "github.com/klauspost/reedsolomon"
  15. )
  16. func (s *Store) CollectErasureCodingHeartbeat() *master_pb.Heartbeat {
  17. var ecShardMessages []*master_pb.VolumeEcShardInformationMessage
  18. for _, location := range s.Locations {
  19. location.ecVolumesLock.RLock()
  20. for _, ecShards := range location.ecVolumes {
  21. ecShardMessages = append(ecShardMessages, ecShards.ToVolumeEcShardInformationMessage()...)
  22. }
  23. location.ecVolumesLock.RUnlock()
  24. }
  25. return &master_pb.Heartbeat{
  26. EcShards: ecShardMessages,
  27. }
  28. }
  29. func (s *Store) MountEcShards(collection string, vid needle.VolumeId, shardId erasure_coding.ShardId) error {
  30. for _, location := range s.Locations {
  31. if err := location.LoadEcShard(collection, vid, shardId); err == nil {
  32. glog.V(0).Infof("MountEcShards %d.%d", vid, shardId)
  33. var shardBits erasure_coding.ShardBits
  34. s.NewEcShardsChan <- master_pb.VolumeEcShardInformationMessage{
  35. Id: uint32(vid),
  36. Collection: collection,
  37. EcIndexBits: uint32(shardBits.AddShardId(shardId)),
  38. }
  39. return nil
  40. }
  41. }
  42. return fmt.Errorf("MountEcShards %d.%d not found on disk", vid, shardId)
  43. }
  44. func (s *Store) UnmountEcShards(vid needle.VolumeId, shardId erasure_coding.ShardId) error {
  45. ecShard, found := s.findEcShard(vid, shardId)
  46. if !found {
  47. return nil
  48. }
  49. var shardBits erasure_coding.ShardBits
  50. message := master_pb.VolumeEcShardInformationMessage{
  51. Id: uint32(vid),
  52. Collection: ecShard.Collection,
  53. EcIndexBits: uint32(shardBits.AddShardId(shardId)),
  54. }
  55. for _, location := range s.Locations {
  56. if deleted := location.UnloadEcShard(vid, shardId); deleted {
  57. glog.V(0).Infof("UnmountEcShards %d.%d", vid, shardId)
  58. s.DeletedEcShardsChan <- message
  59. return nil
  60. }
  61. }
  62. return fmt.Errorf("UnmountEcShards %d.%d not found on disk", vid, shardId)
  63. }
  64. func (s *Store) findEcShard(vid needle.VolumeId, shardId erasure_coding.ShardId) (*erasure_coding.EcVolumeShard, bool) {
  65. for _, location := range s.Locations {
  66. if v, found := location.FindEcShard(vid, shardId); found {
  67. return v, found
  68. }
  69. }
  70. return nil, false
  71. }
  72. func (s *Store) FindEcVolume(vid needle.VolumeId) (*erasure_coding.EcVolume, bool) {
  73. for _, location := range s.Locations {
  74. if s, found := location.FindEcVolume(vid); found {
  75. return s, true
  76. }
  77. }
  78. return nil, false
  79. }
  80. func (s *Store) ReadEcShardNeedle(ctx context.Context, vid needle.VolumeId, n *needle.Needle) (int, error) {
  81. for _, location := range s.Locations {
  82. if localEcVolume, found := location.FindEcVolume(vid); found {
  83. // TODO need to read the version
  84. version := needle.CurrentVersion
  85. offset, size, intervals, err := localEcVolume.LocateEcShardNeedle(n, version)
  86. if err != nil {
  87. return 0, err
  88. }
  89. glog.V(4).Infof("read ec volume %d offset %d size %d intervals:%+v", vid, offset.ToAcutalOffset(), size, intervals)
  90. if len(intervals) > 1 {
  91. glog.V(4).Infof("ReadEcShardNeedle needle id %s intervals:%+v", n.String(), intervals)
  92. }
  93. bytes, err := s.readEcShardIntervals(ctx, vid, localEcVolume, intervals)
  94. if err != nil {
  95. return 0, fmt.Errorf("ReadEcShardIntervals: %v", err)
  96. }
  97. err = n.ReadBytes(bytes, offset.ToAcutalOffset(), size, version)
  98. if err != nil {
  99. return 0, fmt.Errorf("readbytes: %v", err)
  100. }
  101. return len(bytes), nil
  102. }
  103. }
  104. return 0, fmt.Errorf("ec shard %d not found", vid)
  105. }
  106. func (s *Store) readEcShardIntervals(ctx context.Context, vid needle.VolumeId, ecVolume *erasure_coding.EcVolume, intervals []erasure_coding.Interval) (data []byte, err error) {
  107. if err = s.cachedLookupEcShardLocations(ctx, ecVolume); err != nil {
  108. return nil, fmt.Errorf("failed to locate shard via master grpc %s: %v", s.MasterAddress, err)
  109. }
  110. for i, interval := range intervals {
  111. if d, e := s.readOneEcShardInterval(ctx, ecVolume, interval); e != nil {
  112. return nil, e
  113. } else {
  114. if i == 0 {
  115. data = d
  116. } else {
  117. data = append(data, d...)
  118. }
  119. }
  120. }
  121. return
  122. }
  123. func (s *Store) readOneEcShardInterval(ctx context.Context, ecVolume *erasure_coding.EcVolume, interval erasure_coding.Interval) (data []byte, err error) {
  124. shardId, actualOffset := interval.ToShardIdAndOffset(erasure_coding.ErasureCodingLargeBlockSize, erasure_coding.ErasureCodingSmallBlockSize)
  125. data = make([]byte, interval.Size)
  126. if shard, found := ecVolume.FindEcVolumeShard(shardId); found {
  127. if _, err = shard.ReadAt(data, actualOffset); err != nil {
  128. glog.V(0).Infof("read local ec shard %d.%d: %v", ecVolume.VolumeId, shardId, err)
  129. return
  130. }
  131. } else {
  132. ecVolume.ShardLocationsLock.RLock()
  133. sourceDataNodes, found := ecVolume.ShardLocations[shardId]
  134. ecVolume.ShardLocationsLock.RUnlock()
  135. if !found || len(sourceDataNodes) == 0 {
  136. return nil, fmt.Errorf("failed to find ec shard %d.%d", ecVolume.VolumeId, shardId)
  137. }
  138. // try reading directly
  139. _, err = s.readRemoteEcShardInterval(ctx, sourceDataNodes, ecVolume.VolumeId, shardId, data, actualOffset)
  140. if err == nil {
  141. return
  142. }
  143. // try reading by recovering from other shards
  144. _, err = s.recoverOneRemoteEcShardInterval(ctx, ecVolume, shardId, data, actualOffset)
  145. if err == nil {
  146. return
  147. }
  148. glog.V(0).Infof("recover ec shard %d.%d : %v", ecVolume.VolumeId, shardId, err)
  149. }
  150. return
  151. }
  152. func (s *Store) cachedLookupEcShardLocations(ctx context.Context, ecVolume *erasure_coding.EcVolume) (err error) {
  153. if ecVolume.ShardLocationsRefreshTime.Add(10 * time.Minute).After(time.Now()) {
  154. // still fresh
  155. return nil
  156. }
  157. glog.V(3).Infof("lookup and cache ec volume %d locations", ecVolume.VolumeId)
  158. err = operation.WithMasterServerClient(s.MasterAddress, s.grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
  159. req := &master_pb.LookupEcVolumeRequest{
  160. VolumeId: uint32(ecVolume.VolumeId),
  161. }
  162. resp, err := masterClient.LookupEcVolume(ctx, req)
  163. if err != nil {
  164. return fmt.Errorf("lookup ec volume %d: %v", ecVolume.VolumeId, err)
  165. }
  166. ecVolume.ShardLocationsLock.Lock()
  167. for _, shardIdLocations := range resp.ShardIdLocations {
  168. shardId := erasure_coding.ShardId(shardIdLocations.ShardId)
  169. delete(ecVolume.ShardLocations, shardId)
  170. for _, loc := range shardIdLocations.Locations {
  171. ecVolume.ShardLocations[shardId] = append(ecVolume.ShardLocations[shardId], loc.Url)
  172. }
  173. }
  174. ecVolume.ShardLocationsRefreshTime = time.Now()
  175. ecVolume.ShardLocationsLock.Unlock()
  176. return nil
  177. })
  178. return
  179. }
  180. func (s *Store) readRemoteEcShardInterval(ctx context.Context, sourceDataNodes []string, vid needle.VolumeId, shardId erasure_coding.ShardId, buf []byte, offset int64) (n int, err error) {
  181. for _, sourceDataNode := range sourceDataNodes {
  182. glog.V(4).Infof("read remote ec shard %d.%d from %s", vid, shardId, sourceDataNode)
  183. n, err = s.doReadRemoteEcShardInterval(ctx, sourceDataNode, vid, shardId, buf, offset)
  184. if err == nil {
  185. return
  186. }
  187. glog.V(1).Infof("read remote ec shard %d.%d from %s: %v", vid, shardId, sourceDataNode, err)
  188. }
  189. return
  190. }
  191. func (s *Store) doReadRemoteEcShardInterval(ctx context.Context, sourceDataNode string, vid needle.VolumeId, shardId erasure_coding.ShardId, buf []byte, offset int64) (n int, err error) {
  192. err = operation.WithVolumeServerClient(sourceDataNode, s.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
  193. // copy data slice
  194. shardReadClient, err := client.VolumeEcShardRead(ctx, &volume_server_pb.VolumeEcShardReadRequest{
  195. VolumeId: uint32(vid),
  196. ShardId: uint32(shardId),
  197. Offset: offset,
  198. Size: int64(len(buf)),
  199. })
  200. if err != nil {
  201. return fmt.Errorf("failed to start reading ec shard %d.%d from %s: %v", vid, shardId, sourceDataNode, err)
  202. }
  203. for {
  204. resp, receiveErr := shardReadClient.Recv()
  205. if receiveErr == io.EOF {
  206. break
  207. }
  208. if receiveErr != nil {
  209. return fmt.Errorf("receiving ec shard %d.%d from %s: %v", vid, shardId, sourceDataNode, err)
  210. }
  211. copy(buf[n:n+len(resp.Data)], resp.Data)
  212. n += len(resp.Data)
  213. }
  214. return nil
  215. })
  216. if err != nil {
  217. return 0, fmt.Errorf("read ec shard %d.%d from %s: %v", vid, shardId, sourceDataNode, err)
  218. }
  219. return
  220. }
  221. func (s *Store) recoverOneRemoteEcShardInterval(ctx context.Context, ecVolume *erasure_coding.EcVolume, shardIdToRecover erasure_coding.ShardId, buf []byte, offset int64) (n int, err error) {
  222. glog.V(1).Infof("recover ec shard %d.%d from other locations", ecVolume.VolumeId, shardIdToRecover)
  223. enc, err := reedsolomon.New(erasure_coding.DataShardsCount, erasure_coding.ParityShardsCount)
  224. if err != nil {
  225. return 0, fmt.Errorf("failed to create encoder: %v", err)
  226. }
  227. bufs := make([][]byte, erasure_coding.TotalShardsCount)
  228. var wg sync.WaitGroup
  229. ecVolume.ShardLocationsLock.RLock()
  230. for shardId, locations := range ecVolume.ShardLocations {
  231. // skip currnent shard or empty shard
  232. if shardId == shardIdToRecover {
  233. continue
  234. }
  235. if len(locations) == 0 {
  236. glog.V(3).Infof("readRemoteEcShardInterval missing %d.%d from %+v", ecVolume.VolumeId, shardId, locations)
  237. continue
  238. }
  239. // read from remote locations
  240. wg.Add(1)
  241. go func(shardId erasure_coding.ShardId, locations []string) {
  242. defer wg.Done()
  243. data := make([]byte, len(buf))
  244. n, err = s.readRemoteEcShardInterval(ctx, locations, ecVolume.VolumeId, shardId, data, offset)
  245. if err != nil {
  246. glog.V(3).Infof("readRemoteEcShardInterval %d.%d from %+v", ecVolume.VolumeId, shardId, locations)
  247. }
  248. if n == len(buf) {
  249. bufs[shardId] = data
  250. return
  251. }
  252. }(shardId, locations)
  253. }
  254. ecVolume.ShardLocationsLock.RUnlock()
  255. wg.Wait()
  256. if err = enc.ReconstructData(bufs); err != nil {
  257. return 0, err
  258. }
  259. glog.V(3).Infof("recovered ec shard %d.%d from other locations", ecVolume.VolumeId, shardIdToRecover)
  260. copy(buf, bufs[shardIdToRecover])
  261. return len(buf), nil
  262. }