You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

393 lines
13 KiB

6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
  1. package storage
  2. import (
  3. "context"
  4. "fmt"
  5. "io"
  6. "sort"
  7. "sync"
  8. "time"
  9. "github.com/chrislusf/seaweedfs/weed/glog"
  10. "github.com/chrislusf/seaweedfs/weed/operation"
  11. "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
  12. "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
  13. "github.com/chrislusf/seaweedfs/weed/stats"
  14. "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
  15. "github.com/chrislusf/seaweedfs/weed/storage/needle"
  16. "github.com/klauspost/reedsolomon"
  17. )
  18. func (s *Store) CollectErasureCodingHeartbeat() *master_pb.Heartbeat {
  19. var ecShardMessages []*master_pb.VolumeEcShardInformationMessage
  20. collectionEcShardSize := make(map[string]int64)
  21. for _, location := range s.Locations {
  22. location.ecVolumesLock.RLock()
  23. for _, ecShards := range location.ecVolumes {
  24. ecShardMessages = append(ecShardMessages, ecShards.ToVolumeEcShardInformationMessage()...)
  25. for _, ecShard := range ecShards.Shards {
  26. collectionEcShardSize[ecShards.Collection] += ecShard.Size()
  27. }
  28. }
  29. location.ecVolumesLock.RUnlock()
  30. }
  31. for col, size := range collectionEcShardSize {
  32. stats.VolumeServerDiskSizeGauge.WithLabelValues(col, "ec").Set(float64(size))
  33. }
  34. return &master_pb.Heartbeat{
  35. EcShards: ecShardMessages,
  36. HasNoEcShards: len(ecShardMessages) == 0,
  37. }
  38. }
  39. func (s *Store) MountEcShards(collection string, vid needle.VolumeId, shardId erasure_coding.ShardId) error {
  40. for _, location := range s.Locations {
  41. if err := location.LoadEcShard(collection, vid, shardId); err == nil {
  42. glog.V(0).Infof("MountEcShards %d.%d", vid, shardId)
  43. var shardBits erasure_coding.ShardBits
  44. s.NewEcShardsChan <- master_pb.VolumeEcShardInformationMessage{
  45. Id: uint32(vid),
  46. Collection: collection,
  47. EcIndexBits: uint32(shardBits.AddShardId(shardId)),
  48. }
  49. return nil
  50. } else {
  51. return fmt.Errorf("%s load ec shard %d.%d: %v", location.Directory, vid, shardId, err)
  52. }
  53. }
  54. return fmt.Errorf("MountEcShards %d.%d not found on disk", vid, shardId)
  55. }
  56. func (s *Store) UnmountEcShards(vid needle.VolumeId, shardId erasure_coding.ShardId) error {
  57. ecShard, found := s.findEcShard(vid, shardId)
  58. if !found {
  59. return nil
  60. }
  61. var shardBits erasure_coding.ShardBits
  62. message := master_pb.VolumeEcShardInformationMessage{
  63. Id: uint32(vid),
  64. Collection: ecShard.Collection,
  65. EcIndexBits: uint32(shardBits.AddShardId(shardId)),
  66. }
  67. for _, location := range s.Locations {
  68. if deleted := location.UnloadEcShard(vid, shardId); deleted {
  69. glog.V(0).Infof("UnmountEcShards %d.%d", vid, shardId)
  70. s.DeletedEcShardsChan <- message
  71. return nil
  72. }
  73. }
  74. return fmt.Errorf("UnmountEcShards %d.%d not found on disk", vid, shardId)
  75. }
  76. func (s *Store) findEcShard(vid needle.VolumeId, shardId erasure_coding.ShardId) (*erasure_coding.EcVolumeShard, bool) {
  77. for _, location := range s.Locations {
  78. if v, found := location.FindEcShard(vid, shardId); found {
  79. return v, found
  80. }
  81. }
  82. return nil, false
  83. }
  84. func (s *Store) FindEcVolume(vid needle.VolumeId) (*erasure_coding.EcVolume, bool) {
  85. for _, location := range s.Locations {
  86. if s, found := location.FindEcVolume(vid); found {
  87. return s, true
  88. }
  89. }
  90. return nil, false
  91. }
  92. func (s *Store) DestroyEcVolume(vid needle.VolumeId) {
  93. for _, location := range s.Locations {
  94. location.DestroyEcVolume(vid)
  95. }
  96. }
  97. func (s *Store) ReadEcShardNeedle(ctx context.Context, vid needle.VolumeId, n *needle.Needle) (int, error) {
  98. for _, location := range s.Locations {
  99. if localEcVolume, found := location.FindEcVolume(vid); found {
  100. // read the volume version
  101. for localEcVolume.Version == 0 {
  102. err := s.readEcVolumeVersion(ctx, vid, localEcVolume)
  103. time.Sleep(1357 * time.Millisecond)
  104. glog.V(0).Infof("ReadEcShardNeedle vid %d version:%v: %v", vid, localEcVolume.Version, err)
  105. }
  106. version := localEcVolume.Version
  107. offset, size, intervals, err := localEcVolume.LocateEcShardNeedle(n, version)
  108. if err != nil {
  109. return 0, fmt.Errorf("locate in local ec volume: %v", err)
  110. }
  111. glog.V(4).Infof("read ec volume %d offset %d size %d intervals:%+v", vid, offset.ToAcutalOffset(), size, intervals)
  112. if len(intervals) > 1 {
  113. glog.V(4).Infof("ReadEcShardNeedle needle id %s intervals:%+v", n.String(), intervals)
  114. }
  115. bytes, err := s.readEcShardIntervals(ctx, vid, localEcVolume, intervals)
  116. if err != nil {
  117. return 0, fmt.Errorf("ReadEcShardIntervals: %v", err)
  118. }
  119. err = n.ReadBytes(bytes, offset.ToAcutalOffset(), size, version)
  120. if err != nil {
  121. return 0, fmt.Errorf("readbytes: %v", err)
  122. }
  123. return len(bytes), nil
  124. }
  125. }
  126. return 0, fmt.Errorf("ec shard %d not found", vid)
  127. }
  128. func (s *Store) readEcVolumeVersion(ctx context.Context, vid needle.VolumeId, ecVolume *erasure_coding.EcVolume) (err error) {
  129. interval := erasure_coding.Interval{
  130. BlockIndex: 0,
  131. InnerBlockOffset: 0,
  132. Size: _SuperBlockSize,
  133. IsLargeBlock: true, // it could be large block, but ok in this place
  134. LargeBlockRowsCount: 0,
  135. }
  136. data, err := s.readEcShardIntervals(ctx, vid, ecVolume, []erasure_coding.Interval{interval})
  137. if err == nil {
  138. ecVolume.Version = needle.Version(data[0])
  139. }
  140. return
  141. }
  142. func (s *Store) readEcShardIntervals(ctx context.Context, vid needle.VolumeId, ecVolume *erasure_coding.EcVolume, intervals []erasure_coding.Interval) (data []byte, err error) {
  143. if err = s.cachedLookupEcShardLocations(ctx, ecVolume); err != nil {
  144. return nil, fmt.Errorf("failed to locate shard via master grpc %s: %v", s.MasterAddress, err)
  145. }
  146. for i, interval := range intervals {
  147. if d, e := s.readOneEcShardInterval(ctx, ecVolume, interval); e != nil {
  148. return nil, e
  149. } else {
  150. if i == 0 {
  151. data = d
  152. } else {
  153. data = append(data, d...)
  154. }
  155. }
  156. }
  157. return
  158. }
  159. func (s *Store) readOneEcShardInterval(ctx context.Context, ecVolume *erasure_coding.EcVolume, interval erasure_coding.Interval) (data []byte, err error) {
  160. shardId, actualOffset := interval.ToShardIdAndOffset(erasure_coding.ErasureCodingLargeBlockSize, erasure_coding.ErasureCodingSmallBlockSize)
  161. data = make([]byte, interval.Size)
  162. if shard, found := ecVolume.FindEcVolumeShard(shardId); found {
  163. if _, err = shard.ReadAt(data, actualOffset); err != nil {
  164. glog.V(0).Infof("read local ec shard %d.%d: %v", ecVolume.VolumeId, shardId, err)
  165. return
  166. }
  167. } else {
  168. ecVolume.ShardLocationsLock.RLock()
  169. sourceDataNodes, hasShardIdLocation := ecVolume.ShardLocations[shardId]
  170. ecVolume.ShardLocationsLock.RUnlock()
  171. // try reading directly
  172. if hasShardIdLocation {
  173. _, err = s.readRemoteEcShardInterval(ctx, sourceDataNodes, ecVolume.VolumeId, shardId, data, actualOffset)
  174. if err == nil {
  175. return
  176. }
  177. glog.V(0).Infof("clearing ec shard %d.%d locations: %v", ecVolume.VolumeId, shardId, err)
  178. forgetShardId(ecVolume, shardId)
  179. }
  180. // try reading by recovering from other shards
  181. _, err = s.recoverOneRemoteEcShardInterval(ctx, ecVolume, shardId, data, actualOffset)
  182. if err == nil {
  183. return
  184. }
  185. glog.V(0).Infof("recover ec shard %d.%d : %v", ecVolume.VolumeId, shardId, err)
  186. }
  187. return
  188. }
  189. func forgetShardId(ecVolume *erasure_coding.EcVolume, shardId erasure_coding.ShardId) {
  190. // failed to access the source data nodes, clear it up
  191. ecVolume.ShardLocationsLock.Lock()
  192. delete(ecVolume.ShardLocations, shardId)
  193. ecVolume.ShardLocationsLock.Unlock()
  194. }
  195. func (s *Store) cachedLookupEcShardLocations(ctx context.Context, ecVolume *erasure_coding.EcVolume) (err error) {
  196. shardCount := len(ecVolume.ShardLocations)
  197. if shardCount < erasure_coding.DataShardsCount &&
  198. ecVolume.ShardLocationsRefreshTime.Add(11*time.Second).After(time.Now()) ||
  199. shardCount == erasure_coding.TotalShardsCount &&
  200. ecVolume.ShardLocationsRefreshTime.Add(37*time.Minute).After(time.Now()) ||
  201. shardCount >= erasure_coding.DataShardsCount &&
  202. ecVolume.ShardLocationsRefreshTime.Add(7*time.Minute).After(time.Now()) {
  203. // still fresh
  204. return nil
  205. }
  206. glog.V(3).Infof("lookup and cache ec volume %d locations", ecVolume.VolumeId)
  207. err = operation.WithMasterServerClient(s.MasterAddress, s.grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
  208. req := &master_pb.LookupEcVolumeRequest{
  209. VolumeId: uint32(ecVolume.VolumeId),
  210. }
  211. resp, err := masterClient.LookupEcVolume(ctx, req)
  212. if err != nil {
  213. return fmt.Errorf("lookup ec volume %d: %v", ecVolume.VolumeId, err)
  214. }
  215. if len(resp.ShardIdLocations) < erasure_coding.DataShardsCount {
  216. return fmt.Errorf("only %d shards found but %d required", len(resp.ShardIdLocations), erasure_coding.DataShardsCount)
  217. }
  218. ecVolume.ShardLocationsLock.Lock()
  219. for _, shardIdLocations := range resp.ShardIdLocations {
  220. shardId := erasure_coding.ShardId(shardIdLocations.ShardId)
  221. delete(ecVolume.ShardLocations, shardId)
  222. for _, loc := range shardIdLocations.Locations {
  223. ecVolume.ShardLocations[shardId] = append(ecVolume.ShardLocations[shardId], loc.Url)
  224. }
  225. }
  226. ecVolume.ShardLocationsRefreshTime = time.Now()
  227. ecVolume.ShardLocationsLock.Unlock()
  228. return nil
  229. })
  230. return
  231. }
  232. func (s *Store) readRemoteEcShardInterval(ctx context.Context, sourceDataNodes []string, vid needle.VolumeId, shardId erasure_coding.ShardId, buf []byte, offset int64) (n int, err error) {
  233. if len(sourceDataNodes) == 0 {
  234. return 0, fmt.Errorf("failed to find ec shard %d.%d", vid, shardId)
  235. }
  236. for _, sourceDataNode := range sourceDataNodes {
  237. glog.V(4).Infof("read remote ec shard %d.%d from %s", vid, shardId, sourceDataNode)
  238. n, err = s.doReadRemoteEcShardInterval(ctx, sourceDataNode, vid, shardId, buf, offset)
  239. if err == nil {
  240. return
  241. }
  242. glog.V(1).Infof("read remote ec shard %d.%d from %s: %v", vid, shardId, sourceDataNode, err)
  243. }
  244. return
  245. }
  246. func (s *Store) doReadRemoteEcShardInterval(ctx context.Context, sourceDataNode string, vid needle.VolumeId, shardId erasure_coding.ShardId, buf []byte, offset int64) (n int, err error) {
  247. err = operation.WithVolumeServerClient(sourceDataNode, s.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
  248. // copy data slice
  249. shardReadClient, err := client.VolumeEcShardRead(ctx, &volume_server_pb.VolumeEcShardReadRequest{
  250. VolumeId: uint32(vid),
  251. ShardId: uint32(shardId),
  252. Offset: offset,
  253. Size: int64(len(buf)),
  254. })
  255. if err != nil {
  256. return fmt.Errorf("failed to start reading ec shard %d.%d from %s: %v", vid, shardId, sourceDataNode, err)
  257. }
  258. for {
  259. resp, receiveErr := shardReadClient.Recv()
  260. if receiveErr == io.EOF {
  261. break
  262. }
  263. if receiveErr != nil {
  264. return fmt.Errorf("receiving ec shard %d.%d from %s: %v", vid, shardId, sourceDataNode, err)
  265. }
  266. copy(buf[n:n+len(resp.Data)], resp.Data)
  267. n += len(resp.Data)
  268. }
  269. return nil
  270. })
  271. if err != nil {
  272. return 0, fmt.Errorf("read ec shard %d.%d from %s: %v", vid, shardId, sourceDataNode, err)
  273. }
  274. return
  275. }
  276. func (s *Store) recoverOneRemoteEcShardInterval(ctx context.Context, ecVolume *erasure_coding.EcVolume, shardIdToRecover erasure_coding.ShardId, buf []byte, offset int64) (n int, err error) {
  277. glog.V(4).Infof("recover ec shard %d.%d from other locations", ecVolume.VolumeId, shardIdToRecover)
  278. enc, err := reedsolomon.New(erasure_coding.DataShardsCount, erasure_coding.ParityShardsCount)
  279. if err != nil {
  280. return 0, fmt.Errorf("failed to create encoder: %v", err)
  281. }
  282. bufs := make([][]byte, erasure_coding.TotalShardsCount)
  283. var wg sync.WaitGroup
  284. ecVolume.ShardLocationsLock.RLock()
  285. for shardId, locations := range ecVolume.ShardLocations {
  286. // skip currnent shard or empty shard
  287. if shardId == shardIdToRecover {
  288. continue
  289. }
  290. if len(locations) == 0 {
  291. glog.V(3).Infof("readRemoteEcShardInterval missing %d.%d from %+v", ecVolume.VolumeId, shardId, locations)
  292. continue
  293. }
  294. // read from remote locations
  295. wg.Add(1)
  296. go func(shardId erasure_coding.ShardId, locations []string) {
  297. defer wg.Done()
  298. data := make([]byte, len(buf))
  299. nRead, readErr := s.readRemoteEcShardInterval(ctx, locations, ecVolume.VolumeId, shardId, data, offset)
  300. if readErr != nil {
  301. glog.V(3).Infof("recover: readRemoteEcShardInterval %d.%d %d bytes from %+v: %v", ecVolume.VolumeId, shardId, nRead, locations, readErr)
  302. forgetShardId(ecVolume, shardId)
  303. }
  304. if nRead == len(buf) {
  305. bufs[shardId] = data
  306. }
  307. }(shardId, locations)
  308. }
  309. ecVolume.ShardLocationsLock.RUnlock()
  310. wg.Wait()
  311. if err = enc.ReconstructData(bufs); err != nil {
  312. glog.V(3).Infof("recovered ec shard %d.%d failed: %v", ecVolume.VolumeId, shardIdToRecover, err)
  313. return 0, err
  314. }
  315. glog.V(4).Infof("recovered ec shard %d.%d from other locations", ecVolume.VolumeId, shardIdToRecover)
  316. copy(buf, bufs[shardIdToRecover])
  317. return len(buf), nil
  318. }
  319. func (s *Store) EcVolumes() (ecVolumes []*erasure_coding.EcVolume) {
  320. for _, location := range s.Locations {
  321. location.ecVolumesLock.RLock()
  322. for _, v := range location.ecVolumes {
  323. ecVolumes = append(ecVolumes, v)
  324. }
  325. location.ecVolumesLock.RUnlock()
  326. }
  327. sort.Slice(ecVolumes, func(i, j int) bool {
  328. return ecVolumes[i].VolumeId > ecVolumes[j].VolumeId
  329. })
  330. return ecVolumes
  331. }