Chris Lu
5 years ago
26 changed files with 361 additions and 246 deletions
-
2weed/command/command.go
-
3weed/command/filer.go
-
20weed/command/filer_copy.go
-
3weed/command/master.go
-
21weed/command/mount.go
-
7weed/command/mount_std.go
-
111weed/command/msg_broker.go
-
107weed/command/queue.go
-
5weed/command/s3.go
-
2weed/command/scaffold.go
-
3weed/command/volume.go
-
3weed/command/webdav.go
-
4weed/filesys/wfs.go
-
8weed/operation/grpc_client.go
-
55weed/pb/grpc_client_server.go
-
4weed/replication/sink/filersink/fetch_write.go
-
3weed/replication/source/filer_source.go
-
4weed/s3api/s3api_handlers.go
-
23weed/server/msg_broker_grpc_server.go
-
121weed/server/msg_broker_server.go
-
49weed/server/queue_server.go
-
11weed/server/raft_server.go
-
5weed/server/volume_grpc_client_to_master.go
-
3weed/server/webdav_server.go
-
9weed/shell/command_fs_du.go
-
21weed/wdclient/masterclient.go
@ -0,0 +1,111 @@ |
|||
package command |
|||
|
|||
import ( |
|||
"context" |
|||
"fmt" |
|||
"strconv" |
|||
"time" |
|||
|
|||
"google.golang.org/grpc/reflection" |
|||
|
|||
"github.com/chrislusf/seaweedfs/weed/pb" |
|||
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb" |
|||
"github.com/chrislusf/seaweedfs/weed/pb/queue_pb" |
|||
"github.com/chrislusf/seaweedfs/weed/security" |
|||
weed_server "github.com/chrislusf/seaweedfs/weed/server" |
|||
|
|||
"github.com/chrislusf/seaweedfs/weed/glog" |
|||
"github.com/chrislusf/seaweedfs/weed/util" |
|||
) |
|||
|
|||
var ( |
|||
messageBrokerStandaloneOptions QueueOptions |
|||
) |
|||
|
|||
type QueueOptions struct { |
|||
filer *string |
|||
port *int |
|||
tlsPrivateKey *string |
|||
tlsCertificate *string |
|||
defaultTtl *string |
|||
} |
|||
|
|||
func init() { |
|||
cmdMsgBroker.Run = runMsgBroker // break init cycle
|
|||
messageBrokerStandaloneOptions.filer = cmdMsgBroker.Flag.String("filer", "localhost:8888", "filer server address") |
|||
messageBrokerStandaloneOptions.port = cmdMsgBroker.Flag.Int("port", 17777, "queue server gRPC listen port") |
|||
messageBrokerStandaloneOptions.tlsPrivateKey = cmdMsgBroker.Flag.String("key.file", "", "path to the TLS private key file") |
|||
messageBrokerStandaloneOptions.tlsCertificate = cmdMsgBroker.Flag.String("cert.file", "", "path to the TLS certificate file") |
|||
messageBrokerStandaloneOptions.defaultTtl = cmdMsgBroker.Flag.String("ttl", "1h", "time to live, e.g.: 1m, 1h, 1d, 1M, 1y") |
|||
} |
|||
|
|||
var cmdMsgBroker = &Command{ |
|||
UsageLine: "msg.broker [-port=17777] [-filer=<ip:port>]", |
|||
Short: "<WIP> start a message queue broker", |
|||
Long: `start a message queue broker |
|||
|
|||
The broker can accept gRPC calls to write or read messages. The messages are stored via filer. |
|||
The brokers are stateless. To scale up, just add more brokers. |
|||
|
|||
`, |
|||
} |
|||
|
|||
func runMsgBroker(cmd *Command, args []string) bool { |
|||
|
|||
util.LoadConfiguration("security", false) |
|||
|
|||
return messageBrokerStandaloneOptions.startQueueServer() |
|||
|
|||
} |
|||
|
|||
func (msgBrokerOpt *QueueOptions) startQueueServer() bool { |
|||
|
|||
filerGrpcAddress, err := pb.ParseFilerGrpcAddress(*msgBrokerOpt.filer) |
|||
if err != nil { |
|||
glog.Fatal(err) |
|||
return false |
|||
} |
|||
|
|||
filerQueuesPath := "/queues" |
|||
|
|||
grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client") |
|||
|
|||
for { |
|||
err = pb.WithGrpcFilerClient(filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { |
|||
resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}) |
|||
if err != nil { |
|||
return fmt.Errorf("get filer %s configuration: %v", filerGrpcAddress, err) |
|||
} |
|||
filerQueuesPath = resp.DirQueues |
|||
glog.V(0).Infof("Queue read filer queues dir: %s", filerQueuesPath) |
|||
return nil |
|||
}) |
|||
if err != nil { |
|||
glog.V(0).Infof("wait to connect to filer %s grpc address %s", *msgBrokerOpt.filer, filerGrpcAddress) |
|||
time.Sleep(time.Second) |
|||
} else { |
|||
glog.V(0).Infof("connected to filer %s grpc address %s", *msgBrokerOpt.filer, filerGrpcAddress) |
|||
break |
|||
} |
|||
} |
|||
|
|||
qs, err := weed_server.NewMessageBroker(&weed_server.MessageBrokerOption{ |
|||
Filers: []string{*msgBrokerOpt.filer}, |
|||
DefaultReplication: "", |
|||
MaxMB: 0, |
|||
Port: *msgBrokerOpt.port, |
|||
}) |
|||
|
|||
// start grpc listener
|
|||
grpcL, err := util.NewListener(":"+strconv.Itoa(*msgBrokerOpt.port), 0) |
|||
if err != nil { |
|||
glog.Fatalf("failed to listen on grpc port %d: %v", *msgBrokerOpt.port, err) |
|||
} |
|||
grpcS := pb.NewGrpcServer(security.LoadServerTLS(util.GetViper(), "grpc.msg_broker")) |
|||
queue_pb.RegisterSeaweedQueueServer(grpcS, qs) |
|||
reflection.Register(grpcS) |
|||
grpcS.Serve(grpcL) |
|||
|
|||
return true |
|||
|
|||
} |
@ -1,107 +0,0 @@ |
|||
package command |
|||
|
|||
import ( |
|||
"context" |
|||
"fmt" |
|||
"strconv" |
|||
"time" |
|||
|
|||
"google.golang.org/grpc/reflection" |
|||
|
|||
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb" |
|||
"github.com/chrislusf/seaweedfs/weed/pb/queue_pb" |
|||
"github.com/chrislusf/seaweedfs/weed/security" |
|||
weed_server "github.com/chrislusf/seaweedfs/weed/server" |
|||
|
|||
"github.com/chrislusf/seaweedfs/weed/glog" |
|||
"github.com/chrislusf/seaweedfs/weed/util" |
|||
) |
|||
|
|||
var ( |
|||
queueStandaloneOptions QueueOptions |
|||
) |
|||
|
|||
type QueueOptions struct { |
|||
filer *string |
|||
port *int |
|||
tlsPrivateKey *string |
|||
tlsCertificate *string |
|||
defaultTtl *string |
|||
} |
|||
|
|||
func init() { |
|||
cmdQueue.Run = runQueue // break init cycle
|
|||
queueStandaloneOptions.filer = cmdQueue.Flag.String("filer", "localhost:8888", "filer server address") |
|||
queueStandaloneOptions.port = cmdQueue.Flag.Int("port", 17777, "queue server gRPC listen port") |
|||
queueStandaloneOptions.tlsPrivateKey = cmdQueue.Flag.String("key.file", "", "path to the TLS private key file") |
|||
queueStandaloneOptions.tlsCertificate = cmdQueue.Flag.String("cert.file", "", "path to the TLS certificate file") |
|||
queueStandaloneOptions.defaultTtl = cmdQueue.Flag.String("ttl", "1h", "time to live, e.g.: 1m, 1h, 1d, 1M, 1y") |
|||
} |
|||
|
|||
var cmdQueue = &Command{ |
|||
UsageLine: "<WIP> queue [-port=17777] [-filer=<ip:port>]", |
|||
Short: "start a queue gRPC server that is backed by a filer", |
|||
Long: `start a queue gRPC server that is backed by a filer. |
|||
|
|||
`, |
|||
} |
|||
|
|||
func runQueue(cmd *Command, args []string) bool { |
|||
|
|||
util.LoadConfiguration("security", false) |
|||
|
|||
return queueStandaloneOptions.startQueueServer() |
|||
|
|||
} |
|||
|
|||
func (queueopt *QueueOptions) startQueueServer() bool { |
|||
|
|||
filerGrpcAddress, err := parseFilerGrpcAddress(*queueopt.filer) |
|||
if err != nil { |
|||
glog.Fatal(err) |
|||
return false |
|||
} |
|||
|
|||
filerQueuesPath := "/queues" |
|||
|
|||
grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client") |
|||
|
|||
for { |
|||
err = withFilerClient(filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { |
|||
resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}) |
|||
if err != nil { |
|||
return fmt.Errorf("get filer %s configuration: %v", filerGrpcAddress, err) |
|||
} |
|||
filerQueuesPath = resp.DirQueues |
|||
glog.V(0).Infof("Queue read filer queues dir: %s", filerQueuesPath) |
|||
return nil |
|||
}) |
|||
if err != nil { |
|||
glog.V(0).Infof("wait to connect to filer %s grpc address %s", *queueopt.filer, filerGrpcAddress) |
|||
time.Sleep(time.Second) |
|||
} else { |
|||
glog.V(0).Infof("connected to filer %s grpc address %s", *queueopt.filer, filerGrpcAddress) |
|||
break |
|||
} |
|||
} |
|||
|
|||
qs, err := weed_server.NewQueueServer(&weed_server.QueueServerOption{ |
|||
Filers: []string{*queueopt.filer}, |
|||
DefaultReplication: "", |
|||
MaxMB: 0, |
|||
Port: *queueopt.port, |
|||
}) |
|||
|
|||
// start grpc listener
|
|||
grpcL, err := util.NewListener(":"+strconv.Itoa(*queueopt.port), 0) |
|||
if err != nil { |
|||
glog.Fatalf("failed to listen on grpc port %d: %v", *queueopt.port, err) |
|||
} |
|||
grpcS := util.NewGrpcServer(security.LoadServerTLS(util.GetViper(), "grpc.queue")) |
|||
queue_pb.RegisterSeaweedQueueServer(grpcS, qs) |
|||
reflection.Register(grpcS) |
|||
go grpcS.Serve(grpcL) |
|||
|
|||
return true |
|||
|
|||
} |
@ -0,0 +1,23 @@ |
|||
package weed_server |
|||
|
|||
import ( |
|||
"context" |
|||
|
|||
"github.com/chrislusf/seaweedfs/weed/pb/queue_pb" |
|||
) |
|||
|
|||
func (broker *MessageBroker) ConfigureTopic(context.Context, *queue_pb.ConfigureTopicRequest) (*queue_pb.ConfigureTopicResponse, error) { |
|||
panic("implement me") |
|||
} |
|||
|
|||
func (broker *MessageBroker) DeleteTopic(context.Context, *queue_pb.DeleteTopicRequest) (*queue_pb.DeleteTopicResponse, error) { |
|||
panic("implement me") |
|||
} |
|||
|
|||
func (broker *MessageBroker) StreamWrite(queue_pb.SeaweedQueue_StreamWriteServer) error { |
|||
panic("implement me") |
|||
} |
|||
|
|||
func (broker *MessageBroker) StreamRead(*queue_pb.ReadMessageRequest, queue_pb.SeaweedQueue_StreamReadServer) error { |
|||
panic("implement me") |
|||
} |
@ -0,0 +1,121 @@ |
|||
package weed_server |
|||
|
|||
import ( |
|||
"context" |
|||
"fmt" |
|||
"time" |
|||
|
|||
"google.golang.org/grpc" |
|||
|
|||
"github.com/chrislusf/seaweedfs/weed/pb" |
|||
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb" |
|||
"github.com/chrislusf/seaweedfs/weed/pb/master_pb" |
|||
"github.com/chrislusf/seaweedfs/weed/security" |
|||
"github.com/chrislusf/seaweedfs/weed/util" |
|||
) |
|||
|
|||
type MessageBrokerOption struct { |
|||
Filers []string |
|||
DefaultReplication string |
|||
MaxMB int |
|||
Port int |
|||
} |
|||
|
|||
type MessageBroker struct { |
|||
option *MessageBrokerOption |
|||
grpcDialOption grpc.DialOption |
|||
} |
|||
|
|||
func NewMessageBroker(option *MessageBrokerOption) (messageBroker *MessageBroker, err error) { |
|||
|
|||
messageBroker = &MessageBroker{ |
|||
option: option, |
|||
grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.msg_broker"), |
|||
} |
|||
|
|||
go messageBroker.loopForEver() |
|||
|
|||
return messageBroker, nil |
|||
} |
|||
|
|||
func (broker *MessageBroker) loopForEver() { |
|||
|
|||
for { |
|||
broker.checkPeers() |
|||
time.Sleep(3 * time.Second) |
|||
} |
|||
|
|||
} |
|||
|
|||
func (broker *MessageBroker) checkPeers() { |
|||
|
|||
// contact a filer about masters
|
|||
var masters []string |
|||
for _, filer := range broker.option.Filers { |
|||
err := broker.withFilerClient(filer, func(client filer_pb.SeaweedFilerClient) error { |
|||
resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}) |
|||
if err != nil { |
|||
return err |
|||
} |
|||
masters = append(masters, resp.Masters...) |
|||
return nil |
|||
}) |
|||
if err != nil { |
|||
fmt.Printf("failed to read masters from %+v: %v\n", broker.option.Filers, err) |
|||
return |
|||
} |
|||
} |
|||
|
|||
// contact each masters for filers
|
|||
var filers []string |
|||
for _, master := range masters { |
|||
err := broker.withMasterClient(master, func(client master_pb.SeaweedClient) error { |
|||
resp, err := client.ListMasterClients(context.Background(), &master_pb.ListMasterClientsRequest{ |
|||
ClientType: "filer", |
|||
}) |
|||
if err != nil { |
|||
return err |
|||
} |
|||
|
|||
fmt.Printf("filers: %+v\n", resp.GrpcAddresses) |
|||
filers = append(filers, resp.GrpcAddresses...) |
|||
|
|||
return nil |
|||
}) |
|||
if err != nil { |
|||
fmt.Printf("failed to list filers: %v\n", err) |
|||
return |
|||
} |
|||
} |
|||
|
|||
// contact each filer about brokers
|
|||
for _, filer := range filers { |
|||
err := broker.withFilerClient(filer, func(client filer_pb.SeaweedFilerClient) error { |
|||
resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}) |
|||
if err != nil { |
|||
return err |
|||
} |
|||
masters = append(masters, resp.Masters...) |
|||
return nil |
|||
}) |
|||
if err != nil { |
|||
fmt.Printf("failed to read masters from %+v: %v\n", broker.option.Filers, err) |
|||
return |
|||
} |
|||
} |
|||
|
|||
} |
|||
|
|||
func (broker *MessageBroker) withFilerClient(filer string, fn func(filer_pb.SeaweedFilerClient) error) error { |
|||
|
|||
return pb.WithFilerClient(filer, broker.grpcDialOption, fn) |
|||
|
|||
} |
|||
|
|||
func (broker *MessageBroker) withMasterClient(master string, fn func(client master_pb.SeaweedClient) error) error { |
|||
|
|||
return pb.WithMasterClient(master, broker.grpcDialOption, func(client master_pb.SeaweedClient) error { |
|||
return fn(client) |
|||
}) |
|||
|
|||
} |
@ -1,49 +0,0 @@ |
|||
package weed_server |
|||
|
|||
import ( |
|||
"context" |
|||
|
|||
"google.golang.org/grpc" |
|||
|
|||
"github.com/chrislusf/seaweedfs/weed/pb/queue_pb" |
|||
"github.com/chrislusf/seaweedfs/weed/security" |
|||
"github.com/chrislusf/seaweedfs/weed/util" |
|||
) |
|||
|
|||
type QueueServerOption struct { |
|||
Filers []string |
|||
DefaultReplication string |
|||
MaxMB int |
|||
Port int |
|||
} |
|||
|
|||
type QueueServer struct { |
|||
option *QueueServerOption |
|||
grpcDialOption grpc.DialOption |
|||
} |
|||
|
|||
func (q *QueueServer) ConfigureTopic(context.Context, *queue_pb.ConfigureTopicRequest) (*queue_pb.ConfigureTopicResponse, error) { |
|||
panic("implement me") |
|||
} |
|||
|
|||
func (q *QueueServer) DeleteTopic(context.Context, *queue_pb.DeleteTopicRequest) (*queue_pb.DeleteTopicResponse, error) { |
|||
panic("implement me") |
|||
} |
|||
|
|||
func (q *QueueServer) StreamWrite(queue_pb.SeaweedQueue_StreamWriteServer) error { |
|||
panic("implement me") |
|||
} |
|||
|
|||
func (q *QueueServer) StreamRead(*queue_pb.ReadMessageRequest, queue_pb.SeaweedQueue_StreamReadServer) error { |
|||
panic("implement me") |
|||
} |
|||
|
|||
func NewQueueServer(option *QueueServerOption) (qs *QueueServer, err error) { |
|||
|
|||
qs = &QueueServer{ |
|||
option: option, |
|||
grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.queue"), |
|||
} |
|||
|
|||
return qs, nil |
|||
} |
Write
Preview
Loading…
Cancel
Save
Reference in new issue