You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

255 lines
6.6 KiB

6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
  1. package storage
  2. import (
  3. "context"
  4. "fmt"
  5. "io"
  6. "os"
  7. "github.com/chrislusf/seaweedfs/weed/operation"
  8. "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
  9. "github.com/chrislusf/seaweedfs/weed/storage/needle"
  10. . "github.com/chrislusf/seaweedfs/weed/storage/types"
  11. "google.golang.org/grpc"
  12. )
  13. func (v *Volume) GetVolumeSyncStatus() *volume_server_pb.VolumeSyncStatusResponse {
  14. var syncStatus = &volume_server_pb.VolumeSyncStatusResponse{}
  15. if stat, err := v.dataFile.Stat(); err == nil {
  16. syncStatus.TailOffset = uint64(stat.Size())
  17. }
  18. syncStatus.Collection = v.Collection
  19. syncStatus.IdxFileSize = v.nm.IndexFileSize()
  20. syncStatus.CompactRevision = uint32(v.SuperBlock.CompactionRevision)
  21. syncStatus.Ttl = v.SuperBlock.Ttl.String()
  22. syncStatus.Replication = v.SuperBlock.ReplicaPlacement.String()
  23. return syncStatus
  24. }
  25. // The volume sync with a master volume via 2 steps:
  26. // 1. The slave checks master side to find subscription checkpoint
  27. // to setup the replication.
  28. // 2. The slave receives the updates from master
  29. /*
  30. Assume the slave volume needs to follow the master volume.
  31. The master volume could be compacted, and could be many files ahead of
  32. slave volume.
  33. Step 0: // implemented in command/backup.go, to avoid dat file size overflow.
  34. 0.1 If slave compact version is less than the master, do a local compaction, and set
  35. local compact version the same as the master.
  36. 0.2 If the slave size is still bigger than the master, discard local copy and do a full copy.
  37. Step 1:
  38. The slave volume ask the master by the last modification time t.
  39. The master do a binary search in volume (use .idx as an array, and check the appendAtNs in .dat file),
  40. to find the first entry with appendAtNs > t.
  41. Step 2:
  42. The master send content bytes to the slave. The bytes are not chunked by needle.
  43. Step 3:
  44. The slave generate the needle map for the new bytes. (This may be optimized to incrementally
  45. update needle map when receiving new .dat bytes. But seems not necessary now.)
  46. */
  47. func (v *Volume) IncrementalBackup(volumeServer string, grpcDialOption grpc.DialOption) error {
  48. ctx := context.Background()
  49. startFromOffset, _, _ := v.FileStat()
  50. appendAtNs, err := v.findLastAppendAtNs()
  51. if err != nil {
  52. return err
  53. }
  54. err = operation.WithVolumeServerClient(volumeServer, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
  55. stream, err := client.VolumeIncrementalCopy(ctx, &volume_server_pb.VolumeIncrementalCopyRequest{
  56. VolumeId: uint32(v.Id),
  57. SinceNs: appendAtNs,
  58. })
  59. if err != nil {
  60. return err
  61. }
  62. v.dataFile.Seek(int64(startFromOffset), io.SeekStart)
  63. for {
  64. resp, recvErr := stream.Recv()
  65. if recvErr != nil {
  66. if recvErr == io.EOF {
  67. break
  68. } else {
  69. return recvErr
  70. }
  71. }
  72. _, writeErr := v.dataFile.Write(resp.FileContent)
  73. if writeErr != nil {
  74. return writeErr
  75. }
  76. }
  77. return nil
  78. })
  79. if err != nil {
  80. return err
  81. }
  82. // add to needle map
  83. return ScanVolumeFileFrom(v.version, v.dataFile, int64(startFromOffset), &VolumeFileScanner4GenIdx{v: v})
  84. }
  85. func (v *Volume) findLastAppendAtNs() (uint64, error) {
  86. offset, err := v.locateLastAppendEntry()
  87. if err != nil {
  88. return 0, err
  89. }
  90. if offset.IsZero() {
  91. return 0, nil
  92. }
  93. return v.readAppendAtNs(offset)
  94. }
  95. func (v *Volume) locateLastAppendEntry() (Offset, error) {
  96. indexFile, e := os.OpenFile(v.FileName()+".idx", os.O_RDONLY, 0644)
  97. if e != nil {
  98. return Offset{}, fmt.Errorf("cannot read %s.idx: %v", v.FileName(), e)
  99. }
  100. defer indexFile.Close()
  101. fi, err := indexFile.Stat()
  102. if err != nil {
  103. return Offset{}, fmt.Errorf("file %s stat error: %v", indexFile.Name(), err)
  104. }
  105. fileSize := fi.Size()
  106. if fileSize%NeedleMapEntrySize != 0 {
  107. return Offset{}, fmt.Errorf("unexpected file %s size: %d", indexFile.Name(), fileSize)
  108. }
  109. if fileSize == 0 {
  110. return Offset{}, nil
  111. }
  112. bytes := make([]byte, NeedleMapEntrySize)
  113. n, e := indexFile.ReadAt(bytes, fileSize-NeedleMapEntrySize)
  114. if n != NeedleMapEntrySize {
  115. return Offset{}, fmt.Errorf("file %s read error: %v", indexFile.Name(), e)
  116. }
  117. _, offset, _ := IdxFileEntry(bytes)
  118. return offset, nil
  119. }
  120. func (v *Volume) readAppendAtNs(offset Offset) (uint64, error) {
  121. n, _, bodyLength, err := needle.ReadNeedleHeader(v.dataFile, v.SuperBlock.version, offset.ToAcutalOffset())
  122. if err != nil {
  123. return 0, fmt.Errorf("ReadNeedleHeader: %v", err)
  124. }
  125. _, err = n.ReadNeedleBody(v.dataFile, v.SuperBlock.version, offset.ToAcutalOffset()+int64(NeedleHeaderSize), bodyLength)
  126. if err != nil {
  127. return 0, fmt.Errorf("ReadNeedleBody offset %d, bodyLength %d: %v", offset.ToAcutalOffset(), bodyLength, err)
  128. }
  129. return n.AppendAtNs, nil
  130. }
  131. // on server side
  132. func (v *Volume) BinarySearchByAppendAtNs(sinceNs uint64) (offset Offset, isLast bool, err error) {
  133. indexFile, openErr := os.OpenFile(v.FileName()+".idx", os.O_RDONLY, 0644)
  134. if openErr != nil {
  135. err = fmt.Errorf("cannot read %s.idx: %v", v.FileName(), openErr)
  136. return
  137. }
  138. defer indexFile.Close()
  139. fi, statErr := indexFile.Stat()
  140. if statErr != nil {
  141. err = fmt.Errorf("file %s stat error: %v", indexFile.Name(), statErr)
  142. return
  143. }
  144. fileSize := fi.Size()
  145. if fileSize%NeedleMapEntrySize != 0 {
  146. err = fmt.Errorf("unexpected file %s size: %d", indexFile.Name(), fileSize)
  147. return
  148. }
  149. bytes := make([]byte, NeedleMapEntrySize)
  150. entryCount := fileSize / NeedleMapEntrySize
  151. l := int64(0)
  152. h := entryCount
  153. for l < h {
  154. m := (l + h) / 2
  155. if m == entryCount {
  156. return Offset{}, true, nil
  157. }
  158. // read the appendAtNs for entry m
  159. offset, err = v.readAppendAtNsForIndexEntry(indexFile, bytes, m)
  160. if err != nil {
  161. return
  162. }
  163. mNs, nsReadErr := v.readAppendAtNs(offset)
  164. if nsReadErr != nil {
  165. err = nsReadErr
  166. return
  167. }
  168. // move the boundary
  169. if mNs <= sinceNs {
  170. l = m + 1
  171. } else {
  172. h = m
  173. }
  174. }
  175. if l == entryCount {
  176. return Offset{}, true, nil
  177. }
  178. offset, err = v.readAppendAtNsForIndexEntry(indexFile, bytes, l)
  179. return offset, false, err
  180. }
  181. // bytes is of size NeedleMapEntrySize
  182. func (v *Volume) readAppendAtNsForIndexEntry(indexFile *os.File, bytes []byte, m int64) (Offset, error) {
  183. if _, readErr := indexFile.ReadAt(bytes, m*NeedleMapEntrySize); readErr != nil && readErr != io.EOF {
  184. return Offset{}, readErr
  185. }
  186. _, offset, _ := IdxFileEntry(bytes)
  187. return offset, nil
  188. }
  189. // generate the volume idx
  190. type VolumeFileScanner4GenIdx struct {
  191. v *Volume
  192. }
  193. func (scanner *VolumeFileScanner4GenIdx) VisitSuperBlock(superBlock SuperBlock) error {
  194. return nil
  195. }
  196. func (scanner *VolumeFileScanner4GenIdx) ReadNeedleBody() bool {
  197. return false
  198. }
  199. func (scanner *VolumeFileScanner4GenIdx) VisitNeedle(n *needle.Needle, offset int64) error {
  200. if n.Size > 0 && n.Size != TombstoneFileSize {
  201. return scanner.v.nm.Put(n.Id, ToOffset(offset), n.Size)
  202. }
  203. return scanner.v.nm.Delete(n.Id, ToOffset(offset))
  204. }