|
@ -62,6 +62,10 @@ type FilerServer struct { |
|
|
filer *filer.Filer |
|
|
filer *filer.Filer |
|
|
grpcDialOption grpc.DialOption |
|
|
grpcDialOption grpc.DialOption |
|
|
|
|
|
|
|
|
|
|
|
// metrics read from the master
|
|
|
|
|
|
metricsAddress string |
|
|
|
|
|
metricsIntervalSec int |
|
|
|
|
|
|
|
|
// notifying clients
|
|
|
// notifying clients
|
|
|
listenersLock sync.Mutex |
|
|
listenersLock sync.Mutex |
|
|
listenersCond *sync.Cond |
|
|
listenersCond *sync.Cond |
|
@ -88,7 +92,7 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) |
|
|
}) |
|
|
}) |
|
|
fs.filer.Cipher = option.Cipher |
|
|
fs.filer.Cipher = option.Cipher |
|
|
|
|
|
|
|
|
maybeStartMetrics(fs, option) |
|
|
|
|
|
|
|
|
fs.maybeStartMetrics() |
|
|
|
|
|
|
|
|
go fs.filer.KeepConnectedToMaster() |
|
|
go fs.filer.KeepConnectedToMaster() |
|
|
|
|
|
|
|
@ -131,9 +135,9 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) |
|
|
return fs, nil |
|
|
return fs, nil |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
func maybeStartMetrics(fs *FilerServer, option *FilerOption) { |
|
|
|
|
|
|
|
|
func (fs *FilerServer) maybeStartMetrics() { |
|
|
|
|
|
|
|
|
for _, master := range option.Masters { |
|
|
|
|
|
|
|
|
for _, master := range fs.option.Masters { |
|
|
_, err := pb.ParseFilerGrpcAddress(master) |
|
|
_, err := pb.ParseFilerGrpcAddress(master) |
|
|
if err != nil { |
|
|
if err != nil { |
|
|
glog.Fatalf("invalid master address %s: %v", master, err) |
|
|
glog.Fatalf("invalid master address %s: %v", master, err) |
|
@ -141,12 +145,10 @@ func maybeStartMetrics(fs *FilerServer, option *FilerOption) { |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
isConnected := false |
|
|
isConnected := false |
|
|
var metricsAddress string |
|
|
|
|
|
var metricsIntervalSec int |
|
|
|
|
|
var readErr error |
|
|
var readErr error |
|
|
for !isConnected { |
|
|
for !isConnected { |
|
|
for _, master := range option.Masters { |
|
|
|
|
|
metricsAddress, metricsIntervalSec, readErr = readFilerConfiguration(fs.grpcDialOption, master) |
|
|
|
|
|
|
|
|
for _, master := range fs.option.Masters { |
|
|
|
|
|
fs.metricsAddress, fs.metricsIntervalSec, readErr = readFilerConfiguration(fs.grpcDialOption, master) |
|
|
if readErr == nil { |
|
|
if readErr == nil { |
|
|
isConnected = true |
|
|
isConnected = true |
|
|
} else { |
|
|
} else { |
|
@ -154,10 +156,10 @@ func maybeStartMetrics(fs *FilerServer, option *FilerOption) { |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
if metricsAddress == "" && metricsIntervalSec <= 0 { |
|
|
|
|
|
|
|
|
if fs.metricsAddress == "" && fs.metricsIntervalSec <= 0 { |
|
|
return |
|
|
return |
|
|
} |
|
|
} |
|
|
go stats.LoopPushingMetric("filer", stats.SourceName(option.Port), stats.FilerGather, metricsAddress, metricsIntervalSec) |
|
|
|
|
|
|
|
|
go stats.LoopPushingMetric("filer", stats.SourceName(fs.option.Port), stats.FilerGather, fs.metricsAddress, fs.metricsIntervalSec) |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
func readFilerConfiguration(grpcDialOption grpc.DialOption, masterAddress string) (metricsAddress string, metricsIntervalSec int, err error) { |
|
|
func readFilerConfiguration(grpcDialOption grpc.DialOption, masterAddress string) (metricsAddress string, metricsIntervalSec int, err error) { |
|
|