You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
89 lines
2.2 KiB
89 lines
2.2 KiB
package operation
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb"
|
|
"io"
|
|
|
|
"google.golang.org/grpc"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
|
|
)
|
|
|
|
func TailVolume(masterFn GetMasterFn, grpcDialOption grpc.DialOption, vid needle.VolumeId, sinceNs uint64, timeoutSeconds int, fn func(n *needle.Needle) error) error {
|
|
// find volume location, replication, ttl info
|
|
lookup, err := LookupVolumeId(masterFn, grpcDialOption, vid.String())
|
|
if err != nil {
|
|
return fmt.Errorf("look up volume %d: %v", vid, err)
|
|
}
|
|
if len(lookup.Locations) == 0 {
|
|
return fmt.Errorf("unable to locate volume %d", vid)
|
|
}
|
|
|
|
volumeServer := lookup.Locations[0].ServerAddress()
|
|
|
|
return TailVolumeFromSource(volumeServer, grpcDialOption, vid, sinceNs, timeoutSeconds, fn)
|
|
}
|
|
|
|
func TailVolumeFromSource(volumeServer pb.ServerAddress, grpcDialOption grpc.DialOption, vid needle.VolumeId, sinceNs uint64, idleTimeoutSeconds int, fn func(n *needle.Needle) error) error {
|
|
return WithVolumeServerClient(true, volumeServer, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
|
|
stream, err := client.VolumeTailSender(ctx, &volume_server_pb.VolumeTailSenderRequest{
|
|
VolumeId: uint32(vid),
|
|
SinceNs: sinceNs,
|
|
IdleTimeoutSeconds: uint32(idleTimeoutSeconds),
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
for {
|
|
resp, recvErr := stream.Recv()
|
|
if recvErr != nil {
|
|
if recvErr == io.EOF {
|
|
break
|
|
} else {
|
|
return recvErr
|
|
}
|
|
}
|
|
|
|
needleHeader := resp.NeedleHeader
|
|
needleBody := resp.NeedleBody
|
|
|
|
if len(needleHeader) == 0 {
|
|
continue
|
|
}
|
|
|
|
for !resp.IsLastChunk {
|
|
resp, recvErr = stream.Recv()
|
|
if recvErr != nil {
|
|
if recvErr == io.EOF {
|
|
break
|
|
} else {
|
|
return recvErr
|
|
}
|
|
}
|
|
needleBody = append(needleBody, resp.NeedleBody...)
|
|
}
|
|
|
|
n := new(needle.Needle)
|
|
n.ParseNeedleHeader(needleHeader)
|
|
err = n.ReadNeedleBodyBytes(needleBody, needle.CurrentVersion)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
err = fn(n)
|
|
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
}
|
|
return nil
|
|
})
|
|
}
|