You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

77 lines
2.0 KiB

  1. package weed_server
  2. import (
  3. "fmt"
  4. "time"
  5. "github.com/chrislusf/seaweedfs/weed/glog"
  6. "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
  7. "github.com/chrislusf/seaweedfs/weed/storage"
  8. )
  9. func (vs *VolumeServer) VolumeStreamFollow(req *volume_server_pb.VolumeStreamFollowRequest, stream volume_server_pb.VolumeServer_VolumeStreamFollowServer) error {
  10. v := vs.store.GetVolume(storage.VolumeId(req.VolumeId))
  11. if v == nil {
  12. return fmt.Errorf("not found volume id %d", req.VolumeId)
  13. }
  14. lastTimestampNs := req.SinceNs
  15. drainingSeconds := req.DrainingSeconds
  16. ticker := time.NewTicker(time.Second)
  17. for {
  18. select {
  19. case <-ticker.C:
  20. lastProcessedTimestampNs, err := sendNeedlesSince(stream, v, lastTimestampNs)
  21. if err != nil {
  22. return fmt.Errorf("streamFollow: %v", err)
  23. }
  24. if req.DrainingSeconds == 0 {
  25. continue
  26. }
  27. if lastProcessedTimestampNs == lastTimestampNs {
  28. drainingSeconds--
  29. if drainingSeconds <= 0 {
  30. return nil
  31. }
  32. glog.V(0).Infof("volume %d drains requests with %d seconds remaining ...", v.Id, drainingSeconds)
  33. } else {
  34. drainingSeconds = req.DrainingSeconds
  35. glog.V(0).Infof("volume %d resets draining wait time to %d seconds", v.Id, drainingSeconds)
  36. }
  37. }
  38. }
  39. }
  40. func sendNeedlesSince(stream volume_server_pb.VolumeServer_VolumeStreamFollowServer, v *storage.Volume, lastTimestampNs uint64) (lastProcessedTimestampNs uint64, err error) {
  41. foundOffset, isLastOne, err := v.BinarySearchByAppendAtNs(lastTimestampNs)
  42. if err != nil {
  43. return 0, fmt.Errorf("fail to locate by appendAtNs %d: %s", lastTimestampNs, err)
  44. }
  45. if isLastOne {
  46. return lastTimestampNs, nil
  47. }
  48. err = storage.ScanVolumeFileNeedleFrom(v.Version(), v.DataFile(), foundOffset.ToAcutalOffset(), func(needleHeader, needleBody []byte, needleAppendAtNs uint64) error {
  49. sendErr := stream.Send(&volume_server_pb.VolumeStreamFollowResponse{
  50. NeedleHeader: needleHeader,
  51. NeedleBody: needleBody,
  52. })
  53. if sendErr != nil {
  54. return sendErr
  55. }
  56. lastProcessedTimestampNs = needleAppendAtNs
  57. return nil
  58. })
  59. return
  60. }