Browse Source

filer master start up with default ip address instead of just localhost

pull/1293/head
Chris Lu 5 years ago
parent
commit
076c8bd3bc
  1. 2
      weed/command/benchmark.go
  2. 3
      weed/command/filer.go
  3. 1
      weed/command/master.go
  4. 2
      weed/command/server.go
  5. 4
      weed/filer2/filer.go
  6. 15
      weed/messaging/broker/broker_grpc_server_publish.go
  7. 85
      weed/messaging/broker/broker_server.go
  8. 27
      weed/messaging/client/client.go
  9. 72
      weed/messaging/client/publisher.go
  10. 10
      weed/pb/grpc_client_server.go
  11. 5
      weed/pb/messaging.proto
  12. 144
      weed/pb/messaging_pb/messaging.pb.go
  13. 3
      weed/server/filer_server.go
  14. 3
      weed/server/master_server.go
  15. 2
      weed/shell/commands.go
  16. 4
      weed/wdclient/masterclient.go

2
weed/command/benchmark.go

@ -127,7 +127,7 @@ func runBenchmark(cmd *Command, args []string) bool {
defer pprof.StopCPUProfile() defer pprof.StopCPUProfile()
} }
b.masterClient = wdclient.NewMasterClient(b.grpcDialOption, "client", 0, strings.Split(*b.masters, ","))
b.masterClient = wdclient.NewMasterClient(b.grpcDialOption, "client", "", 0, strings.Split(*b.masters, ","))
go b.masterClient.KeepConnectedToMaster() go b.masterClient.KeepConnectedToMaster()
b.masterClient.WaitUntilConnected() b.masterClient.WaitUntilConnected()

3
weed/command/filer.go

@ -43,7 +43,7 @@ func init() {
cmdFiler.Run = runFiler // break init cycle cmdFiler.Run = runFiler // break init cycle
f.masters = cmdFiler.Flag.String("master", "localhost:9333", "comma-separated master servers") f.masters = cmdFiler.Flag.String("master", "localhost:9333", "comma-separated master servers")
f.collection = cmdFiler.Flag.String("collection", "", "all data will be stored in this collection") f.collection = cmdFiler.Flag.String("collection", "", "all data will be stored in this collection")
f.ip = cmdFiler.Flag.String("ip", "", "filer server http listen ip address")
f.ip = cmdFiler.Flag.String("ip", util.DetectedHostAddress(), "filer server http listen ip address")
f.port = cmdFiler.Flag.Int("port", 8888, "filer server http listen port") f.port = cmdFiler.Flag.Int("port", 8888, "filer server http listen port")
f.publicPort = cmdFiler.Flag.Int("port.readonly", 0, "readonly port opened to public") f.publicPort = cmdFiler.Flag.Int("port.readonly", 0, "readonly port opened to public")
f.defaultReplicaPlacement = cmdFiler.Flag.String("defaultReplicaPlacement", "000", "default replication type if not specified") f.defaultReplicaPlacement = cmdFiler.Flag.String("defaultReplicaPlacement", "000", "default replication type if not specified")
@ -109,6 +109,7 @@ func (fo *FilerOptions) startFiler() {
DataCenter: *fo.dataCenter, DataCenter: *fo.dataCenter,
DefaultLevelDbDir: defaultLevelDbDirectory, DefaultLevelDbDir: defaultLevelDbDirectory,
DisableHttp: *fo.disableHttp, DisableHttp: *fo.disableHttp,
Host: *fo.ip,
Port: uint32(*fo.port), Port: uint32(*fo.port),
Cipher: *fo.cipher, Cipher: *fo.cipher,
}) })

1
weed/command/master.go

