Browse Source

cluster.ps add filer meta sync progress

pull/3942/merge
chrislu 2 years ago
parent
commit
7f49c59c14
  1. 12
      weed/filer/meta_aggregator.go
  2. 21
      weed/shell/command_cluster_ps.go

12
weed/filer/meta_aggregator.go

@ -243,10 +243,15 @@ const (
MetaOffsetPrefix = "Meta" MetaOffsetPrefix = "Meta"
) )
func (ma *MetaAggregator) readOffset(f *Filer, peer pb.ServerAddress, peerSignature int32) (lastTsNs int64, err error) {
func GetPeerMetaOffsetKey(peerSignature int32) []byte {
key := []byte(MetaOffsetPrefix + "xxxx") key := []byte(MetaOffsetPrefix + "xxxx")
util.Uint32toBytes(key[len(MetaOffsetPrefix):], uint32(peerSignature)) util.Uint32toBytes(key[len(MetaOffsetPrefix):], uint32(peerSignature))
return key
}
func (ma *MetaAggregator) readOffset(f *Filer, peer pb.ServerAddress, peerSignature int32) (lastTsNs int64, err error) {
key := GetPeerMetaOffsetKey(peerSignature)
value, err := f.Store.KvGet(context.Background(), key) value, err := f.Store.KvGet(context.Background(), key)
@ -263,8 +268,7 @@ func (ma *MetaAggregator) readOffset(f *Filer, peer pb.ServerAddress, peerSignat
func (ma *MetaAggregator) updateOffset(f *Filer, peer pb.ServerAddress, peerSignature int32, lastTsNs int64) (err error) { func (ma *MetaAggregator) updateOffset(f *Filer, peer pb.ServerAddress, peerSignature int32, lastTsNs int64) (err error) {
key := []byte(MetaOffsetPrefix + "xxxx")
util.Uint32toBytes(key[len(MetaOffsetPrefix):], uint32(peerSignature))
key := GetPeerMetaOffsetKey(peerSignature)
value := make([]byte, 8) value := make([]byte, 8)
util.Uint64toBytes(value, uint64(lastTsNs)) util.Uint64toBytes(value, uint64(lastTsNs))

21
weed/shell/command_cluster_ps.go

@ -5,10 +5,13 @@ import (
"flag" "flag"
"fmt" "fmt"
"github.com/seaweedfs/seaweedfs/weed/cluster" "github.com/seaweedfs/seaweedfs/weed/cluster"
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb" "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
"github.com/seaweedfs/seaweedfs/weed/util"
"io" "io"
"time"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb" "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
) )
@ -92,6 +95,7 @@ func (c *commandClusterPs) Do(args []string, commandEnv *CommandEnv, writer io.W
} }
} }
filerSignatures := make(map[*master_pb.ListClusterNodesResponse_ClusterNode]int32)
fmt.Fprintf(writer, "* filers %d\n", len(filerNodes)) fmt.Fprintf(writer, "* filers %d\n", len(filerNodes))
for _, node := range filerNodes { for _, node := range filerNodes {
fmt.Fprintf(writer, " * %s (%v)\n", node.Address, node.Version) fmt.Fprintf(writer, " * %s (%v)\n", node.Address, node.Version)
@ -108,12 +112,29 @@ func (c *commandClusterPs) Do(args []string, commandEnv *CommandEnv, writer io.W
fmt.Fprintf(writer, " filer group: %s\n", resp.FilerGroup) fmt.Fprintf(writer, " filer group: %s\n", resp.FilerGroup)
} }
fmt.Fprintf(writer, " signature: %d\n", resp.Signature) fmt.Fprintf(writer, " signature: %d\n", resp.Signature)
filerSignatures[node] = resp.Signature
} else { } else {
fmt.Fprintf(writer, " failed to connect: %v\n", err) fmt.Fprintf(writer, " failed to connect: %v\n", err)
} }
return err return err
}) })
} }
for _, node := range filerNodes {
pb.WithFilerClient(false, pb.ServerAddress(node.Address), commandEnv.option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
fmt.Fprintf(writer, "* filer %s metadata sync time\n", node.Address)
selfSignature := filerSignatures[node]
for peer, peerSignature := range filerSignatures {
if selfSignature == peerSignature {
continue
}
if resp, err := client.KvGet(context.Background(), &filer_pb.KvGetRequest{Key: filer.GetPeerMetaOffsetKey(peerSignature)}); err == nil && len(resp.Value) == 8 {
lastTsNs := int64(util.BytesToUint64(resp.Value))
fmt.Fprintf(writer, " %s: %v\n", peer.Address, time.Unix(0, lastTsNs).UTC())
}
}
return nil
})
}
// collect volume servers // collect volume servers
var volumeServers []pb.ServerAddress var volumeServers []pb.ServerAddress

Loading…
Cancel
Save