hilimd
3 years ago
committed by
GitHub
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
81 changed files with 2127 additions and 1305 deletions
-
13.github/workflows/binaries_dev.yml
-
8backers.md
-
2docker/Dockerfile.go_build
-
2docker/Dockerfile.go_build_large
-
4k8s/helm_charts2/Chart.yaml
-
5k8s/helm_charts2/templates/filer-statefulset.yaml
-
2k8s/helm_charts2/values.yaml
-
4other/java/client/pom.xml
-
4other/java/client/pom.xml.deploy
-
4other/java/client/pom_debug.xml
-
6other/java/client/src/main/java/seaweedfs/client/RemoteUtil.java
-
6other/java/client/src/main/proto/filer.proto
-
4other/java/examples/pom.xml
-
2other/java/hdfs2/dependency-reduced-pom.xml
-
2other/java/hdfs2/pom.xml
-
2other/java/hdfs3/dependency-reduced-pom.xml
-
2other/java/hdfs3/pom.xml
-
5weed/Makefile
-
264weed/cluster/cluster.go
-
47weed/cluster/cluster_test.go
-
9weed/command/filer.go
-
2weed/command/filer_meta_tail.go
-
1weed/command/server.go
-
7weed/command/shell.go
-
4weed/filer/filechunk_manifest.go
-
2weed/filer/filechunks_read.go
-
8weed/filer/filechunks_read_test.go
-
46weed/filer/filer.go
-
63weed/filer/filer_delete_entry.go
-
49weed/filer/meta_aggregator.go
-
4weed/filer/read_remote.go
-
3weed/filesys/dir_rename.go
-
6weed/filesys/file.go
-
8weed/iamapi/iamapi_handlers.go
-
22weed/iamapi/iamapi_management_handlers.go
-
2weed/iamapi/iamapi_server.go
-
9weed/messaging/broker/broker_grpc_server_discovery.go
-
6weed/pb/filer.proto
-
119weed/pb/filer_pb/filer.pb.go
-
30weed/pb/master.proto
-
1533weed/pb/master_pb/master.pb.go
-
5weed/remote_storage/s3/s3_storage_client.go
-
20weed/s3api/auth_credentials.go
-
47weed/s3api/s3api_bucket_handlers.go
-
8weed/s3api/s3api_handlers.go
-
30weed/s3api/s3api_object_copy_handlers.go
-
32weed/s3api/s3api_object_handlers.go
-
30weed/s3api/s3api_object_handlers_postpolicy.go
-
40weed/s3api/s3api_object_multipart_handlers.go
-
24weed/s3api/s3api_object_tagging_handlers.go
-
24weed/s3api/s3api_objects_list_handlers.go
-
4weed/s3api/s3api_server.go
-
2weed/s3api/s3api_status_handlers.go
-
24weed/s3api/s3err/error_handler.go
-
3weed/s3api/stats.go
-
4weed/server/filer_grpc_server_remote.go
-
5weed/server/filer_server.go
-
4weed/server/filer_server_handlers_read.go
-
57weed/server/master_grpc_server.go
-
21weed/server/master_grpc_server_cluster.go
-
10weed/server/master_server.go
-
2weed/server/volume_grpc_copy.go
-
7weed/server/volume_grpc_tier_upload.go
-
2weed/server/volume_grpc_vacuum.go
-
55weed/shell/command_cluster_ps.go
-
5weed/shell/command_ec_encode.go
-
8weed/shell/command_remote_cache.go
-
2weed/shell/command_remote_meta_sync.go
-
2weed/shell/command_remote_mount.go
-
2weed/shell/command_remote_mount_buckets.go
-
2weed/shell/command_remote_unmount.go
-
23weed/shell/command_volume_fix_replication_test.go
-
27weed/shell/shell_liner.go
-
2weed/storage/backend/backend.go
-
4weed/storage/backend/s3_backend/s3_backend.go
-
5weed/storage/backend/s3_backend/s3_sessions.go
-
16weed/storage/backend/s3_backend/s3_upload.go
-
2weed/util/constants.go
-
45weed/wdclient/masterclient.go
@ -1,5 +1,5 @@ |
|||
apiVersion: v1 |
|||
description: SeaweedFS |
|||
name: seaweedfs |
|||
appVersion: "2.75" |
|||
version: "2.75" |
|||
appVersion: "2.77" |
|||
version: "2.77" |
@ -0,0 +1,264 @@ |
|||
package cluster |
|||
|
|||
import ( |
|||
"github.com/chrislusf/seaweedfs/weed/pb" |
|||
"github.com/chrislusf/seaweedfs/weed/pb/master_pb" |
|||
"math" |
|||
"sync" |
|||
"time" |
|||
) |
|||
|
|||
const ( |
|||
MasterType = "master" |
|||
FilerType = "filer" |
|||
BrokerType = "broker" |
|||
) |
|||
|
|||
type ClusterNode struct { |
|||
Address pb.ServerAddress |
|||
Version string |
|||
counter int |
|||
createdTs time.Time |
|||
} |
|||
|
|||
type Leaders struct { |
|||
leaders [3]pb.ServerAddress |
|||
} |
|||
|
|||
type Cluster struct { |
|||
filers map[pb.ServerAddress]*ClusterNode |
|||
filersLock sync.RWMutex |
|||
filerLeaders *Leaders |
|||
brokers map[pb.ServerAddress]*ClusterNode |
|||
brokersLock sync.RWMutex |
|||
} |
|||
|
|||
func NewCluster() *Cluster { |
|||
return &Cluster{ |
|||
filers: make(map[pb.ServerAddress]*ClusterNode), |
|||
filerLeaders: &Leaders{}, |
|||
brokers: make(map[pb.ServerAddress]*ClusterNode), |
|||
} |
|||
} |
|||
|
|||
func (cluster *Cluster) AddClusterNode(nodeType string, address pb.ServerAddress, version string) []*master_pb.KeepConnectedResponse { |
|||
switch nodeType { |
|||
case FilerType: |
|||
cluster.filersLock.Lock() |
|||
defer cluster.filersLock.Unlock() |
|||
if existingNode, found := cluster.filers[address]; found { |
|||
existingNode.counter++ |
|||
return nil |
|||
} |
|||
cluster.filers[address] = &ClusterNode{ |
|||
Address: address, |
|||
Version: version, |
|||
counter: 1, |
|||
createdTs: time.Now(), |
|||
} |
|||
return cluster.ensureFilerLeaders(true, nodeType, address) |
|||
case BrokerType: |
|||
cluster.brokersLock.Lock() |
|||
defer cluster.brokersLock.Unlock() |
|||
if existingNode, found := cluster.brokers[address]; found { |
|||
existingNode.counter++ |
|||
return nil |
|||
} |
|||
cluster.brokers[address] = &ClusterNode{ |
|||
Address: address, |
|||
Version: version, |
|||
counter: 1, |
|||
createdTs: time.Now(), |
|||
} |
|||
return []*master_pb.KeepConnectedResponse{ |
|||
{ |
|||
ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{ |
|||
NodeType: nodeType, |
|||
Address: string(address), |
|||
IsAdd: true, |
|||
}, |
|||
}, |
|||
} |
|||
case MasterType: |
|||
} |
|||
return nil |
|||
} |
|||
|
|||
func (cluster *Cluster) RemoveClusterNode(nodeType string, address pb.ServerAddress) []*master_pb.KeepConnectedResponse { |
|||
switch nodeType { |
|||
case FilerType: |
|||
cluster.filersLock.Lock() |
|||
defer cluster.filersLock.Unlock() |
|||
if existingNode, found := cluster.filers[address]; !found { |
|||
return nil |
|||
} else { |
|||
existingNode.counter-- |
|||
if existingNode.counter <= 0 { |
|||
delete(cluster.filers, address) |
|||
return cluster.ensureFilerLeaders(false, nodeType, address) |
|||
} |
|||
} |
|||
case BrokerType: |
|||
cluster.brokersLock.Lock() |
|||
defer cluster.brokersLock.Unlock() |
|||
if existingNode, found := cluster.brokers[address]; !found { |
|||
return nil |
|||
} else { |
|||
existingNode.counter-- |
|||
if existingNode.counter <= 0 { |
|||
delete(cluster.brokers, address) |
|||
return []*master_pb.KeepConnectedResponse{ |
|||
{ |
|||
ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{ |
|||
NodeType: nodeType, |
|||
Address: string(address), |
|||
IsAdd: false, |
|||
}, |
|||
}, |
|||
} |
|||
} |
|||
} |
|||
case MasterType: |
|||
} |
|||
return nil |
|||
} |
|||
|
|||
func (cluster *Cluster) ListClusterNode(nodeType string) (nodes []*ClusterNode) { |
|||
switch nodeType { |
|||
case FilerType: |
|||
cluster.filersLock.RLock() |
|||
defer cluster.filersLock.RUnlock() |
|||
for _, node := range cluster.filers { |
|||
nodes = append(nodes, node) |
|||
} |
|||
case BrokerType: |
|||
cluster.brokersLock.RLock() |
|||
defer cluster.brokersLock.RUnlock() |
|||
for _, node := range cluster.brokers { |
|||
nodes = append(nodes, node) |
|||
} |
|||
case MasterType: |
|||
} |
|||
return |
|||
} |
|||
|
|||
func (cluster *Cluster) IsOneLeader(address pb.ServerAddress) bool { |
|||
return cluster.filerLeaders.isOneLeader(address) |
|||
} |
|||
|
|||
func (cluster *Cluster) ensureFilerLeaders(isAdd bool, nodeType string, address pb.ServerAddress) (result []*master_pb.KeepConnectedResponse) { |
|||
if isAdd { |
|||
if cluster.filerLeaders.addLeaderIfVacant(address) { |
|||
// has added the address as one leader
|
|||
result = append(result, &master_pb.KeepConnectedResponse{ |
|||
ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{ |
|||
NodeType: nodeType, |
|||
Address: string(address), |
|||
IsLeader: true, |
|||
IsAdd: true, |
|||
}, |
|||
}) |
|||
} else { |
|||
result = append(result, &master_pb.KeepConnectedResponse{ |
|||
ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{ |
|||
NodeType: nodeType, |
|||
Address: string(address), |
|||
IsLeader: false, |
|||
IsAdd: true, |
|||
}, |
|||
}) |
|||
} |
|||
} else { |
|||
if cluster.filerLeaders.removeLeaderIfExists(address) { |
|||
|
|||
result = append(result, &master_pb.KeepConnectedResponse{ |
|||
ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{ |
|||
NodeType: nodeType, |
|||
Address: string(address), |
|||
IsLeader: true, |
|||
IsAdd: false, |
|||
}, |
|||
}) |
|||
|
|||
// pick the freshest one, since it is less likely to go away
|
|||
var shortestDuration int64 = math.MaxInt64 |
|||
now := time.Now() |
|||
var candidateAddress pb.ServerAddress |
|||
for _, node := range cluster.filers { |
|||
if cluster.filerLeaders.isOneLeader(node.Address) { |
|||
continue |
|||
} |
|||
duration := now.Sub(node.createdTs).Nanoseconds() |
|||
if duration < shortestDuration { |
|||
shortestDuration = duration |
|||
candidateAddress = node.Address |
|||
} |
|||
} |
|||
if candidateAddress != "" { |
|||
cluster.filerLeaders.addLeaderIfVacant(candidateAddress) |
|||
// added a new leader
|
|||
result = append(result, &master_pb.KeepConnectedResponse{ |
|||
ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{ |
|||
NodeType: nodeType, |
|||
Address: string(candidateAddress), |
|||
IsLeader: true, |
|||
IsAdd: true, |
|||
}, |
|||
}) |
|||
} |
|||
} else { |
|||
result = append(result, &master_pb.KeepConnectedResponse{ |
|||
ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{ |
|||
NodeType: nodeType, |
|||
Address: string(address), |
|||
IsLeader: false, |
|||
IsAdd: false, |
|||
}, |
|||
}) |
|||
} |
|||
} |
|||
return |
|||
} |
|||
|
|||
func (leaders *Leaders) addLeaderIfVacant(address pb.ServerAddress) (hasChanged bool) { |
|||
if leaders.isOneLeader(address) { |
|||
return |
|||
} |
|||
for i := 0; i < len(leaders.leaders); i++ { |
|||
if leaders.leaders[i] == "" { |
|||
leaders.leaders[i] = address |
|||
hasChanged = true |
|||
return |
|||
} |
|||
} |
|||
return |
|||
} |
|||
func (leaders *Leaders) removeLeaderIfExists(address pb.ServerAddress) (hasChanged bool) { |
|||
if !leaders.isOneLeader(address) { |
|||
return |
|||
} |
|||
for i := 0; i < len(leaders.leaders); i++ { |
|||
if leaders.leaders[i] == address { |
|||
leaders.leaders[i] = "" |
|||
hasChanged = true |
|||
return |
|||
} |
|||
} |
|||
return |
|||
} |
|||
func (leaders *Leaders) isOneLeader(address pb.ServerAddress) bool { |
|||
for i := 0; i < len(leaders.leaders); i++ { |
|||
if leaders.leaders[i] == address { |
|||
return true |
|||
} |
|||
} |
|||
return false |
|||
} |
|||
func (leaders *Leaders) GetLeaders() (addresses []pb.ServerAddress) { |
|||
for i := 0; i < len(leaders.leaders); i++ { |
|||
if leaders.leaders[i] != "" { |
|||
addresses = append(addresses, leaders.leaders[i]) |
|||
} |
|||
} |
|||
return |
|||
} |
@ -0,0 +1,47 @@ |
|||
package cluster |
|||
|
|||
import ( |
|||
"github.com/chrislusf/seaweedfs/weed/pb" |
|||
"github.com/stretchr/testify/assert" |
|||
"testing" |
|||
) |
|||
|
|||
func TestClusterAddRemoveNodes(t *testing.T) { |
|||
c := NewCluster() |
|||
|
|||
c.AddClusterNode("filer", pb.ServerAddress("111:1"), "23.45") |
|||
c.AddClusterNode("filer", pb.ServerAddress("111:2"), "23.45") |
|||
assert.Equal(t, []pb.ServerAddress{ |
|||
pb.ServerAddress("111:1"), |
|||
pb.ServerAddress("111:2"), |
|||
}, c.filerLeaders.GetLeaders()) |
|||
|
|||
c.AddClusterNode("filer", pb.ServerAddress("111:3"), "23.45") |
|||
c.AddClusterNode("filer", pb.ServerAddress("111:4"), "23.45") |
|||
assert.Equal(t, []pb.ServerAddress{ |
|||
pb.ServerAddress("111:1"), |
|||
pb.ServerAddress("111:2"), |
|||
pb.ServerAddress("111:3"), |
|||
}, c.filerLeaders.GetLeaders()) |
|||
|
|||
c.AddClusterNode("filer", pb.ServerAddress("111:5"), "23.45") |
|||
c.AddClusterNode("filer", pb.ServerAddress("111:6"), "23.45") |
|||
c.RemoveClusterNode("filer", pb.ServerAddress("111:4")) |
|||
assert.Equal(t, []pb.ServerAddress{ |
|||
pb.ServerAddress("111:1"), |
|||
pb.ServerAddress("111:2"), |
|||
pb.ServerAddress("111:3"), |
|||
}, c.filerLeaders.GetLeaders()) |
|||
|
|||
// remove oldest
|
|||
c.RemoveClusterNode("filer", pb.ServerAddress("111:1")) |
|||
assert.Equal(t, []pb.ServerAddress{ |
|||
pb.ServerAddress("111:6"), |
|||
pb.ServerAddress("111:2"), |
|||
pb.ServerAddress("111:3"), |
|||
}, c.filerLeaders.GetLeaders()) |
|||
|
|||
// remove oldest
|
|||
c.RemoveClusterNode("filer", pb.ServerAddress("111:1")) |
|||
|
|||
} |
1533
weed/pb/master_pb/master.pb.go
File diff suppressed because it is too large
View File
File diff suppressed because it is too large
View File
@ -0,0 +1,21 @@ |
|||
package weed_server |
|||
|
|||
import ( |
|||
"context" |
|||
"github.com/chrislusf/seaweedfs/weed/pb/master_pb" |
|||
) |
|||
|
|||
func (ms *MasterServer) ListClusterNodes(ctx context.Context, req *master_pb.ListClusterNodesRequest) (*master_pb.ListClusterNodesResponse, error) { |
|||
resp := &master_pb.ListClusterNodesResponse{} |
|||
|
|||
clusterNodes := ms.Cluster.ListClusterNode(req.ClientType) |
|||
|
|||
for _, node := range clusterNodes { |
|||
resp.ClusterNodes = append(resp.ClusterNodes, &master_pb.ListClusterNodesResponse_ClusterNode{ |
|||
Address: string(node.Address), |
|||
Version: node.Version, |
|||
IsLeader: ms.Cluster.IsOneLeader(node.Address), |
|||
}) |
|||
} |
|||
return resp, nil |
|||
} |
@ -0,0 +1,55 @@ |
|||
package shell |
|||
|
|||
import ( |
|||
"context" |
|||
"flag" |
|||
"fmt" |
|||
"github.com/chrislusf/seaweedfs/weed/cluster" |
|||
"io" |
|||
|
|||
"github.com/chrislusf/seaweedfs/weed/pb/master_pb" |
|||
) |
|||
|
|||
func init() { |
|||
Commands = append(Commands, &commandClusterPs{}) |
|||
} |
|||
|
|||
type commandClusterPs struct { |
|||
} |
|||
|
|||
func (c *commandClusterPs) Name() string { |
|||
return "cluster.ps" |
|||
} |
|||
|
|||
func (c *commandClusterPs) Help() string { |
|||
return `check current cluster process status |
|||
|
|||
cluster.ps |
|||
|
|||
` |
|||
} |
|||
|
|||
func (c *commandClusterPs) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { |
|||
|
|||
clusterPsCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) |
|||
if err = clusterPsCommand.Parse(args); err != nil { |
|||
return nil |
|||
} |
|||
|
|||
err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error { |
|||
resp, err := client.ListClusterNodes(context.Background(), &master_pb.ListClusterNodesRequest{ |
|||
ClientType: cluster.FilerType, |
|||
}) |
|||
|
|||
fmt.Fprintf(writer, "the cluster has %d filers\n", len(resp.ClusterNodes)) |
|||
for _, node := range resp.ClusterNodes { |
|||
fmt.Fprintf(writer, " * %s (%v)\n", node.Address, node.Version) |
|||
} |
|||
return err |
|||
}) |
|||
if err != nil { |
|||
return |
|||
} |
|||
|
|||
return nil |
|||
} |
Write
Preview
Loading…
Cancel
Save
Reference in new issue