Browse Source

volume stream read skips deleted content

pull/2348/head
Chris Lu 3 years ago
parent
commit
2e9372dcf7
  1. 34
      weed/server/volume_grpc_read_all.go
  2. 42
      weed/storage/volume_read_all.go

34
weed/server/volume_grpc_read_all.go

@ -5,7 +5,6 @@ import (
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
"github.com/chrislusf/seaweedfs/weed/storage" "github.com/chrislusf/seaweedfs/weed/storage"
"github.com/chrislusf/seaweedfs/weed/storage/needle" "github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/chrislusf/seaweedfs/weed/storage/super_block"
) )
func (vs *VolumeServer) ReadAllNeedles(req *volume_server_pb.ReadAllNeedlesRequest, stream volume_server_pb.VolumeServer_ReadAllNeedlesServer) (err error) { func (vs *VolumeServer) ReadAllNeedles(req *volume_server_pb.ReadAllNeedlesRequest, stream volume_server_pb.VolumeServer_ReadAllNeedlesServer) (err error) {
@ -24,9 +23,9 @@ func (vs *VolumeServer) streaReadOneVolume(vid needle.VolumeId, stream volume_se
return fmt.Errorf("not found volume id %d", vid) return fmt.Errorf("not found volume id %d", vid)
} }
scanner := &VolumeFileScanner4ReadAll{
stream: stream,
v: v,
scanner := &storage.VolumeFileScanner4ReadAll{
Stream: stream,
V: v,
} }
offset := int64(v.SuperBlock.BlockSize()) offset := int64(v.SuperBlock.BlockSize())
@ -35,30 +34,3 @@ func (vs *VolumeServer) streaReadOneVolume(vid needle.VolumeId, stream volume_se
return err return err
} }
type VolumeFileScanner4ReadAll struct {
stream volume_server_pb.VolumeServer_ReadAllNeedlesServer
v *storage.Volume
}
func (scanner *VolumeFileScanner4ReadAll) VisitSuperBlock(superBlock super_block.SuperBlock) error {
return nil
}
func (scanner *VolumeFileScanner4ReadAll) ReadNeedleBody() bool {
return true
}
func (scanner *VolumeFileScanner4ReadAll) VisitNeedle(n *needle.Needle, offset int64, needleHeader, needleBody []byte) error {
sendErr := scanner.stream.Send(&volume_server_pb.ReadAllNeedlesResponse{
VolumeId: uint32(scanner.v.Id),
NeedleId: uint64(n.Id),
Cookie: uint32(n.Cookie),
NeedleBlob: n.Data,
})
if sendErr != nil {
return sendErr
}
return nil
}

42
weed/storage/volume_read_all.go

@ -0,0 +1,42 @@
package storage
import (
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/chrislusf/seaweedfs/weed/storage/super_block"
)
type VolumeFileScanner4ReadAll struct {
Stream volume_server_pb.VolumeServer_ReadAllNeedlesServer
V *Volume
}
func (scanner *VolumeFileScanner4ReadAll) VisitSuperBlock(superBlock super_block.SuperBlock) error {
return nil
}
func (scanner *VolumeFileScanner4ReadAll) ReadNeedleBody() bool {
return true
}
func (scanner *VolumeFileScanner4ReadAll) VisitNeedle(n *needle.Needle, offset int64, needleHeader, needleBody []byte) error {
nv, ok := scanner.V.nm.Get(n.Id)
if !ok {
return nil
}
if nv.Offset.ToActualOffset() != offset {
return nil
}
sendErr := scanner.Stream.Send(&volume_server_pb.ReadAllNeedlesResponse{
VolumeId: uint32(scanner.V.Id),
NeedleId: uint64(n.Id),
Cookie: uint32(n.Cookie),
NeedleBlob: n.Data,
})
if sendErr != nil {
return sendErr
}
return nil
}
Loading…
Cancel
Save