Browse Source

refactoring

pull/934/head
Chris Lu 6 years ago
parent
commit
33c92b819a
  1. 88
      unmaintained/volume_tailer/volume_tailer.go
  2. 78
      weed/storage/tail_volume.go.go

88
unmaintained/volume_tailer/volume_tailer.go

@ -1,26 +1,22 @@
package main package main
import ( import (
"context"
"flag" "flag"
"fmt"
"log"
"time"
"github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
"github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/security"
weed_server "github.com/chrislusf/seaweedfs/weed/server" weed_server "github.com/chrislusf/seaweedfs/weed/server"
"github.com/chrislusf/seaweedfs/weed/storage" "github.com/chrislusf/seaweedfs/weed/storage"
"github.com/spf13/viper" "github.com/spf13/viper"
"golang.org/x/tools/godoc/util" "golang.org/x/tools/godoc/util"
"google.golang.org/grpc"
"io"
"log"
) )
var ( var (
master = flag.String("master", "localhost:9333", "master server host and port") master = flag.String("master", "localhost:9333", "master server host and port")
volumeId = flag.Int("volumeId", -1, "a volume id") volumeId = flag.Int("volumeId", -1, "a volume id")
rewindDuration = flag.Duration("rewind", -1, "rewind back in time. -1 means from the first entry. 0 means from now.")
timeoutSeconds = flag.Int("timeoutSeconds", 0, "disconnect if no activity after these seconds") timeoutSeconds = flag.Int("timeoutSeconds", 0, "disconnect if no activity after these seconds")
showTextFile = flag.Bool("showTextFile", false, "display textual file content") showTextFile = flag.Bool("showTextFile", false, "display textual file content")
) )
@ -33,7 +29,16 @@ func main() {
vid := storage.VolumeId(*volumeId) vid := storage.VolumeId(*volumeId)
err := TailVolume(*master, grpcDialOption, vid, func(n *storage.Needle) (err error) {
var sinceTimeNs int64
if *rewindDuration == 0 {
sinceTimeNs = time.Now().UnixNano()
} else if *rewindDuration == -1 {
sinceTimeNs = 0
} else if *rewindDuration > 0 {
sinceTimeNs = time.Now().Add(-*rewindDuration).UnixNano()
}
err := storage.TailVolume(*master, grpcDialOption, vid, uint64(sinceTimeNs), *timeoutSeconds, func(n *storage.Needle) (err error) {
if n.Size == 0 { if n.Size == 0 {
println("-", n.String()) println("-", n.String())
return nil return nil
@ -63,70 +68,3 @@ func main() {
} }
} }
func TailVolume(master string, grpcDialOption grpc.DialOption, vid storage.VolumeId, fn func(n *storage.Needle) error) error {
// find volume location, replication, ttl info
lookup, err := operation.Lookup(master, vid.String())
if err != nil {
return fmt.Errorf("Error looking up volume %d: %v", vid, err)
}
if len(lookup.Locations) == 0 {
return fmt.Errorf("unable to locate volume %d", vid)
}
volumeServer := lookup.Locations[0].Url
return operation.WithVolumeServerClient(volumeServer, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
stream, err := client.VolumeTail(context.Background(), &volume_server_pb.VolumeTailRequest{
VolumeId: uint32(vid),
SinceNs: 0,
DrainingSeconds: uint32(*timeoutSeconds),
})
if err != nil {
return err
}
for {
resp, recvErr := stream.Recv()
if recvErr != nil {
if recvErr == io.EOF {
break
} else {
return recvErr
}
}
needleHeader := resp.NeedleHeader
needleBody := resp.NeedleBody
if len(needleHeader) == 0 {
continue
}
for !resp.IsLastChunk {
resp, recvErr = stream.Recv()
if recvErr != nil {
if recvErr == io.EOF {
break
} else {
return recvErr
}
}
needleBody = append(needleBody, resp.NeedleBody...)
}
n := new(storage.Needle)
n.ParseNeedleHeader(needleHeader)
n.ReadNeedleBodyBytes(needleBody, storage.CurrentVersion)
err = fn(n)
if err != nil {
return err
}
}
return nil
})
}

78
weed/storage/tail_volume.go.go

@ -0,0 +1,78 @@
package storage
import (
"context"
"fmt"
"io"
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
"google.golang.org/grpc"
)
func TailVolume(master string, grpcDialOption grpc.DialOption, vid VolumeId, sinceNs uint64, timeoutSeconds int, fn func(n *Needle) error) error {
// find volume location, replication, ttl info
lookup, err := operation.Lookup(master, vid.String())
if err != nil {
return fmt.Errorf("look up volume %d: %v", vid, err)
}
if len(lookup.Locations) == 0 {
return fmt.Errorf("unable to locate volume %d", vid)
}
volumeServer := lookup.Locations[0].Url
return operation.WithVolumeServerClient(volumeServer, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
stream, err := client.VolumeTail(context.Background(), &volume_server_pb.VolumeTailRequest{
VolumeId: uint32(vid),
SinceNs: sinceNs,
DrainingSeconds: uint32(timeoutSeconds),
})
if err != nil {
return err
}
for {
resp, recvErr := stream.Recv()
if recvErr != nil {
if recvErr == io.EOF {
break
} else {
return recvErr
}
}
needleHeader := resp.NeedleHeader
needleBody := resp.NeedleBody
if len(needleHeader) == 0 {
continue
}
for !resp.IsLastChunk {
resp, recvErr = stream.Recv()
if recvErr != nil {
if recvErr == io.EOF {
break
} else {
return recvErr
}
}
needleBody = append(needleBody, resp.NeedleBody...)
}
n := new(Needle)
n.ParseNeedleHeader(needleHeader)
n.ReadNeedleBodyBytes(needleBody, CurrentVersion)
err = fn(n)
if err != nil {
return err
}
}
return nil
})
}
Loading…
Cancel
Save