You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

417 lines
14 KiB

6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
  1. package storage
  2. import (
  3. "context"
  4. "fmt"
  5. "io"
  6. "sort"
  7. "sync"
  8. "time"
  9. "github.com/klauspost/reedsolomon"
  10. "github.com/chrislusf/seaweedfs/weed/glog"
  11. "github.com/chrislusf/seaweedfs/weed/operation"
  12. "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
  13. "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
  14. "github.com/chrislusf/seaweedfs/weed/stats"
  15. "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
  16. "github.com/chrislusf/seaweedfs/weed/storage/needle"
  17. "github.com/chrislusf/seaweedfs/weed/storage/super_block"
  18. "github.com/chrislusf/seaweedfs/weed/storage/types"
  19. )
  20. func (s *Store) CollectErasureCodingHeartbeat() *master_pb.Heartbeat {
  21. var ecShardMessages []*master_pb.VolumeEcShardInformationMessage
  22. collectionEcShardSize := make(map[string]int64)
  23. for _, location := range s.Locations {
  24. location.ecVolumesLock.RLock()
  25. for _, ecShards := range location.ecVolumes {
  26. ecShardMessages = append(ecShardMessages, ecShards.ToVolumeEcShardInformationMessage()...)
  27. for _, ecShard := range ecShards.Shards {
  28. collectionEcShardSize[ecShards.Collection] += ecShard.Size()
  29. }
  30. }
  31. location.ecVolumesLock.RUnlock()
  32. }
  33. for col, size := range collectionEcShardSize {
  34. stats.VolumeServerDiskSizeGauge.WithLabelValues(col, "ec").Set(float64(size))
  35. }
  36. return &master_pb.Heartbeat{
  37. EcShards: ecShardMessages,
  38. HasNoEcShards: len(ecShardMessages) == 0,
  39. }
  40. }
  41. func (s *Store) MountEcShards(collection string, vid needle.VolumeId, shardId erasure_coding.ShardId) error {
  42. for _, location := range s.Locations {
  43. if err := location.LoadEcShard(collection, vid, shardId); err == nil {
  44. glog.V(0).Infof("MountEcShards %d.%d", vid, shardId)
  45. var shardBits erasure_coding.ShardBits
  46. s.NewEcShardsChan <- master_pb.VolumeEcShardInformationMessage{
  47. Id: uint32(vid),
  48. Collection: collection,
  49. EcIndexBits: uint32(shardBits.AddShardId(shardId)),
  50. }
  51. return nil
  52. } else {
  53. return fmt.Errorf("%s load ec shard %d.%d: %v", location.Directory, vid, shardId, err)
  54. }
  55. }
  56. return fmt.Errorf("MountEcShards %d.%d not found on disk", vid, shardId)
  57. }
  58. func (s *Store) UnmountEcShards(vid needle.VolumeId, shardId erasure_coding.ShardId) error {
  59. ecShard, found := s.findEcShard(vid, shardId)
  60. if !found {
  61. return nil
  62. }
  63. var shardBits erasure_coding.ShardBits
  64. message := master_pb.VolumeEcShardInformationMessage{
  65. Id: uint32(vid),
  66. Collection: ecShard.Collection,
  67. EcIndexBits: uint32(shardBits.AddShardId(shardId)),
  68. }
  69. for _, location := range s.Locations {
  70. if deleted := location.UnloadEcShard(vid, shardId); deleted {
  71. glog.V(0).Infof("UnmountEcShards %d.%d", vid, shardId)
  72. s.DeletedEcShardsChan <- message
  73. return nil
  74. }
  75. }
  76. return fmt.Errorf("UnmountEcShards %d.%d not found on disk", vid, shardId)
  77. }
  78. func (s *Store) findEcShard(vid needle.VolumeId, shardId erasure_coding.ShardId) (*erasure_coding.EcVolumeShard, bool) {
  79. for _, location := range s.Locations {
  80. if v, found := location.FindEcShard(vid, shardId); found {
  81. return v, found
  82. }
  83. }
  84. return nil, false
  85. }
  86. func (s *Store) FindEcVolume(vid needle.VolumeId) (*erasure_coding.EcVolume, bool) {
  87. for _, location := range s.Locations {
  88. if s, found := location.FindEcVolume(vid); found {
  89. return s, true
  90. }
  91. }
  92. return nil, false
  93. }
  94. func (s *Store) DestroyEcVolume(vid needle.VolumeId) {
  95. for _, location := range s.Locations {
  96. location.DestroyEcVolume(vid)
  97. }
  98. }
  99. func (s *Store) ReadEcShardNeedle(ctx context.Context, vid needle.VolumeId, n *needle.Needle) (int, error) {
  100. for _, location := range s.Locations {
  101. if localEcVolume, found := location.FindEcVolume(vid); found {
  102. // read the volume version
  103. readCounter := 0
  104. for localEcVolume.Version == 0 {
  105. err := s.readEcVolumeVersion(ctx, vid, localEcVolume)
  106. readCounter++
  107. if readCounter > 10 && err != nil {
  108. return 0, fmt.Errorf("fail to read ec volume %d: %v", vid, err)
  109. }
  110. time.Sleep(1357 * time.Millisecond)
  111. glog.V(0).Infof("ReadEcShardNeedle vid %d version:%v: %v", vid, localEcVolume.Version, err)
  112. }
  113. version := localEcVolume.Version
  114. offset, size, intervals, err := localEcVolume.LocateEcShardNeedle(n.Id, version)
  115. if err != nil {
  116. return 0, fmt.Errorf("locate in local ec volume: %v", err)
  117. }
  118. if size == types.TombstoneFileSize {
  119. return 0, fmt.Errorf("entry %s is deleted", n.Id)
  120. }
  121. glog.V(3).Infof("read ec volume %d offset %d size %d intervals:%+v", vid, offset.ToAcutalOffset(), size, intervals)
  122. if len(intervals) > 1 {
  123. glog.V(3).Infof("ReadEcShardNeedle needle id %s intervals:%+v", n.String(), intervals)
  124. }
  125. bytes, isDeleted, err := s.readEcShardIntervals(ctx, vid, n.Id, localEcVolume, intervals)
  126. if err != nil {
  127. return 0, fmt.Errorf("ReadEcShardIntervals: %v", err)
  128. }
  129. if isDeleted {
  130. return 0, fmt.Errorf("ec entry %s is deleted", n.Id)
  131. }
  132. err = n.ReadBytes(bytes, offset.ToAcutalOffset(), size, version)
  133. if err != nil {
  134. return 0, fmt.Errorf("readbytes: %v", err)
  135. }
  136. return len(bytes), nil
  137. }
  138. }
  139. return 0, fmt.Errorf("ec shard %d not found", vid)
  140. }
  141. func (s *Store) readEcVolumeVersion(ctx context.Context, vid needle.VolumeId, ecVolume *erasure_coding.EcVolume) (err error) {
  142. interval := erasure_coding.Interval{
  143. BlockIndex: 0,
  144. InnerBlockOffset: 0,
  145. Size: super_block.SuperBlockSize,
  146. IsLargeBlock: true, // it could be large block, but ok in this place
  147. LargeBlockRowsCount: 0,
  148. }
  149. data, _, err := s.readEcShardIntervals(ctx, vid, 0, ecVolume, []erasure_coding.Interval{interval})
  150. if err == nil {
  151. ecVolume.Version = needle.Version(data[0])
  152. }
  153. return
  154. }
  155. func (s *Store) readEcShardIntervals(ctx context.Context, vid needle.VolumeId, needleId types.NeedleId, ecVolume *erasure_coding.EcVolume, intervals []erasure_coding.Interval) (data []byte, is_deleted bool, err error) {
  156. if err = s.cachedLookupEcShardLocations(ctx, ecVolume); err != nil {
  157. return nil, false, fmt.Errorf("failed to locate shard via master grpc %s: %v", s.MasterAddress, err)
  158. }
  159. for i, interval := range intervals {
  160. if d, isDeleted, e := s.readOneEcShardInterval(ctx, needleId, ecVolume, interval); e != nil {
  161. return nil, isDeleted, e
  162. } else {
  163. if isDeleted {
  164. is_deleted = true
  165. }
  166. if i == 0 {
  167. data = d
  168. } else {
  169. data = append(data, d...)
  170. }
  171. }
  172. }
  173. return
  174. }
  175. func (s *Store) readOneEcShardInterval(ctx context.Context, needleId types.NeedleId, ecVolume *erasure_coding.EcVolume, interval erasure_coding.Interval) (data []byte, is_deleted bool, err error) {
  176. shardId, actualOffset := interval.ToShardIdAndOffset(erasure_coding.ErasureCodingLargeBlockSize, erasure_coding.ErasureCodingSmallBlockSize)
  177. data = make([]byte, interval.Size)
  178. if shard, found := ecVolume.FindEcVolumeShard(shardId); found {
  179. if _, err = shard.ReadAt(data, actualOffset); err != nil {
  180. glog.V(0).Infof("read local ec shard %d.%d: %v", ecVolume.VolumeId, shardId, err)
  181. return
  182. }
  183. } else {
  184. ecVolume.ShardLocationsLock.RLock()
  185. sourceDataNodes, hasShardIdLocation := ecVolume.ShardLocations[shardId]
  186. ecVolume.ShardLocationsLock.RUnlock()
  187. // try reading directly
  188. if hasShardIdLocation {
  189. _, is_deleted, err = s.readRemoteEcShardInterval(ctx, sourceDataNodes, needleId, ecVolume.VolumeId, shardId, data, actualOffset)
  190. if err == nil {
  191. return
  192. }
  193. glog.V(0).Infof("clearing ec shard %d.%d locations: %v", ecVolume.VolumeId, shardId, err)
  194. forgetShardId(ecVolume, shardId)
  195. }
  196. // try reading by recovering from other shards
  197. _, is_deleted, err = s.recoverOneRemoteEcShardInterval(ctx, needleId, ecVolume, shardId, data, actualOffset)
  198. if err == nil {
  199. return
  200. }
  201. glog.V(0).Infof("recover ec shard %d.%d : %v", ecVolume.VolumeId, shardId, err)
  202. }
  203. return
  204. }
  205. func forgetShardId(ecVolume *erasure_coding.EcVolume, shardId erasure_coding.ShardId) {
  206. // failed to access the source data nodes, clear it up
  207. ecVolume.ShardLocationsLock.Lock()
  208. delete(ecVolume.ShardLocations, shardId)
  209. ecVolume.ShardLocationsLock.Unlock()
  210. }
  211. func (s *Store) cachedLookupEcShardLocations(ctx context.Context, ecVolume *erasure_coding.EcVolume) (err error) {
  212. shardCount := len(ecVolume.ShardLocations)
  213. if shardCount < erasure_coding.DataShardsCount &&
  214. ecVolume.ShardLocationsRefreshTime.Add(11*time.Second).After(time.Now()) ||
  215. shardCount == erasure_coding.TotalShardsCount &&
  216. ecVolume.ShardLocationsRefreshTime.Add(37*time.Minute).After(time.Now()) ||
  217. shardCount >= erasure_coding.DataShardsCount &&
  218. ecVolume.ShardLocationsRefreshTime.Add(7*time.Minute).After(time.Now()) {
  219. // still fresh
  220. return nil
  221. }
  222. glog.V(3).Infof("lookup and cache ec volume %d locations", ecVolume.VolumeId)
  223. err = operation.WithMasterServerClient(s.MasterAddress, s.grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
  224. req := &master_pb.LookupEcVolumeRequest{
  225. VolumeId: uint32(ecVolume.VolumeId),
  226. }
  227. resp, err := masterClient.LookupEcVolume(ctx, req)
  228. if err != nil {
  229. return fmt.Errorf("lookup ec volume %d: %v", ecVolume.VolumeId, err)
  230. }
  231. if len(resp.ShardIdLocations) < erasure_coding.DataShardsCount {
  232. return fmt.Errorf("only %d shards found but %d required", len(resp.ShardIdLocations), erasure_coding.DataShardsCount)
  233. }
  234. ecVolume.ShardLocationsLock.Lock()
  235. for _, shardIdLocations := range resp.ShardIdLocations {
  236. shardId := erasure_coding.ShardId(shardIdLocations.ShardId)
  237. delete(ecVolume.ShardLocations, shardId)
  238. for _, loc := range shardIdLocations.Locations {
  239. ecVolume.ShardLocations[shardId] = append(ecVolume.ShardLocations[shardId], loc.Url)
  240. }
  241. }
  242. ecVolume.ShardLocationsRefreshTime = time.Now()
  243. ecVolume.ShardLocationsLock.Unlock()
  244. return nil
  245. })
  246. return
  247. }
  248. func (s *Store) readRemoteEcShardInterval(ctx context.Context, sourceDataNodes []string, needleId types.NeedleId, vid needle.VolumeId, shardId erasure_coding.ShardId, buf []byte, offset int64) (n int, is_deleted bool, err error) {
  249. if len(sourceDataNodes) == 0 {
  250. return 0, false, fmt.Errorf("failed to find ec shard %d.%d", vid, shardId)
  251. }
  252. for _, sourceDataNode := range sourceDataNodes {
  253. glog.V(3).Infof("read remote ec shard %d.%d from %s", vid, shardId, sourceDataNode)
  254. n, is_deleted, err = s.doReadRemoteEcShardInterval(ctx, sourceDataNode, needleId, vid, shardId, buf, offset)
  255. if err == nil {
  256. return
  257. }
  258. glog.V(1).Infof("read remote ec shard %d.%d from %s: %v", vid, shardId, sourceDataNode, err)
  259. }
  260. return
  261. }
  262. func (s *Store) doReadRemoteEcShardInterval(ctx context.Context, sourceDataNode string, needleId types.NeedleId, vid needle.VolumeId, shardId erasure_coding.ShardId, buf []byte, offset int64) (n int, is_deleted bool, err error) {
  263. err = operation.WithVolumeServerClient(sourceDataNode, s.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
  264. // copy data slice
  265. shardReadClient, err := client.VolumeEcShardRead(ctx, &volume_server_pb.VolumeEcShardReadRequest{
  266. VolumeId: uint32(vid),
  267. ShardId: uint32(shardId),
  268. Offset: offset,
  269. Size: int64(len(buf)),
  270. FileKey: uint64(needleId),
  271. })
  272. if err != nil {
  273. return fmt.Errorf("failed to start reading ec shard %d.%d from %s: %v", vid, shardId, sourceDataNode, err)
  274. }
  275. for {
  276. resp, receiveErr := shardReadClient.Recv()
  277. if receiveErr == io.EOF {
  278. break
  279. }
  280. if receiveErr != nil {
  281. return fmt.Errorf("receiving ec shard %d.%d from %s: %v", vid, shardId, sourceDataNode, err)
  282. }
  283. if resp.IsDeleted {
  284. is_deleted = true
  285. }
  286. copy(buf[n:n+len(resp.Data)], resp.Data)
  287. n += len(resp.Data)
  288. }
  289. return nil
  290. })
  291. if err != nil {
  292. return 0, is_deleted, fmt.Errorf("read ec shard %d.%d from %s: %v", vid, shardId, sourceDataNode, err)
  293. }
  294. return
  295. }
  296. func (s *Store) recoverOneRemoteEcShardInterval(ctx context.Context, needleId types.NeedleId, ecVolume *erasure_coding.EcVolume, shardIdToRecover erasure_coding.ShardId, buf []byte, offset int64) (n int, is_deleted bool, err error) {
  297. glog.V(3).Infof("recover ec shard %d.%d from other locations", ecVolume.VolumeId, shardIdToRecover)
  298. enc, err := reedsolomon.New(erasure_coding.DataShardsCount, erasure_coding.ParityShardsCount)
  299. if err != nil {
  300. return 0, false, fmt.Errorf("failed to create encoder: %v", err)
  301. }
  302. bufs := make([][]byte, erasure_coding.TotalShardsCount)
  303. var wg sync.WaitGroup
  304. ecVolume.ShardLocationsLock.RLock()
  305. for shardId, locations := range ecVolume.ShardLocations {
  306. // skip currnent shard or empty shard
  307. if shardId == shardIdToRecover {
  308. continue
  309. }
  310. if len(locations) == 0 {
  311. glog.V(3).Infof("readRemoteEcShardInterval missing %d.%d from %+v", ecVolume.VolumeId, shardId, locations)
  312. continue
  313. }
  314. // read from remote locations
  315. wg.Add(1)
  316. go func(shardId erasure_coding.ShardId, locations []string) {
  317. defer wg.Done()
  318. data := make([]byte, len(buf))
  319. nRead, isDeleted, readErr := s.readRemoteEcShardInterval(ctx, locations, needleId, ecVolume.VolumeId, shardId, data, offset)
  320. if readErr != nil {
  321. glog.V(3).Infof("recover: readRemoteEcShardInterval %d.%d %d bytes from %+v: %v", ecVolume.VolumeId, shardId, nRead, locations, readErr)
  322. forgetShardId(ecVolume, shardId)
  323. }
  324. if isDeleted {
  325. is_deleted = true
  326. }
  327. if nRead == len(buf) {
  328. bufs[shardId] = data
  329. }
  330. }(shardId, locations)
  331. }
  332. ecVolume.ShardLocationsLock.RUnlock()
  333. wg.Wait()
  334. if err = enc.ReconstructData(bufs); err != nil {
  335. glog.V(3).Infof("recovered ec shard %d.%d failed: %v", ecVolume.VolumeId, shardIdToRecover, err)
  336. return 0, false, err
  337. }
  338. glog.V(4).Infof("recovered ec shard %d.%d from other locations", ecVolume.VolumeId, shardIdToRecover)
  339. copy(buf, bufs[shardIdToRecover])
  340. return len(buf), is_deleted, nil
  341. }
  342. func (s *Store) EcVolumes() (ecVolumes []*erasure_coding.EcVolume) {
  343. for _, location := range s.Locations {
  344. location.ecVolumesLock.RLock()
  345. for _, v := range location.ecVolumes {
  346. ecVolumes = append(ecVolumes, v)
  347. }
  348. location.ecVolumesLock.RUnlock()
  349. }
  350. sort.Slice(ecVolumes, func(i, j int) bool {
  351. return ecVolumes[i].VolumeId > ecVolumes[j].VolumeId
  352. })
  353. return ecVolumes
  354. }