You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

382 lines
12 KiB

6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
  1. package storage
  2. import (
  3. "context"
  4. "fmt"
  5. "io"
  6. "sort"
  7. "sync"
  8. "time"
  9. "github.com/chrislusf/seaweedfs/weed/glog"
  10. "github.com/chrislusf/seaweedfs/weed/operation"
  11. "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
  12. "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
  13. "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
  14. "github.com/chrislusf/seaweedfs/weed/storage/needle"
  15. "github.com/klauspost/reedsolomon"
  16. )
  17. func (s *Store) CollectErasureCodingHeartbeat() *master_pb.Heartbeat {
  18. var ecShardMessages []*master_pb.VolumeEcShardInformationMessage
  19. for _, location := range s.Locations {
  20. location.ecVolumesLock.RLock()
  21. for _, ecShards := range location.ecVolumes {
  22. ecShardMessages = append(ecShardMessages, ecShards.ToVolumeEcShardInformationMessage()...)
  23. }
  24. location.ecVolumesLock.RUnlock()
  25. }
  26. return &master_pb.Heartbeat{
  27. EcShards: ecShardMessages,
  28. }
  29. }
  30. func (s *Store) MountEcShards(collection string, vid needle.VolumeId, shardId erasure_coding.ShardId) error {
  31. for _, location := range s.Locations {
  32. if err := location.LoadEcShard(collection, vid, shardId); err == nil {
  33. glog.V(0).Infof("MountEcShards %d.%d", vid, shardId)
  34. var shardBits erasure_coding.ShardBits
  35. s.NewEcShardsChan <- master_pb.VolumeEcShardInformationMessage{
  36. Id: uint32(vid),
  37. Collection: collection,
  38. EcIndexBits: uint32(shardBits.AddShardId(shardId)),
  39. }
  40. return nil
  41. } else {
  42. return fmt.Errorf("%s load ec shard %d.%d: %v", location.Directory, vid, shardId, err)
  43. }
  44. }
  45. return fmt.Errorf("MountEcShards %d.%d not found on disk", vid, shardId)
  46. }
  47. func (s *Store) UnmountEcShards(vid needle.VolumeId, shardId erasure_coding.ShardId) error {
  48. ecShard, found := s.findEcShard(vid, shardId)
  49. if !found {
  50. return nil
  51. }
  52. var shardBits erasure_coding.ShardBits
  53. message := master_pb.VolumeEcShardInformationMessage{
  54. Id: uint32(vid),
  55. Collection: ecShard.Collection,
  56. EcIndexBits: uint32(shardBits.AddShardId(shardId)),
  57. }
  58. for _, location := range s.Locations {
  59. if deleted := location.UnloadEcShard(vid, shardId); deleted {
  60. glog.V(0).Infof("UnmountEcShards %d.%d", vid, shardId)
  61. s.DeletedEcShardsChan <- message
  62. return nil
  63. }
  64. }
  65. return fmt.Errorf("UnmountEcShards %d.%d not found on disk", vid, shardId)
  66. }
  67. func (s *Store) findEcShard(vid needle.VolumeId, shardId erasure_coding.ShardId) (*erasure_coding.EcVolumeShard, bool) {
  68. for _, location := range s.Locations {
  69. if v, found := location.FindEcShard(vid, shardId); found {
  70. return v, found
  71. }
  72. }
  73. return nil, false
  74. }
  75. func (s *Store) FindEcVolume(vid needle.VolumeId) (*erasure_coding.EcVolume, bool) {
  76. for _, location := range s.Locations {
  77. if s, found := location.FindEcVolume(vid); found {
  78. return s, true
  79. }
  80. }
  81. return nil, false
  82. }
  83. func (s *Store) DestroyEcVolume(vid needle.VolumeId) {
  84. for _, location := range s.Locations {
  85. location.DestroyEcVolume(vid)
  86. }
  87. }
  88. func (s *Store) ReadEcShardNeedle(ctx context.Context, vid needle.VolumeId, n *needle.Needle) (int, error) {
  89. for _, location := range s.Locations {
  90. if localEcVolume, found := location.FindEcVolume(vid); found {
  91. // read the volume version
  92. for localEcVolume.Version == 0 {
  93. err := s.readEcVolumeVersion(ctx, vid, localEcVolume)
  94. time.Sleep(1357 * time.Millisecond)
  95. glog.V(0).Infof("ReadEcShardNeedle vid %d version:%v: %v", vid, localEcVolume.Version, err)
  96. }
  97. version := localEcVolume.Version
  98. offset, size, intervals, err := localEcVolume.LocateEcShardNeedle(n, version)
  99. if err != nil {
  100. return 0, err
  101. }
  102. glog.V(4).Infof("read ec volume %d offset %d size %d intervals:%+v", vid, offset.ToAcutalOffset(), size, intervals)
  103. if len(intervals) > 1 {
  104. glog.V(4).Infof("ReadEcShardNeedle needle id %s intervals:%+v", n.String(), intervals)
  105. }
  106. bytes, err := s.readEcShardIntervals(ctx, vid, localEcVolume, intervals)
  107. if err != nil {
  108. return 0, fmt.Errorf("ReadEcShardIntervals: %v", err)
  109. }
  110. err = n.ReadBytes(bytes, offset.ToAcutalOffset(), size, version)
  111. if err != nil {
  112. return 0, fmt.Errorf("readbytes: %v", err)
  113. }
  114. return len(bytes), nil
  115. }
  116. }
  117. return 0, fmt.Errorf("ec shard %d not found", vid)
  118. }
  119. func (s *Store) readEcVolumeVersion(ctx context.Context, vid needle.VolumeId, ecVolume *erasure_coding.EcVolume) (err error) {
  120. interval := erasure_coding.Interval{
  121. BlockIndex: 0,
  122. InnerBlockOffset: 0,
  123. Size: _SuperBlockSize,
  124. IsLargeBlock: true, // it could be large block, but ok in this place
  125. LargeBlockRowsCount: 0,
  126. }
  127. data, err := s.readEcShardIntervals(ctx, vid, ecVolume, []erasure_coding.Interval{interval})
  128. if err == nil {
  129. ecVolume.Version = needle.Version(data[0])
  130. }
  131. return
  132. }
  133. func (s *Store) readEcShardIntervals(ctx context.Context, vid needle.VolumeId, ecVolume *erasure_coding.EcVolume, intervals []erasure_coding.Interval) (data []byte, err error) {
  134. if err = s.cachedLookupEcShardLocations(ctx, ecVolume); err != nil {
  135. return nil, fmt.Errorf("failed to locate shard via master grpc %s: %v", s.MasterAddress, err)
  136. }
  137. for i, interval := range intervals {
  138. if d, e := s.readOneEcShardInterval(ctx, ecVolume, interval); e != nil {
  139. return nil, e
  140. } else {
  141. if i == 0 {
  142. data = d
  143. } else {
  144. data = append(data, d...)
  145. }
  146. }
  147. }
  148. return
  149. }
  150. func (s *Store) readOneEcShardInterval(ctx context.Context, ecVolume *erasure_coding.EcVolume, interval erasure_coding.Interval) (data []byte, err error) {
  151. shardId, actualOffset := interval.ToShardIdAndOffset(erasure_coding.ErasureCodingLargeBlockSize, erasure_coding.ErasureCodingSmallBlockSize)
  152. data = make([]byte, interval.Size)
  153. if shard, found := ecVolume.FindEcVolumeShard(shardId); found {
  154. if _, err = shard.ReadAt(data, actualOffset); err != nil {
  155. glog.V(0).Infof("read local ec shard %d.%d: %v", ecVolume.VolumeId, shardId, err)
  156. return
  157. }
  158. } else {
  159. ecVolume.ShardLocationsLock.RLock()
  160. sourceDataNodes, hasShardIdLocation := ecVolume.ShardLocations[shardId]
  161. ecVolume.ShardLocationsLock.RUnlock()
  162. // try reading directly
  163. if hasShardIdLocation {
  164. _, err = s.readRemoteEcShardInterval(ctx, sourceDataNodes, ecVolume.VolumeId, shardId, data, actualOffset)
  165. if err == nil {
  166. return
  167. }
  168. glog.V(0).Infof("clearing ec shard %d.%d locations: %v", ecVolume.VolumeId, shardId, err)
  169. forgetShardId(ecVolume, shardId)
  170. }
  171. // try reading by recovering from other shards
  172. _, err = s.recoverOneRemoteEcShardInterval(ctx, ecVolume, shardId, data, actualOffset)
  173. if err == nil {
  174. return
  175. }
  176. glog.V(0).Infof("recover ec shard %d.%d : %v", ecVolume.VolumeId, shardId, err)
  177. }
  178. return
  179. }
  180. func forgetShardId(ecVolume *erasure_coding.EcVolume, shardId erasure_coding.ShardId) {
  181. // failed to access the source data nodes, clear it up
  182. ecVolume.ShardLocationsLock.Lock()
  183. delete(ecVolume.ShardLocations, shardId)
  184. ecVolume.ShardLocationsLock.Unlock()
  185. }
  186. func (s *Store) cachedLookupEcShardLocations(ctx context.Context, ecVolume *erasure_coding.EcVolume) (err error) {
  187. shardCount := len(ecVolume.ShardLocations)
  188. if shardCount < erasure_coding.DataShardsCount &&
  189. ecVolume.ShardLocationsRefreshTime.Add(11*time.Second).After(time.Now()) ||
  190. shardCount == erasure_coding.TotalShardsCount &&
  191. ecVolume.ShardLocationsRefreshTime.Add(37*time.Minute).After(time.Now()) ||
  192. shardCount >= erasure_coding.DataShardsCount &&
  193. ecVolume.ShardLocationsRefreshTime.Add(7*time.Minute).After(time.Now()) {
  194. // still fresh
  195. return nil
  196. }
  197. glog.V(3).Infof("lookup and cache ec volume %d locations", ecVolume.VolumeId)
  198. err = operation.WithMasterServerClient(s.MasterAddress, s.grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
  199. req := &master_pb.LookupEcVolumeRequest{
  200. VolumeId: uint32(ecVolume.VolumeId),
  201. }
  202. resp, err := masterClient.LookupEcVolume(ctx, req)
  203. if err != nil {
  204. return fmt.Errorf("lookup ec volume %d: %v", ecVolume.VolumeId, err)
  205. }
  206. if len(resp.ShardIdLocations) < erasure_coding.DataShardsCount {
  207. return fmt.Errorf("only %d shards found but %d required", len(resp.ShardIdLocations), erasure_coding.DataShardsCount)
  208. }
  209. ecVolume.ShardLocationsLock.Lock()
  210. for _, shardIdLocations := range resp.ShardIdLocations {
  211. shardId := erasure_coding.ShardId(shardIdLocations.ShardId)
  212. delete(ecVolume.ShardLocations, shardId)
  213. for _, loc := range shardIdLocations.Locations {
  214. ecVolume.ShardLocations[shardId] = append(ecVolume.ShardLocations[shardId], loc.Url)
  215. }
  216. }
  217. ecVolume.ShardLocationsRefreshTime = time.Now()
  218. ecVolume.ShardLocationsLock.Unlock()
  219. return nil
  220. })
  221. return
  222. }
  223. func (s *Store) readRemoteEcShardInterval(ctx context.Context, sourceDataNodes []string, vid needle.VolumeId, shardId erasure_coding.ShardId, buf []byte, offset int64) (n int, err error) {
  224. if len(sourceDataNodes) == 0 {
  225. return 0, fmt.Errorf("failed to find ec shard %d.%d", vid, shardId)
  226. }
  227. for _, sourceDataNode := range sourceDataNodes {
  228. glog.V(4).Infof("read remote ec shard %d.%d from %s", vid, shardId, sourceDataNode)
  229. n, err = s.doReadRemoteEcShardInterval(ctx, sourceDataNode, vid, shardId, buf, offset)
  230. if err == nil {
  231. return
  232. }
  233. glog.V(1).Infof("read remote ec shard %d.%d from %s: %v", vid, shardId, sourceDataNode, err)
  234. }
  235. return
  236. }
  237. func (s *Store) doReadRemoteEcShardInterval(ctx context.Context, sourceDataNode string, vid needle.VolumeId, shardId erasure_coding.ShardId, buf []byte, offset int64) (n int, err error) {
  238. err = operation.WithVolumeServerClient(sourceDataNode, s.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
  239. // copy data slice
  240. shardReadClient, err := client.VolumeEcShardRead(ctx, &volume_server_pb.VolumeEcShardReadRequest{
  241. VolumeId: uint32(vid),
  242. ShardId: uint32(shardId),
  243. Offset: offset,
  244. Size: int64(len(buf)),
  245. })
  246. if err != nil {
  247. return fmt.Errorf("failed to start reading ec shard %d.%d from %s: %v", vid, shardId, sourceDataNode, err)
  248. }
  249. for {
  250. resp, receiveErr := shardReadClient.Recv()
  251. if receiveErr == io.EOF {
  252. break
  253. }
  254. if receiveErr != nil {
  255. return fmt.Errorf("receiving ec shard %d.%d from %s: %v", vid, shardId, sourceDataNode, err)
  256. }
  257. copy(buf[n:n+len(resp.Data)], resp.Data)
  258. n += len(resp.Data)
  259. }
  260. return nil
  261. })
  262. if err != nil {
  263. return 0, fmt.Errorf("read ec shard %d.%d from %s: %v", vid, shardId, sourceDataNode, err)
  264. }
  265. return
  266. }
  267. func (s *Store) recoverOneRemoteEcShardInterval(ctx context.Context, ecVolume *erasure_coding.EcVolume, shardIdToRecover erasure_coding.ShardId, buf []byte, offset int64) (n int, err error) {
  268. glog.V(4).Infof("recover ec shard %d.%d from other locations", ecVolume.VolumeId, shardIdToRecover)
  269. enc, err := reedsolomon.New(erasure_coding.DataShardsCount, erasure_coding.ParityShardsCount)
  270. if err != nil {
  271. return 0, fmt.Errorf("failed to create encoder: %v", err)
  272. }
  273. bufs := make([][]byte, erasure_coding.TotalShardsCount)
  274. var wg sync.WaitGroup
  275. ecVolume.ShardLocationsLock.RLock()
  276. for shardId, locations := range ecVolume.ShardLocations {
  277. // skip currnent shard or empty shard
  278. if shardId == shardIdToRecover {
  279. continue
  280. }
  281. if len(locations) == 0 {
  282. glog.V(3).Infof("readRemoteEcShardInterval missing %d.%d from %+v", ecVolume.VolumeId, shardId, locations)
  283. continue
  284. }
  285. // read from remote locations
  286. wg.Add(1)
  287. go func(shardId erasure_coding.ShardId, locations []string) {
  288. defer wg.Done()
  289. data := make([]byte, len(buf))
  290. nRead, readErr := s.readRemoteEcShardInterval(ctx, locations, ecVolume.VolumeId, shardId, data, offset)
  291. if readErr != nil {
  292. glog.V(3).Infof("recover: readRemoteEcShardInterval %d.%d %d bytes from %+v: %v", ecVolume.VolumeId, shardId, nRead, locations, readErr)
  293. forgetShardId(ecVolume, shardId)
  294. }
  295. if nRead == len(buf) {
  296. bufs[shardId] = data
  297. }
  298. }(shardId, locations)
  299. }
  300. ecVolume.ShardLocationsLock.RUnlock()
  301. wg.Wait()
  302. if err = enc.ReconstructData(bufs); err != nil {
  303. glog.V(3).Infof("recovered ec shard %d.%d failed: %v", ecVolume.VolumeId, shardIdToRecover, err)
  304. return 0, err
  305. }
  306. glog.V(4).Infof("recovered ec shard %d.%d from other locations", ecVolume.VolumeId, shardIdToRecover)
  307. copy(buf, bufs[shardIdToRecover])
  308. return len(buf), nil
  309. }
  310. func (s *Store) EcVolumes() (ecVolumes []*erasure_coding.EcVolume) {
  311. for _, location := range s.Locations {
  312. location.ecVolumesLock.RLock()
  313. for _, v := range location.ecVolumes {
  314. ecVolumes = append(ecVolumes, v)
  315. }
  316. location.ecVolumesLock.RUnlock()
  317. }
  318. sort.Slice(ecVolumes, func(i, j int) bool {
  319. return ecVolumes[i].VolumeId > ecVolumes[j].VolumeId
  320. })
  321. return ecVolumes
  322. }