89 lines
2.2 KiB

package operation
import (
"context"
"fmt"
"github.com/chrislusf/seaweedfs/weed/pb"
"io"
"google.golang.org/grpc"
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
)
func TailVolume(masterFn GetMasterFn, grpcDialOption grpc.DialOption, vid needle.VolumeId, sinceNs uint64, timeoutSeconds int, fn func(n *needle.Needle) error) error {
// find volume location, replication, ttl info
lookup, err := LookupVolumeId(masterFn, grpcDialOption, vid.String())
if err != nil {
return fmt.Errorf("look up volume %d: %v", vid, err)
}
if len(lookup.Locations) == 0 {
return fmt.Errorf("unable to locate volume %d", vid)
}
volumeServer := lookup.Locations[0].ServerAddress()
return TailVolumeFromSource(volumeServer, grpcDialOption, vid, sinceNs, timeoutSeconds, fn)
}
func TailVolumeFromSource(volumeServer pb.ServerAddress, grpcDialOption grpc.DialOption, vid needle.VolumeId, sinceNs uint64, idleTimeoutSeconds int, fn func(n *needle.Needle) error) error {
return WithVolumeServerClient(true, volumeServer, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
stream, err := client.VolumeTailSender(ctx, &volume_server_pb.VolumeTailSenderRequest{
VolumeId: uint32(vid),
SinceNs: sinceNs,
IdleTimeoutSeconds: uint32(idleTimeoutSeconds),
})
if err != nil {
return err
}
for {
resp, recvErr := stream.Recv()
if recvErr != nil {
if recvErr == io.EOF {
break
} else {
return recvErr
}
}
needleHeader := resp.NeedleHeader
needleBody := resp.NeedleBody
if len(needleHeader) == 0 {
continue
}
for !resp.IsLastChunk {
resp, recvErr = stream.Recv()
if recvErr != nil {
if recvErr == io.EOF {
break
} else {
return recvErr
}
}
needleBody = append(needleBody, resp.NeedleBody...)
}
n := new(needle.Needle)
n.ParseNeedleHeader(needleHeader)
err = n.ReadNeedleBodyBytes(needleBody, needle.CurrentVersion)
if err != nil {
return err
}
err = fn(n)
if err != nil {
return err
}
}
return nil
})
}