You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

225 lines
7.3 KiB

  1. package storage
  2. import (
  3. "context"
  4. "fmt"
  5. "google.golang.org/grpc"
  6. "io"
  7. "os"
  8. "sort"
  9. "github.com/chrislusf/seaweedfs/weed/glog"
  10. "github.com/chrislusf/seaweedfs/weed/operation"
  11. "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
  12. "github.com/chrislusf/seaweedfs/weed/storage/needle"
  13. . "github.com/chrislusf/seaweedfs/weed/storage/types"
  14. )
  15. // The volume sync with a master volume via 2 steps:
  16. // 1. The slave checks master side to find subscription checkpoint
  17. // to setup the replication.
  18. // 2. The slave receives the updates from master
  19. /*
  20. Assume the slave volume needs to follow the master volume.
  21. The master volume could be compacted, and could be many files ahead of
  22. slave volume.
  23. Step 1:
  24. The slave volume will ask the master volume for a snapshot
  25. of (existing file entries, last offset, number of compacted times).
  26. For each entry x in master existing file entries:
  27. if x does not exist locally:
  28. add x locally
  29. For each entry y in local slave existing file entries:
  30. if y does not exist on master:
  31. delete y locally
  32. Step 2:
  33. After this, use the last offset and number of compacted times to request
  34. the master volume to send a new file, and keep looping. If the number of
  35. compacted times is changed, go back to step 1 (very likely this can be
  36. optimized more later).
  37. */
  38. func (v *Volume) Synchronize(volumeServer string, grpcDialOption grpc.DialOption) (err error) {
  39. var lastCompactRevision uint16 = 0
  40. var compactRevision uint16 = 0
  41. var masterMap *needle.CompactMap
  42. for i := 0; i < 3; i++ {
  43. if masterMap, _, compactRevision, err = fetchVolumeFileEntries(volumeServer, grpcDialOption, v.Id); err != nil {
  44. return fmt.Errorf("Failed to sync volume %d entries with %s: %v", v.Id, volumeServer, err)
  45. }
  46. if lastCompactRevision != compactRevision && lastCompactRevision != 0 {
  47. if err = v.Compact(0); err != nil {
  48. return fmt.Errorf("Compact Volume before synchronizing %v", err)
  49. }
  50. if err = v.commitCompact(); err != nil {
  51. return fmt.Errorf("Commit Compact before synchronizing %v", err)
  52. }
  53. }
  54. lastCompactRevision = compactRevision
  55. if err = v.trySynchronizing(volumeServer, grpcDialOption, masterMap, compactRevision); err == nil {
  56. return
  57. }
  58. }
  59. return
  60. }
  61. type ByOffset []needle.NeedleValue
  62. func (a ByOffset) Len() int { return len(a) }
  63. func (a ByOffset) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
  64. func (a ByOffset) Less(i, j int) bool { return a[i].Offset < a[j].Offset }
  65. // trySynchronizing sync with remote volume server incrementally by
  66. // make up the local and remote delta.
  67. func (v *Volume) trySynchronizing(volumeServer string, grpcDialOption grpc.DialOption, masterMap *needle.CompactMap, compactRevision uint16) error {
  68. slaveIdxFile, err := os.Open(v.nm.IndexFileName())
  69. if err != nil {
  70. return fmt.Errorf("Open volume %d index file: %v", v.Id, err)
  71. }
  72. defer slaveIdxFile.Close()
  73. slaveMap, err := LoadBtreeNeedleMap(slaveIdxFile)
  74. if err != nil {
  75. return fmt.Errorf("Load volume %d index file: %v", v.Id, err)
  76. }
  77. var delta []needle.NeedleValue
  78. if err := masterMap.Visit(func(needleValue needle.NeedleValue) error {
  79. if needleValue.Key == NeedleIdEmpty {
  80. return nil
  81. }
  82. if _, ok := slaveMap.Get(needleValue.Key); ok {
  83. return nil // skip intersection
  84. }
  85. delta = append(delta, needleValue)
  86. return nil
  87. }); err != nil {
  88. return fmt.Errorf("Add master entry: %v", err)
  89. }
  90. if err := slaveMap.m.Visit(func(needleValue needle.NeedleValue) error {
  91. if needleValue.Key == NeedleIdEmpty {
  92. return nil
  93. }
  94. if _, ok := masterMap.Get(needleValue.Key); ok {
  95. return nil // skip intersection
  96. }
  97. needleValue.Size = 0
  98. delta = append(delta, needleValue)
  99. return nil
  100. }); err != nil {
  101. return fmt.Errorf("Remove local entry: %v", err)
  102. }
  103. // simulate to same ordering of remote .dat file needle entries
  104. sort.Sort(ByOffset(delta))
  105. // make up the delta
  106. fetchCount := 0
  107. for _, needleValue := range delta {
  108. if needleValue.Size == 0 {
  109. // remove file entry from local
  110. v.removeNeedle(needleValue.Key)
  111. continue
  112. }
  113. // add master file entry to local data file
  114. if err := v.fetchNeedle(volumeServer, grpcDialOption, needleValue, compactRevision); err != nil {
  115. glog.V(0).Infof("Fetch needle %v from %s: %v", needleValue, volumeServer, err)
  116. return err
  117. }
  118. fetchCount++
  119. }
  120. glog.V(1).Infof("Fetched %d needles from %s", fetchCount, volumeServer)
  121. return nil
  122. }
  123. func fetchVolumeFileEntries(volumeServer string, grpcDialOption grpc.DialOption, vid VolumeId) (m *needle.CompactMap, lastOffset uint64, compactRevision uint16, err error) {
  124. m = needle.NewCompactMap()
  125. syncStatus, err := operation.GetVolumeSyncStatus(volumeServer, grpcDialOption, uint32(vid))
  126. if err != nil {
  127. return m, 0, 0, err
  128. }
  129. total := 0
  130. err = operation.GetVolumeIdxEntries(volumeServer, grpcDialOption, uint32(vid), func(key NeedleId, offset Offset, size uint32) {
  131. // println("remote key", key, "offset", offset*NeedlePaddingSize, "size", size)
  132. if offset > 0 && size != TombstoneFileSize {
  133. m.Set(NeedleId(key), offset, size)
  134. } else {
  135. m.Delete(NeedleId(key))
  136. }
  137. total++
  138. })
  139. glog.V(2).Infof("server %s volume %d, entries %d, last offset %d, revision %d", volumeServer, vid, total, syncStatus.TailOffset, syncStatus.CompactRevision)
  140. return m, syncStatus.TailOffset, uint16(syncStatus.CompactRevision), err
  141. }
  142. func (v *Volume) GetVolumeSyncStatus() *volume_server_pb.VolumeSyncStatusResponse {
  143. var syncStatus = &volume_server_pb.VolumeSyncStatusResponse{}
  144. if stat, err := v.dataFile.Stat(); err == nil {
  145. syncStatus.TailOffset = uint64(stat.Size())
  146. }
  147. syncStatus.Collection = v.Collection
  148. syncStatus.IdxFileSize = v.nm.IndexFileSize()
  149. syncStatus.CompactRevision = uint32(v.SuperBlock.CompactRevision)
  150. syncStatus.Ttl = v.SuperBlock.Ttl.String()
  151. syncStatus.Replication = v.SuperBlock.ReplicaPlacement.String()
  152. return syncStatus
  153. }
  154. func (v *Volume) IndexFileContent() ([]byte, error) {
  155. return v.nm.IndexFileContent()
  156. }
  157. // removeNeedle removes one needle by needle key
  158. func (v *Volume) removeNeedle(key NeedleId) {
  159. n := new(Needle)
  160. n.Id = key
  161. v.deleteNeedle(n)
  162. }
  163. // fetchNeedle fetches a remote volume needle by vid, id, offset
  164. // The compact revision is checked first in case the remote volume
  165. // is compacted and the offset is invalid any more.
  166. func (v *Volume) fetchNeedle(volumeServer string, grpcDialOption grpc.DialOption, needleValue needle.NeedleValue, compactRevision uint16) error {
  167. return operation.WithVolumeServerClient(volumeServer, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
  168. stream, err := client.VolumeSyncData(context.Background(), &volume_server_pb.VolumeSyncDataRequest{
  169. VolumdId: uint32(v.Id),
  170. Revision: uint32(compactRevision),
  171. Offset: uint32(needleValue.Offset),
  172. Size: uint32(needleValue.Size),
  173. NeedleId: needleValue.Key.String(),
  174. })
  175. if err != nil {
  176. return err
  177. }
  178. var fileContent []byte
  179. for {
  180. resp, err := stream.Recv()
  181. if err == io.EOF {
  182. break
  183. }
  184. if err != nil {
  185. return fmt.Errorf("read needle %v: %v", needleValue.Key.String(), err)
  186. }
  187. fileContent = append(fileContent, resp.FileContent...)
  188. }
  189. offset, err := v.AppendBlob(fileContent)
  190. if err != nil {
  191. return fmt.Errorf("Appending volume %d error: %v", v.Id, err)
  192. }
  193. // println("add key", needleValue.Key, "offset", offset, "size", needleValue.Size)
  194. v.nm.Put(needleValue.Key, Offset(offset/NeedlePaddingSize), needleValue.Size)
  195. return nil
  196. })
  197. }