You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

260 lines
6.8 KiB

6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
  1. package storage
  2. import (
  3. "context"
  4. "fmt"
  5. "io"
  6. "os"
  7. "github.com/chrislusf/seaweedfs/weed/operation"
  8. "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
  9. "github.com/chrislusf/seaweedfs/weed/storage/idx"
  10. "github.com/chrislusf/seaweedfs/weed/storage/needle"
  11. . "github.com/chrislusf/seaweedfs/weed/storage/types"
  12. "google.golang.org/grpc"
  13. )
  14. func (v *Volume) GetVolumeSyncStatus() *volume_server_pb.VolumeSyncStatusResponse {
  15. v.dataFileAccessLock.Lock()
  16. defer v.dataFileAccessLock.Unlock()
  17. var syncStatus = &volume_server_pb.VolumeSyncStatusResponse{}
  18. if datSize, _, err := v.DataBackend.GetStat(); err == nil {
  19. syncStatus.TailOffset = uint64(datSize)
  20. }
  21. syncStatus.Collection = v.Collection
  22. syncStatus.IdxFileSize = v.nm.IndexFileSize()
  23. syncStatus.CompactRevision = uint32(v.SuperBlock.CompactionRevision)
  24. syncStatus.Ttl = v.SuperBlock.Ttl.String()
  25. syncStatus.Replication = v.SuperBlock.ReplicaPlacement.String()
  26. return syncStatus
  27. }
  28. // The volume sync with a master volume via 2 steps:
  29. // 1. The slave checks master side to find subscription checkpoint
  30. // to setup the replication.
  31. // 2. The slave receives the updates from master
  32. /*
  33. Assume the slave volume needs to follow the master volume.
  34. The master volume could be compacted, and could be many files ahead of
  35. slave volume.
  36. Step 0: // implemented in command/backup.go, to avoid dat file size overflow.
  37. 0.1 If slave compact version is less than the master, do a local compaction, and set
  38. local compact version the same as the master.
  39. 0.2 If the slave size is still bigger than the master, discard local copy and do a full copy.
  40. Step 1:
  41. The slave volume ask the master by the last modification time t.
  42. The master do a binary search in volume (use .idx as an array, and check the appendAtNs in .dat file),
  43. to find the first entry with appendAtNs > t.
  44. Step 2:
  45. The master send content bytes to the slave. The bytes are not chunked by needle.
  46. Step 3:
  47. The slave generate the needle map for the new bytes. (This may be optimized to incrementally
  48. update needle map when receiving new .dat bytes. But seems not necessary now.)
  49. */
  50. func (v *Volume) IncrementalBackup(volumeServer string, grpcDialOption grpc.DialOption) error {
  51. ctx := context.Background()
  52. startFromOffset, _, _ := v.FileStat()
  53. appendAtNs, err := v.findLastAppendAtNs()
  54. if err != nil {
  55. return err
  56. }
  57. writeOffset := int64(startFromOffset)
  58. err = operation.WithVolumeServerClient(volumeServer, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
  59. stream, err := client.VolumeIncrementalCopy(ctx, &volume_server_pb.VolumeIncrementalCopyRequest{
  60. VolumeId: uint32(v.Id),
  61. SinceNs: appendAtNs,
  62. })
  63. if err != nil {
  64. return err
  65. }
  66. for {
  67. resp, recvErr := stream.Recv()
  68. if recvErr != nil {
  69. if recvErr == io.EOF {
  70. break
  71. } else {
  72. return recvErr
  73. }
  74. }
  75. n, writeErr := v.DataBackend.WriteAt(resp.FileContent, writeOffset)
  76. if writeErr != nil {
  77. return writeErr
  78. }
  79. writeOffset += int64(n)
  80. }
  81. return nil
  82. })
  83. if err != nil {
  84. return err
  85. }
  86. // add to needle map
  87. return ScanVolumeFileFrom(v.version, v.DataBackend, int64(startFromOffset), &VolumeFileScanner4GenIdx{v: v})
  88. }
  89. func (v *Volume) findLastAppendAtNs() (uint64, error) {
  90. offset, err := v.locateLastAppendEntry()
  91. if err != nil {
  92. return 0, err
  93. }
  94. if offset.IsZero() {
  95. return 0, nil
  96. }
  97. return v.readAppendAtNs(offset)
  98. }
  99. func (v *Volume) locateLastAppendEntry() (Offset, error) {
  100. indexFile, e := os.OpenFile(v.FileName()+".idx", os.O_RDONLY, 0644)
  101. if e != nil {
  102. return Offset{}, fmt.Errorf("cannot read %s.idx: %v", v.FileName(), e)
  103. }
  104. defer indexFile.Close()
  105. fi, err := indexFile.Stat()
  106. if err != nil {
  107. return Offset{}, fmt.Errorf("file %s stat error: %v", indexFile.Name(), err)
  108. }
  109. fileSize := fi.Size()
  110. if fileSize%NeedleMapEntrySize != 0 {
  111. return Offset{}, fmt.Errorf("unexpected file %s size: %d", indexFile.Name(), fileSize)
  112. }
  113. if fileSize == 0 {
  114. return Offset{}, nil
  115. }
  116. bytes := make([]byte, NeedleMapEntrySize)
  117. n, e := indexFile.ReadAt(bytes, fileSize-NeedleMapEntrySize)
  118. if n != NeedleMapEntrySize {
  119. return Offset{}, fmt.Errorf("file %s read error: %v", indexFile.Name(), e)
  120. }
  121. _, offset, _ := idx.IdxFileEntry(bytes)
  122. return offset, nil
  123. }
  124. func (v *Volume) readAppendAtNs(offset Offset) (uint64, error) {
  125. n, _, bodyLength, err := needle.ReadNeedleHeader(v.DataBackend, v.SuperBlock.version, offset.ToAcutalOffset())
  126. if err != nil {
  127. return 0, fmt.Errorf("ReadNeedleHeader: %v", err)
  128. }
  129. _, err = n.ReadNeedleBody(v.DataBackend, v.SuperBlock.version, offset.ToAcutalOffset()+int64(NeedleHeaderSize), bodyLength)
  130. if err != nil {
  131. return 0, fmt.Errorf("ReadNeedleBody offset %d, bodyLength %d: %v", offset.ToAcutalOffset(), bodyLength, err)
  132. }
  133. return n.AppendAtNs, nil
  134. }
  135. // on server side
  136. func (v *Volume) BinarySearchByAppendAtNs(sinceNs uint64) (offset Offset, isLast bool, err error) {
  137. indexFile, openErr := os.OpenFile(v.FileName()+".idx", os.O_RDONLY, 0644)
  138. if openErr != nil {
  139. err = fmt.Errorf("cannot read %s.idx: %v", v.FileName(), openErr)
  140. return
  141. }
  142. defer indexFile.Close()
  143. fi, statErr := indexFile.Stat()
  144. if statErr != nil {
  145. err = fmt.Errorf("file %s stat error: %v", indexFile.Name(), statErr)
  146. return
  147. }
  148. fileSize := fi.Size()
  149. if fileSize%NeedleMapEntrySize != 0 {
  150. err = fmt.Errorf("unexpected file %s size: %d", indexFile.Name(), fileSize)
  151. return
  152. }
  153. bytes := make([]byte, NeedleMapEntrySize)
  154. entryCount := fileSize / NeedleMapEntrySize
  155. l := int64(0)
  156. h := entryCount
  157. for l < h {
  158. m := (l + h) / 2
  159. if m == entryCount {
  160. return Offset{}, true, nil
  161. }
  162. // read the appendAtNs for entry m
  163. offset, err = v.readAppendAtNsForIndexEntry(indexFile, bytes, m)
  164. if err != nil {
  165. return
  166. }
  167. mNs, nsReadErr := v.readAppendAtNs(offset)
  168. if nsReadErr != nil {
  169. err = nsReadErr
  170. return
  171. }
  172. // move the boundary
  173. if mNs <= sinceNs {
  174. l = m + 1
  175. } else {
  176. h = m
  177. }
  178. }
  179. if l == entryCount {
  180. return Offset{}, true, nil
  181. }
  182. offset, err = v.readAppendAtNsForIndexEntry(indexFile, bytes, l)
  183. return offset, false, err
  184. }
  185. // bytes is of size NeedleMapEntrySize
  186. func (v *Volume) readAppendAtNsForIndexEntry(indexFile *os.File, bytes []byte, m int64) (Offset, error) {
  187. if _, readErr := indexFile.ReadAt(bytes, m*NeedleMapEntrySize); readErr != nil && readErr != io.EOF {
  188. return Offset{}, readErr
  189. }
  190. _, offset, _ := idx.IdxFileEntry(bytes)
  191. return offset, nil
  192. }
  193. // generate the volume idx
  194. type VolumeFileScanner4GenIdx struct {
  195. v *Volume
  196. }
  197. func (scanner *VolumeFileScanner4GenIdx) VisitSuperBlock(superBlock SuperBlock) error {
  198. return nil
  199. }
  200. func (scanner *VolumeFileScanner4GenIdx) ReadNeedleBody() bool {
  201. return false
  202. }
  203. func (scanner *VolumeFileScanner4GenIdx) VisitNeedle(n *needle.Needle, offset int64, needleHeader, needleBody []byte) error {
  204. if n.Size > 0 && n.Size != TombstoneFileSize {
  205. return scanner.v.nm.Put(n.Id, ToOffset(offset), n.Size)
  206. }
  207. return scanner.v.nm.Delete(n.Id, ToOffset(offset))
  208. }