Browse Source
Merge branch 'master' into gentle_vacuum
Merge branch 'master' into gentle_vacuum
# Conflicts: # weed/pb/messaging_pb/messaging.pb.go # weed/pb/messaging_pb/messaging_grpc.pb.go # weed/pb/s3_pb/s3.pb.go # weed/pb/volume_server_pb/volume_server.pb.go # weed/server/volume_grpc_vacuum.gopull/3390/head
633 changed files with 6719 additions and 7637 deletions
-
6.github/ISSUE_TEMPLATE.md
-
8.github/workflows/binaries_dev.yml
-
4.github/workflows/binaries_release0.yml
-
4.github/workflows/binaries_release1.yml
-
4.github/workflows/binaries_release2.yml
-
4.github/workflows/binaries_release3.yml
-
4.github/workflows/binaries_release4.yml
-
69README.md
-
12docker/Dockerfile.gccgo_build
-
12docker/Dockerfile.go_build
-
14docker/Dockerfile.rocksdb_large
-
2docker/Makefile
-
6docker/README.md
-
19go.mod
-
33go.sum
-
4k8s/helm_charts2/Chart.yaml
-
30k8s/helm_charts2/README.md
-
BINnote/SeaweedMQ_Architecture.png
-
8other/java/client/src/main/proto/filer.proto
-
2snap/snapcraft.yaml
-
8unmaintained/change_superblock/change_superblock.go
-
18unmaintained/diff_volume_servers/diff_volume_servers.go
-
12unmaintained/fix_dat/fix_dat.go
-
10unmaintained/load_test/load_test_meta_tail/load_test_meta_tail.go
-
10unmaintained/remove_duplicate_fids/remove_duplicate_fids.go
-
8unmaintained/repeated_vacuum/repeated_vacuum.go
-
10unmaintained/see_dat/see_dat.go
-
8unmaintained/see_idx/see_idx.go
-
8unmaintained/see_log_entry/see_log_entry.go
-
4unmaintained/see_meta/see_meta.go
-
10unmaintained/stream_read_volume/stream_read_volume.go
-
12unmaintained/volume_tailer/volume_tailer.go
-
290weed/cluster/cluster.go
-
24weed/cluster/cluster_test.go
-
34weed/cluster/master_client.go
-
2weed/command/autocomplete.go
-
14weed/command/backup.go
-
14weed/command/benchmark.go
-
6weed/command/command.go
-
8weed/command/compact.go
-
8weed/command/download.go
-
14weed/command/export.go
-
16weed/command/filer.go
-
10weed/command/filer_backup.go
-
12weed/command/filer_cat.go
-
18weed/command/filer_copy.go
-
12weed/command/filer_meta_backup.go
-
8weed/command/filer_meta_tail.go
-
4weed/command/filer_meta_tail_elastic.go
-
2weed/command/filer_meta_tail_non_elastic.go
-
14weed/command/filer_remote_gateway.go
-
18weed/command/filer_remote_gateway_buckets.go
-
12weed/command/filer_remote_sync.go
-
16weed/command/filer_remote_sync_dir.go
-
10weed/command/filer_replication.go
-
22weed/command/filer_sync.go
-
14weed/command/fix.go
-
12weed/command/iam.go
-
56weed/command/imports.go
-
25weed/command/master.go
-
12weed/command/master_follower.go
-
22weed/command/mount_std.go
-
94weed/command/mq_broker.go
-
109weed/command/msg_broker.go
-
18weed/command/s3.go
-
4weed/command/scaffold.go
-
42weed/command/server.go
-
8weed/command/shell.go
-
10weed/command/update.go
-
10weed/command/upload.go
-
2weed/command/version.go
-
22weed/command/volume.go
-
2weed/command/volume_test.go
-
12weed/command/webdav.go
-
8weed/filer/abstract_sql/abstract_sql_store.go
-
6weed/filer/abstract_sql/abstract_sql_store_kv.go
-
8weed/filer/arangodb/arangodb_store.go
-
4weed/filer/arangodb/arangodb_store_bucket.go
-
4weed/filer/arangodb/arangodb_store_kv.go
-
2weed/filer/arangodb/helpers.go
-
8weed/filer/cassandra/cassandra_store.go
-
2weed/filer/cassandra/cassandra_store_kv.go
-
4weed/filer/configuration.go
-
8weed/filer/elastic/v7/elastic_store.go
-
4weed/filer/elastic/v7/elastic_store_kv.go
-
4weed/filer/entry.go
-
2weed/filer/entry_codec.go
-
8weed/filer/etcd/etcd_store.go
-
2weed/filer/etcd/etcd_store_kv.go
-
2weed/filer/etcd/etcd_store_test.go
-
8weed/filer/filechunk_manifest.go
-
2weed/filer/filechunk_manifest_test.go
-
11weed/filer/filechunks.go
-
43weed/filer/filechunks2_test.go
-
2weed/filer/filechunks_read.go
-
2weed/filer/filechunks_read_test.go
-
2weed/filer/filechunks_test.go
-
41weed/filer/filer.go
-
10weed/filer/filer_conf.go
-
2weed/filer/filer_conf_test.go
@ -1,5 +1,5 @@ |
|||||
apiVersion: v1 |
apiVersion: v1 |
||||
description: SeaweedFS |
description: SeaweedFS |
||||
name: seaweedfs |
name: seaweedfs |
||||
appVersion: "3.18" |
|
||||
version: "3.18" |
|
||||
|
appVersion: "3.19" |
||||
|
version: "3.19" |
After Width: 1681 | Height: 961 | Size: 136 KiB |
@ -0,0 +1,34 @@ |
|||||
|
package cluster |
||||
|
|
||||
|
import ( |
||||
|
"context" |
||||
|
"github.com/seaweedfs/seaweedfs/weed/glog" |
||||
|
"github.com/seaweedfs/seaweedfs/weed/pb" |
||||
|
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb" |
||||
|
"google.golang.org/grpc" |
||||
|
) |
||||
|
|
||||
|
func ListExistingPeerUpdates(master pb.ServerAddress, grpcDialOption grpc.DialOption, filerGroup string, clientType string) (existingNodes []*master_pb.ClusterNodeUpdate) { |
||||
|
|
||||
|
if grpcErr := pb.WithMasterClient(false, master, grpcDialOption, func(client master_pb.SeaweedClient) error { |
||||
|
resp, err := client.ListClusterNodes(context.Background(), &master_pb.ListClusterNodesRequest{ |
||||
|
ClientType: clientType, |
||||
|
FilerGroup: filerGroup, |
||||
|
}) |
||||
|
|
||||
|
glog.V(0).Infof("the cluster has %d %s\n", len(resp.ClusterNodes), clientType) |
||||
|
for _, node := range resp.ClusterNodes { |
||||
|
existingNodes = append(existingNodes, &master_pb.ClusterNodeUpdate{ |
||||
|
NodeType: FilerType, |
||||
|
Address: node.Address, |
||||
|
IsLeader: node.IsLeader, |
||||
|
IsAdd: true, |
||||
|
CreatedAtNs: node.CreatedAtNs, |
||||
|
}) |
||||
|
} |
||||
|
return err |
||||
|
}); grpcErr != nil { |
||||
|
glog.V(0).Infof("connect to %s: %v", master, grpcErr) |
||||
|
} |
||||
|
return |
||||
|
} |
@ -0,0 +1,94 @@ |
|||||
|
package command |
||||
|
|
||||
|
import ( |
||||
|
"google.golang.org/grpc/reflection" |
||||
|
|
||||
|
"github.com/seaweedfs/seaweedfs/weed/util/grace" |
||||
|
|
||||
|
"github.com/seaweedfs/seaweedfs/weed/glog" |
||||
|
"github.com/seaweedfs/seaweedfs/weed/mq/broker" |
||||
|
"github.com/seaweedfs/seaweedfs/weed/pb" |
||||
|
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" |
||||
|
"github.com/seaweedfs/seaweedfs/weed/security" |
||||
|
"github.com/seaweedfs/seaweedfs/weed/util" |
||||
|
) |
||||
|
|
||||
|
var ( |
||||
|
mqBrokerStandaloneOptions MessageQueueBrokerOptions |
||||
|
) |
||||
|
|
||||
|
type MessageQueueBrokerOptions struct { |
||||
|
masters map[string]pb.ServerAddress |
||||
|
mastersString *string |
||||
|
filerGroup *string |
||||
|
ip *string |
||||
|
port *int |
||||
|
dataCenter *string |
||||
|
rack *string |
||||
|
cpuprofile *string |
||||
|
memprofile *string |
||||
|
} |
||||
|
|
||||
|
func init() { |
||||
|
cmdMqBroker.Run = runMqBroker // break init cycle
|
||||
|
mqBrokerStandaloneOptions.mastersString = cmdMqBroker.Flag.String("master", "localhost:9333", "comma-separated master servers") |
||||
|
mqBrokerStandaloneOptions.filerGroup = cmdMqBroker.Flag.String("filerGroup", "", "share metadata with other filers in the same filerGroup") |
||||
|
mqBrokerStandaloneOptions.ip = cmdMqBroker.Flag.String("ip", util.DetectedHostAddress(), "broker host address") |
||||
|
mqBrokerStandaloneOptions.port = cmdMqBroker.Flag.Int("port", 17777, "broker gRPC listen port") |
||||
|
mqBrokerStandaloneOptions.dataCenter = cmdMqBroker.Flag.String("dataCenter", "", "prefer to read and write to volumes in this data center") |
||||
|
mqBrokerStandaloneOptions.rack = cmdMqBroker.Flag.String("rack", "", "prefer to write to volumes in this rack") |
||||
|
mqBrokerStandaloneOptions.cpuprofile = cmdMqBroker.Flag.String("cpuprofile", "", "cpu profile output file") |
||||
|
mqBrokerStandaloneOptions.memprofile = cmdMqBroker.Flag.String("memprofile", "", "memory profile output file") |
||||
|
} |
||||
|
|
||||
|
var cmdMqBroker = &Command{ |
||||
|
UsageLine: "mq.broker [-port=17777] [-master=<ip:port>]", |
||||
|
Short: "<WIP> start a message queue broker", |
||||
|
Long: `start a message queue broker |
||||
|
|
||||
|
The broker can accept gRPC calls to write or read messages. The messages are stored via filer. |
||||
|
The brokers are stateless. To scale up, just add more brokers. |
||||
|
|
||||
|
`, |
||||
|
} |
||||
|
|
||||
|
func runMqBroker(cmd *Command, args []string) bool { |
||||
|
|
||||
|
util.LoadConfiguration("security", false) |
||||
|
|
||||
|
mqBrokerStandaloneOptions.masters = pb.ServerAddresses(*mqBrokerStandaloneOptions.mastersString).ToAddressMap() |
||||
|
|
||||
|
return mqBrokerStandaloneOptions.startQueueServer() |
||||
|
|
||||
|
} |
||||
|
|
||||
|
func (mqBrokerOpt *MessageQueueBrokerOptions) startQueueServer() bool { |
||||
|
|
||||
|
grace.SetupProfiling(*mqBrokerStandaloneOptions.cpuprofile, *mqBrokerStandaloneOptions.memprofile) |
||||
|
|
||||
|
grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.msg_broker") |
||||
|
|
||||
|
qs, err := broker.NewMessageBroker(&broker.MessageQueueBrokerOption{ |
||||
|
Masters: mqBrokerOpt.masters, |
||||
|
FilerGroup: *mqBrokerOpt.filerGroup, |
||||
|
DataCenter: *mqBrokerOpt.dataCenter, |
||||
|
Rack: *mqBrokerOpt.rack, |
||||
|
DefaultReplication: "", |
||||
|
MaxMB: 0, |
||||
|
Ip: *mqBrokerOpt.ip, |
||||
|
Port: *mqBrokerOpt.port, |
||||
|
}, grpcDialOption) |
||||
|
|
||||
|
// start grpc listener
|
||||
|
grpcL, _, err := util.NewIpAndLocalListeners("", *mqBrokerOpt.port, 0) |
||||
|
if err != nil { |
||||
|
glog.Fatalf("failed to listen on grpc port %d: %v", *mqBrokerOpt.port, err) |
||||
|
} |
||||
|
grpcS := pb.NewGrpcServer(security.LoadServerTLS(util.GetViper(), "grpc.msg_broker")) |
||||
|
mq_pb.RegisterSeaweedMessagingServer(grpcS, qs) |
||||
|
reflection.Register(grpcS) |
||||
|
grpcS.Serve(grpcL) |
||||
|
|
||||
|
return true |
||||
|
|
||||
|
} |
@ -1,109 +0,0 @@ |
|||||
package command |
|
||||
|
|
||||
import ( |
|
||||
"context" |
|
||||
"fmt" |
|
||||
"time" |
|
||||
|
|
||||
"google.golang.org/grpc/reflection" |
|
||||
|
|
||||
"github.com/chrislusf/seaweedfs/weed/util/grace" |
|
||||
|
|
||||
"github.com/chrislusf/seaweedfs/weed/glog" |
|
||||
"github.com/chrislusf/seaweedfs/weed/messaging/broker" |
|
||||
"github.com/chrislusf/seaweedfs/weed/pb" |
|
||||
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb" |
|
||||
"github.com/chrislusf/seaweedfs/weed/pb/messaging_pb" |
|
||||
"github.com/chrislusf/seaweedfs/weed/security" |
|
||||
"github.com/chrislusf/seaweedfs/weed/util" |
|
||||
) |
|
||||
|
|
||||
var ( |
|
||||
messageBrokerStandaloneOptions MessageBrokerOptions |
|
||||
) |
|
||||
|
|
||||
type MessageBrokerOptions struct { |
|
||||
filer *string |
|
||||
ip *string |
|
||||
port *int |
|
||||
cpuprofile *string |
|
||||
memprofile *string |
|
||||
} |
|
||||
|
|
||||
func init() { |
|
||||
cmdMsgBroker.Run = runMsgBroker // break init cycle
|
|
||||
messageBrokerStandaloneOptions.filer = cmdMsgBroker.Flag.String("filer", "localhost:8888", "filer server address") |
|
||||
messageBrokerStandaloneOptions.ip = cmdMsgBroker.Flag.String("ip", util.DetectedHostAddress(), "broker host address") |
|
||||
messageBrokerStandaloneOptions.port = cmdMsgBroker.Flag.Int("port", 17777, "broker gRPC listen port") |
|
||||
messageBrokerStandaloneOptions.cpuprofile = cmdMsgBroker.Flag.String("cpuprofile", "", "cpu profile output file") |
|
||||
messageBrokerStandaloneOptions.memprofile = cmdMsgBroker.Flag.String("memprofile", "", "memory profile output file") |
|
||||
} |
|
||||
|
|
||||
var cmdMsgBroker = &Command{ |
|
||||
UsageLine: "msgBroker [-port=17777] [-filer=<ip:port>]", |
|
||||
Short: "start a message queue broker", |
|
||||
Long: `start a message queue broker |
|
||||
|
|
||||
The broker can accept gRPC calls to write or read messages. The messages are stored via filer. |
|
||||
The brokers are stateless. To scale up, just add more brokers. |
|
||||
|
|
||||
`, |
|
||||
} |
|
||||
|
|
||||
func runMsgBroker(cmd *Command, args []string) bool { |
|
||||
|
|
||||
util.LoadConfiguration("security", false) |
|
||||
|
|
||||
return messageBrokerStandaloneOptions.startQueueServer() |
|
||||
|
|
||||
} |
|
||||
|
|
||||
func (msgBrokerOpt *MessageBrokerOptions) startQueueServer() bool { |
|
||||
|
|
||||
grace.SetupProfiling(*messageBrokerStandaloneOptions.cpuprofile, *messageBrokerStandaloneOptions.memprofile) |
|
||||
|
|
||||
filerAddress := pb.ServerAddress(*msgBrokerOpt.filer) |
|
||||
|
|
||||
grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.msg_broker") |
|
||||
cipher := false |
|
||||
|
|
||||
for { |
|
||||
err := pb.WithGrpcFilerClient(false, filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { |
|
||||
resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}) |
|
||||
if err != nil { |
|
||||
return fmt.Errorf("get filer %s configuration: %v", filerAddress, err) |
|
||||
} |
|
||||
cipher = resp.Cipher |
|
||||
return nil |
|
||||
}) |
|
||||
if err != nil { |
|
||||
glog.V(0).Infof("wait to connect to filer %s grpc address %s", *msgBrokerOpt.filer, filerAddress.ToGrpcAddress()) |
|
||||
time.Sleep(time.Second) |
|
||||
} else { |
|
||||
glog.V(0).Infof("connected to filer %s grpc address %s", *msgBrokerOpt.filer, filerAddress.ToGrpcAddress()) |
|
||||
break |
|
||||
} |
|
||||
} |
|
||||
|
|
||||
qs, err := broker.NewMessageBroker(&broker.MessageBrokerOption{ |
|
||||
Filers: []pb.ServerAddress{filerAddress}, |
|
||||
DefaultReplication: "", |
|
||||
MaxMB: 0, |
|
||||
Ip: *msgBrokerOpt.ip, |
|
||||
Port: *msgBrokerOpt.port, |
|
||||
Cipher: cipher, |
|
||||
}, grpcDialOption) |
|
||||
|
|
||||
// start grpc listener
|
|
||||
grpcL, _, err := util.NewIpAndLocalListeners("", *msgBrokerOpt.port, 0) |
|
||||
if err != nil { |
|
||||
glog.Fatalf("failed to listen on grpc port %d: %v", *msgBrokerOpt.port, err) |
|
||||
} |
|
||||
grpcS := pb.NewGrpcServer(security.LoadServerTLS(util.GetViper(), "grpc.msg_broker")) |
|
||||
messaging_pb.RegisterSeaweedMessagingServer(grpcS, qs) |
|
||||
reflection.Register(grpcS) |
|
||||
grpcS.Serve(grpcL) |
|
||||
|
|
||||
return true |
|
||||
|
|
||||
} |
|
Some files were not shown because too many files changed in this diff
Write
Preview
Loading…
Cancel
Save
Reference in new issue