You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

367 lines
12 KiB

6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
  1. package storage
  2. import (
  3. "context"
  4. "fmt"
  5. "io"
  6. "sync"
  7. "time"
  8. "github.com/chrislusf/seaweedfs/weed/glog"
  9. "github.com/chrislusf/seaweedfs/weed/operation"
  10. "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
  11. "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
  12. "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
  13. "github.com/chrislusf/seaweedfs/weed/storage/needle"
  14. "github.com/klauspost/reedsolomon"
  15. )
  16. func (s *Store) CollectErasureCodingHeartbeat() *master_pb.Heartbeat {
  17. var ecShardMessages []*master_pb.VolumeEcShardInformationMessage
  18. for _, location := range s.Locations {
  19. location.ecVolumesLock.RLock()
  20. for _, ecShards := range location.ecVolumes {
  21. ecShardMessages = append(ecShardMessages, ecShards.ToVolumeEcShardInformationMessage()...)
  22. }
  23. location.ecVolumesLock.RUnlock()
  24. }
  25. return &master_pb.Heartbeat{
  26. EcShards: ecShardMessages,
  27. }
  28. }
  29. func (s *Store) MountEcShards(collection string, vid needle.VolumeId, shardId erasure_coding.ShardId) error {
  30. for _, location := range s.Locations {
  31. if err := location.LoadEcShard(collection, vid, shardId); err == nil {
  32. glog.V(0).Infof("MountEcShards %d.%d", vid, shardId)
  33. var shardBits erasure_coding.ShardBits
  34. s.NewEcShardsChan <- master_pb.VolumeEcShardInformationMessage{
  35. Id: uint32(vid),
  36. Collection: collection,
  37. EcIndexBits: uint32(shardBits.AddShardId(shardId)),
  38. }
  39. return nil
  40. } else {
  41. return fmt.Errorf("%s load ec shard %d.%d: %v", location.Directory, vid, shardId, err)
  42. }
  43. }
  44. return fmt.Errorf("MountEcShards %d.%d not found on disk", vid, shardId)
  45. }
  46. func (s *Store) UnmountEcShards(vid needle.VolumeId, shardId erasure_coding.ShardId) error {
  47. ecShard, found := s.findEcShard(vid, shardId)
  48. if !found {
  49. return nil
  50. }
  51. var shardBits erasure_coding.ShardBits
  52. message := master_pb.VolumeEcShardInformationMessage{
  53. Id: uint32(vid),
  54. Collection: ecShard.Collection,
  55. EcIndexBits: uint32(shardBits.AddShardId(shardId)),
  56. }
  57. for _, location := range s.Locations {
  58. if deleted := location.UnloadEcShard(vid, shardId); deleted {
  59. glog.V(0).Infof("UnmountEcShards %d.%d", vid, shardId)
  60. s.DeletedEcShardsChan <- message
  61. return nil
  62. }
  63. }
  64. return fmt.Errorf("UnmountEcShards %d.%d not found on disk", vid, shardId)
  65. }
  66. func (s *Store) findEcShard(vid needle.VolumeId, shardId erasure_coding.ShardId) (*erasure_coding.EcVolumeShard, bool) {
  67. for _, location := range s.Locations {
  68. if v, found := location.FindEcShard(vid, shardId); found {
  69. return v, found
  70. }
  71. }
  72. return nil, false
  73. }
  74. func (s *Store) FindEcVolume(vid needle.VolumeId) (*erasure_coding.EcVolume, bool) {
  75. for _, location := range s.Locations {
  76. if s, found := location.FindEcVolume(vid); found {
  77. return s, true
  78. }
  79. }
  80. return nil, false
  81. }
  82. func (s *Store) DestroyEcVolume(vid needle.VolumeId) {
  83. for _, location := range s.Locations {
  84. location.DestroyEcVolume(vid)
  85. }
  86. }
  87. func (s *Store) ReadEcShardNeedle(ctx context.Context, vid needle.VolumeId, n *needle.Needle) (int, error) {
  88. for _, location := range s.Locations {
  89. if localEcVolume, found := location.FindEcVolume(vid); found {
  90. // read the volume version
  91. for localEcVolume.Version == 0 {
  92. err := s.readEcVolumeVersion(ctx, vid, localEcVolume)
  93. time.Sleep(1357 * time.Millisecond)
  94. glog.V(0).Infof("ReadEcShardNeedle vid %d version:%v: %v", vid, localEcVolume.Version, err)
  95. }
  96. version := localEcVolume.Version
  97. offset, size, intervals, err := localEcVolume.LocateEcShardNeedle(n, version)
  98. if err != nil {
  99. return 0, err
  100. }
  101. glog.V(4).Infof("read ec volume %d offset %d size %d intervals:%+v", vid, offset.ToAcutalOffset(), size, intervals)
  102. if len(intervals) > 1 {
  103. glog.V(4).Infof("ReadEcShardNeedle needle id %s intervals:%+v", n.String(), intervals)
  104. }
  105. bytes, err := s.readEcShardIntervals(ctx, vid, localEcVolume, intervals)
  106. if err != nil {
  107. return 0, fmt.Errorf("ReadEcShardIntervals: %v", err)
  108. }
  109. err = n.ReadBytes(bytes, offset.ToAcutalOffset(), size, version)
  110. if err != nil {
  111. return 0, fmt.Errorf("readbytes: %v", err)
  112. }
  113. return len(bytes), nil
  114. }
  115. }
  116. return 0, fmt.Errorf("ec shard %d not found", vid)
  117. }
  118. func (s *Store) readEcVolumeVersion(ctx context.Context, vid needle.VolumeId, ecVolume *erasure_coding.EcVolume) (err error) {
  119. interval := erasure_coding.Interval{
  120. BlockIndex: 0,
  121. InnerBlockOffset: 0,
  122. Size: _SuperBlockSize,
  123. IsLargeBlock: true, // it could be large block, but ok in this place
  124. LargeBlockRowsCount: 0,
  125. }
  126. data, err := s.readEcShardIntervals(ctx, vid, ecVolume, []erasure_coding.Interval{interval})
  127. if err == nil {
  128. ecVolume.Version = needle.Version(data[0])
  129. }
  130. return
  131. }
  132. func (s *Store) readEcShardIntervals(ctx context.Context, vid needle.VolumeId, ecVolume *erasure_coding.EcVolume, intervals []erasure_coding.Interval) (data []byte, err error) {
  133. if err = s.cachedLookupEcShardLocations(ctx, ecVolume); err != nil {
  134. return nil, fmt.Errorf("failed to locate shard via master grpc %s: %v", s.MasterAddress, err)
  135. }
  136. for i, interval := range intervals {
  137. if d, e := s.readOneEcShardInterval(ctx, ecVolume, interval); e != nil {
  138. return nil, e
  139. } else {
  140. if i == 0 {
  141. data = d
  142. } else {
  143. data = append(data, d...)
  144. }
  145. }
  146. }
  147. return
  148. }
  149. func (s *Store) readOneEcShardInterval(ctx context.Context, ecVolume *erasure_coding.EcVolume, interval erasure_coding.Interval) (data []byte, err error) {
  150. shardId, actualOffset := interval.ToShardIdAndOffset(erasure_coding.ErasureCodingLargeBlockSize, erasure_coding.ErasureCodingSmallBlockSize)
  151. data = make([]byte, interval.Size)
  152. if shard, found := ecVolume.FindEcVolumeShard(shardId); found {
  153. if _, err = shard.ReadAt(data, actualOffset); err != nil {
  154. glog.V(0).Infof("read local ec shard %d.%d: %v", ecVolume.VolumeId, shardId, err)
  155. return
  156. }
  157. } else {
  158. ecVolume.ShardLocationsLock.RLock()
  159. sourceDataNodes, hasShardIdLocation := ecVolume.ShardLocations[shardId]
  160. ecVolume.ShardLocationsLock.RUnlock()
  161. // try reading directly
  162. if hasShardIdLocation {
  163. _, err = s.readRemoteEcShardInterval(ctx, sourceDataNodes, ecVolume.VolumeId, shardId, data, actualOffset)
  164. if err == nil {
  165. return
  166. }
  167. glog.V(0).Infof("clearing ec shard %d.%d locations: %v", ecVolume.VolumeId, shardId, err)
  168. forgetShardId(ecVolume, shardId)
  169. }
  170. // try reading by recovering from other shards
  171. _, err = s.recoverOneRemoteEcShardInterval(ctx, ecVolume, shardId, data, actualOffset)
  172. if err == nil {
  173. return
  174. }
  175. glog.V(0).Infof("recover ec shard %d.%d : %v", ecVolume.VolumeId, shardId, err)
  176. }
  177. return
  178. }
  179. func forgetShardId(ecVolume *erasure_coding.EcVolume, shardId erasure_coding.ShardId) {
  180. // failed to access the source data nodes, clear it up
  181. ecVolume.ShardLocationsLock.Lock()
  182. delete(ecVolume.ShardLocations, shardId)
  183. ecVolume.ShardLocationsLock.Unlock()
  184. }
  185. func (s *Store) cachedLookupEcShardLocations(ctx context.Context, ecVolume *erasure_coding.EcVolume) (err error) {
  186. shardCount := len(ecVolume.ShardLocations)
  187. if shardCount < erasure_coding.DataShardsCount &&
  188. ecVolume.ShardLocationsRefreshTime.Add(11 * time.Second).After(time.Now()) ||
  189. shardCount == erasure_coding.TotalShardsCount &&
  190. ecVolume.ShardLocationsRefreshTime.Add(37 * time.Minute).After(time.Now()) ||
  191. shardCount >= erasure_coding.DataShardsCount &&
  192. ecVolume.ShardLocationsRefreshTime.Add(7 * time.Minute).After(time.Now()) {
  193. // still fresh
  194. return nil
  195. }
  196. glog.V(3).Infof("lookup and cache ec volume %d locations", ecVolume.VolumeId)
  197. err = operation.WithMasterServerClient(s.MasterAddress, s.grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
  198. req := &master_pb.LookupEcVolumeRequest{
  199. VolumeId: uint32(ecVolume.VolumeId),
  200. }
  201. resp, err := masterClient.LookupEcVolume(ctx, req)
  202. if err != nil {
  203. return fmt.Errorf("lookup ec volume %d: %v", ecVolume.VolumeId, err)
  204. }
  205. if len(resp.ShardIdLocations) < erasure_coding.DataShardsCount {
  206. return fmt.Errorf("only %d shards found but %d required", len(resp.ShardIdLocations), erasure_coding.DataShardsCount)
  207. }
  208. ecVolume.ShardLocationsLock.Lock()
  209. for _, shardIdLocations := range resp.ShardIdLocations {
  210. shardId := erasure_coding.ShardId(shardIdLocations.ShardId)
  211. delete(ecVolume.ShardLocations, shardId)
  212. for _, loc := range shardIdLocations.Locations {
  213. ecVolume.ShardLocations[shardId] = append(ecVolume.ShardLocations[shardId], loc.Url)
  214. }
  215. }
  216. ecVolume.ShardLocationsRefreshTime = time.Now()
  217. ecVolume.ShardLocationsLock.Unlock()
  218. return nil
  219. })
  220. return
  221. }
  222. func (s *Store) readRemoteEcShardInterval(ctx context.Context, sourceDataNodes []string, vid needle.VolumeId, shardId erasure_coding.ShardId, buf []byte, offset int64) (n int, err error) {
  223. if len(sourceDataNodes) == 0 {
  224. return 0, fmt.Errorf("failed to find ec shard %d.%d", vid, shardId)
  225. }
  226. for _, sourceDataNode := range sourceDataNodes {
  227. glog.V(4).Infof("read remote ec shard %d.%d from %s", vid, shardId, sourceDataNode)
  228. n, err = s.doReadRemoteEcShardInterval(ctx, sourceDataNode, vid, shardId, buf, offset)
  229. if err == nil {
  230. return
  231. }
  232. glog.V(1).Infof("read remote ec shard %d.%d from %s: %v", vid, shardId, sourceDataNode, err)
  233. }
  234. return
  235. }
  236. func (s *Store) doReadRemoteEcShardInterval(ctx context.Context, sourceDataNode string, vid needle.VolumeId, shardId erasure_coding.ShardId, buf []byte, offset int64) (n int, err error) {
  237. err = operation.WithVolumeServerClient(sourceDataNode, s.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
  238. // copy data slice
  239. shardReadClient, err := client.VolumeEcShardRead(ctx, &volume_server_pb.VolumeEcShardReadRequest{
  240. VolumeId: uint32(vid),
  241. ShardId: uint32(shardId),
  242. Offset: offset,
  243. Size: int64(len(buf)),
  244. })
  245. if err != nil {
  246. return fmt.Errorf("failed to start reading ec shard %d.%d from %s: %v", vid, shardId, sourceDataNode, err)
  247. }
  248. for {
  249. resp, receiveErr := shardReadClient.Recv()
  250. if receiveErr == io.EOF {
  251. break
  252. }
  253. if receiveErr != nil {
  254. return fmt.Errorf("receiving ec shard %d.%d from %s: %v", vid, shardId, sourceDataNode, err)
  255. }
  256. copy(buf[n:n+len(resp.Data)], resp.Data)
  257. n += len(resp.Data)
  258. }
  259. return nil
  260. })
  261. if err != nil {
  262. return 0, fmt.Errorf("read ec shard %d.%d from %s: %v", vid, shardId, sourceDataNode, err)
  263. }
  264. return
  265. }
  266. func (s *Store) recoverOneRemoteEcShardInterval(ctx context.Context, ecVolume *erasure_coding.EcVolume, shardIdToRecover erasure_coding.ShardId, buf []byte, offset int64) (n int, err error) {
  267. glog.V(4).Infof("recover ec shard %d.%d from other locations", ecVolume.VolumeId, shardIdToRecover)
  268. enc, err := reedsolomon.New(erasure_coding.DataShardsCount, erasure_coding.ParityShardsCount)
  269. if err != nil {
  270. return 0, fmt.Errorf("failed to create encoder: %v", err)
  271. }
  272. bufs := make([][]byte, erasure_coding.TotalShardsCount)
  273. var wg sync.WaitGroup
  274. ecVolume.ShardLocationsLock.RLock()
  275. for shardId, locations := range ecVolume.ShardLocations {
  276. // skip currnent shard or empty shard
  277. if shardId == shardIdToRecover {
  278. continue
  279. }
  280. if len(locations) == 0 {
  281. glog.V(3).Infof("readRemoteEcShardInterval missing %d.%d from %+v", ecVolume.VolumeId, shardId, locations)
  282. continue
  283. }
  284. // read from remote locations
  285. wg.Add(1)
  286. go func(shardId erasure_coding.ShardId, locations []string) {
  287. defer wg.Done()
  288. data := make([]byte, len(buf))
  289. nRead, readErr := s.readRemoteEcShardInterval(ctx, locations, ecVolume.VolumeId, shardId, data, offset)
  290. if readErr != nil {
  291. glog.V(3).Infof("recover: readRemoteEcShardInterval %d.%d %d bytes from %+v: %v", ecVolume.VolumeId, shardId, nRead, locations, readErr)
  292. forgetShardId(ecVolume, shardId)
  293. }
  294. if nRead == len(buf) {
  295. bufs[shardId] = data
  296. }
  297. }(shardId, locations)
  298. }
  299. ecVolume.ShardLocationsLock.RUnlock()
  300. wg.Wait()
  301. if err = enc.ReconstructData(bufs); err != nil {
  302. glog.V(3).Infof("recovered ec shard %d.%d failed: %v", ecVolume.VolumeId, shardIdToRecover, err)
  303. return 0, err
  304. }
  305. glog.V(4).Infof("recovered ec shard %d.%d from other locations", ecVolume.VolumeId, shardIdToRecover)
  306. copy(buf, bufs[shardIdToRecover])
  307. return len(buf), nil
  308. }