You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

411 lines
13 KiB

6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
4 years ago
6 years ago
2 months ago
6 years ago
6 years ago
6 years ago
  1. package storage
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/seaweedfs/seaweedfs/weed/pb"
  6. "golang.org/x/exp/slices"
  7. "io"
  8. "os"
  9. "sync"
  10. "time"
  11. "github.com/klauspost/reedsolomon"
  12. "github.com/seaweedfs/seaweedfs/weed/glog"
  13. "github.com/seaweedfs/seaweedfs/weed/operation"
  14. "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
  15. "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
  16. "github.com/seaweedfs/seaweedfs/weed/stats"
  17. "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
  18. "github.com/seaweedfs/seaweedfs/weed/storage/needle"
  19. "github.com/seaweedfs/seaweedfs/weed/storage/types"
  20. )
  21. func (s *Store) CollectErasureCodingHeartbeat() *master_pb.Heartbeat {
  22. var ecShardMessages []*master_pb.VolumeEcShardInformationMessage
  23. collectionEcShardSize := make(map[string]int64)
  24. for _, location := range s.Locations {
  25. location.ecVolumesLock.RLock()
  26. for _, ecShards := range location.ecVolumes {
  27. ecShardMessages = append(ecShardMessages, ecShards.ToVolumeEcShardInformationMessage()...)
  28. for _, ecShard := range ecShards.Shards {
  29. collectionEcShardSize[ecShards.Collection] += ecShard.Size()
  30. }
  31. }
  32. location.ecVolumesLock.RUnlock()
  33. }
  34. for col, size := range collectionEcShardSize {
  35. stats.VolumeServerDiskSizeGauge.WithLabelValues(col, "ec").Set(float64(size))
  36. }
  37. return &master_pb.Heartbeat{
  38. EcShards: ecShardMessages,
  39. HasNoEcShards: len(ecShardMessages) == 0,
  40. }
  41. }
  42. func (s *Store) MountEcShards(collection string, vid needle.VolumeId, shardId erasure_coding.ShardId) error {
  43. for _, location := range s.Locations {
  44. if ecVolume, err := location.LoadEcShard(collection, vid, shardId); err == nil {
  45. glog.V(0).Infof("MountEcShards %d.%d", vid, shardId)
  46. var shardBits erasure_coding.ShardBits
  47. s.NewEcShardsChan <- master_pb.VolumeEcShardInformationMessage{
  48. Id: uint32(vid),
  49. Collection: collection,
  50. EcIndexBits: uint32(shardBits.AddShardId(shardId)),
  51. DiskType: string(location.DiskType),
  52. ExpireAtSec: ecVolume.ExpireAtSec,
  53. }
  54. return nil
  55. } else if err == os.ErrNotExist {
  56. continue
  57. } else {
  58. return fmt.Errorf("%s load ec shard %d.%d: %v", location.Directory, vid, shardId, err)
  59. }
  60. }
  61. return fmt.Errorf("MountEcShards %d.%d not found on disk", vid, shardId)
  62. }
  63. func (s *Store) UnmountEcShards(vid needle.VolumeId, shardId erasure_coding.ShardId) error {
  64. ecShard, found := s.findEcShard(vid, shardId)
  65. if !found {
  66. return nil
  67. }
  68. var shardBits erasure_coding.ShardBits
  69. message := master_pb.VolumeEcShardInformationMessage{
  70. Id: uint32(vid),
  71. Collection: ecShard.Collection,
  72. EcIndexBits: uint32(shardBits.AddShardId(shardId)),
  73. DiskType: string(ecShard.DiskType),
  74. }
  75. for _, location := range s.Locations {
  76. if deleted := location.UnloadEcShard(vid, shardId); deleted {
  77. glog.V(0).Infof("UnmountEcShards %d.%d", vid, shardId)
  78. s.DeletedEcShardsChan <- message
  79. return nil
  80. }
  81. }
  82. return fmt.Errorf("UnmountEcShards %d.%d not found on disk", vid, shardId)
  83. }
  84. func (s *Store) findEcShard(vid needle.VolumeId, shardId erasure_coding.ShardId) (*erasure_coding.EcVolumeShard, bool) {
  85. for _, location := range s.Locations {
  86. if v, found := location.FindEcShard(vid, shardId); found {
  87. return v, found
  88. }
  89. }
  90. return nil, false
  91. }
  92. func (s *Store) FindEcVolume(vid needle.VolumeId) (*erasure_coding.EcVolume, bool) {
  93. for _, location := range s.Locations {
  94. if s, found := location.FindEcVolume(vid); found {
  95. return s, true
  96. }
  97. }
  98. return nil, false
  99. }
  100. // shardFiles is a list of shard files, which is used to return the shard locations
  101. func (s *Store) CollectEcShards(vid needle.VolumeId, shardFileNames []string) (ecVolume *erasure_coding.EcVolume, found bool) {
  102. for _, location := range s.Locations {
  103. if s, foundShards := location.CollectEcShards(vid, shardFileNames); foundShards {
  104. ecVolume = s
  105. found = true
  106. }
  107. }
  108. return
  109. }
  110. func (s *Store) DestroyEcVolume(vid needle.VolumeId) {
  111. for _, location := range s.Locations {
  112. location.DestroyEcVolume(vid)
  113. }
  114. }
  115. func (s *Store) ReadEcShardNeedle(vid needle.VolumeId, n *needle.Needle, onReadSizeFn func(size types.Size)) (int, error) {
  116. for _, location := range s.Locations {
  117. if localEcVolume, found := location.FindEcVolume(vid); found {
  118. offset, size, intervals, err := localEcVolume.LocateEcShardNeedle(n.Id, localEcVolume.Version)
  119. if err != nil {
  120. return 0, fmt.Errorf("locate in local ec volume: %v", err)
  121. }
  122. if size.IsDeleted() {
  123. return 0, ErrorDeleted
  124. }
  125. if onReadSizeFn != nil {
  126. onReadSizeFn(size)
  127. }
  128. glog.V(3).Infof("read ec volume %d offset %d size %d intervals:%+v", vid, offset.ToActualOffset(), size, intervals)
  129. if len(intervals) > 1 {
  130. glog.V(3).Infof("ReadEcShardNeedle needle id %s intervals:%+v", n.String(), intervals)
  131. }
  132. bytes, isDeleted, err := s.readEcShardIntervals(vid, n.Id, localEcVolume, intervals)
  133. if err != nil {
  134. return 0, fmt.Errorf("ReadEcShardIntervals: %v", err)
  135. }
  136. if isDeleted {
  137. return 0, ErrorDeleted
  138. }
  139. err = n.ReadBytes(bytes, offset.ToActualOffset(), size, localEcVolume.Version)
  140. if err != nil {
  141. return 0, fmt.Errorf("readbytes: %v", err)
  142. }
  143. return len(bytes), nil
  144. }
  145. }
  146. return 0, fmt.Errorf("ec shard %d not found", vid)
  147. }
  148. func (s *Store) readEcShardIntervals(vid needle.VolumeId, needleId types.NeedleId, ecVolume *erasure_coding.EcVolume, intervals []erasure_coding.Interval) (data []byte, is_deleted bool, err error) {
  149. if err = s.cachedLookupEcShardLocations(ecVolume); err != nil {
  150. return nil, false, fmt.Errorf("failed to locate shard via master grpc %s: %v", s.MasterAddress, err)
  151. }
  152. for i, interval := range intervals {
  153. if d, isDeleted, e := s.readOneEcShardInterval(needleId, ecVolume, interval); e != nil {
  154. return nil, isDeleted, e
  155. } else {
  156. if isDeleted {
  157. is_deleted = true
  158. }
  159. if i == 0 {
  160. data = d
  161. } else {
  162. data = append(data, d...)
  163. }
  164. }
  165. }
  166. return
  167. }
  168. func (s *Store) readOneEcShardInterval(needleId types.NeedleId, ecVolume *erasure_coding.EcVolume, interval erasure_coding.Interval) (data []byte, is_deleted bool, err error) {
  169. shardId, actualOffset := interval.ToShardIdAndOffset(erasure_coding.ErasureCodingLargeBlockSize, erasure_coding.ErasureCodingSmallBlockSize)
  170. data = make([]byte, interval.Size)
  171. if shard, found := ecVolume.FindEcVolumeShard(shardId); found {
  172. var readSize int
  173. if readSize, err = shard.ReadAt(data, actualOffset); err != nil {
  174. if readSize != int(interval.Size) {
  175. glog.V(0).Infof("read local ec shard %d.%d offset %d: %v", ecVolume.VolumeId, shardId, actualOffset, err)
  176. return
  177. }
  178. }
  179. } else {
  180. ecVolume.ShardLocationsLock.RLock()
  181. sourceDataNodes, hasShardIdLocation := ecVolume.ShardLocations[shardId]
  182. ecVolume.ShardLocationsLock.RUnlock()
  183. // try reading directly
  184. if hasShardIdLocation {
  185. _, is_deleted, err = s.readRemoteEcShardInterval(sourceDataNodes, needleId, ecVolume.VolumeId, shardId, data, actualOffset)
  186. if err == nil {
  187. return
  188. }
  189. glog.V(0).Infof("clearing ec shard %d.%d locations: %v", ecVolume.VolumeId, shardId, err)
  190. }
  191. // try reading by recovering from other shards
  192. _, is_deleted, err = s.recoverOneRemoteEcShardInterval(needleId, ecVolume, shardId, data, actualOffset)
  193. if err == nil {
  194. return
  195. }
  196. glog.V(0).Infof("recover ec shard %d.%d : %v", ecVolume.VolumeId, shardId, err)
  197. }
  198. return
  199. }
  200. func forgetShardId(ecVolume *erasure_coding.EcVolume, shardId erasure_coding.ShardId) {
  201. // failed to access the source data nodes, clear it up
  202. ecVolume.ShardLocationsLock.Lock()
  203. delete(ecVolume.ShardLocations, shardId)
  204. ecVolume.ShardLocationsLock.Unlock()
  205. }
  206. func (s *Store) cachedLookupEcShardLocations(ecVolume *erasure_coding.EcVolume) (err error) {
  207. shardCount := len(ecVolume.ShardLocations)
  208. if shardCount < erasure_coding.DataShardsCount &&
  209. ecVolume.ShardLocationsRefreshTime.Add(11*time.Second).After(time.Now()) ||
  210. shardCount == erasure_coding.TotalShardsCount &&
  211. ecVolume.ShardLocationsRefreshTime.Add(37*time.Minute).After(time.Now()) ||
  212. shardCount >= erasure_coding.DataShardsCount &&
  213. ecVolume.ShardLocationsRefreshTime.Add(7*time.Minute).After(time.Now()) {
  214. // still fresh
  215. return nil
  216. }
  217. glog.V(3).Infof("lookup and cache ec volume %d locations", ecVolume.VolumeId)
  218. err = operation.WithMasterServerClient(false, s.MasterAddress, s.grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
  219. req := &master_pb.LookupEcVolumeRequest{
  220. VolumeId: uint32(ecVolume.VolumeId),
  221. }
  222. resp, err := masterClient.LookupEcVolume(context.Background(), req)
  223. if err != nil {
  224. return fmt.Errorf("lookup ec volume %d: %v", ecVolume.VolumeId, err)
  225. }
  226. if len(resp.ShardIdLocations) < erasure_coding.DataShardsCount {
  227. return fmt.Errorf("only %d shards found but %d required", len(resp.ShardIdLocations), erasure_coding.DataShardsCount)
  228. }
  229. ecVolume.ShardLocationsLock.Lock()
  230. for _, shardIdLocations := range resp.ShardIdLocations {
  231. shardId := erasure_coding.ShardId(shardIdLocations.ShardId)
  232. delete(ecVolume.ShardLocations, shardId)
  233. for _, loc := range shardIdLocations.Locations {
  234. ecVolume.ShardLocations[shardId] = append(ecVolume.ShardLocations[shardId], pb.NewServerAddressFromLocation(loc))
  235. }
  236. }
  237. ecVolume.ShardLocationsRefreshTime = time.Now()
  238. ecVolume.ShardLocationsLock.Unlock()
  239. return nil
  240. })
  241. return
  242. }
  243. func (s *Store) readRemoteEcShardInterval(sourceDataNodes []pb.ServerAddress, needleId types.NeedleId, vid needle.VolumeId, shardId erasure_coding.ShardId, buf []byte, offset int64) (n int, is_deleted bool, err error) {
  244. if len(sourceDataNodes) == 0 {
  245. return 0, false, fmt.Errorf("failed to find ec shard %d.%d", vid, shardId)
  246. }
  247. for _, sourceDataNode := range sourceDataNodes {
  248. glog.V(3).Infof("read remote ec shard %d.%d from %s", vid, shardId, sourceDataNode)
  249. n, is_deleted, err = s.doReadRemoteEcShardInterval(sourceDataNode, needleId, vid, shardId, buf, offset)
  250. if err == nil {
  251. return
  252. }
  253. glog.V(1).Infof("read remote ec shard %d.%d from %s: %v", vid, shardId, sourceDataNode, err)
  254. }
  255. return
  256. }
  257. func (s *Store) doReadRemoteEcShardInterval(sourceDataNode pb.ServerAddress, needleId types.NeedleId, vid needle.VolumeId, shardId erasure_coding.ShardId, buf []byte, offset int64) (n int, is_deleted bool, err error) {
  258. err = operation.WithVolumeServerClient(false, sourceDataNode, s.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
  259. // copy data slice
  260. shardReadClient, err := client.VolumeEcShardRead(context.Background(), &volume_server_pb.VolumeEcShardReadRequest{
  261. VolumeId: uint32(vid),
  262. ShardId: uint32(shardId),
  263. Offset: offset,
  264. Size: int64(len(buf)),
  265. FileKey: uint64(needleId),
  266. })
  267. if err != nil {
  268. return fmt.Errorf("failed to start reading ec shard %d.%d from %s: %v", vid, shardId, sourceDataNode, err)
  269. }
  270. for {
  271. resp, receiveErr := shardReadClient.Recv()
  272. if receiveErr == io.EOF {
  273. break
  274. }
  275. if receiveErr != nil {
  276. return fmt.Errorf("receiving ec shard %d.%d from %s: %v", vid, shardId, sourceDataNode, receiveErr)
  277. }
  278. if resp.IsDeleted {
  279. is_deleted = true
  280. }
  281. copy(buf[n:n+len(resp.Data)], resp.Data)
  282. n += len(resp.Data)
  283. }
  284. return nil
  285. })
  286. if err != nil {
  287. return 0, is_deleted, fmt.Errorf("read ec shard %d.%d from %s: %v", vid, shardId, sourceDataNode, err)
  288. }
  289. return
  290. }
  291. func (s *Store) recoverOneRemoteEcShardInterval(needleId types.NeedleId, ecVolume *erasure_coding.EcVolume, shardIdToRecover erasure_coding.ShardId, buf []byte, offset int64) (n int, is_deleted bool, err error) {
  292. glog.V(3).Infof("recover ec shard %d.%d from other locations", ecVolume.VolumeId, shardIdToRecover)
  293. enc, err := reedsolomon.New(erasure_coding.DataShardsCount, erasure_coding.ParityShardsCount)
  294. if err != nil {
  295. return 0, false, fmt.Errorf("failed to create encoder: %v", err)
  296. }
  297. bufs := make([][]byte, erasure_coding.TotalShardsCount)
  298. var wg sync.WaitGroup
  299. ecVolume.ShardLocationsLock.RLock()
  300. for shardId, locations := range ecVolume.ShardLocations {
  301. // skip current shard or empty shard
  302. if shardId == shardIdToRecover {
  303. continue
  304. }
  305. if len(locations) == 0 {
  306. glog.V(3).Infof("readRemoteEcShardInterval missing %d.%d from %+v", ecVolume.VolumeId, shardId, locations)
  307. continue
  308. }
  309. // read from remote locations
  310. wg.Add(1)
  311. go func(shardId erasure_coding.ShardId, locations []pb.ServerAddress) {
  312. defer wg.Done()
  313. data := make([]byte, len(buf))
  314. nRead, isDeleted, readErr := s.readRemoteEcShardInterval(locations, needleId, ecVolume.VolumeId, shardId, data, offset)
  315. if readErr != nil {
  316. glog.V(3).Infof("recover: readRemoteEcShardInterval %d.%d %d bytes from %+v: %v", ecVolume.VolumeId, shardId, nRead, locations, readErr)
  317. forgetShardId(ecVolume, shardId)
  318. }
  319. if isDeleted {
  320. is_deleted = true
  321. }
  322. if nRead == len(buf) {
  323. bufs[shardId] = data
  324. }
  325. }(shardId, locations)
  326. }
  327. ecVolume.ShardLocationsLock.RUnlock()
  328. wg.Wait()
  329. if err = enc.ReconstructData(bufs); err != nil {
  330. glog.V(3).Infof("recovered ec shard %d.%d failed: %v", ecVolume.VolumeId, shardIdToRecover, err)
  331. return 0, false, err
  332. }
  333. glog.V(4).Infof("recovered ec shard %d.%d from other locations", ecVolume.VolumeId, shardIdToRecover)
  334. copy(buf, bufs[shardIdToRecover])
  335. return len(buf), is_deleted, nil
  336. }
  337. func (s *Store) EcVolumes() (ecVolumes []*erasure_coding.EcVolume) {
  338. for _, location := range s.Locations {
  339. location.ecVolumesLock.RLock()
  340. for _, v := range location.ecVolumes {
  341. ecVolumes = append(ecVolumes, v)
  342. }
  343. location.ecVolumesLock.RUnlock()
  344. }
  345. slices.SortFunc(ecVolumes, func(a, b *erasure_coding.EcVolume) int {
  346. return int(b.VolumeId - a.VolumeId)
  347. })
  348. return ecVolumes
  349. }