You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

220 lines
5.3 KiB

6 years ago
  1. package storage
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/chrislusf/seaweedfs/weed/operation"
  6. "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
  7. . "github.com/chrislusf/seaweedfs/weed/storage/types"
  8. "google.golang.org/grpc"
  9. "io"
  10. "os"
  11. )
  12. // The volume sync with a master volume via 2 steps:
  13. // 1. The slave checks master side to find subscription checkpoint
  14. // to setup the replication.
  15. // 2. The slave receives the updates from master
  16. /*
  17. Assume the slave volume needs to follow the master volume.
  18. The master volume could be compacted, and could be many files ahead of
  19. slave volume.
  20. Step 0: // implemented in command/backup.go, to avoid dat file size overflow.
  21. 0.1 If slave compact version is less than the master, do a local compaction, and set
  22. local compact version the same as the master.
  23. 0.2 If the slave size is still bigger than the master, discard local copy and do a full copy.
  24. Step 1:
  25. The slave volume ask the master by the last modification time t.
  26. The master do a binary search in volume (use .idx as an array, and check the appendAtNs in .dat file),
  27. to find the first entry with appendAtNs > t.
  28. Step 2:
  29. The master send content bytes to the slave. The bytes are not chunked by needle.
  30. Step 3:
  31. The slave generate the needle map for the new bytes. (This may be optimized to incrementally
  32. update needle map when receiving new .dat bytes. But seems not necessary now.)
  33. */
  34. func (v *Volume) Follow(volumeServer string, grpcDialOption grpc.DialOption) (error) {
  35. ctx := context.Background()
  36. startFromOffset := v.Size()
  37. appendAtNs, err := v.findLastAppendAtNs()
  38. if err != nil {
  39. return err
  40. }
  41. err = operation.WithVolumeServerClient(volumeServer, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
  42. stream, err := client.VolumeFollow(ctx, &volume_server_pb.VolumeFollowRequest{
  43. VolumeId: uint32(v.Id),
  44. Since: appendAtNs,
  45. })
  46. if err != nil {
  47. return err
  48. }
  49. v.dataFile.Seek(startFromOffset, io.SeekStart)
  50. for {
  51. resp, recvErr := stream.Recv()
  52. if recvErr != nil {
  53. if recvErr == io.EOF {
  54. break
  55. } else {
  56. return recvErr
  57. }
  58. }
  59. _, writeErr := v.dataFile.Write(resp.FileContent)
  60. if writeErr != nil {
  61. return writeErr
  62. }
  63. }
  64. return nil
  65. })
  66. if err != nil {
  67. return err
  68. }
  69. // TODO add to needle map
  70. return nil
  71. }
  72. func (v *Volume) findLastAppendAtNs() (uint64, error) {
  73. offset, err := v.locateLastAppendEntry()
  74. if err != nil {
  75. return 0, err
  76. }
  77. if offset == 0 {
  78. return 0, nil
  79. }
  80. return v.readAppendAtNs(offset)
  81. }
  82. func (v *Volume) locateLastAppendEntry() (Offset, error) {
  83. indexFile, e := os.OpenFile(v.FileName()+".idx", os.O_RDONLY, 0644)
  84. if e != nil {
  85. return 0, fmt.Errorf("cannot read %s.idx: %v", v.FileName(), e)
  86. }
  87. defer indexFile.Close()
  88. fi, err := indexFile.Stat()
  89. if err != nil {
  90. return 0, fmt.Errorf("file %s stat error: %v", indexFile.Name(), err)
  91. }
  92. fileSize := fi.Size()
  93. if fileSize%NeedleEntrySize != 0 {
  94. return 0, fmt.Errorf("unexpected file %s size: %d", indexFile.Name(), fileSize)
  95. }
  96. if fileSize == 0 {
  97. return 0, nil
  98. }
  99. bytes := make([]byte, NeedleEntrySize)
  100. n, e := indexFile.ReadAt(bytes, fileSize-NeedleEntrySize)
  101. if n != NeedleEntrySize {
  102. return 0, fmt.Errorf("file %s read error: %v", indexFile.Name(), e)
  103. }
  104. _, offset, _ := IdxFileEntry(bytes)
  105. return offset, nil
  106. }
  107. func (v *Volume) readAppendAtNs(offset Offset) (uint64, error) {
  108. n, bodyLength, err := ReadNeedleHeader(v.dataFile, v.SuperBlock.version, int64(offset)*NeedlePaddingSize)
  109. if err != nil {
  110. return 0, fmt.Errorf("ReadNeedleHeader: %v", err)
  111. }
  112. err = n.ReadNeedleBody(v.dataFile, v.SuperBlock.version, int64(offset)*NeedlePaddingSize, bodyLength)
  113. if err != nil {
  114. return 0, fmt.Errorf("ReadNeedleBody offset %d: %v", int64(offset)*NeedlePaddingSize, err)
  115. }
  116. return n.AppendAtNs, nil
  117. }
  118. // on server side
  119. func (v *Volume) BinarySearchByAppendAtNs(sinceNs uint64) (offset Offset, isLast bool, err error) {
  120. indexFile, openErr := os.OpenFile(v.FileName()+".idx", os.O_RDONLY, 0644)
  121. if openErr != nil {
  122. err = fmt.Errorf("cannot read %s.idx: %v", v.FileName(), openErr)
  123. return
  124. }
  125. defer indexFile.Close()
  126. fi, statErr := indexFile.Stat()
  127. if statErr != nil {
  128. err = fmt.Errorf("file %s stat error: %v", indexFile.Name(), statErr)
  129. return
  130. }
  131. fileSize := fi.Size()
  132. if fileSize%NeedleEntrySize != 0 {
  133. err = fmt.Errorf("unexpected file %s size: %d", indexFile.Name(), fileSize)
  134. return
  135. }
  136. bytes := make([]byte, NeedleEntrySize)
  137. entryCount := fileSize / NeedleEntrySize
  138. l := int64(0)
  139. h := entryCount
  140. for l < h {
  141. m := (l + h) / 2
  142. if m == entryCount {
  143. return 0, true, nil
  144. }
  145. // read the appendAtNs for entry m
  146. offset, err = v.readAppendAtNsForIndexEntry(indexFile, bytes, m)
  147. if err != nil {
  148. return
  149. }
  150. mNs, nsReadErr := v.readAppendAtNs(offset)
  151. if nsReadErr != nil {
  152. err = nsReadErr
  153. return
  154. }
  155. // move the boundary
  156. if mNs <= sinceNs {
  157. l = m + 1
  158. } else {
  159. h = m
  160. }
  161. }
  162. if l == entryCount {
  163. return 0, true, nil
  164. }
  165. offset, err = v.readAppendAtNsForIndexEntry(indexFile, bytes, l)
  166. return offset, false, err
  167. }
  168. // bytes is of size NeedleEntrySize
  169. func (v *Volume) readAppendAtNsForIndexEntry(indexFile *os.File, bytes []byte, m int64) (Offset, error) {
  170. if _, readErr := indexFile.ReadAt(bytes, m*NeedleEntrySize); readErr != nil && readErr != io.EOF {
  171. return 0, readErr
  172. }
  173. _, offset, _ := IdxFileEntry(bytes)
  174. return offset, nil
  175. }