You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

107 lines
3.1 KiB

  1. package command
  2. import (
  3. "context"
  4. "fmt"
  5. "strconv"
  6. "time"
  7. "google.golang.org/grpc/reflection"
  8. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  9. "github.com/chrislusf/seaweedfs/weed/pb/queue_pb"
  10. "github.com/chrislusf/seaweedfs/weed/security"
  11. weed_server "github.com/chrislusf/seaweedfs/weed/server"
  12. "github.com/chrislusf/seaweedfs/weed/glog"
  13. "github.com/chrislusf/seaweedfs/weed/util"
  14. )
  15. var (
  16. queueStandaloneOptions QueueOptions
  17. )
  18. type QueueOptions struct {
  19. filer *string
  20. port *int
  21. tlsPrivateKey *string
  22. tlsCertificate *string
  23. defaultTtl *string
  24. }
  25. func init() {
  26. cmdQueue.Run = runQueue // break init cycle
  27. queueStandaloneOptions.filer = cmdQueue.Flag.String("filer", "localhost:8888", "filer server address")
  28. queueStandaloneOptions.port = cmdQueue.Flag.Int("port", 17777, "queue server gRPC listen port")
  29. queueStandaloneOptions.tlsPrivateKey = cmdQueue.Flag.String("key.file", "", "path to the TLS private key file")
  30. queueStandaloneOptions.tlsCertificate = cmdQueue.Flag.String("cert.file", "", "path to the TLS certificate file")
  31. queueStandaloneOptions.defaultTtl = cmdQueue.Flag.String("ttl", "1h", "time to live, e.g.: 1m, 1h, 1d, 1M, 1y")
  32. }
  33. var cmdQueue = &Command{
  34. UsageLine: "<WIP> queue [-port=17777] [-filer=<ip:port>]",
  35. Short: "start a queue gRPC server that is backed by a filer",
  36. Long: `start a queue gRPC server that is backed by a filer.
  37. `,
  38. }
  39. func runQueue(cmd *Command, args []string) bool {
  40. util.LoadConfiguration("security", false)
  41. return queueStandaloneOptions.startQueueServer()
  42. }
  43. func (queueopt *QueueOptions) startQueueServer() bool {
  44. filerGrpcAddress, err := parseFilerGrpcAddress(*queueopt.filer)
  45. if err != nil {
  46. glog.Fatal(err)
  47. return false
  48. }
  49. filerQueuesPath := "/queues"
  50. grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
  51. for {
  52. err = withFilerClient(filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
  53. resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
  54. if err != nil {
  55. return fmt.Errorf("get filer %s configuration: %v", filerGrpcAddress, err)
  56. }
  57. filerQueuesPath = resp.DirQueues
  58. glog.V(0).Infof("Queue read filer queues dir: %s", filerQueuesPath)
  59. return nil
  60. })
  61. if err != nil {
  62. glog.V(0).Infof("wait to connect to filer %s grpc address %s", *queueopt.filer, filerGrpcAddress)
  63. time.Sleep(time.Second)
  64. } else {
  65. glog.V(0).Infof("connected to filer %s grpc address %s", *queueopt.filer, filerGrpcAddress)
  66. break
  67. }
  68. }
  69. qs, err := weed_server.NewQueueServer(&weed_server.QueueServerOption{
  70. Filers: []string{*queueopt.filer},
  71. DefaultReplication: "",
  72. MaxMB: 0,
  73. Port: *queueopt.port,
  74. })
  75. // start grpc listener
  76. grpcL, err := util.NewListener(":"+strconv.Itoa(*queueopt.port), 0)
  77. if err != nil {
  78. glog.Fatalf("failed to listen on grpc port %d: %v", *queueopt.port, err)
  79. }
  80. grpcS := util.NewGrpcServer(security.LoadServerTLS(util.GetViper(), "grpc.queue"))
  81. queue_pb.RegisterSeaweedQueueServer(grpcS, qs)
  82. reflection.Register(grpcS)
  83. go grpcS.Serve(grpcL)
  84. return true
  85. }