You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

87 lines
2.4 KiB

  1. package weed_server
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/chrislusf/seaweedfs/weed/glog"
  6. "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
  7. "github.com/chrislusf/seaweedfs/weed/storage"
  8. "github.com/chrislusf/seaweedfs/weed/storage/types"
  9. )
  10. func (vs *VolumeServer) VolumeSyncStatus(ctx context.Context, req *volume_server_pb.VolumeSyncStatusRequest) (*volume_server_pb.VolumeSyncStatusResponse, error) {
  11. v := vs.store.GetVolume(storage.VolumeId(req.VolumdId))
  12. if v == nil {
  13. return nil, fmt.Errorf("Not Found Volume Id %d", req.VolumdId)
  14. }
  15. resp := v.GetVolumeSyncStatus()
  16. glog.V(2).Infof("volume sync status %d", req.VolumdId)
  17. return resp, nil
  18. }
  19. func (vs *VolumeServer) VolumeSyncIndex(ctx context.Context, req *volume_server_pb.VolumeSyncIndexRequest) (*volume_server_pb.VolumeSyncIndexResponse, error) {
  20. resp := &volume_server_pb.VolumeSyncIndexResponse{}
  21. v := vs.store.GetVolume(storage.VolumeId(req.VolumdId))
  22. if v == nil {
  23. return nil, fmt.Errorf("Not Found Volume Id %d", req.VolumdId)
  24. }
  25. content, err := v.IndexFileContent()
  26. if err != nil {
  27. glog.Errorf("sync volume %d index: %v", req.VolumdId, err)
  28. } else {
  29. glog.V(2).Infof("sync volume %d index", req.VolumdId)
  30. }
  31. resp.IndexFileContent = content
  32. return resp, nil
  33. }
  34. func (vs *VolumeServer) VolumeSyncData(ctx context.Context, req *volume_server_pb.VolumeSyncDataRequest) (*volume_server_pb.VolumeSyncDataResponse, error) {
  35. resp := &volume_server_pb.VolumeSyncDataResponse{}
  36. v := vs.store.GetVolume(storage.VolumeId(req.VolumdId))
  37. if v == nil {
  38. return nil, fmt.Errorf("Not Found Volume Id %d", req.VolumdId)
  39. }
  40. if uint32(v.SuperBlock.CompactRevision) != req.Revision {
  41. return nil, fmt.Errorf("Requested Volume Revision is %d, but current revision is %d", req.Revision, v.SuperBlock.CompactRevision)
  42. }
  43. content, err := storage.ReadNeedleBlob(v.DataFile(), int64(req.Offset)*types.NeedlePaddingSize, req.Size, v.Version())
  44. if err != nil {
  45. return nil, fmt.Errorf("read offset:%d size:%d", req.Offset, req.Size)
  46. }
  47. id, err := types.ParseNeedleId(req.NeedleId)
  48. if err != nil {
  49. return nil, fmt.Errorf("parsing needle id %s: %v", req.NeedleId, err)
  50. }
  51. n := new(storage.Needle)
  52. n.ParseNeedleHeader(content)
  53. if id != n.Id {
  54. return nil, fmt.Errorf("Expected file entry id %d, but found %d", id, n.Id)
  55. }
  56. if err != nil {
  57. glog.Errorf("sync volume %d data: %v", req.VolumdId, err)
  58. }
  59. resp.FileContent = content
  60. return resp, nil
  61. }