You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

67 lines
1.8 KiB

6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
  1. package weed_server
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/chrislusf/seaweedfs/weed/storage/types"
  6. "io"
  7. "os"
  8. "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
  9. "github.com/chrislusf/seaweedfs/weed/storage"
  10. )
  11. func (vs *VolumeServer) VolumeFollow(req *volume_server_pb.VolumeFollowRequest, stream volume_server_pb.VolumeServer_VolumeFollowServer) error {
  12. v := vs.store.GetVolume(storage.VolumeId(req.VolumeId))
  13. if v == nil {
  14. return fmt.Errorf("not found volume id %d", req.VolumeId)
  15. }
  16. stopOffset := v.Size()
  17. foundOffset, isLastOne, err := v.BinarySearchByAppendAtNs(req.Since)
  18. if err != nil {
  19. return fmt.Errorf("fail to locate by appendAtNs %d: %s", req.Since, err)
  20. }
  21. if isLastOne {
  22. return nil
  23. }
  24. startOffset := int64(foundOffset) * int64(types.NeedlePaddingSize)
  25. buf := make([]byte, 1024*1024*2)
  26. return sendFileContent(v.DataFile(), buf, startOffset, stopOffset, stream)
  27. }
  28. func (vs *VolumeServer) VolumeSyncStatus(ctx context.Context, req *volume_server_pb.VolumeSyncStatusRequest) (*volume_server_pb.VolumeSyncStatusResponse, error) {
  29. v := vs.store.GetVolume(storage.VolumeId(req.VolumeId))
  30. if v == nil {
  31. return nil, fmt.Errorf("not found volume id %d", req.VolumeId)
  32. }
  33. resp := v.GetVolumeSyncStatus()
  34. return resp, nil
  35. }
  36. func sendFileContent(datFile *os.File, buf []byte, startOffset, stopOffset int64, stream volume_server_pb.VolumeServer_VolumeFollowServer) error {
  37. var blockSizeLimit = int64(len(buf))
  38. for i := int64(0); i < stopOffset-startOffset; i += blockSizeLimit {
  39. n, readErr := datFile.ReadAt(buf, startOffset+i)
  40. if readErr == nil || readErr == io.EOF {
  41. resp := &volume_server_pb.VolumeFollowResponse{}
  42. resp.FileContent = buf[:int64(n)]
  43. sendErr := stream.Send(resp)
  44. if sendErr != nil {
  45. return sendErr
  46. }
  47. } else {
  48. return readErr
  49. }
  50. }
  51. return nil
  52. }