You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

313 lines
7.9 KiB

6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
5 years ago
  1. package storage
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/chrislusf/seaweedfs/weed/pb"
  6. "io"
  7. "os"
  8. "google.golang.org/grpc"
  9. "github.com/chrislusf/seaweedfs/weed/operation"
  10. "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
  11. "github.com/chrislusf/seaweedfs/weed/storage/idx"
  12. "github.com/chrislusf/seaweedfs/weed/storage/needle"
  13. "github.com/chrislusf/seaweedfs/weed/storage/super_block"
  14. . "github.com/chrislusf/seaweedfs/weed/storage/types"
  15. )
  16. func (v *Volume) GetVolumeSyncStatus() *volume_server_pb.VolumeSyncStatusResponse {
  17. v.dataFileAccessLock.RLock()
  18. defer v.dataFileAccessLock.RUnlock()
  19. var syncStatus = &volume_server_pb.VolumeSyncStatusResponse{}
  20. if datSize, _, err := v.DataBackend.GetStat(); err == nil {
  21. syncStatus.TailOffset = uint64(datSize)
  22. }
  23. syncStatus.Collection = v.Collection
  24. syncStatus.IdxFileSize = v.nm.IndexFileSize()
  25. syncStatus.CompactRevision = uint32(v.SuperBlock.CompactionRevision)
  26. syncStatus.Ttl = v.SuperBlock.Ttl.String()
  27. syncStatus.Replication = v.SuperBlock.ReplicaPlacement.String()
  28. return syncStatus
  29. }
  30. // The volume sync with a master volume via 2 steps:
  31. // 1. The slave checks master side to find subscription checkpoint
  32. // to setup the replication.
  33. // 2. The slave receives the updates from master
  34. /*
  35. Assume the slave volume needs to follow the master volume.
  36. The master volume could be compacted, and could be many files ahead of
  37. slave volume.
  38. Step 0: // implemented in command/backup.go, to avoid dat file size overflow.
  39. 0.1 If slave compact version is less than the master, do a local compaction, and set
  40. local compact version the same as the master.
  41. 0.2 If the slave size is still bigger than the master, discard local copy and do a full copy.
  42. Step 1:
  43. The slave volume ask the master by the last modification time t.
  44. The master do a binary search in volume (use .idx as an array, and check the appendAtNs in .dat file),
  45. to find the first entry with appendAtNs > t.
  46. Step 2:
  47. The master send content bytes to the slave. The bytes are not chunked by needle.
  48. Step 3:
  49. The slave generate the needle map for the new bytes. (This may be optimized to incrementally
  50. update needle map when receiving new .dat bytes. But seems not necessary now.)
  51. */
  52. func (v *Volume) IncrementalBackup(volumeServer pb.ServerAddress, grpcDialOption grpc.DialOption) error {
  53. startFromOffset, _, _ := v.FileStat()
  54. appendAtNs, err := v.findLastAppendAtNs()
  55. if err != nil {
  56. return err
  57. }
  58. writeOffset := int64(startFromOffset)
  59. err = operation.WithVolumeServerClient(volumeServer, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
  60. stream, err := client.VolumeIncrementalCopy(context.Background(), &volume_server_pb.VolumeIncrementalCopyRequest{
  61. VolumeId: uint32(v.Id),
  62. SinceNs: appendAtNs,
  63. })
  64. if err != nil {
  65. return err
  66. }
  67. for {
  68. resp, recvErr := stream.Recv()
  69. if recvErr != nil {
  70. if recvErr == io.EOF {
  71. break
  72. } else {
  73. return recvErr
  74. }
  75. }
  76. n, writeErr := v.DataBackend.WriteAt(resp.FileContent, writeOffset)
  77. if writeErr != nil {
  78. return writeErr
  79. }
  80. writeOffset += int64(n)
  81. }
  82. return nil
  83. })
  84. if err != nil {
  85. return err
  86. }
  87. // add to needle map
  88. return ScanVolumeFileFrom(v.Version(), v.DataBackend, int64(startFromOffset), &VolumeFileScanner4GenIdx{v: v})
  89. }
  90. func (v *Volume) findLastAppendAtNs() (uint64, error) {
  91. offset, err := v.locateLastAppendEntry()
  92. if err != nil {
  93. return 0, err
  94. }
  95. if offset.IsZero() {
  96. return 0, nil
  97. }
  98. return v.readAppendAtNs(offset)
  99. }
  100. func (v *Volume) locateLastAppendEntry() (Offset, error) {
  101. indexFile, e := os.OpenFile(v.FileName(".idx"), os.O_RDONLY, 0644)
  102. if e != nil {
  103. return Offset{}, fmt.Errorf("cannot read %s: %v", v.FileName(".idx"), e)
  104. }
  105. defer indexFile.Close()
  106. fi, err := indexFile.Stat()
  107. if err != nil {
  108. return Offset{}, fmt.Errorf("file %s stat error: %v", indexFile.Name(), err)
  109. }
  110. fileSize := fi.Size()
  111. if fileSize%NeedleMapEntrySize != 0 {
  112. return Offset{}, fmt.Errorf("unexpected file %s size: %d", indexFile.Name(), fileSize)
  113. }
  114. if fileSize == 0 {
  115. return Offset{}, nil
  116. }
  117. bytes := make([]byte, NeedleMapEntrySize)
  118. n, e := indexFile.ReadAt(bytes, fileSize-NeedleMapEntrySize)
  119. if n != NeedleMapEntrySize {
  120. return Offset{}, fmt.Errorf("file %s read error: %v", indexFile.Name(), e)
  121. }
  122. _, offset, _ := idx.IdxFileEntry(bytes)
  123. return offset, nil
  124. }
  125. func (v *Volume) readAppendAtNs(offset Offset) (uint64, error) {
  126. n, _, bodyLength, err := needle.ReadNeedleHeader(v.DataBackend, v.SuperBlock.Version, offset.ToActualOffset())
  127. if err != nil {
  128. return 0, fmt.Errorf("ReadNeedleHeader %s [%d,%d): %v", v.DataBackend.Name(), offset.ToActualOffset(), offset.ToActualOffset()+NeedleHeaderSize, err)
  129. }
  130. _, err = n.ReadNeedleBody(v.DataBackend, v.SuperBlock.Version, offset.ToActualOffset()+NeedleHeaderSize, bodyLength)
  131. if err != nil {
  132. return 0, fmt.Errorf("ReadNeedleBody offset %d, bodyLength %d: %v", offset.ToActualOffset(), bodyLength, err)
  133. }
  134. return n.AppendAtNs, nil
  135. }
  136. // on server side
  137. func (v *Volume) BinarySearchByAppendAtNs(sinceNs uint64) (offset Offset, isLast bool, err error) {
  138. fileSize := int64(v.IndexFileSize())
  139. if fileSize%NeedleMapEntrySize != 0 {
  140. err = fmt.Errorf("unexpected file %s.idx size: %d", v.IndexFileName(), fileSize)
  141. return
  142. }
  143. entryCount := fileSize / NeedleMapEntrySize
  144. l := int64(0)
  145. h := entryCount
  146. for l < h {
  147. m := (l + h) / 2
  148. if m == entryCount {
  149. return Offset{}, true, nil
  150. }
  151. // read the appendAtNs for entry m
  152. offset, err = v.readOffsetFromIndex(m)
  153. if err != nil {
  154. err = fmt.Errorf("read entry %d: %v", m, err)
  155. return
  156. }
  157. if offset.IsZero() {
  158. leftIndex, _, leftNs, leftErr := v.readLeftNs(m)
  159. if leftErr != nil {
  160. err = leftErr
  161. return
  162. }
  163. rightIndex, rightOffset, rightNs, rightErr := v.readRightNs(m)
  164. if rightErr != nil {
  165. err = rightErr
  166. return
  167. }
  168. if rightNs <= sinceNs {
  169. l = rightIndex
  170. if l == entryCount {
  171. return Offset{}, true, nil
  172. } else {
  173. continue
  174. }
  175. }
  176. if sinceNs < leftNs {
  177. h = leftIndex + 1
  178. continue
  179. }
  180. return rightOffset, false, nil
  181. }
  182. if offset.IsZero() {
  183. return Offset{}, true, nil
  184. }
  185. mNs, nsReadErr := v.readAppendAtNs(offset)
  186. if nsReadErr != nil {
  187. err = fmt.Errorf("read entry %d offset %d: %v", m, offset, nsReadErr)
  188. return
  189. }
  190. // move the boundary
  191. if mNs <= sinceNs {
  192. l = m + 1
  193. } else {
  194. h = m
  195. }
  196. }
  197. if l == entryCount {
  198. return Offset{}, true, nil
  199. }
  200. offset, err = v.readOffsetFromIndex(l)
  201. return offset, false, err
  202. }
  203. func (v *Volume) readRightNs(m int64) (index int64, offset Offset, ts uint64, err error) {
  204. index = m
  205. for offset.IsZero() {
  206. index++
  207. offset, err = v.readOffsetFromIndex(index)
  208. if err != nil {
  209. err = fmt.Errorf("read entry %d: %v", index, err)
  210. return
  211. }
  212. }
  213. if !offset.IsZero() {
  214. ts, err = v.readAppendAtNs(offset)
  215. }
  216. return
  217. }
  218. func (v *Volume) readLeftNs(m int64) (index int64, offset Offset, ts uint64, err error) {
  219. index = m
  220. for offset.IsZero() {
  221. index--
  222. offset, err = v.readOffsetFromIndex(index)
  223. if err != nil {
  224. err = fmt.Errorf("read entry %d: %v", index, err)
  225. return
  226. }
  227. }
  228. if !offset.IsZero() {
  229. ts, err = v.readAppendAtNs(offset)
  230. }
  231. return
  232. }
  233. // bytes is of size NeedleMapEntrySize
  234. func (v *Volume) readOffsetFromIndex(m int64) (Offset, error) {
  235. v.dataFileAccessLock.RLock()
  236. defer v.dataFileAccessLock.RUnlock()
  237. if v.nm == nil {
  238. return Offset{}, io.EOF
  239. }
  240. _, offset, _, err := v.nm.ReadIndexEntry(m)
  241. return offset, err
  242. }
  243. // generate the volume idx
  244. type VolumeFileScanner4GenIdx struct {
  245. v *Volume
  246. }
  247. func (scanner *VolumeFileScanner4GenIdx) VisitSuperBlock(superBlock super_block.SuperBlock) error {
  248. return nil
  249. }
  250. func (scanner *VolumeFileScanner4GenIdx) ReadNeedleBody() bool {
  251. return false
  252. }
  253. func (scanner *VolumeFileScanner4GenIdx) VisitNeedle(n *needle.Needle, offset int64, needleHeader, needleBody []byte) error {
  254. if n.Size > 0 && n.Size.IsValid() {
  255. return scanner.v.nm.Put(n.Id, ToOffset(offset), n.Size)
  256. }
  257. return scanner.v.nm.Delete(n.Id, ToOffset(offset))
  258. }