You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

118 lines
3.5 KiB

6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
  1. package weed_server
  2. import (
  3. "context"
  4. "fmt"
  5. "time"
  6. "github.com/chrislusf/seaweedfs/weed/glog"
  7. "github.com/chrislusf/seaweedfs/weed/operation"
  8. "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
  9. "github.com/chrislusf/seaweedfs/weed/storage"
  10. "github.com/chrislusf/seaweedfs/weed/storage/needle"
  11. )
  12. func (vs *VolumeServer) VolumeTailSender(req *volume_server_pb.VolumeTailSenderRequest, stream volume_server_pb.VolumeServer_VolumeTailSenderServer) error {
  13. v := vs.store.GetVolume(needle.VolumeId(req.VolumeId))
  14. if v == nil {
  15. return fmt.Errorf("not found volume id %d", req.VolumeId)
  16. }
  17. defer glog.V(1).Infof("tailing volume %d finished", v.Id)
  18. lastTimestampNs := req.SinceNs
  19. drainingSeconds := req.IdleTimeoutSeconds
  20. for {
  21. lastProcessedTimestampNs, err := sendNeedlesSince(stream, v, lastTimestampNs)
  22. if err != nil {
  23. glog.Infof("sendNeedlesSince: %v", err)
  24. return fmt.Errorf("streamFollow: %v", err)
  25. }
  26. time.Sleep(2 * time.Second)
  27. if req.IdleTimeoutSeconds == 0 {
  28. lastTimestampNs = lastProcessedTimestampNs
  29. continue
  30. }
  31. if lastProcessedTimestampNs == lastTimestampNs {
  32. drainingSeconds--
  33. if drainingSeconds <= 0 {
  34. return nil
  35. }
  36. glog.V(1).Infof("tailing volume %d drains requests with %d seconds remaining", v.Id, drainingSeconds)
  37. } else {
  38. lastTimestampNs = lastProcessedTimestampNs
  39. drainingSeconds = req.IdleTimeoutSeconds
  40. glog.V(1).Infof("tailing volume %d resets draining wait time to %d seconds", v.Id, drainingSeconds)
  41. }
  42. }
  43. }
  44. func sendNeedlesSince(stream volume_server_pb.VolumeServer_VolumeTailSenderServer, v *storage.Volume, lastTimestampNs uint64) (lastProcessedTimestampNs uint64, err error) {
  45. foundOffset, isLastOne, err := v.BinarySearchByAppendAtNs(lastTimestampNs)
  46. if err != nil {
  47. return 0, fmt.Errorf("fail to locate by appendAtNs %d: %s", lastTimestampNs, err)
  48. }
  49. // log.Printf("reading ts %d offset %d isLast %v", lastTimestampNs, foundOffset, isLastOne)
  50. if isLastOne {
  51. // need to heart beat to the client to ensure the connection health
  52. sendErr := stream.Send(&volume_server_pb.VolumeTailSenderResponse{IsLastChunk: true})
  53. return lastTimestampNs, sendErr
  54. }
  55. err = storage.ScanVolumeFileNeedleFrom(v.Version(), v.DataFile(), foundOffset.ToAcutalOffset(), func(needleHeader, needleBody []byte, needleAppendAtNs uint64) error {
  56. blockSizeLimit := 1024 * 1024 * 2
  57. isLastChunk := false
  58. // need to send body by chunks
  59. for i := 0; i < len(needleBody); i += blockSizeLimit {
  60. stopOffset := i + blockSizeLimit
  61. if stopOffset >= len(needleBody) {
  62. isLastChunk = true
  63. stopOffset = len(needleBody)
  64. }
  65. sendErr := stream.Send(&volume_server_pb.VolumeTailSenderResponse{
  66. NeedleHeader: needleHeader,
  67. NeedleBody: needleBody[i:stopOffset],
  68. IsLastChunk: isLastChunk,
  69. })
  70. if sendErr != nil {
  71. return sendErr
  72. }
  73. }
  74. lastProcessedTimestampNs = needleAppendAtNs
  75. return nil
  76. })
  77. return
  78. }
  79. func (vs *VolumeServer) VolumeTailReceiver(ctx context.Context, req *volume_server_pb.VolumeTailReceiverRequest) (*volume_server_pb.VolumeTailReceiverResponse, error) {
  80. resp := &volume_server_pb.VolumeTailReceiverResponse{}
  81. v := vs.store.GetVolume(needle.VolumeId(req.VolumeId))
  82. if v == nil {
  83. return resp, fmt.Errorf("receiver not found volume id %d", req.VolumeId)
  84. }
  85. defer glog.V(1).Infof("receive tailing volume %d finished", v.Id)
  86. return resp, operation.TailVolumeFromSource(req.SourceVolumeServer, vs.grpcDialOption, v.Id, req.SinceNs, int(req.IdleTimeoutSeconds), func(n *needle.Needle) error {
  87. _, _, err := vs.store.Write(v.Id, n)
  88. return err
  89. })
  90. }