You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

415 lines
14 KiB

6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
  1. package storage
  2. import (
  3. "context"
  4. "fmt"
  5. "io"
  6. "sort"
  7. "sync"
  8. "time"
  9. "github.com/chrislusf/seaweedfs/weed/glog"
  10. "github.com/chrislusf/seaweedfs/weed/operation"
  11. "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
  12. "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
  13. "github.com/chrislusf/seaweedfs/weed/stats"
  14. "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
  15. "github.com/chrislusf/seaweedfs/weed/storage/needle"
  16. "github.com/chrislusf/seaweedfs/weed/storage/types"
  17. "github.com/klauspost/reedsolomon"
  18. )
  19. func (s *Store) CollectErasureCodingHeartbeat() *master_pb.Heartbeat {
  20. var ecShardMessages []*master_pb.VolumeEcShardInformationMessage
  21. collectionEcShardSize := make(map[string]int64)
  22. for _, location := range s.Locations {
  23. location.ecVolumesLock.RLock()
  24. for _, ecShards := range location.ecVolumes {
  25. ecShardMessages = append(ecShardMessages, ecShards.ToVolumeEcShardInformationMessage()...)
  26. for _, ecShard := range ecShards.Shards {
  27. collectionEcShardSize[ecShards.Collection] += ecShard.Size()
  28. }
  29. }
  30. location.ecVolumesLock.RUnlock()
  31. }
  32. for col, size := range collectionEcShardSize {
  33. stats.VolumeServerDiskSizeGauge.WithLabelValues(col, "ec").Set(float64(size))
  34. }
  35. return &master_pb.Heartbeat{
  36. EcShards: ecShardMessages,
  37. HasNoEcShards: len(ecShardMessages) == 0,
  38. }
  39. }
  40. func (s *Store) MountEcShards(collection string, vid needle.VolumeId, shardId erasure_coding.ShardId) error {
  41. for _, location := range s.Locations {
  42. if err := location.LoadEcShard(collection, vid, shardId); err == nil {
  43. glog.V(0).Infof("MountEcShards %d.%d", vid, shardId)
  44. var shardBits erasure_coding.ShardBits
  45. s.NewEcShardsChan <- master_pb.VolumeEcShardInformationMessage{
  46. Id: uint32(vid),
  47. Collection: collection,
  48. EcIndexBits: uint32(shardBits.AddShardId(shardId)),
  49. }
  50. return nil
  51. } else {
  52. return fmt.Errorf("%s load ec shard %d.%d: %v", location.Directory, vid, shardId, err)
  53. }
  54. }
  55. return fmt.Errorf("MountEcShards %d.%d not found on disk", vid, shardId)
  56. }
  57. func (s *Store) UnmountEcShards(vid needle.VolumeId, shardId erasure_coding.ShardId) error {
  58. ecShard, found := s.findEcShard(vid, shardId)
  59. if !found {
  60. return nil
  61. }
  62. var shardBits erasure_coding.ShardBits
  63. message := master_pb.VolumeEcShardInformationMessage{
  64. Id: uint32(vid),
  65. Collection: ecShard.Collection,
  66. EcIndexBits: uint32(shardBits.AddShardId(shardId)),
  67. }
  68. for _, location := range s.Locations {
  69. if deleted := location.UnloadEcShard(vid, shardId); deleted {
  70. glog.V(0).Infof("UnmountEcShards %d.%d", vid, shardId)
  71. s.DeletedEcShardsChan <- message
  72. return nil
  73. }
  74. }
  75. return fmt.Errorf("UnmountEcShards %d.%d not found on disk", vid, shardId)
  76. }
  77. func (s *Store) findEcShard(vid needle.VolumeId, shardId erasure_coding.ShardId) (*erasure_coding.EcVolumeShard, bool) {
  78. for _, location := range s.Locations {
  79. if v, found := location.FindEcShard(vid, shardId); found {
  80. return v, found
  81. }
  82. }
  83. return nil, false
  84. }
  85. func (s *Store) FindEcVolume(vid needle.VolumeId) (*erasure_coding.EcVolume, bool) {
  86. for _, location := range s.Locations {
  87. if s, found := location.FindEcVolume(vid); found {
  88. return s, true
  89. }
  90. }
  91. return nil, false
  92. }
  93. func (s *Store) DestroyEcVolume(vid needle.VolumeId) {
  94. for _, location := range s.Locations {
  95. location.DestroyEcVolume(vid)
  96. }
  97. }
  98. func (s *Store) ReadEcShardNeedle(ctx context.Context, vid needle.VolumeId, n *needle.Needle) (int, error) {
  99. for _, location := range s.Locations {
  100. if localEcVolume, found := location.FindEcVolume(vid); found {
  101. // read the volume version
  102. readCounter := 0
  103. for localEcVolume.Version == 0 {
  104. err := s.readEcVolumeVersion(ctx, vid, localEcVolume)
  105. readCounter++
  106. if readCounter > 10 && err != nil {
  107. return 0, fmt.Errorf("fail to read ec volume %d: %v", vid, err)
  108. }
  109. time.Sleep(1357 * time.Millisecond)
  110. glog.V(0).Infof("ReadEcShardNeedle vid %d version:%v: %v", vid, localEcVolume.Version, err)
  111. }
  112. version := localEcVolume.Version
  113. offset, size, intervals, err := localEcVolume.LocateEcShardNeedle(n.Id, version)
  114. if err != nil {
  115. return 0, fmt.Errorf("locate in local ec volume: %v", err)
  116. }
  117. if size == types.TombstoneFileSize {
  118. return 0, fmt.Errorf("entry %s is deleted", n.Id)
  119. }
  120. glog.V(3).Infof("read ec volume %d offset %d size %d intervals:%+v", vid, offset.ToAcutalOffset(), size, intervals)
  121. if len(intervals) > 1 {
  122. glog.V(3).Infof("ReadEcShardNeedle needle id %s intervals:%+v", n.String(), intervals)
  123. }
  124. bytes, isDeleted, err := s.readEcShardIntervals(ctx, vid, n.Id, localEcVolume, intervals)
  125. if err != nil {
  126. return 0, fmt.Errorf("ReadEcShardIntervals: %v", err)
  127. }
  128. if isDeleted {
  129. return 0, fmt.Errorf("ec entry %s is deleted", n.Id)
  130. }
  131. err = n.ReadBytes(bytes, offset.ToAcutalOffset(), size, version)
  132. if err != nil {
  133. return 0, fmt.Errorf("readbytes: %v", err)
  134. }
  135. return len(bytes), nil
  136. }
  137. }
  138. return 0, fmt.Errorf("ec shard %d not found", vid)
  139. }
  140. func (s *Store) readEcVolumeVersion(ctx context.Context, vid needle.VolumeId, ecVolume *erasure_coding.EcVolume) (err error) {
  141. interval := erasure_coding.Interval{
  142. BlockIndex: 0,
  143. InnerBlockOffset: 0,
  144. Size: _SuperBlockSize,
  145. IsLargeBlock: true, // it could be large block, but ok in this place
  146. LargeBlockRowsCount: 0,
  147. }
  148. data, _, err := s.readEcShardIntervals(ctx, vid, 0, ecVolume, []erasure_coding.Interval{interval})
  149. if err == nil {
  150. ecVolume.Version = needle.Version(data[0])
  151. }
  152. return
  153. }
  154. func (s *Store) readEcShardIntervals(ctx context.Context, vid needle.VolumeId, needleId types.NeedleId, ecVolume *erasure_coding.EcVolume, intervals []erasure_coding.Interval) (data []byte, is_deleted bool, err error) {
  155. if err = s.cachedLookupEcShardLocations(ctx, ecVolume); err != nil {
  156. return nil, false, fmt.Errorf("failed to locate shard via master grpc %s: %v", s.MasterAddress, err)
  157. }
  158. for i, interval := range intervals {
  159. if d, isDeleted, e := s.readOneEcShardInterval(ctx, needleId, ecVolume, interval); e != nil {
  160. return nil, isDeleted, e
  161. } else {
  162. if isDeleted {
  163. is_deleted = true
  164. }
  165. if i == 0 {
  166. data = d
  167. } else {
  168. data = append(data, d...)
  169. }
  170. }
  171. }
  172. return
  173. }
  174. func (s *Store) readOneEcShardInterval(ctx context.Context, needleId types.NeedleId, ecVolume *erasure_coding.EcVolume, interval erasure_coding.Interval) (data []byte, is_deleted bool, err error) {
  175. shardId, actualOffset := interval.ToShardIdAndOffset(erasure_coding.ErasureCodingLargeBlockSize, erasure_coding.ErasureCodingSmallBlockSize)
  176. data = make([]byte, interval.Size)
  177. if shard, found := ecVolume.FindEcVolumeShard(shardId); found {
  178. if _, err = shard.ReadAt(data, actualOffset); err != nil {
  179. glog.V(0).Infof("read local ec shard %d.%d: %v", ecVolume.VolumeId, shardId, err)
  180. return
  181. }
  182. } else {
  183. ecVolume.ShardLocationsLock.RLock()
  184. sourceDataNodes, hasShardIdLocation := ecVolume.ShardLocations[shardId]
  185. ecVolume.ShardLocationsLock.RUnlock()
  186. // try reading directly
  187. if hasShardIdLocation {
  188. _, is_deleted, err = s.readRemoteEcShardInterval(ctx, sourceDataNodes, needleId, ecVolume.VolumeId, shardId, data, actualOffset)
  189. if err == nil {
  190. return
  191. }
  192. glog.V(0).Infof("clearing ec shard %d.%d locations: %v", ecVolume.VolumeId, shardId, err)
  193. forgetShardId(ecVolume, shardId)
  194. }
  195. // try reading by recovering from other shards
  196. _, is_deleted, err = s.recoverOneRemoteEcShardInterval(ctx, needleId, ecVolume, shardId, data, actualOffset)
  197. if err == nil {
  198. return
  199. }
  200. glog.V(0).Infof("recover ec shard %d.%d : %v", ecVolume.VolumeId, shardId, err)
  201. }
  202. return
  203. }
  204. func forgetShardId(ecVolume *erasure_coding.EcVolume, shardId erasure_coding.ShardId) {
  205. // failed to access the source data nodes, clear it up
  206. ecVolume.ShardLocationsLock.Lock()
  207. delete(ecVolume.ShardLocations, shardId)
  208. ecVolume.ShardLocationsLock.Unlock()
  209. }
  210. func (s *Store) cachedLookupEcShardLocations(ctx context.Context, ecVolume *erasure_coding.EcVolume) (err error) {
  211. shardCount := len(ecVolume.ShardLocations)
  212. if shardCount < erasure_coding.DataShardsCount &&
  213. ecVolume.ShardLocationsRefreshTime.Add(11*time.Second).After(time.Now()) ||
  214. shardCount == erasure_coding.TotalShardsCount &&
  215. ecVolume.ShardLocationsRefreshTime.Add(37*time.Minute).After(time.Now()) ||
  216. shardCount >= erasure_coding.DataShardsCount &&
  217. ecVolume.ShardLocationsRefreshTime.Add(7*time.Minute).After(time.Now()) {
  218. // still fresh
  219. return nil
  220. }
  221. glog.V(3).Infof("lookup and cache ec volume %d locations", ecVolume.VolumeId)
  222. err = operation.WithMasterServerClient(s.MasterAddress, s.grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
  223. req := &master_pb.LookupEcVolumeRequest{
  224. VolumeId: uint32(ecVolume.VolumeId),
  225. }
  226. resp, err := masterClient.LookupEcVolume(ctx, req)
  227. if err != nil {
  228. return fmt.Errorf("lookup ec volume %d: %v", ecVolume.VolumeId, err)
  229. }
  230. if len(resp.ShardIdLocations) < erasure_coding.DataShardsCount {
  231. return fmt.Errorf("only %d shards found but %d required", len(resp.ShardIdLocations), erasure_coding.DataShardsCount)
  232. }
  233. ecVolume.ShardLocationsLock.Lock()
  234. for _, shardIdLocations := range resp.ShardIdLocations {
  235. shardId := erasure_coding.ShardId(shardIdLocations.ShardId)
  236. delete(ecVolume.ShardLocations, shardId)
  237. for _, loc := range shardIdLocations.Locations {
  238. ecVolume.ShardLocations[shardId] = append(ecVolume.ShardLocations[shardId], loc.Url)
  239. }
  240. }
  241. ecVolume.ShardLocationsRefreshTime = time.Now()
  242. ecVolume.ShardLocationsLock.Unlock()
  243. return nil
  244. })
  245. return
  246. }
  247. func (s *Store) readRemoteEcShardInterval(ctx context.Context, sourceDataNodes []string, needleId types.NeedleId, vid needle.VolumeId, shardId erasure_coding.ShardId, buf []byte, offset int64) (n int, is_deleted bool, err error) {
  248. if len(sourceDataNodes) == 0 {
  249. return 0, false, fmt.Errorf("failed to find ec shard %d.%d", vid, shardId)
  250. }
  251. for _, sourceDataNode := range sourceDataNodes {
  252. glog.V(3).Infof("read remote ec shard %d.%d from %s", vid, shardId, sourceDataNode)
  253. n, is_deleted, err = s.doReadRemoteEcShardInterval(ctx, sourceDataNode, needleId, vid, shardId, buf, offset)
  254. if err == nil {
  255. return
  256. }
  257. glog.V(1).Infof("read remote ec shard %d.%d from %s: %v", vid, shardId, sourceDataNode, err)
  258. }
  259. return
  260. }
  261. func (s *Store) doReadRemoteEcShardInterval(ctx context.Context, sourceDataNode string, needleId types.NeedleId, vid needle.VolumeId, shardId erasure_coding.ShardId, buf []byte, offset int64) (n int, is_deleted bool, err error) {
  262. err = operation.WithVolumeServerClient(sourceDataNode, s.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
  263. // copy data slice
  264. shardReadClient, err := client.VolumeEcShardRead(ctx, &volume_server_pb.VolumeEcShardReadRequest{
  265. VolumeId: uint32(vid),
  266. ShardId: uint32(shardId),
  267. Offset: offset,
  268. Size: int64(len(buf)),
  269. FileKey: uint64(needleId),
  270. })
  271. if err != nil {
  272. return fmt.Errorf("failed to start reading ec shard %d.%d from %s: %v", vid, shardId, sourceDataNode, err)
  273. }
  274. for {
  275. resp, receiveErr := shardReadClient.Recv()
  276. if receiveErr == io.EOF {
  277. break
  278. }
  279. if receiveErr != nil {
  280. return fmt.Errorf("receiving ec shard %d.%d from %s: %v", vid, shardId, sourceDataNode, err)
  281. }
  282. if resp.IsDeleted {
  283. is_deleted = true
  284. }
  285. copy(buf[n:n+len(resp.Data)], resp.Data)
  286. n += len(resp.Data)
  287. }
  288. return nil
  289. })
  290. if err != nil {
  291. return 0, is_deleted, fmt.Errorf("read ec shard %d.%d from %s: %v", vid, shardId, sourceDataNode, err)
  292. }
  293. return
  294. }
  295. func (s *Store) recoverOneRemoteEcShardInterval(ctx context.Context, needleId types.NeedleId, ecVolume *erasure_coding.EcVolume, shardIdToRecover erasure_coding.ShardId, buf []byte, offset int64) (n int, is_deleted bool, err error) {
  296. glog.V(3).Infof("recover ec shard %d.%d from other locations", ecVolume.VolumeId, shardIdToRecover)
  297. enc, err := reedsolomon.New(erasure_coding.DataShardsCount, erasure_coding.ParityShardsCount)
  298. if err != nil {
  299. return 0, false, fmt.Errorf("failed to create encoder: %v", err)
  300. }
  301. bufs := make([][]byte, erasure_coding.TotalShardsCount)
  302. var wg sync.WaitGroup
  303. ecVolume.ShardLocationsLock.RLock()
  304. for shardId, locations := range ecVolume.ShardLocations {
  305. // skip currnent shard or empty shard
  306. if shardId == shardIdToRecover {
  307. continue
  308. }
  309. if len(locations) == 0 {
  310. glog.V(3).Infof("readRemoteEcShardInterval missing %d.%d from %+v", ecVolume.VolumeId, shardId, locations)
  311. continue
  312. }
  313. // read from remote locations
  314. wg.Add(1)
  315. go func(shardId erasure_coding.ShardId, locations []string) {
  316. defer wg.Done()
  317. data := make([]byte, len(buf))
  318. nRead, isDeleted, readErr := s.readRemoteEcShardInterval(ctx, locations, needleId, ecVolume.VolumeId, shardId, data, offset)
  319. if readErr != nil {
  320. glog.V(3).Infof("recover: readRemoteEcShardInterval %d.%d %d bytes from %+v: %v", ecVolume.VolumeId, shardId, nRead, locations, readErr)
  321. forgetShardId(ecVolume, shardId)
  322. }
  323. if isDeleted {
  324. is_deleted = true
  325. }
  326. if nRead == len(buf) {
  327. bufs[shardId] = data
  328. }
  329. }(shardId, locations)
  330. }
  331. ecVolume.ShardLocationsLock.RUnlock()
  332. wg.Wait()
  333. if err = enc.ReconstructData(bufs); err != nil {
  334. glog.V(3).Infof("recovered ec shard %d.%d failed: %v", ecVolume.VolumeId, shardIdToRecover, err)
  335. return 0, false, err
  336. }
  337. glog.V(4).Infof("recovered ec shard %d.%d from other locations", ecVolume.VolumeId, shardIdToRecover)
  338. copy(buf, bufs[shardIdToRecover])
  339. return len(buf), is_deleted, nil
  340. }
  341. func (s *Store) EcVolumes() (ecVolumes []*erasure_coding.EcVolume) {
  342. for _, location := range s.Locations {
  343. location.ecVolumesLock.RLock()
  344. for _, v := range location.ecVolumes {
  345. ecVolumes = append(ecVolumes, v)
  346. }
  347. location.ecVolumesLock.RUnlock()
  348. }
  349. sort.Slice(ecVolumes, func(i, j int) bool {
  350. return ecVolumes[i].VolumeId > ecVolumes[j].VolumeId
  351. })
  352. return ecVolumes
  353. }