You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

216 lines
7.1 KiB

  1. package storage
  2. import (
  3. "fmt"
  4. "io"
  5. "io/ioutil"
  6. "net/url"
  7. "os"
  8. "sort"
  9. "strconv"
  10. "github.com/chrislusf/seaweedfs/weed/glog"
  11. "github.com/chrislusf/seaweedfs/weed/operation"
  12. "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
  13. "github.com/chrislusf/seaweedfs/weed/storage/needle"
  14. . "github.com/chrislusf/seaweedfs/weed/storage/types"
  15. "github.com/chrislusf/seaweedfs/weed/util"
  16. )
  17. // The volume sync with a master volume via 2 steps:
  18. // 1. The slave checks master side to find subscription checkpoint
  19. // to setup the replication.
  20. // 2. The slave receives the updates from master
  21. /*
  22. Assume the slave volume needs to follow the master volume.
  23. The master volume could be compacted, and could be many files ahead of
  24. slave volume.
  25. Step 1:
  26. The slave volume will ask the master volume for a snapshot
  27. of (existing file entries, last offset, number of compacted times).
  28. For each entry x in master existing file entries:
  29. if x does not exist locally:
  30. add x locally
  31. For each entry y in local slave existing file entries:
  32. if y does not exist on master:
  33. delete y locally
  34. Step 2:
  35. After this, use the last offset and number of compacted times to request
  36. the master volume to send a new file, and keep looping. If the number of
  37. compacted times is changed, go back to step 1 (very likely this can be
  38. optimized more later).
  39. */
  40. func (v *Volume) Synchronize(volumeServer string) (err error) {
  41. var lastCompactRevision uint16 = 0
  42. var compactRevision uint16 = 0
  43. var masterMap *needle.CompactMap
  44. for i := 0; i < 3; i++ {
  45. if masterMap, _, compactRevision, err = fetchVolumeFileEntries(volumeServer, v.Id); err != nil {
  46. return fmt.Errorf("Failed to sync volume %d entries with %s: %v", v.Id, volumeServer, err)
  47. }
  48. if lastCompactRevision != compactRevision && lastCompactRevision != 0 {
  49. if err = v.Compact(0); err != nil {
  50. return fmt.Errorf("Compact Volume before synchronizing %v", err)
  51. }
  52. if err = v.commitCompact(); err != nil {
  53. return fmt.Errorf("Commit Compact before synchronizing %v", err)
  54. }
  55. }
  56. lastCompactRevision = compactRevision
  57. if err = v.trySynchronizing(volumeServer, masterMap, compactRevision); err == nil {
  58. return
  59. }
  60. }
  61. return
  62. }
  63. type ByOffset []needle.NeedleValue
  64. func (a ByOffset) Len() int { return len(a) }
  65. func (a ByOffset) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
  66. func (a ByOffset) Less(i, j int) bool { return a[i].Offset < a[j].Offset }
  67. // trySynchronizing sync with remote volume server incrementally by
  68. // make up the local and remote delta.
  69. func (v *Volume) trySynchronizing(volumeServer string, masterMap *needle.CompactMap, compactRevision uint16) error {
  70. slaveIdxFile, err := os.Open(v.nm.IndexFileName())
  71. if err != nil {
  72. return fmt.Errorf("Open volume %d index file: %v", v.Id, err)
  73. }
  74. defer slaveIdxFile.Close()
  75. slaveMap, err := LoadBtreeNeedleMap(slaveIdxFile)
  76. if err != nil {
  77. return fmt.Errorf("Load volume %d index file: %v", v.Id, err)
  78. }
  79. var delta []needle.NeedleValue
  80. if err := masterMap.Visit(func(needleValue needle.NeedleValue) error {
  81. if needleValue.Key == NeedleIdEmpty {
  82. return nil
  83. }
  84. if _, ok := slaveMap.Get(needleValue.Key); ok {
  85. return nil // skip intersection
  86. }
  87. delta = append(delta, needleValue)
  88. return nil
  89. }); err != nil {
  90. return fmt.Errorf("Add master entry: %v", err)
  91. }
  92. if err := slaveMap.m.Visit(func(needleValue needle.NeedleValue) error {
  93. if needleValue.Key == NeedleIdEmpty {
  94. return nil
  95. }
  96. if _, ok := masterMap.Get(needleValue.Key); ok {
  97. return nil // skip intersection
  98. }
  99. needleValue.Size = 0
  100. delta = append(delta, needleValue)
  101. return nil
  102. }); err != nil {
  103. return fmt.Errorf("Remove local entry: %v", err)
  104. }
  105. // simulate to same ordering of remote .dat file needle entries
  106. sort.Sort(ByOffset(delta))
  107. // make up the delta
  108. fetchCount := 0
  109. volumeDataContentHandlerUrl := "http://" + volumeServer + "/admin/sync/data"
  110. for _, needleValue := range delta {
  111. if needleValue.Size == 0 {
  112. // remove file entry from local
  113. v.removeNeedle(needleValue.Key)
  114. continue
  115. }
  116. // add master file entry to local data file
  117. if err := v.fetchNeedle(volumeDataContentHandlerUrl, needleValue, compactRevision); err != nil {
  118. glog.V(0).Infof("Fetch needle %v from %s: %v", needleValue, volumeServer, err)
  119. return err
  120. }
  121. fetchCount++
  122. }
  123. glog.V(1).Infof("Fetched %d needles from %s", fetchCount, volumeServer)
  124. return nil
  125. }
  126. func fetchVolumeFileEntries(volumeServer string, vid VolumeId) (m *needle.CompactMap, lastOffset uint64, compactRevision uint16, err error) {
  127. m = needle.NewCompactMap()
  128. syncStatus, err := operation.GetVolumeSyncStatus(volumeServer, uint32(vid))
  129. if err != nil {
  130. return m, 0, 0, err
  131. }
  132. total := 0
  133. err = operation.GetVolumeIdxEntries(volumeServer, vid.String(), func(key NeedleId, offset Offset, size uint32) {
  134. // println("remote key", key, "offset", offset*NeedlePaddingSize, "size", size)
  135. if offset > 0 && size != TombstoneFileSize {
  136. m.Set(NeedleId(key), offset, size)
  137. } else {
  138. m.Delete(NeedleId(key))
  139. }
  140. total++
  141. })
  142. glog.V(2).Infof("server %s volume %d, entries %d, last offset %d, revision %d", volumeServer, vid, total, syncStatus.TailOffset, syncStatus.CompactRevision)
  143. return m, syncStatus.TailOffset, uint16(syncStatus.CompactRevision), err
  144. }
  145. func (v *Volume) GetVolumeSyncStatus() *volume_server_pb.VolumeSyncStatusResponse {
  146. var syncStatus = &volume_server_pb.VolumeSyncStatusResponse{}
  147. if stat, err := v.dataFile.Stat(); err == nil {
  148. syncStatus.TailOffset = uint64(stat.Size())
  149. }
  150. syncStatus.IdxFileSize = v.nm.IndexFileSize()
  151. syncStatus.CompactRevision = uint32(v.SuperBlock.CompactRevision)
  152. syncStatus.Ttl = v.SuperBlock.Ttl.String()
  153. syncStatus.Replication = v.SuperBlock.ReplicaPlacement.String()
  154. return syncStatus
  155. }
  156. func (v *Volume) IndexFileContent() ([]byte, error) {
  157. return v.nm.IndexFileContent()
  158. }
  159. // removeNeedle removes one needle by needle key
  160. func (v *Volume) removeNeedle(key NeedleId) {
  161. n := new(Needle)
  162. n.Id = key
  163. v.deleteNeedle(n)
  164. }
  165. // fetchNeedle fetches a remote volume needle by vid, id, offset
  166. // The compact revision is checked first in case the remote volume
  167. // is compacted and the offset is invalid any more.
  168. func (v *Volume) fetchNeedle(volumeDataContentHandlerUrl string,
  169. needleValue needle.NeedleValue, compactRevision uint16) error {
  170. // add master file entry to local data file
  171. values := make(url.Values)
  172. values.Add("revision", strconv.Itoa(int(compactRevision)))
  173. values.Add("volume", v.Id.String())
  174. values.Add("id", needleValue.Key.String())
  175. values.Add("offset", strconv.FormatUint(uint64(needleValue.Offset), 10))
  176. values.Add("size", strconv.FormatUint(uint64(needleValue.Size), 10))
  177. glog.V(4).Infof("Fetch %+v", needleValue)
  178. return util.GetUrlStream(volumeDataContentHandlerUrl, values, func(r io.Reader) error {
  179. b, err := ioutil.ReadAll(r)
  180. if err != nil {
  181. return fmt.Errorf("Reading from %s error: %v", volumeDataContentHandlerUrl, err)
  182. }
  183. offset, err := v.AppendBlob(b)
  184. if err != nil {
  185. return fmt.Errorf("Appending volume %d error: %v", v.Id, err)
  186. }
  187. // println("add key", needleValue.Key, "offset", offset, "size", needleValue.Size)
  188. v.nm.Put(needleValue.Key, Offset(offset/NeedlePaddingSize), needleValue.Size)
  189. return nil
  190. })
  191. }