You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
177 lines
5.3 KiB
177 lines
5.3 KiB
package weed_server
|
|
|
|
import (
|
|
"log"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/storage/blockvol"
|
|
)
|
|
|
|
// minHeartbeatInterval is the minimum allowed heartbeat interval.
|
|
// Prevents time.NewTicker panics from zero/negative values.
|
|
const minHeartbeatInterval = time.Millisecond
|
|
|
|
// BlockVolumeHeartbeatCollector periodically collects block volume status
|
|
// and delivers it via StatusCallback. Standalone -- does not touch the
|
|
// existing gRPC heartbeat stream. When proto is updated, the callback
|
|
// will be wired to stream.Send().
|
|
type BlockVolumeHeartbeatCollector struct {
|
|
blockService *BlockService
|
|
interval time.Duration
|
|
stopCh chan struct{}
|
|
done chan struct{}
|
|
stopOnce sync.Once
|
|
started atomic.Bool // BUG-CP4B2-1: tracks whether Run() was called
|
|
|
|
// cbMu protects the callback/source fields against concurrent
|
|
// read (Run goroutine) and write (SetXxx callers). BUG-CP4B3-2.
|
|
cbMu sync.Mutex
|
|
statusCallback func([]blockvol.BlockVolumeInfoMessage)
|
|
assignmentSource func() []blockvol.BlockVolumeAssignment
|
|
assignmentCallback func([]blockvol.BlockVolumeAssignment, []error)
|
|
}
|
|
|
|
// NewBlockVolumeHeartbeatCollector creates a collector that calls
|
|
// the status callback every interval with the current block volume status.
|
|
// Intervals <= 0 are clamped to minHeartbeatInterval (BUG-CP4B2-2).
|
|
func NewBlockVolumeHeartbeatCollector(
|
|
bs *BlockService, interval time.Duration,
|
|
) *BlockVolumeHeartbeatCollector {
|
|
if bs == nil {
|
|
panic("block heartbeat: nil BlockService")
|
|
}
|
|
if interval <= 0 {
|
|
interval = minHeartbeatInterval
|
|
}
|
|
return &BlockVolumeHeartbeatCollector{
|
|
blockService: bs,
|
|
interval: interval,
|
|
stopCh: make(chan struct{}),
|
|
done: make(chan struct{}),
|
|
}
|
|
}
|
|
|
|
// SetStatusCallback sets the function called each tick with collected status.
|
|
// Safe to call before or after Run() (BUG-CP4B3-2).
|
|
func (c *BlockVolumeHeartbeatCollector) SetStatusCallback(fn func([]blockvol.BlockVolumeInfoMessage)) {
|
|
c.cbMu.Lock()
|
|
c.statusCallback = fn
|
|
c.cbMu.Unlock()
|
|
}
|
|
|
|
// SetAssignmentSource sets the function called each tick to fetch pending
|
|
// assignments. If nil, no assignments are processed.
|
|
// Safe to call before or after Run() (BUG-CP4B3-2).
|
|
func (c *BlockVolumeHeartbeatCollector) SetAssignmentSource(fn func() []blockvol.BlockVolumeAssignment) {
|
|
c.cbMu.Lock()
|
|
c.assignmentSource = fn
|
|
c.cbMu.Unlock()
|
|
}
|
|
|
|
// SetAssignmentCallback sets the function called after processing with
|
|
// per-assignment errors. If nil, errors are silently dropped (already
|
|
// logged by ProcessBlockVolumeAssignments).
|
|
// Safe to call before or after Run() (BUG-CP4B3-2).
|
|
func (c *BlockVolumeHeartbeatCollector) SetAssignmentCallback(fn func([]blockvol.BlockVolumeAssignment, []error)) {
|
|
c.cbMu.Lock()
|
|
c.assignmentCallback = fn
|
|
c.cbMu.Unlock()
|
|
}
|
|
|
|
// Run blocks until Stop() is called. Collects status on each tick,
|
|
// then processes any pending assignments.
|
|
func (c *BlockVolumeHeartbeatCollector) Run() {
|
|
c.started.Store(true)
|
|
defer close(c.done)
|
|
|
|
ticker := time.NewTicker(c.interval)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-ticker.C:
|
|
// Outbound: collect and report status.
|
|
msgs := c.blockService.Store().CollectBlockVolumeHeartbeat()
|
|
c.safeCallback(msgs)
|
|
// Inbound: process any pending assignments.
|
|
c.processAssignments()
|
|
case <-c.stopCh:
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// processAssignments fetches and applies pending assignments from master.
|
|
func (c *BlockVolumeHeartbeatCollector) processAssignments() {
|
|
c.cbMu.Lock()
|
|
src := c.assignmentSource
|
|
c.cbMu.Unlock()
|
|
if src == nil {
|
|
return
|
|
}
|
|
assignments := c.safeFuncCall(src)
|
|
if len(assignments) == 0 {
|
|
return
|
|
}
|
|
errs := c.blockService.Store().ProcessBlockVolumeAssignments(assignments)
|
|
c.cbMu.Lock()
|
|
cb := c.assignmentCallback
|
|
c.cbMu.Unlock()
|
|
if cb != nil {
|
|
c.safeAssignmentCallback(cb, assignments, errs)
|
|
}
|
|
}
|
|
|
|
// safeCallback loads and invokes the status callback with panic recovery
|
|
// (BUG-CP4B2-3). Lock is held only for the load, not during the call.
|
|
func (c *BlockVolumeHeartbeatCollector) safeCallback(msgs []blockvol.BlockVolumeInfoMessage) {
|
|
c.cbMu.Lock()
|
|
fn := c.statusCallback
|
|
c.cbMu.Unlock()
|
|
if fn == nil {
|
|
return
|
|
}
|
|
defer func() {
|
|
if r := recover(); r != nil {
|
|
log.Printf("block heartbeat: callback panic: %v", r)
|
|
}
|
|
}()
|
|
fn(msgs)
|
|
}
|
|
|
|
// safeFuncCall invokes an AssignmentSource with panic recovery.
|
|
func (c *BlockVolumeHeartbeatCollector) safeFuncCall(
|
|
fn func() []blockvol.BlockVolumeAssignment,
|
|
) []blockvol.BlockVolumeAssignment {
|
|
defer func() {
|
|
if r := recover(); r != nil {
|
|
log.Printf("block heartbeat: assignment source panic: %v", r)
|
|
}
|
|
}()
|
|
return fn()
|
|
}
|
|
|
|
// safeAssignmentCallback invokes an AssignmentCallback with panic recovery.
|
|
func (c *BlockVolumeHeartbeatCollector) safeAssignmentCallback(
|
|
fn func([]blockvol.BlockVolumeAssignment, []error),
|
|
assignments []blockvol.BlockVolumeAssignment, errs []error,
|
|
) {
|
|
defer func() {
|
|
if r := recover(); r != nil {
|
|
log.Printf("block heartbeat: assignment callback panic: %v", r)
|
|
}
|
|
}()
|
|
fn(assignments, errs)
|
|
}
|
|
|
|
// Stop signals Run() to exit and waits for it to finish.
|
|
// Safe to call even if Run() was never started (BUG-CP4B2-1).
|
|
func (c *BlockVolumeHeartbeatCollector) Stop() {
|
|
c.stopOnce.Do(func() { close(c.stopCh) })
|
|
if !c.started.Load() {
|
|
return // Run() never started -- nothing to wait for.
|
|
}
|
|
<-c.done
|
|
}
|