@ -172,6 +172,7 @@ func checkPeers(masterIp string, masterPort int, peers string) (masterAddress st
func (m *MasterOptions) toMasterOption(whiteList []string) *weed_server.MasterOption { func (m *MasterOptions) toMasterOption(whiteList []string) *weed_server.MasterOption {
return &weed_server.MasterOption{ return &weed_server.MasterOption{
Host: *m.ip,
Port: *m.port, Port: *m.port,
MetaFolder: *m.metaFolder, MetaFolder: *m.metaFolder,
VolumeSizeLimitMB: *m.volumeSizeLimitMB, VolumeSizeLimitMB: *m.volumeSizeLimitMB,

2
weed/command/server.go

@ -45,7 +45,7 @@ var cmdServer = &Command{
} }
var ( var (
serverIp = cmdServer.Flag.String("ip", "localhost", "ip or server name")
serverIp = cmdServer.Flag.String("ip", util.DetectedHostAddress(), "ip or server name")
serverBindIp = cmdServer.Flag.String("ip.bind", "0.0.0.0", "ip address to bind to") serverBindIp = cmdServer.Flag.String("ip.bind", "0.0.0.0", "ip address to bind to")
serverTimeout = cmdServer.Flag.Int("idleTimeout", 30, "connection idle seconds") serverTimeout = cmdServer.Flag.Int("idleTimeout", 30, "connection idle seconds")
serverDataCenter = cmdServer.Flag.String("dataCenter", "", "current volume server's data center name") serverDataCenter = cmdServer.Flag.String("dataCenter", "", "current volume server's data center name")

4
weed/filer2/filer.go

@ -40,10 +40,10 @@ type Filer struct {
metaLogReplication string metaLogReplication string
} }
func NewFiler(masters []string, grpcDialOption grpc.DialOption, filerGrpcPort uint32, collection string, replication string, notifyFn func()) *Filer {
func NewFiler(masters []string, grpcDialOption grpc.DialOption, filerHost string, filerGrpcPort uint32, collection string, replication string, notifyFn func()) *Filer {
f := &Filer{ f := &Filer{
directoryCache: ccache.New(ccache.Configure().MaxSize(1000).ItemsToPrune(100)), directoryCache: ccache.New(ccache.Configure().MaxSize(1000).ItemsToPrune(100)),
MasterClient: wdclient.NewMasterClient(grpcDialOption, "filer", filerGrpcPort, masters),
MasterClient: wdclient.NewMasterClient(grpcDialOption, "filer", filerHost, filerGrpcPort, masters),
fileIdDeletionQueue: util.NewUnboundedQueue(), fileIdDeletionQueue: util.NewUnboundedQueue(),
GrpcDialOption: grpcDialOption, GrpcDialOption: grpcDialOption,
} }

15
weed/messaging/broker/broker_grpc_server_publish.go

@ -27,6 +27,19 @@ func (broker *MessageBroker) Publish(stream messaging_pb.SeaweedMessaging_Publis
topicConfig := &messaging_pb.TopicConfiguration{ topicConfig := &messaging_pb.TopicConfiguration{
} }
// send init response
initResponse := &messaging_pb.PublishResponse{
Config: nil,
Redirect: nil,
}
err = stream.Send(initResponse)
if err != nil {
return err
}
if initResponse.Redirect != nil {
return nil
}
// get lock // get lock
tp := TopicPartition{ tp := TopicPartition{
@ -87,6 +100,8 @@ func (broker *MessageBroker) Publish(stream messaging_pb.SeaweedMessaging_Publis
Headers: in.Data.Headers, Headers: in.Data.Headers,
} }
println("received message:", string(in.Data.Value))
data, err := proto.Marshal(m) data, err := proto.Marshal(m)
if err != nil { if err != nil {
glog.Errorf("marshall error: %v\n", err) glog.Errorf("marshall error: %v\n", err)

85
weed/messaging/broker/broker_server.go

@ -2,11 +2,11 @@ package broker
import ( import (
"context" "context"
"fmt"
"time" "time"
"google.golang.org/grpc" "google.golang.org/grpc"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb" "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
@ -34,7 +34,9 @@ func NewMessageBroker(option *MessageBrokerOption, grpcDialOption grpc.DialOptio
topicLocks: NewTopicLocks(), topicLocks: NewTopicLocks(),
} }
go messageBroker.loopForEver()
messageBroker.checkPeers()
// go messageBroker.loopForEver()
return messageBroker, nil return messageBroker, nil
} }
@ -52,58 +54,55 @@ func (broker *MessageBroker) checkPeers() {
// contact a filer about masters // contact a filer about masters
var masters []string var masters []string
for _, filer := range broker.option.Filers {
err := broker.withFilerClient(filer, func(client filer_pb.SeaweedFilerClient) error {
resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
if err != nil {
return err
found := false
for !found {
for _, filer := range broker.option.Filers {
err := broker.withFilerClient(filer, func(client filer_pb.SeaweedFilerClient) error {
resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
if err != nil {
return err
}
masters = append(masters, resp.Masters...)
return nil
})
if err == nil {
found = true
break
} }
masters = append(masters, resp.Masters...)
return nil
})
if err != nil {
fmt.Printf("failed to read masters from %+v: %v\n", broker.option.Filers, err)
return
glog.V(0).Infof("failed to read masters from %+v: %v", broker.option.Filers, err)
time.Sleep(time.Second)
} }
} }
glog.V(0).Infof("received master list: %s", masters)
// contact each masters for filers // contact each masters for filers
var filers []string var filers []string
for _, master := range masters {
err := broker.withMasterClient(master, func(client master_pb.SeaweedClient) error {
resp, err := client.ListMasterClients(context.Background(), &master_pb.ListMasterClientsRequest{
ClientType: "filer",
found = false
for !found {
for _, master := range masters {
err := broker.withMasterClient(master, func(client master_pb.SeaweedClient) error {
resp, err := client.ListMasterClients(context.Background(), &master_pb.ListMasterClientsRequest{
ClientType: "filer",
})
if err != nil {
return err
}
filers = append(filers, resp.GrpcAddresses...)
return nil
}) })
if err != nil {
return err
if err == nil {
found = true
break
} }
fmt.Printf("filers: %+v\n", resp.GrpcAddresses)
filers = append(filers, resp.GrpcAddresses...)
return nil
})
if err != nil {
fmt.Printf("failed to list filers: %v\n", err)
return
glog.V(0).Infof("failed to list filers: %v", err)
time.Sleep(time.Second)
} }
} }
glog.V(0).Infof("received filer list: %s", filers)
// contact each filer about brokers
for _, filer := range filers {
err := broker.withFilerClient(filer, func(client filer_pb.SeaweedFilerClient) error {
resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
if err != nil {
return err
}
masters = append(masters, resp.Masters...)
return nil
})
if err != nil {
fmt.Printf("failed to read masters from %+v: %v\n", broker.option.Filers, err)
return
}
}
broker.option.Filers = filers
} }

27
weed/messaging/client/client.go

@ -1,11 +1,34 @@
package client package client
import (
"context"
"google.golang.org/grpc"
"github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/util"
)
type MessagingClient struct { type MessagingClient struct {
bootstrapBrokers []string bootstrapBrokers []string
grpcConnection *grpc.ClientConn
} }
func NewMessagingClient(bootstrapBrokers []string) *MessagingClient {
func NewMessagingClient(bootstrapBrokers []string) (*MessagingClient, error) {
grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.msg_client")
grpcConnection, err := pb.GrpcDial(context.Background(), "localhost:17777", grpcDialOption)
if err != nil {
return nil, err
}
return &MessagingClient{ return &MessagingClient{
bootstrapBrokers: bootstrapBrokers, bootstrapBrokers: bootstrapBrokers,
}
grpcConnection: grpcConnection,
}, nil
}
func (mc *MessagingClient) Shutdown() {
mc.grpcConnection.Close()
} }

72
weed/messaging/client/publisher.go

@ -1,14 +1,76 @@
package client package client
import "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
import (
"context"
"github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
)
type Publisher struct { type Publisher struct {
publishClient messaging_pb.SeaweedMessaging_PublishClient
}
func (mc *MessagingClient) NewPublisher(namespace, topic string) (*Publisher, error) {
stream, err := messaging_pb.NewSeaweedMessagingClient(mc.grpcConnection).Publish(context.Background())
if err != nil {
return nil, err
}
// send init message
err = stream.Send(&messaging_pb.PublishRequest{
Init: &messaging_pb.PublishRequest_InitMessage{
Namespace: namespace,
Topic: topic,
Partition: 0,
},
})
if err != nil {
return nil, err
}
// process init response
initResponse, err := stream.Recv()
if err != nil {
return nil, err
}
if initResponse.Redirect != nil {
// TODO follow redirection
}
if initResponse.Config != nil {
}
// setup looks for control messages
doneChan := make(chan error, 1)
go func() {
for {
in, err := stream.Recv()
if err != nil {
doneChan <- err
return
}
if in.Redirect != nil{
}
if in.Config != nil{
}
}
}()
return &Publisher{
publishClient: stream,
}, nil
} }
func (c *MessagingClient) NewPublisher(namespace, topic string) *Publisher {
return &Publisher{}
func (p *Publisher) Publish(m *messaging_pb.RawData) error {
return p.publishClient.Send(&messaging_pb.PublishRequest{
Data: m,
})
} }
func (p *Publisher) Publish(m *messaging_pb.RawData) error{
return nil
func (p *Publisher) Shutdown() {
p.publishClient.CloseSend()
} }

10
weed/pb/grpc_client_server.go

@ -14,6 +14,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb" "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
) )
const ( const (
@ -158,6 +159,15 @@ func WithMasterClient(master string, grpcDialOption grpc.DialOption, fn func(cli
} }
func WithBrokerGrpcClient(brokerGrpcAddress string, grpcDialOption grpc.DialOption, fn func(client messaging_pb.SeaweedMessagingClient) error) error {
return WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error {
client := messaging_pb.NewSeaweedMessagingClient(grpcConnection)
return fn(client)
}, brokerGrpcAddress, grpcDialOption)
}
func WithFilerClient(filer string, grpcDialOption grpc.DialOption, fn func(client filer_pb.SeaweedFilerClient) error) error { func WithFilerClient(filer string, grpcDialOption grpc.DialOption, fn func(client filer_pb.SeaweedFilerClient) error) error {
filerGrpcAddress, parseErr := ParseServerToGrpcAddress(filer) filerGrpcAddress, parseErr := ParseServerToGrpcAddress(filer)

5
weed/pb/messaging.proto

@ -74,11 +74,6 @@ message PublishRequest {
int32 partition = 3; int32 partition = 3;
} }
InitMessage init = 1; InitMessage init = 1;
message RawData {
bytes key = 1; // Message key
bytes value = 2; // Message payload
map<string, bytes> headers = 3; // Message headers
}
RawData data = 2; RawData data = 2;
} }

144
weed/pb/messaging_pb/messaging.pb.go

@ -283,7 +283,7 @@ func (m *BrokerMessage_RedirectMessage) GetNewBroker() string {
type PublishRequest struct { type PublishRequest struct {
Init *PublishRequest_InitMessage `protobuf:"bytes,1,opt,name=init" json:"init,omitempty"` Init *PublishRequest_InitMessage `protobuf:"bytes,1,opt,name=init" json:"init,omitempty"`
Data *PublishRequest_RawData `protobuf:"bytes,2,opt,name=data" json:"data,omitempty"`
Data *RawData `protobuf:"bytes,2,opt,name=data" json:"data,omitempty"`
} }
func (m *PublishRequest) Reset() { *m = PublishRequest{} } func (m *PublishRequest) Reset() { *m = PublishRequest{} }
@ -298,7 +298,7 @@ func (m *PublishRequest) GetInit() *PublishRequest_InitMessage {
return nil return nil
} }
func (m *PublishRequest) GetData() *PublishRequest_RawData {
func (m *PublishRequest) GetData() *RawData {
if m != nil { if m != nil {
return m.Data return m.Data
} }
@ -337,38 +337,6 @@ func (m *PublishRequest_InitMessage) GetPartition() int32 {
return 0 return 0
} }
type PublishRequest_RawData struct {
Key []byte `protobuf:"bytes,1,opt,name=key,proto3" json:"key,omitempty"`
Value []byte `protobuf:"bytes,2,opt,name=value,proto3" json:"value,omitempty"`
Headers map[string][]byte `protobuf:"bytes,3,rep,name=headers" json:"headers,omitempty" protobuf_key:"bytes,1,opt,name=key" protobuf_val:"bytes,2,opt,name=value,proto3"`
}
func (m *PublishRequest_RawData) Reset() { *m = PublishRequest_RawData{} }
func (m *PublishRequest_RawData) String() string { return proto.CompactTextString(m) }
func (*PublishRequest_RawData) ProtoMessage() {}
func (*PublishRequest_RawData) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{4, 1} }
func (m *PublishRequest_RawData) GetKey() []byte {
if m != nil {
return m.Key
}
return nil
}
func (m *PublishRequest_RawData) GetValue() []byte {
if m != nil {
return m.Value
}
return nil
}
func (m *PublishRequest_RawData) GetHeaders() map[string][]byte {
if m != nil {
return m.Headers
}
return nil
}
type PublishResponse struct { type PublishResponse struct {
Config *PublishResponse_ConfigMessage `protobuf:"bytes,1,opt,name=config" json:"config,omitempty"` Config *PublishResponse_ConfigMessage `protobuf:"bytes,1,opt,name=config" json:"config,omitempty"`
Redirect *PublishResponse_RedirectMessage `protobuf:"bytes,2,opt,name=redirect" json:"redirect,omitempty"` Redirect *PublishResponse_RedirectMessage `protobuf:"bytes,2,opt,name=redirect" json:"redirect,omitempty"`
@ -551,7 +519,6 @@ func init() {
proto.RegisterType((*BrokerMessage_RedirectMessage)(nil), "messaging_pb.BrokerMessage.RedirectMessage") proto.RegisterType((*BrokerMessage_RedirectMessage)(nil), "messaging_pb.BrokerMessage.RedirectMessage")
proto.RegisterType((*PublishRequest)(nil), "messaging_pb.PublishRequest") proto.RegisterType((*PublishRequest)(nil), "messaging_pb.PublishRequest")
proto.RegisterType((*PublishRequest_InitMessage)(nil), "messaging_pb.PublishRequest.InitMessage") proto.RegisterType((*PublishRequest_InitMessage)(nil), "messaging_pb.PublishRequest.InitMessage")
proto.RegisterType((*PublishRequest_RawData)(nil), "messaging_pb.PublishRequest.RawData")
proto.RegisterType((*PublishResponse)(nil), "messaging_pb.PublishResponse") proto.RegisterType((*PublishResponse)(nil), "messaging_pb.PublishResponse")
proto.RegisterType((*PublishResponse_ConfigMessage)(nil), "messaging_pb.PublishResponse.ConfigMessage") proto.RegisterType((*PublishResponse_ConfigMessage)(nil), "messaging_pb.PublishResponse.ConfigMessage")
proto.RegisterType((*PublishResponse_RedirectMessage)(nil), "messaging_pb.PublishResponse.RedirectMessage") proto.RegisterType((*PublishResponse_RedirectMessage)(nil), "messaging_pb.PublishResponse.RedirectMessage")
@ -802,58 +769,57 @@ var _SeaweedMessaging_serviceDesc = grpc.ServiceDesc{
func init() { proto.RegisterFile("messaging.proto", fileDescriptor0) } func init() { proto.RegisterFile("messaging.proto", fileDescriptor0) }
var fileDescriptor0 = []byte{ var fileDescriptor0 = []byte{
// 847 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xb4, 0x56, 0xcd, 0x8e, 0xe3, 0x44,
0x10, 0xde, 0x8e, 0x33, 0x93, 0x75, 0xe5, 0x97, 0x16, 0x83, 0x22, 0x33, 0x03, 0x96, 0x77, 0x25,
0x02, 0x23, 0xac, 0x21, 0x5c, 0xc2, 0x6a, 0x25, 0x94, 0x84, 0xb0, 0x44, 0x4c, 0x20, 0xea, 0xe4,
0x8a, 0xa2, 0x8e, 0xd3, 0x9b, 0xb5, 0x92, 0xd8, 0xc6, 0xdd, 0x21, 0xda, 0x13, 0x07, 0xb8, 0x72,
0xe2, 0x1d, 0xb8, 0x70, 0xe6, 0xc2, 0x8d, 0x07, 0xe0, 0x9d, 0x90, 0x7f, 0x63, 0x27, 0x1e, 0xef,
0x10, 0xed, 0xdc, 0xec, 0x72, 0xd5, 0x57, 0xf5, 0x55, 0x7d, 0xd5, 0x6e, 0xa8, 0x6f, 0x18, 0xe7,
0x74, 0x69, 0x5a, 0x4b, 0xdd, 0x71, 0x6d, 0x61, 0xe3, 0x4a, 0x6c, 0x98, 0x39, 0x73, 0xed, 0x97,
0x22, 0xbc, 0x33, 0xd9, 0xce, 0xb9, 0xe1, 0x9a, 0x73, 0xe6, 0x8e, 0xfc, 0x4f, 0x0c, 0x7f, 0x09,
0x45, 0xd3, 0x32, 0x45, 0x13, 0xa9, 0xa8, 0x55, 0x6e, 0x5f, 0xeb, 0xc9, 0x10, 0xfd, 0xc8, 0x5d,
0x1f, 0x5a, 0xa6, 0x08, 0x9f, 0x89, 0x1f, 0x88, 0x9f, 0x83, 0x44, 0x8d, 0x55, 0xb3, 0xe0, 0xc7,
0x7f, 0xf2, 0xa6, 0xf8, 0xae, 0xb1, 0x8a, 0xc2, 0xbd, 0x30, 0xe5, 0x9f, 0x02, 0x94, 0x13, 0x98,
0xf8, 0x12, 0x64, 0x8b, 0x6e, 0x18, 0x77, 0xa8, 0xc1, 0xfc, 0x9a, 0x64, 0xb2, 0x37, 0xe0, 0x77,
0xe1, 0x4c, 0xd8, 0x8e, 0x69, 0xf8, 0xd9, 0x64, 0x12, 0xbc, 0x78, 0x31, 0x0e, 0x75, 0x85, 0x29,
0x4c, 0xdb, 0x6a, 0x4a, 0x2a, 0x6a, 0x9d, 0x91, 0xbd, 0x01, 0xcf, 0xa0, 0xca, 0x05, 0x75, 0xc5,
0xd8, 0xe6, 0x81, 0x47, 0x51, 0x45, 0xad, 0x5a, 0xfb, 0x8b, 0xff, 0xc1, 0x54, 0x9f, 0x24, 0x01,
0x48, 0x1a, 0x0f, 0xab, 0x50, 0x16, 0xe6, 0x86, 0x71, 0x41, 0x37, 0xce, 0x77, 0xbc, 0x79, 0xa6,
0xa2, 0x96, 0x44, 0x92, 0x26, 0xfc, 0x04, 0xaa, 0x3c, 0xc6, 0x9f, 0x99, 0x8b, 0xe6, 0xb9, 0x5f,
0x7e, 0x65, 0x6f, 0x1c, 0x2e, 0xb4, 0x0e, 0x54, 0x53, 0x69, 0x30, 0xc0, 0xf9, 0x6d, 0x77, 0x3a,
0x98, 0x4c, 0x1b, 0x8f, 0x70, 0x05, 0x1e, 0x0f, 0xba, 0xe4, 0x76, 0xe8, 0xbd, 0x21, 0x5c, 0x05,
0x79, 0x3a, 0x1c, 0x0d, 0x26, 0xd3, 0xee, 0x68, 0xdc, 0x28, 0x28, 0xd7, 0x00, 0xfb, 0xb6, 0xe2,
0x2b, 0x80, 0x80, 0x19, 0xf3, 0x32, 0x21, 0xbf, 0x1a, 0x39, 0xb4, 0x0c, 0x17, 0xda, 0x9f, 0x08,
0x4a, 0x84, 0xee, 0xbe, 0xa2, 0x82, 0xe2, 0x06, 0x48, 0x2b, 0xf6, 0xda, 0xf7, 0xa9, 0x10, 0xef,
0xd1, 0x6b, 0xf0, 0x4f, 0x74, 0xbd, 0x65, 0x7e, 0x83, 0x2b, 0x24, 0x78, 0xc1, 0xcf, 0xa1, 0xf4,
0x8a, 0xd1, 0x05, 0x73, 0x79, 0x53, 0x52, 0xa5, 0x56, 0xb9, 0xad, 0xa5, 0x9b, 0x17, 0xe2, 0xe9,
0xdf, 0x04, 0x4e, 0x03, 0x4b, 0xb8, 0xaf, 0x49, 0x14, 0xa2, 0x3c, 0x83, 0x4a, 0xf2, 0x43, 0x32,
0xab, 0x9c, 0x93, 0xf5, 0x59, 0xa1, 0x83, 0xb4, 0x7f, 0x11, 0x94, 0x22, 0x62, 0x2a, 0xc8, 0x71,
0x53, 0x03, 0x5e, 0xbd, 0xc2, 0x0d, 0x22, 0x7b, 0x63, 0x84, 0x5c, 0xc8, 0xe0, 0x23, 0xdd, 0xc1,
0xa7, 0x98, 0xc5, 0x27, 0x1a, 0xfb, 0xdb, 0xe7, 0xf3, 0x17, 0x82, 0x6a, 0xcf, 0xb5, 0x57, 0xfb,
0xfd, 0xfb, 0x18, 0x8a, 0x0b, 0x2a, 0x68, 0xb8, 0x7f, 0x17, 0x99, 0x85, 0x10, 0xdf, 0x05, 0xbf,
0x80, 0xc7, 0x2e, 0x5b, 0x98, 0x2e, 0x33, 0x44, 0xb8, 0x6e, 0x07, 0xeb, 0x9a, 0x42, 0xd6, 0x49,
0xe8, 0x1b, 0x81, 0xc4, 0xc1, 0xca, 0x0d, 0xd4, 0x0f, 0x3e, 0x7a, 0xaa, 0xb1, 0xd8, 0x6e, 0x36,
0xf7, 0x11, 0xe2, 0xc5, 0x63, 0xbb, 0x00, 0x52, 0xfb, 0x43, 0x82, 0xda, 0x78, 0x3b, 0x5f, 0x9b,
0xfc, 0x15, 0x61, 0x3f, 0x6e, 0x19, 0xf7, 0xf6, 0x3e, 0x79, 0x70, 0xb4, 0xd2, 0x95, 0xa4, 0x7d,
0x33, 0x4e, 0x8d, 0x4e, 0x48, 0x3b, 0xe0, 0xf1, 0x34, 0x37, 0x3a, 0x94, 0x57, 0xd0, 0x05, 0x65,
0xf6, 0xc0, 0x07, 0x86, 0xf2, 0xf7, 0x09, 0x1b, 0xf2, 0xed, 0xe1, 0x86, 0x7c, 0x76, 0x1f, 0x46,
0x0f, 0x20, 0xb0, 0xdf, 0x0a, 0x50, 0x8f, 0x93, 0x71, 0xc7, 0xb6, 0x38, 0xc3, 0x7d, 0x38, 0x37,
0x6c, 0xeb, 0xa5, 0xb9, 0xcc, 0x3e, 0xe4, 0x0f, 0xdc, 0xf5, 0xbe, 0xef, 0x1b, 0x8d, 0x2b, 0x0c,
0xc5, 0xc3, 0x23, 0xf1, 0x7d, 0x9a, 0x0f, 0x73, 0xb7, 0xfc, 0x3a, 0x50, 0x4d, 0xe5, 0xc0, 0x1f,
0x41, 0x3d, 0x6e, 0xff, 0xcc, 0xb0, 0xb7, 0x56, 0xa0, 0xaa, 0x33, 0x52, 0x8b, 0xcd, 0x7d, 0xcf,
0x7a, 0x82, 0x70, 0x7f, 0x47, 0x70, 0x11, 0x24, 0xdb, 0xba, 0x6c, 0xea, 0x4d, 0x3f, 0xd2, 0xef,
0x29, 0xc2, 0xf9, 0x1a, 0xaa, 0x46, 0x08, 0x46, 0x63, 0xf1, 0x94, 0xdb, 0x6a, 0xba, 0x13, 0x7e,
0x9a, 0x7e, 0xd2, 0x8f, 0xa4, 0xc3, 0xb4, 0x26, 0xbc, 0x77, 0x58, 0x54, 0xd0, 0x35, 0x8d, 0xc0,
0xe5, 0x0b, 0x26, 0x32, 0x10, 0x4e, 0xaf, 0x5a, 0x5b, 0xc2, 0xd5, 0x1d, 0x98, 0xa1, 0x40, 0x8e,
0x68, 0xa1, 0xd3, 0x68, 0xfd, 0x0c, 0xf8, 0xd8, 0xe9, 0xde, 0xd3, 0xc5, 0x1f, 0x00, 0x18, 0xf6,
0x7a, 0xcd, 0x0c, 0xbf, 0x86, 0x80, 0x42, 0xc2, 0xe2, 0xfd, 0x68, 0x5d, 0xe6, 0xac, 0x4d, 0x63,
0xdf, 0x7b, 0x99, 0x24, 0x4d, 0xed, 0x5f, 0x25, 0x68, 0x4c, 0x18, 0xdd, 0x31, 0xb6, 0x18, 0x45,
0xa5, 0xe3, 0xef, 0x41, 0x8e, 0xff, 0xee, 0xf8, 0xc3, 0x37, 0xfc, 0xf6, 0x95, 0xf7, 0x73, 0x8e,
0x54, 0xed, 0x51, 0x0b, 0xdd, 0x20, 0x7c, 0x0b, 0xa5, 0x50, 0xec, 0xf8, 0x32, 0x6f, 0xcd, 0x95,
0xab, 0xdc, 0x0d, 0x09, 0xd1, 0x7e, 0x80, 0x5a, 0x5a, 0x0b, 0xf8, 0x49, 0x3a, 0x2c, 0x53, 0xbe,
0xca, 0xd3, 0x7c, 0xa7, 0x28, 0x05, 0x76, 0xe1, 0x22, 0x73, 0xf8, 0xf8, 0xe0, 0xaa, 0x96, 0xa7,
0x3a, 0xe5, 0xfa, 0x5e, 0xbe, 0x51, 0xce, 0x9e, 0x06, 0x0d, 0x1e, 0x4c, 0xe1, 0x25, 0xd7, 0x8d,
0xb5, 0xc9, 0x2c, 0xd1, 0xab, 0xc5, 0x03, 0x19, 0x7b, 0x77, 0xd3, 0xf9, 0xb9, 0x7f, 0x45, 0xfd,
0xfc, 0xbf, 0x00, 0x00, 0x00, 0xff, 0xff, 0xdf, 0xc7, 0xa8, 0xba, 0xb5, 0x0a, 0x00, 0x00,
// 826 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xb4, 0x56, 0xcb, 0x8e, 0xe3, 0x44,
0x14, 0x9d, 0xb2, 0xf3, 0x18, 0xdf, 0x3c, 0x29, 0xd1, 0x28, 0x32, 0xdd, 0x60, 0x79, 0x90, 0x08,
0xb4, 0xb0, 0x5a, 0x61, 0xd3, 0x8c, 0x46, 0x42, 0x49, 0x08, 0x43, 0xa4, 0x0e, 0x44, 0x95, 0x6c,
0x51, 0x54, 0x71, 0x6a, 0x32, 0x56, 0x12, 0xdb, 0xb8, 0x2a, 0x44, 0xbd, 0x62, 0x01, 0x5b, 0x56,
0x7c, 0x06, 0x6b, 0x3e, 0x80, 0x0f, 0xe0, 0x07, 0xf8, 0x1a, 0xe4, 0x67, 0xec, 0xc4, 0x9d, 0x6e,
0x22, 0xd8, 0xd9, 0xd7, 0xe7, 0x9e, 0x7b, 0xcf, 0x7d, 0x94, 0x0b, 0x1a, 0x1b, 0xc6, 0x39, 0x5d,
0x5a, 0xf6, 0xd2, 0x70, 0x3d, 0x47, 0x38, 0xb8, 0x9a, 0x18, 0x66, 0xee, 0x5c, 0xff, 0xb9, 0x00,
0xef, 0x4c, 0xb6, 0x73, 0x6e, 0x7a, 0xd6, 0x9c, 0x79, 0xa3, 0xe0, 0x13, 0xc3, 0x5f, 0x42, 0xc1,
0xb2, 0x2d, 0xd1, 0x42, 0x1a, 0x6a, 0x57, 0x3a, 0xd7, 0x46, 0xda, 0xc5, 0x38, 0x82, 0x1b, 0x43,
0xdb, 0x12, 0xd1, 0x33, 0x09, 0x1c, 0xf1, 0x2b, 0x90, 0xa9, 0xb9, 0x6a, 0x49, 0x81, 0xff, 0xa7,
0x8f, 0xf9, 0x77, 0xcd, 0x55, 0xec, 0xee, 0xbb, 0xa9, 0x7f, 0x4a, 0x50, 0x49, 0x71, 0xe2, 0x4b,
0x50, 0x6c, 0xba, 0x61, 0xdc, 0xa5, 0x26, 0x0b, 0x72, 0x52, 0xc8, 0xde, 0x80, 0xdf, 0x85, 0xa2,
0x70, 0x5c, 0xcb, 0x0c, 0xa2, 0x29, 0x24, 0x7c, 0xf1, 0x7d, 0x5c, 0xea, 0x09, 0x4b, 0x58, 0x8e,
0xdd, 0x92, 0x35, 0xd4, 0x2e, 0x92, 0xbd, 0x01, 0xcf, 0xa0, 0xc6, 0x05, 0xf5, 0xc4, 0xd8, 0xe1,
0x21, 0xa2, 0xa0, 0xa1, 0x76, 0xbd, 0xf3, 0xc5, 0xbf, 0x50, 0x6a, 0x4c, 0xd2, 0x04, 0x24, 0xcb,
0x87, 0x35, 0xa8, 0x08, 0x6b, 0xc3, 0xb8, 0xa0, 0x1b, 0xf7, 0x5b, 0xde, 0x2a, 0x6a, 0xa8, 0x2d,
0x93, 0xb4, 0x09, 0xbf, 0x80, 0x1a, 0x4f, 0xf8, 0x67, 0xd6, 0xa2, 0x55, 0x0a, 0xd2, 0xaf, 0xee,
0x8d, 0xc3, 0x85, 0x7e, 0x0b, 0xb5, 0x4c, 0x18, 0x0c, 0x50, 0xba, 0xeb, 0x4e, 0x07, 0x93, 0x69,
0xf3, 0x19, 0xae, 0xc2, 0xf3, 0x41, 0x97, 0xdc, 0x0d, 0xfd, 0x37, 0x84, 0x6b, 0xa0, 0x4c, 0x87,
0xa3, 0xc1, 0x64, 0xda, 0x1d, 0x8d, 0x9b, 0x92, 0x7a, 0x0d, 0xb0, 0x2f, 0x2b, 0xbe, 0x02, 0x08,
0x95, 0x31, 0x3f, 0x12, 0x0a, 0xb2, 0x51, 0x22, 0xcb, 0x70, 0xa1, 0xff, 0x8e, 0xa0, 0x4c, 0xe8,
0xee, 0x2b, 0x2a, 0x28, 0x6e, 0x82, 0xbc, 0x62, 0xf7, 0x01, 0xa6, 0x4a, 0xfc, 0x47, 0xbf, 0xc0,
0x3f, 0xd2, 0xf5, 0x96, 0x05, 0x05, 0xae, 0x92, 0xf0, 0x05, 0xbf, 0x82, 0xf2, 0x5b, 0x46, 0x17,
0xcc, 0xe3, 0x2d, 0x59, 0x93, 0xdb, 0x95, 0x8e, 0x9e, 0x2d, 0x5e, 0xc4, 0x67, 0x7c, 0x13, 0x82,
0x06, 0xb6, 0xf0, 0xee, 0x49, 0xec, 0xa2, 0xbe, 0x84, 0x6a, 0xfa, 0x43, 0x3a, 0xaa, 0x72, 0x22,
0xea, 0x4b, 0xe9, 0x16, 0xe9, 0x7f, 0x21, 0x28, 0xc7, 0xc2, 0x34, 0x50, 0x92, 0xa2, 0x86, 0xba,
0x7a, 0xd2, 0x0d, 0x22, 0x7b, 0x63, 0xcc, 0x2c, 0xe5, 0xe8, 0x91, 0x1f, 0xd0, 0x53, 0xc8, 0xd3,
0x13, 0xb7, 0xfd, 0xbf, 0xd7, 0xf3, 0x07, 0x82, 0x5a, 0xcf, 0x73, 0x56, 0xfb, 0xfd, 0xfb, 0x04,
0x0a, 0x0b, 0x2a, 0x68, 0xb4, 0x7f, 0x17, 0xb9, 0x89, 0x90, 0x00, 0x82, 0x5f, 0xc3, 0x73, 0x8f,
0x2d, 0x2c, 0x8f, 0x99, 0x22, 0x5a, 0xb7, 0x83, 0x75, 0xcd, 0x30, 0x1b, 0x24, 0xc2, 0xc6, 0x24,
0x89, 0xb3, 0x7a, 0x03, 0x8d, 0x83, 0x8f, 0xfe, 0xd4, 0xd8, 0x6c, 0x37, 0x9b, 0x07, 0x0c, 0xc9,
0xe2, 0xb1, 0x5d, 0x48, 0xa9, 0xff, 0x8d, 0xa0, 0x3e, 0xde, 0xce, 0xd7, 0x16, 0x7f, 0x4b, 0xd8,
0x0f, 0x5b, 0xc6, 0xfd, 0xbd, 0x4f, 0x1f, 0x1c, 0xed, 0x6c, 0x26, 0x59, 0x6c, 0xce, 0xa9, 0x11,
0xcb, 0x96, 0xf2, 0x64, 0x47, 0xf3, 0x14, 0xca, 0x56, 0x67, 0xff, 0xf3, 0x09, 0xa1, 0xff, 0x2a,
0x41, 0x23, 0x49, 0x98, 0xbb, 0x8e, 0xcd, 0x19, 0xee, 0x43, 0xc9, 0x74, 0xec, 0x37, 0xd6, 0x32,
0xff, 0x60, 0x3c, 0x80, 0x1b, 0xfd, 0x00, 0x1b, 0x4b, 0x8c, 0x5c, 0xf1, 0xf0, 0xa8, 0x61, 0x9f,
0x9d, 0xa6, 0x79, 0xb8, 0x65, 0xb7, 0x50, 0xcb, 0xc4, 0xc0, 0x1f, 0x43, 0x23, 0x51, 0x30, 0x33,
0x9d, 0xad, 0x1d, 0x76, 0xa2, 0x48, 0xea, 0x89, 0xb9, 0xef, 0x5b, 0xcf, 0x68, 0xf6, 0x6f, 0x08,
0x2e, 0xc2, 0x60, 0x5b, 0x8f, 0x4d, 0xfd, 0x02, 0xc6, 0x3d, 0x3f, 0xa7, 0xf6, 0x5f, 0x43, 0xcd,
0x8c, 0xc8, 0x68, 0x52, 0xff, 0x4a, 0x47, 0xcb, 0x56, 0x22, 0x08, 0xd3, 0x4f, 0xe3, 0x48, 0xd6,
0x4d, 0x6f, 0xc1, 0x7b, 0x87, 0x49, 0x85, 0x55, 0xd3, 0x09, 0x5c, 0xbe, 0x66, 0x22, 0x87, 0xe1,
0xfc, 0xac, 0xf5, 0x25, 0x5c, 0x3d, 0xc0, 0x19, 0x0d, 0xc8, 0x91, 0x2c, 0x74, 0x9e, 0xac, 0x9f,
0x00, 0x1f, 0x83, 0x9e, 0xdc, 0x5d, 0xfc, 0x01, 0x80, 0xe9, 0xac, 0xd7, 0xcc, 0x0c, 0x72, 0x08,
0x25, 0xa4, 0x2c, 0xfe, 0xcf, 0xc9, 0x63, 0xee, 0xda, 0x32, 0xf7, 0xb5, 0x57, 0x48, 0xda, 0xd4,
0xf9, 0x45, 0x86, 0xe6, 0x84, 0xd1, 0x1d, 0x63, 0x8b, 0x51, 0x9c, 0x3a, 0xfe, 0x0e, 0x94, 0xe4,
0x8f, 0x88, 0x3f, 0x7c, 0xe4, 0x57, 0xa9, 0xbe, 0x7f, 0xe2, 0x18, 0xd2, 0x9f, 0xb5, 0xd1, 0x0d,
0xc2, 0x77, 0x50, 0x8e, 0x86, 0x1d, 0x5f, 0x9e, 0x3a, 0x2a, 0xd4, 0xab, 0x93, 0x1b, 0x12, 0xb1,
0x7d, 0x0f, 0xf5, 0xec, 0x2c, 0xe0, 0x17, 0x59, 0xb7, 0xdc, 0xf1, 0x55, 0x3f, 0x3a, 0x0d, 0x8a,
0x43, 0x60, 0x0f, 0x2e, 0x72, 0x9b, 0x8f, 0x0f, 0xae, 0x37, 0xa7, 0xa6, 0x4e, 0xbd, 0x7e, 0x12,
0x36, 0x8e, 0xd9, 0xd3, 0xa1, 0xc9, 0xc3, 0x2e, 0xbc, 0xe1, 0x86, 0xb9, 0xb6, 0x98, 0x2d, 0x7a,
0xf5, 0xa4, 0x21, 0x63, 0xff, 0x3e, 0x37, 0x2f, 0x05, 0xd7, 0xba, 0xcf, 0xff, 0x09, 0x00, 0x00,
0xff, 0xff, 0x14, 0x36, 0x4a, 0x4d, 0xe9, 0x09, 0x00, 0x00,
} }

3
weed/server/filer_server.go

@ -45,6 +45,7 @@ type FilerOption struct {
DataCenter string DataCenter string
DefaultLevelDbDir string DefaultLevelDbDir string
DisableHttp bool DisableHttp bool
Host string
Port uint32 Port uint32
recursiveDelete bool recursiveDelete bool
Cipher bool Cipher bool
@ -73,7 +74,7 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption)
glog.Fatal("master list is required!") glog.Fatal("master list is required!")
} }
fs.filer = filer2.NewFiler(option.Masters, fs.grpcDialOption, option.Port+10000, option.Collection, option.DefaultReplication, fs.notifyMetaListeners)
fs.filer = filer2.NewFiler(option.Masters, fs.grpcDialOption, option.Host, option.Port, option.Collection, option.DefaultReplication, fs.notifyMetaListeners)
fs.filer.Cipher = option.Cipher fs.filer.Cipher = option.Cipher
maybeStartMetrics(fs, option) maybeStartMetrics(fs, option)

3
weed/server/master_server.go

@ -32,6 +32,7 @@ const (
) )
type MasterOption struct { type MasterOption struct {
Host string
Port int Port int
MetaFolder string MetaFolder string
VolumeSizeLimitMB uint VolumeSizeLimitMB uint
@ -93,7 +94,7 @@ func NewMasterServer(r *mux.Router, option *MasterOption, peers []string) *Maste
preallocateSize: preallocateSize, preallocateSize: preallocateSize,
clientChans: make(map[string]chan *master_pb.VolumeLocation), clientChans: make(map[string]chan *master_pb.VolumeLocation),
grpcDialOption: grpcDialOption, grpcDialOption: grpcDialOption,
MasterClient: wdclient.NewMasterClient(grpcDialOption, "master", 0, peers),
MasterClient: wdclient.NewMasterClient(grpcDialOption, "master", option.Host, 0, peers),
} }
ms.bounedLeaderChan = make(chan int, 16) ms.bounedLeaderChan = make(chan int, 16)

2
weed/shell/commands.go

@ -43,7 +43,7 @@ var (
func NewCommandEnv(options ShellOptions) *CommandEnv { func NewCommandEnv(options ShellOptions) *CommandEnv {
return &CommandEnv{ return &CommandEnv{
env: make(map[string]string), env: make(map[string]string),
MasterClient: wdclient.NewMasterClient(options.GrpcDialOption, pb.AdminShellClient, 0, strings.Split(*options.Masters, ",")),
MasterClient: wdclient.NewMasterClient(options.GrpcDialOption, pb.AdminShellClient, "", 0, strings.Split(*options.Masters, ",")),
option: options, option: options,
} }
} }

4
weed/wdclient/masterclient.go

@ -14,6 +14,7 @@ import (
type MasterClient struct { type MasterClient struct {
clientType string clientType string
clientHost string
grpcPort uint32 grpcPort uint32
currentMaster string currentMaster string
masters []string masters []string
@ -22,9 +23,10 @@ type MasterClient struct {
vidMap vidMap
} }
func NewMasterClient(grpcDialOption grpc.DialOption, clientType string, clientGrpcPort uint32, masters []string) *MasterClient {
func NewMasterClient(grpcDialOption grpc.DialOption, clientType string, clientHost string, clientGrpcPort uint32, masters []string) *MasterClient {
return &MasterClient{ return &MasterClient{
clientType: clientType, clientType: clientType,
clientHost: clientHost,
grpcPort: clientGrpcPort, grpcPort: clientGrpcPort,
masters: masters, masters: masters,
grpcDialOption: grpcDialOption, grpcDialOption: grpcDialOption,

Loading…
Cancel
Save