You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

253 lines
6.5 KiB

6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
  1. package storage
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/chrislusf/seaweedfs/weed/operation"
  6. "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
  7. . "github.com/chrislusf/seaweedfs/weed/storage/types"
  8. "google.golang.org/grpc"
  9. "io"
  10. "os"
  11. )
  12. func (v *Volume) GetVolumeSyncStatus() *volume_server_pb.VolumeSyncStatusResponse {
  13. var syncStatus = &volume_server_pb.VolumeSyncStatusResponse{}
  14. if stat, err := v.dataFile.Stat(); err == nil {
  15. syncStatus.TailOffset = uint64(stat.Size())
  16. }
  17. syncStatus.Collection = v.Collection
  18. syncStatus.IdxFileSize = v.nm.IndexFileSize()
  19. syncStatus.CompactRevision = uint32(v.SuperBlock.CompactRevision)
  20. syncStatus.Ttl = v.SuperBlock.Ttl.String()
  21. syncStatus.Replication = v.SuperBlock.ReplicaPlacement.String()
  22. return syncStatus
  23. }
  24. // The volume sync with a master volume via 2 steps:
  25. // 1. The slave checks master side to find subscription checkpoint
  26. // to setup the replication.
  27. // 2. The slave receives the updates from master
  28. /*
  29. Assume the slave volume needs to follow the master volume.
  30. The master volume could be compacted, and could be many files ahead of
  31. slave volume.
  32. Step 0: // implemented in command/backup.go, to avoid dat file size overflow.
  33. 0.1 If slave compact version is less than the master, do a local compaction, and set
  34. local compact version the same as the master.
  35. 0.2 If the slave size is still bigger than the master, discard local copy and do a full copy.
  36. Step 1:
  37. The slave volume ask the master by the last modification time t.
  38. The master do a binary search in volume (use .idx as an array, and check the appendAtNs in .dat file),
  39. to find the first entry with appendAtNs > t.
  40. Step 2:
  41. The master send content bytes to the slave. The bytes are not chunked by needle.
  42. Step 3:
  43. The slave generate the needle map for the new bytes. (This may be optimized to incrementally
  44. update needle map when receiving new .dat bytes. But seems not necessary now.)
  45. */
  46. func (v *Volume) Follow(volumeServer string, grpcDialOption grpc.DialOption) error {
  47. ctx := context.Background()
  48. startFromOffset := v.Size()
  49. appendAtNs, err := v.findLastAppendAtNs()
  50. if err != nil {
  51. return err
  52. }
  53. err = operation.WithVolumeServerClient(volumeServer, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
  54. stream, err := client.VolumeFollow(ctx, &volume_server_pb.VolumeFollowRequest{
  55. VolumeId: uint32(v.Id),
  56. Since: appendAtNs,
  57. })
  58. if err != nil {
  59. return err
  60. }
  61. v.dataFile.Seek(startFromOffset, io.SeekStart)
  62. for {
  63. resp, recvErr := stream.Recv()
  64. if recvErr != nil {
  65. if recvErr == io.EOF {
  66. break
  67. } else {
  68. return recvErr
  69. }
  70. }
  71. _, writeErr := v.dataFile.Write(resp.FileContent)
  72. if writeErr != nil {
  73. return writeErr
  74. }
  75. }
  76. return nil
  77. })
  78. if err != nil {
  79. return err
  80. }
  81. // add to needle map
  82. return ScanVolumeFileFrom(v.version, v.dataFile, startFromOffset, &VolumeFileScanner4GenIdx{v: v})
  83. }
  84. func (v *Volume) findLastAppendAtNs() (uint64, error) {
  85. offset, err := v.locateLastAppendEntry()
  86. if err != nil {
  87. return 0, err
  88. }
  89. if offset == 0 {
  90. return 0, nil
  91. }
  92. return v.readAppendAtNs(offset)
  93. }
  94. func (v *Volume) locateLastAppendEntry() (Offset, error) {
  95. indexFile, e := os.OpenFile(v.FileName()+".idx", os.O_RDONLY, 0644)
  96. if e != nil {
  97. return 0, fmt.Errorf("cannot read %s.idx: %v", v.FileName(), e)
  98. }
  99. defer indexFile.Close()
  100. fi, err := indexFile.Stat()
  101. if err != nil {
  102. return 0, fmt.Errorf("file %s stat error: %v", indexFile.Name(), err)
  103. }
  104. fileSize := fi.Size()
  105. if fileSize%NeedleEntrySize != 0 {
  106. return 0, fmt.Errorf("unexpected file %s size: %d", indexFile.Name(), fileSize)
  107. }
  108. if fileSize == 0 {
  109. return 0, nil
  110. }
  111. bytes := make([]byte, NeedleEntrySize)
  112. n, e := indexFile.ReadAt(bytes, fileSize-NeedleEntrySize)
  113. if n != NeedleEntrySize {
  114. return 0, fmt.Errorf("file %s read error: %v", indexFile.Name(), e)
  115. }
  116. _, offset, _ := IdxFileEntry(bytes)
  117. return offset, nil
  118. }
  119. func (v *Volume) readAppendAtNs(offset Offset) (uint64, error) {
  120. n, bodyLength, err := ReadNeedleHeader(v.dataFile, v.SuperBlock.version, int64(offset)*NeedlePaddingSize)
  121. if err != nil {
  122. return 0, fmt.Errorf("ReadNeedleHeader: %v", err)
  123. }
  124. err = n.ReadNeedleBody(v.dataFile, v.SuperBlock.version, int64(offset)*NeedlePaddingSize+int64(NeedleEntrySize), bodyLength)
  125. if err != nil {
  126. return 0, fmt.Errorf("ReadNeedleBody offset %d, bodyLength %d: %v", int64(offset)*NeedlePaddingSize, bodyLength, err)
  127. }
  128. return n.AppendAtNs, nil
  129. }
  130. // on server side
  131. func (v *Volume) BinarySearchByAppendAtNs(sinceNs uint64) (offset Offset, isLast bool, err error) {
  132. indexFile, openErr := os.OpenFile(v.FileName()+".idx", os.O_RDONLY, 0644)
  133. if openErr != nil {
  134. err = fmt.Errorf("cannot read %s.idx: %v", v.FileName(), openErr)
  135. return
  136. }
  137. defer indexFile.Close()
  138. fi, statErr := indexFile.Stat()
  139. if statErr != nil {
  140. err = fmt.Errorf("file %s stat error: %v", indexFile.Name(), statErr)
  141. return
  142. }
  143. fileSize := fi.Size()
  144. if fileSize%NeedleEntrySize != 0 {
  145. err = fmt.Errorf("unexpected file %s size: %d", indexFile.Name(), fileSize)
  146. return
  147. }
  148. bytes := make([]byte, NeedleEntrySize)
  149. entryCount := fileSize / NeedleEntrySize
  150. l := int64(0)
  151. h := entryCount
  152. for l < h {
  153. m := (l + h) / 2
  154. if m == entryCount {
  155. return 0, true, nil
  156. }
  157. // read the appendAtNs for entry m
  158. offset, err = v.readAppendAtNsForIndexEntry(indexFile, bytes, m)
  159. if err != nil {
  160. return
  161. }
  162. mNs, nsReadErr := v.readAppendAtNs(offset)
  163. if nsReadErr != nil {
  164. err = nsReadErr
  165. return
  166. }
  167. // move the boundary
  168. if mNs <= sinceNs {
  169. l = m + 1
  170. } else {
  171. h = m
  172. }
  173. }
  174. if l == entryCount {
  175. return 0, true, nil
  176. }
  177. offset, err = v.readAppendAtNsForIndexEntry(indexFile, bytes, l)
  178. return offset, false, err
  179. }
  180. // bytes is of size NeedleEntrySize
  181. func (v *Volume) readAppendAtNsForIndexEntry(indexFile *os.File, bytes []byte, m int64) (Offset, error) {
  182. if _, readErr := indexFile.ReadAt(bytes, m*NeedleEntrySize); readErr != nil && readErr != io.EOF {
  183. return 0, readErr
  184. }
  185. _, offset, _ := IdxFileEntry(bytes)
  186. return offset, nil
  187. }
  188. // generate the volume idx
  189. type VolumeFileScanner4GenIdx struct {
  190. v *Volume
  191. }
  192. func (scanner *VolumeFileScanner4GenIdx) VisitSuperBlock(superBlock SuperBlock) error {
  193. return nil
  194. }
  195. func (scanner *VolumeFileScanner4GenIdx) ReadNeedleBody() bool {
  196. return false
  197. }
  198. func (scanner *VolumeFileScanner4GenIdx) VisitNeedle(n *Needle, offset int64) error {
  199. if n.Size > 0 && n.Size != TombstoneFileSize {
  200. return scanner.v.nm.Put(n.Id, Offset(offset/NeedlePaddingSize), n.Size)
  201. }
  202. return scanner.v.nm.Delete(n.Id, Offset(offset/NeedlePaddingSize))
  203. }