You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

211 lines
6.7 KiB

  1. package storage
  2. import (
  3. "context"
  4. "fmt"
  5. "os"
  6. "sort"
  7. "github.com/chrislusf/seaweedfs/weed/glog"
  8. "github.com/chrislusf/seaweedfs/weed/operation"
  9. "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
  10. "github.com/chrislusf/seaweedfs/weed/storage/needle"
  11. . "github.com/chrislusf/seaweedfs/weed/storage/types"
  12. )
  13. // The volume sync with a master volume via 2 steps:
  14. // 1. The slave checks master side to find subscription checkpoint
  15. // to setup the replication.
  16. // 2. The slave receives the updates from master
  17. /*
  18. Assume the slave volume needs to follow the master volume.
  19. The master volume could be compacted, and could be many files ahead of
  20. slave volume.
  21. Step 1:
  22. The slave volume will ask the master volume for a snapshot
  23. of (existing file entries, last offset, number of compacted times).
  24. For each entry x in master existing file entries:
  25. if x does not exist locally:
  26. add x locally
  27. For each entry y in local slave existing file entries:
  28. if y does not exist on master:
  29. delete y locally
  30. Step 2:
  31. After this, use the last offset and number of compacted times to request
  32. the master volume to send a new file, and keep looping. If the number of
  33. compacted times is changed, go back to step 1 (very likely this can be
  34. optimized more later).
  35. */
  36. func (v *Volume) Synchronize(volumeServer string) (err error) {
  37. var lastCompactRevision uint16 = 0
  38. var compactRevision uint16 = 0
  39. var masterMap *needle.CompactMap
  40. for i := 0; i < 3; i++ {
  41. if masterMap, _, compactRevision, err = fetchVolumeFileEntries(volumeServer, v.Id); err != nil {
  42. return fmt.Errorf("Failed to sync volume %d entries with %s: %v", v.Id, volumeServer, err)
  43. }
  44. if lastCompactRevision != compactRevision && lastCompactRevision != 0 {
  45. if err = v.Compact(0); err != nil {
  46. return fmt.Errorf("Compact Volume before synchronizing %v", err)
  47. }
  48. if err = v.commitCompact(); err != nil {
  49. return fmt.Errorf("Commit Compact before synchronizing %v", err)
  50. }
  51. }
  52. lastCompactRevision = compactRevision
  53. if err = v.trySynchronizing(volumeServer, masterMap, compactRevision); err == nil {
  54. return
  55. }
  56. }
  57. return
  58. }
  59. type ByOffset []needle.NeedleValue
  60. func (a ByOffset) Len() int { return len(a) }
  61. func (a ByOffset) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
  62. func (a ByOffset) Less(i, j int) bool { return a[i].Offset < a[j].Offset }
  63. // trySynchronizing sync with remote volume server incrementally by
  64. // make up the local and remote delta.
  65. func (v *Volume) trySynchronizing(volumeServer string, masterMap *needle.CompactMap, compactRevision uint16) error {
  66. slaveIdxFile, err := os.Open(v.nm.IndexFileName())
  67. if err != nil {
  68. return fmt.Errorf("Open volume %d index file: %v", v.Id, err)
  69. }
  70. defer slaveIdxFile.Close()
  71. slaveMap, err := LoadBtreeNeedleMap(slaveIdxFile)
  72. if err != nil {
  73. return fmt.Errorf("Load volume %d index file: %v", v.Id, err)
  74. }
  75. var delta []needle.NeedleValue
  76. if err := masterMap.Visit(func(needleValue needle.NeedleValue) error {
  77. if needleValue.Key == NeedleIdEmpty {
  78. return nil
  79. }
  80. if _, ok := slaveMap.Get(needleValue.Key); ok {
  81. return nil // skip intersection
  82. }
  83. delta = append(delta, needleValue)
  84. return nil
  85. }); err != nil {
  86. return fmt.Errorf("Add master entry: %v", err)
  87. }
  88. if err := slaveMap.m.Visit(func(needleValue needle.NeedleValue) error {
  89. if needleValue.Key == NeedleIdEmpty {
  90. return nil
  91. }
  92. if _, ok := masterMap.Get(needleValue.Key); ok {
  93. return nil // skip intersection
  94. }
  95. needleValue.Size = 0
  96. delta = append(delta, needleValue)
  97. return nil
  98. }); err != nil {
  99. return fmt.Errorf("Remove local entry: %v", err)
  100. }
  101. // simulate to same ordering of remote .dat file needle entries
  102. sort.Sort(ByOffset(delta))
  103. // make up the delta
  104. fetchCount := 0
  105. for _, needleValue := range delta {
  106. if needleValue.Size == 0 {
  107. // remove file entry from local
  108. v.removeNeedle(needleValue.Key)
  109. continue
  110. }
  111. // add master file entry to local data file
  112. if err := v.fetchNeedle(volumeServer, needleValue, compactRevision); err != nil {
  113. glog.V(0).Infof("Fetch needle %v from %s: %v", needleValue, volumeServer, err)
  114. return err
  115. }
  116. fetchCount++
  117. }
  118. glog.V(1).Infof("Fetched %d needles from %s", fetchCount, volumeServer)
  119. return nil
  120. }
  121. func fetchVolumeFileEntries(volumeServer string, vid VolumeId) (m *needle.CompactMap, lastOffset uint64, compactRevision uint16, err error) {
  122. m = needle.NewCompactMap()
  123. syncStatus, err := operation.GetVolumeSyncStatus(volumeServer, uint32(vid))
  124. if err != nil {
  125. return m, 0, 0, err
  126. }
  127. total := 0
  128. err = operation.GetVolumeIdxEntries(volumeServer, uint32(vid), func(key NeedleId, offset Offset, size uint32) {
  129. // println("remote key", key, "offset", offset*NeedlePaddingSize, "size", size)
  130. if offset > 0 && size != TombstoneFileSize {
  131. m.Set(NeedleId(key), offset, size)
  132. } else {
  133. m.Delete(NeedleId(key))
  134. }
  135. total++
  136. })
  137. glog.V(2).Infof("server %s volume %d, entries %d, last offset %d, revision %d", volumeServer, vid, total, syncStatus.TailOffset, syncStatus.CompactRevision)
  138. return m, syncStatus.TailOffset, uint16(syncStatus.CompactRevision), err
  139. }
  140. func (v *Volume) GetVolumeSyncStatus() *volume_server_pb.VolumeSyncStatusResponse {
  141. var syncStatus = &volume_server_pb.VolumeSyncStatusResponse{}
  142. if stat, err := v.dataFile.Stat(); err == nil {
  143. syncStatus.TailOffset = uint64(stat.Size())
  144. }
  145. syncStatus.IdxFileSize = v.nm.IndexFileSize()
  146. syncStatus.CompactRevision = uint32(v.SuperBlock.CompactRevision)
  147. syncStatus.Ttl = v.SuperBlock.Ttl.String()
  148. syncStatus.Replication = v.SuperBlock.ReplicaPlacement.String()
  149. return syncStatus
  150. }
  151. func (v *Volume) IndexFileContent() ([]byte, error) {
  152. return v.nm.IndexFileContent()
  153. }
  154. // removeNeedle removes one needle by needle key
  155. func (v *Volume) removeNeedle(key NeedleId) {
  156. n := new(Needle)
  157. n.Id = key
  158. v.deleteNeedle(n)
  159. }
  160. // fetchNeedle fetches a remote volume needle by vid, id, offset
  161. // The compact revision is checked first in case the remote volume
  162. // is compacted and the offset is invalid any more.
  163. func (v *Volume) fetchNeedle(volumeServer string, needleValue needle.NeedleValue, compactRevision uint16) error {
  164. return operation.WithVolumeServerClient(volumeServer, func(client volume_server_pb.VolumeServerClient) error {
  165. resp, err := client.VolumeSyncData(context.Background(), &volume_server_pb.VolumeSyncDataRequest{
  166. VolumdId: uint32(v.Id),
  167. Revision: uint32(compactRevision),
  168. Offset: uint32(needleValue.Offset),
  169. Size: uint32(needleValue.Size),
  170. NeedleId: needleValue.Key.String(),
  171. })
  172. if err != nil {
  173. return err
  174. }
  175. offset, err := v.AppendBlob(resp.FileContent)
  176. if err != nil {
  177. return fmt.Errorf("Appending volume %d error: %v", v.Id, err)
  178. }
  179. // println("add key", needleValue.Key, "offset", offset, "size", needleValue.Size)
  180. v.nm.Put(needleValue.Key, Offset(offset/NeedlePaddingSize), needleValue.Size)
  181. return nil
  182. })
  183. }