|
@ -9,6 +9,8 @@ import ( |
|
|
"github.com/seaweedfs/seaweedfs/weed/topology" |
|
|
"github.com/seaweedfs/seaweedfs/weed/topology" |
|
|
) |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
// Collector gathers telemetry data from a SeaweedFS cluster
|
|
|
|
|
|
// Only the leader master will send telemetry to avoid duplicates
|
|
|
type Collector struct { |
|
|
type Collector struct { |
|
|
client *Client |
|
|
client *Client |
|
|
topo *topology.Topology |
|
|
topo *topology.Topology |
|
@ -45,12 +47,27 @@ func (c *Collector) SetMasterServer(masterServer interface{}) { |
|
|
c.masterServer = masterServer |
|
|
c.masterServer = masterServer |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// isLeader checks if this master is the leader
|
|
|
|
|
|
func (c *Collector) isLeader() bool { |
|
|
|
|
|
if c.topo == nil { |
|
|
|
|
|
return false |
|
|
|
|
|
} |
|
|
|
|
|
return c.topo.IsLeader() |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
// CollectAndSendAsync collects telemetry data and sends it asynchronously
|
|
|
// CollectAndSendAsync collects telemetry data and sends it asynchronously
|
|
|
|
|
|
// Only sends telemetry if this master is the leader
|
|
|
func (c *Collector) CollectAndSendAsync() { |
|
|
func (c *Collector) CollectAndSendAsync() { |
|
|
if !c.client.IsEnabled() { |
|
|
if !c.client.IsEnabled() { |
|
|
return |
|
|
return |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// Only leader master should send telemetry to avoid duplicates
|
|
|
|
|
|
if !c.isLeader() { |
|
|
|
|
|
glog.V(2).Infof("Skipping telemetry collection - not the leader master") |
|
|
|
|
|
return |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
go func() { |
|
|
go func() { |
|
|
data := c.collectData() |
|
|
data := c.collectData() |
|
|
c.client.SendTelemetryAsync(data) |
|
|
c.client.SendTelemetryAsync(data) |
|
@ -69,7 +86,11 @@ func (c *Collector) StartPeriodicCollection(interval time.Duration) { |
|
|
// Send initial telemetry after a short delay
|
|
|
// Send initial telemetry after a short delay
|
|
|
go func() { |
|
|
go func() { |
|
|
time.Sleep(61 * time.Second) // Wait for cluster to stabilize
|
|
|
time.Sleep(61 * time.Second) // Wait for cluster to stabilize
|
|
|
c.CollectAndSendAsync() |
|
|
|
|
|
|
|
|
if c.isLeader() { |
|
|
|
|
|
c.CollectAndSendAsync() |
|
|
|
|
|
} else { |
|
|
|
|
|
glog.V(2).Infof("Skipping initial telemetry collection - not the leader master") |
|
|
|
|
|
} |
|
|
}() |
|
|
}() |
|
|
|
|
|
|
|
|
// Start periodic collection
|
|
|
// Start periodic collection
|
|
@ -77,7 +98,12 @@ func (c *Collector) StartPeriodicCollection(interval time.Duration) { |
|
|
go func() { |
|
|
go func() { |
|
|
defer ticker.Stop() |
|
|
defer ticker.Stop() |
|
|
for range ticker.C { |
|
|
for range ticker.C { |
|
|
c.CollectAndSendAsync() |
|
|
|
|
|
|
|
|
// Check leadership before each collection
|
|
|
|
|
|
if c.isLeader() { |
|
|
|
|
|
c.CollectAndSendAsync() |
|
|
|
|
|
} else { |
|
|
|
|
|
glog.V(2).Infof("Skipping periodic telemetry collection - not the leader master") |
|
|
|
|
|
} |
|
|
} |
|
|
} |
|
|
}() |
|
|
}() |
|
|
} |
|
|
} |
|
|