You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

98 lines
2.8 KiB

6 years ago
6 years ago
  1. package weed_server
  2. import (
  3. "fmt"
  4. "time"
  5. "github.com/chrislusf/seaweedfs/weed/glog"
  6. "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
  7. "github.com/chrislusf/seaweedfs/weed/storage"
  8. "github.com/chrislusf/seaweedfs/weed/storage/needle"
  9. )
  10. func (vs *VolumeServer) VolumeTail(req *volume_server_pb.VolumeTailRequest, stream volume_server_pb.VolumeServer_VolumeTailServer) error {
  11. v := vs.store.GetVolume(needle.VolumeId(req.VolumeId))
  12. if v == nil {
  13. return fmt.Errorf("not found volume id %d", req.VolumeId)
  14. }
  15. defer glog.V(1).Infof("tailing volume %d finished", v.Id)
  16. lastTimestampNs := req.SinceNs
  17. drainingSeconds := req.DrainingSeconds
  18. for {
  19. lastProcessedTimestampNs, err := sendNeedlesSince(stream, v, lastTimestampNs)
  20. if err != nil {
  21. glog.Infof("sendNeedlesSince: %v", err)
  22. return fmt.Errorf("streamFollow: %v", err)
  23. }
  24. time.Sleep(2 * time.Second)
  25. if req.DrainingSeconds == 0 {
  26. lastTimestampNs = lastProcessedTimestampNs
  27. continue
  28. }
  29. if lastProcessedTimestampNs == lastTimestampNs {
  30. drainingSeconds--
  31. if drainingSeconds <= 0 {
  32. return nil
  33. }
  34. glog.V(1).Infof("tailing volume %d drains requests with %d seconds remaining", v.Id, drainingSeconds)
  35. } else {
  36. lastTimestampNs = lastProcessedTimestampNs
  37. drainingSeconds = req.DrainingSeconds
  38. glog.V(1).Infof("tailing volume %d resets draining wait time to %d seconds", v.Id, drainingSeconds)
  39. }
  40. }
  41. }
  42. func sendNeedlesSince(stream volume_server_pb.VolumeServer_VolumeTailServer, v *storage.Volume, lastTimestampNs uint64) (lastProcessedTimestampNs uint64, err error) {
  43. foundOffset, isLastOne, err := v.BinarySearchByAppendAtNs(lastTimestampNs)
  44. if err != nil {
  45. return 0, fmt.Errorf("fail to locate by appendAtNs %d: %s", lastTimestampNs, err)
  46. }
  47. // log.Printf("reading ts %d offset %d isLast %v", lastTimestampNs, foundOffset, isLastOne)
  48. if isLastOne {
  49. // need to heart beat to the client to ensure the connection health
  50. sendErr := stream.Send(&volume_server_pb.VolumeTailResponse{IsLastChunk: true})
  51. return lastTimestampNs, sendErr
  52. }
  53. err = storage.ScanVolumeFileNeedleFrom(v.Version(), v.DataFile(), foundOffset.ToAcutalOffset(), func(needleHeader, needleBody []byte, needleAppendAtNs uint64) error {
  54. blockSizeLimit := 1024 * 1024 * 2
  55. isLastChunk := false
  56. // need to send body by chunks
  57. for i := 0; i < len(needleBody); i += blockSizeLimit {
  58. stopOffset := i + blockSizeLimit
  59. if stopOffset >= len(needleBody) {
  60. isLastChunk = true
  61. stopOffset = len(needleBody)
  62. }
  63. sendErr := stream.Send(&volume_server_pb.VolumeTailResponse{
  64. NeedleHeader: needleHeader,
  65. NeedleBody: needleBody[i:stopOffset],
  66. IsLastChunk: isLastChunk,
  67. })
  68. if sendErr != nil {
  69. return sendErr
  70. }
  71. }
  72. lastProcessedTimestampNs = needleAppendAtNs
  73. return nil
  74. })
  75. return
  76. }