You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

219 lines
7.2 KiB

6 years ago
6 years ago
6 years ago
6 years ago
  1. package storage
  2. import (
  3. "context"
  4. "fmt"
  5. "google.golang.org/grpc"
  6. "io"
  7. "os"
  8. "sort"
  9. "github.com/chrislusf/seaweedfs/weed/glog"
  10. "github.com/chrislusf/seaweedfs/weed/operation"
  11. "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
  12. "github.com/chrislusf/seaweedfs/weed/storage/needle"
  13. . "github.com/chrislusf/seaweedfs/weed/storage/types"
  14. )
  15. // The volume sync with a master volume via 2 steps:
  16. // 1. The slave checks master side to find subscription checkpoint
  17. // to setup the replication.
  18. // 2. The slave receives the updates from master
  19. /*
  20. Assume the slave volume needs to follow the master volume.
  21. The master volume could be compacted, and could be many files ahead of
  22. slave volume.
  23. Step 0:
  24. If slave compact version is less than the master, do a local compaction.
  25. If the slave size is still less than the master, discard local copy and do a full copy.
  26. Step 1:
  27. The slave volume ask the master by the last modification time t.
  28. The master do a binary search in volume (use .idx as an array, and check the appendAtNs in .dat file),
  29. to find the first entry with appendAtNs > t.
  30. Step 2:
  31. The master iterate following entries (including the first one) and send it to the follower.
  32. */
  33. func (v *Volume) Synchronize(volumeServer string, grpcDialOption grpc.DialOption) (err error) {
  34. var lastCompactRevision uint16 = 0
  35. var compactRevision uint16 = 0
  36. var masterMap *needle.CompactMap
  37. for i := 0; i < 3; i++ {
  38. if masterMap, _, compactRevision, err = fetchVolumeFileEntries(volumeServer, grpcDialOption, v.Id); err != nil {
  39. return fmt.Errorf("Failed to sync volume %d entries with %s: %v", v.Id, volumeServer, err)
  40. }
  41. if lastCompactRevision != compactRevision && lastCompactRevision != 0 {
  42. if err = v.Compact(0); err != nil {
  43. return fmt.Errorf("Compact Volume before synchronizing %v", err)
  44. }
  45. if err = v.CommitCompact(); err != nil {
  46. return fmt.Errorf("Commit Compact before synchronizing %v", err)
  47. }
  48. }
  49. lastCompactRevision = compactRevision
  50. if err = v.trySynchronizing(volumeServer, grpcDialOption, masterMap, compactRevision); err == nil {
  51. return
  52. }
  53. }
  54. return
  55. }
  56. type ByOffset []needle.NeedleValue
  57. func (a ByOffset) Len() int { return len(a) }
  58. func (a ByOffset) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
  59. func (a ByOffset) Less(i, j int) bool { return a[i].Offset < a[j].Offset }
  60. // trySynchronizing sync with remote volume server incrementally by
  61. // make up the local and remote delta.
  62. func (v *Volume) trySynchronizing(volumeServer string, grpcDialOption grpc.DialOption, masterMap *needle.CompactMap, compactRevision uint16) error {
  63. slaveIdxFile, err := os.Open(v.nm.IndexFileName())
  64. if err != nil {
  65. return fmt.Errorf("Open volume %d index file: %v", v.Id, err)
  66. }
  67. defer slaveIdxFile.Close()
  68. slaveMap, err := LoadBtreeNeedleMap(slaveIdxFile)
  69. if err != nil {
  70. return fmt.Errorf("Load volume %d index file: %v", v.Id, err)
  71. }
  72. var delta []needle.NeedleValue
  73. if err := masterMap.Visit(func(needleValue needle.NeedleValue) error {
  74. if needleValue.Key == NeedleIdEmpty {
  75. return nil
  76. }
  77. if _, ok := slaveMap.Get(needleValue.Key); ok {
  78. return nil // skip intersection
  79. }
  80. delta = append(delta, needleValue)
  81. return nil
  82. }); err != nil {
  83. return fmt.Errorf("Add master entry: %v", err)
  84. }
  85. if err := slaveMap.m.Visit(func(needleValue needle.NeedleValue) error {
  86. if needleValue.Key == NeedleIdEmpty {
  87. return nil
  88. }
  89. if _, ok := masterMap.Get(needleValue.Key); ok {
  90. return nil // skip intersection
  91. }
  92. needleValue.Size = 0
  93. delta = append(delta, needleValue)
  94. return nil
  95. }); err != nil {
  96. return fmt.Errorf("Remove local entry: %v", err)
  97. }
  98. // simulate to same ordering of remote .dat file needle entries
  99. sort.Sort(ByOffset(delta))
  100. // make up the delta
  101. fetchCount := 0
  102. for _, needleValue := range delta {
  103. if needleValue.Size == 0 {
  104. // remove file entry from local
  105. v.removeNeedle(needleValue.Key)
  106. continue
  107. }
  108. // add master file entry to local data file
  109. if err := v.fetchNeedle(volumeServer, grpcDialOption, needleValue, compactRevision); err != nil {
  110. glog.V(0).Infof("Fetch needle %v from %s: %v", needleValue, volumeServer, err)
  111. return err
  112. }
  113. fetchCount++
  114. }
  115. glog.V(1).Infof("Fetched %d needles from %s", fetchCount, volumeServer)
  116. return nil
  117. }
  118. func fetchVolumeFileEntries(volumeServer string, grpcDialOption grpc.DialOption, vid VolumeId) (m *needle.CompactMap, lastOffset uint64, compactRevision uint16, err error) {
  119. m = needle.NewCompactMap()
  120. syncStatus, err := operation.GetVolumeSyncStatus(volumeServer, grpcDialOption, uint32(vid))
  121. if err != nil {
  122. return m, 0, 0, err
  123. }
  124. total := 0
  125. err = operation.GetVolumeIdxEntries(volumeServer, grpcDialOption, uint32(vid), func(key NeedleId, offset Offset, size uint32) {
  126. // println("remote key", key, "offset", offset*NeedlePaddingSize, "size", size)
  127. if offset > 0 && size != TombstoneFileSize {
  128. m.Set(NeedleId(key), offset, size)
  129. } else {
  130. m.Delete(NeedleId(key))
  131. }
  132. total++
  133. })
  134. glog.V(2).Infof("server %s volume %d, entries %d, last offset %d, revision %d", volumeServer, vid, total, syncStatus.TailOffset, syncStatus.CompactRevision)
  135. return m, syncStatus.TailOffset, uint16(syncStatus.CompactRevision), err
  136. }
  137. func (v *Volume) GetVolumeSyncStatus() *volume_server_pb.VolumeSyncStatusResponse {
  138. var syncStatus = &volume_server_pb.VolumeSyncStatusResponse{}
  139. if stat, err := v.dataFile.Stat(); err == nil {
  140. syncStatus.TailOffset = uint64(stat.Size())
  141. }
  142. syncStatus.Collection = v.Collection
  143. syncStatus.IdxFileSize = v.nm.IndexFileSize()
  144. syncStatus.CompactRevision = uint32(v.SuperBlock.CompactRevision)
  145. syncStatus.Ttl = v.SuperBlock.Ttl.String()
  146. syncStatus.Replication = v.SuperBlock.ReplicaPlacement.String()
  147. return syncStatus
  148. }
  149. func (v *Volume) IndexFileContent() ([]byte, error) {
  150. return v.nm.IndexFileContent()
  151. }
  152. // removeNeedle removes one needle by needle key
  153. func (v *Volume) removeNeedle(key NeedleId) {
  154. n := new(Needle)
  155. n.Id = key
  156. v.deleteNeedle(n)
  157. }
  158. // fetchNeedle fetches a remote volume needle by vid, id, offset
  159. // The compact revision is checked first in case the remote volume
  160. // is compacted and the offset is invalid any more.
  161. func (v *Volume) fetchNeedle(volumeServer string, grpcDialOption grpc.DialOption, needleValue needle.NeedleValue, compactRevision uint16) error {
  162. return operation.WithVolumeServerClient(volumeServer, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
  163. stream, err := client.VolumeSyncData(context.Background(), &volume_server_pb.VolumeSyncDataRequest{
  164. VolumeId: uint32(v.Id),
  165. Revision: uint32(compactRevision),
  166. Offset: uint32(needleValue.Offset),
  167. Size: uint32(needleValue.Size),
  168. NeedleId: needleValue.Key.String(),
  169. })
  170. if err != nil {
  171. return err
  172. }
  173. var fileContent []byte
  174. for {
  175. resp, err := stream.Recv()
  176. if err == io.EOF {
  177. break
  178. }
  179. if err != nil {
  180. return fmt.Errorf("read needle %v: %v", needleValue.Key.String(), err)
  181. }
  182. fileContent = append(fileContent, resp.FileContent...)
  183. }
  184. offset, err := v.AppendBlob(fileContent)
  185. if err != nil {
  186. return fmt.Errorf("Appending volume %d error: %v", v.Id, err)
  187. }
  188. // println("add key", needleValue.Key, "offset", offset, "size", needleValue.Size)
  189. v.nm.Put(needleValue.Key, Offset(offset/NeedlePaddingSize), needleValue.Size)
  190. return nil
  191. })
  192. }