You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

97 lines
2.7 KiB

  1. package weed_server
  2. import (
  3. "fmt"
  4. "time"
  5. "github.com/chrislusf/seaweedfs/weed/glog"
  6. "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
  7. "github.com/chrislusf/seaweedfs/weed/storage"
  8. )
  9. func (vs *VolumeServer) VolumeTail(req *volume_server_pb.VolumeTailRequest, stream volume_server_pb.VolumeServer_VolumeTailServer) error {
  10. v := vs.store.GetVolume(storage.VolumeId(req.VolumeId))
  11. if v == nil {
  12. return fmt.Errorf("not found volume id %d", req.VolumeId)
  13. }
  14. defer glog.V(1).Infof("tailing volume %d finished", v.Id)
  15. lastTimestampNs := req.SinceNs
  16. drainingSeconds := req.DrainingSeconds
  17. for {
  18. lastProcessedTimestampNs, err := sendNeedlesSince(stream, v, lastTimestampNs)
  19. if err != nil {
  20. glog.Infof("sendNeedlesSince: %v", err)
  21. return fmt.Errorf("streamFollow: %v", err)
  22. }
  23. time.Sleep(2 * time.Second)
  24. if req.DrainingSeconds == 0 {
  25. lastTimestampNs = lastProcessedTimestampNs
  26. continue
  27. }
  28. if lastProcessedTimestampNs == lastTimestampNs {
  29. drainingSeconds--
  30. if drainingSeconds <= 0 {
  31. return nil
  32. }
  33. glog.V(1).Infof("tailing volume %d drains requests with %d seconds remaining", v.Id, drainingSeconds)
  34. } else {
  35. lastTimestampNs = lastProcessedTimestampNs
  36. drainingSeconds = req.DrainingSeconds
  37. glog.V(1).Infof("tailing volume %d resets draining wait time to %d seconds", v.Id, drainingSeconds)
  38. }
  39. }
  40. }
  41. func sendNeedlesSince(stream volume_server_pb.VolumeServer_VolumeTailServer, v *storage.Volume, lastTimestampNs uint64) (lastProcessedTimestampNs uint64, err error) {
  42. foundOffset, isLastOne, err := v.BinarySearchByAppendAtNs(lastTimestampNs)
  43. if err != nil {
  44. return 0, fmt.Errorf("fail to locate by appendAtNs %d: %s", lastTimestampNs, err)
  45. }
  46. // log.Printf("reading ts %d offset %d isLast %v", lastTimestampNs, foundOffset, isLastOne)
  47. if isLastOne {
  48. // need to heart beat to the client to ensure the connection health
  49. sendErr := stream.Send(&volume_server_pb.VolumeTailResponse{IsLastChunk: true})
  50. return lastTimestampNs, sendErr
  51. }
  52. err = storage.ScanVolumeFileNeedleFrom(v.Version(), v.DataFile(), foundOffset.ToAcutalOffset(), func(needleHeader, needleBody []byte, needleAppendAtNs uint64) error {
  53. blockSizeLimit := 1024 * 1024 * 2
  54. isLastChunk := false
  55. // need to send body by chunks
  56. for i := 0; i < len(needleBody); i += blockSizeLimit {
  57. stopOffset := i + blockSizeLimit
  58. if stopOffset >= len(needleBody) {
  59. isLastChunk = true
  60. stopOffset = len(needleBody)
  61. }
  62. sendErr := stream.Send(&volume_server_pb.VolumeTailResponse{
  63. NeedleHeader: needleHeader,
  64. NeedleBody: needleBody[i:stopOffset],
  65. IsLastChunk: isLastChunk,
  66. })
  67. if sendErr != nil {
  68. return sendErr
  69. }
  70. }
  71. lastProcessedTimestampNs = needleAppendAtNs
  72. return nil
  73. })
  74. return
  75. }