Browse Source
feat: Phase 4A CP2 — WAL shipping, replica barrier, distributed group commit
feat: Phase 4A CP2 — WAL shipping, replica barrier, distributed group commit
Primary ships WAL entries to replica over TCP (data channel), confirms durability via barrier RPC (control channel). SyncCache runs local fsync and replica barrier in parallel via MakeDistributedSync. When replica is unreachable, shipper enters permanent degraded mode and falls back to local-only sync (Phase 3 behavior). Key design: two separate TCP ports (data+control), contiguous LSN enforcement, epoch equality check, WAL-full retry on replica, cond.Wait-based barrier with configurable timeout, BarrierFsyncFailed status code. Close lifecycle: shipper → receiver → drain → committer → flusher → fd. New files: repl_proto.go, wal_shipper.go, replica_apply.go, replica_barrier.go, dist_group_commit.go Modified: blockvol.go, blockvol_test.go 27 dev tests + 21 QA tests = 48 new tests; 889 total (609 engine + 280 iSCSI), all passing. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>feature/sw-block
8 changed files with 3006 additions and 1 deletions
-
61weed/storage/blockvol/blockvol.go
-
1104weed/storage/blockvol/blockvol_test.go
-
46weed/storage/blockvol/dist_group_commit.go
-
1104weed/storage/blockvol/qa_phase4a_cp2_test.go
-
107weed/storage/blockvol/repl_proto.go
-
287weed/storage/blockvol/replica_apply.go
-
101weed/storage/blockvol/replica_barrier.go
-
197weed/storage/blockvol/wal_shipper.go
1104
weed/storage/blockvol/blockvol_test.go
File diff suppressed because it is too large
View File
File diff suppressed because it is too large
View File
@ -0,0 +1,46 @@ |
|||
package blockvol |
|||
|
|||
import ( |
|||
"sync" |
|||
) |
|||
|
|||
// MakeDistributedSync creates a sync function that runs local WAL fsync and
|
|||
// replica barrier in parallel. If no replica is configured or the replica is
|
|||
// degraded, it falls back to local-only sync (Phase 3 behavior).
|
|||
//
|
|||
// walSync: the local fsync function (typically fd.Sync)
|
|||
// shipper: the WAL shipper to the replica (may be nil)
|
|||
// vol: the BlockVol (used to read nextLSN and trigger degradation)
|
|||
func MakeDistributedSync(walSync func() error, shipper *WALShipper, vol *BlockVol) func() error { |
|||
return func() error { |
|||
if shipper == nil || shipper.IsDegraded() { |
|||
return walSync() |
|||
} |
|||
|
|||
// The highest LSN that needs to be durable is nextLSN-1.
|
|||
lsnMax := vol.nextLSN.Load() - 1 |
|||
|
|||
var localErr, remoteErr error |
|||
var wg sync.WaitGroup |
|||
wg.Add(2) |
|||
go func() { |
|||
defer wg.Done() |
|||
localErr = walSync() |
|||
}() |
|||
go func() { |
|||
defer wg.Done() |
|||
remoteErr = shipper.Barrier(lsnMax) |
|||
}() |
|||
wg.Wait() |
|||
|
|||
if localErr != nil { |
|||
return localErr |
|||
} |
|||
if remoteErr != nil { |
|||
// Local succeeded, replica failed — degrade but don't fail the client.
|
|||
vol.degradeReplica(remoteErr) |
|||
return nil |
|||
} |
|||
return nil |
|||
} |
|||
} |
|||
1104
weed/storage/blockvol/qa_phase4a_cp2_test.go
File diff suppressed because it is too large
View File
File diff suppressed because it is too large
View File
@ -0,0 +1,107 @@ |
|||
package blockvol |
|||
|
|||
import ( |
|||
"encoding/binary" |
|||
"errors" |
|||
"fmt" |
|||
"io" |
|||
) |
|||
|
|||
// Data channel message types.
|
|||
const ( |
|||
MsgWALEntry byte = 0x01 |
|||
) |
|||
|
|||
// Control channel message types.
|
|||
const ( |
|||
MsgBarrierReq byte = 0x01 |
|||
MsgBarrierResp byte = 0x02 |
|||
) |
|||
|
|||
// Barrier response status codes.
|
|||
const ( |
|||
BarrierOK byte = 0x00 |
|||
BarrierEpochMismatch byte = 0x01 |
|||
BarrierTimeout byte = 0x02 |
|||
BarrierFsyncFailed byte = 0x03 |
|||
) |
|||
|
|||
// BarrierRequest is sent by the primary to the replica on the control channel.
|
|||
type BarrierRequest struct { |
|||
Vid uint32 |
|||
LSN uint64 |
|||
Epoch uint64 |
|||
} |
|||
|
|||
// BarrierResponse is the replica's reply to a barrier request.
|
|||
type BarrierResponse struct { |
|||
Status byte |
|||
} |
|||
|
|||
// Frame header: [1B type][4B payload_len].
|
|||
const frameHeaderSize = 5 |
|||
|
|||
// maxFramePayload caps the payload size to prevent OOM on corrupt data.
|
|||
const maxFramePayload = 256 * 1024 * 1024 // 256MB
|
|||
|
|||
var ( |
|||
ErrFrameTooLarge = errors.New("repl: frame payload exceeds maximum size") |
|||
ErrFrameEmpty = errors.New("repl: empty frame payload") |
|||
) |
|||
|
|||
// WriteFrame writes a length-prefixed frame: [1B type][4B len][payload].
|
|||
func WriteFrame(w io.Writer, msgType byte, payload []byte) error { |
|||
hdr := make([]byte, frameHeaderSize) |
|||
hdr[0] = msgType |
|||
binary.BigEndian.PutUint32(hdr[1:5], uint32(len(payload))) |
|||
if _, err := w.Write(hdr); err != nil { |
|||
return fmt.Errorf("repl: write frame header: %w", err) |
|||
} |
|||
if len(payload) > 0 { |
|||
if _, err := w.Write(payload); err != nil { |
|||
return fmt.Errorf("repl: write frame payload: %w", err) |
|||
} |
|||
} |
|||
return nil |
|||
} |
|||
|
|||
// ReadFrame reads a length-prefixed frame and returns message type and payload.
|
|||
func ReadFrame(r io.Reader) (msgType byte, payload []byte, err error) { |
|||
hdr := make([]byte, frameHeaderSize) |
|||
if _, err = io.ReadFull(r, hdr); err != nil { |
|||
return 0, nil, fmt.Errorf("repl: read frame header: %w", err) |
|||
} |
|||
msgType = hdr[0] |
|||
payloadLen := binary.BigEndian.Uint32(hdr[1:5]) |
|||
if payloadLen > maxFramePayload { |
|||
return 0, nil, ErrFrameTooLarge |
|||
} |
|||
payload = make([]byte, payloadLen) |
|||
if payloadLen > 0 { |
|||
if _, err = io.ReadFull(r, payload); err != nil { |
|||
return 0, nil, fmt.Errorf("repl: read frame payload: %w", err) |
|||
} |
|||
} |
|||
return msgType, payload, nil |
|||
} |
|||
|
|||
// EncodeBarrierRequest serializes a BarrierRequest (4+8+8 = 20 bytes).
|
|||
func EncodeBarrierRequest(req BarrierRequest) []byte { |
|||
buf := make([]byte, 20) |
|||
binary.BigEndian.PutUint32(buf[0:4], req.Vid) |
|||
binary.BigEndian.PutUint64(buf[4:12], req.LSN) |
|||
binary.BigEndian.PutUint64(buf[12:20], req.Epoch) |
|||
return buf |
|||
} |
|||
|
|||
// DecodeBarrierRequest deserializes a BarrierRequest.
|
|||
func DecodeBarrierRequest(buf []byte) (BarrierRequest, error) { |
|||
if len(buf) < 20 { |
|||
return BarrierRequest{}, fmt.Errorf("repl: barrier request too short: %d bytes", len(buf)) |
|||
} |
|||
return BarrierRequest{ |
|||
Vid: binary.BigEndian.Uint32(buf[0:4]), |
|||
LSN: binary.BigEndian.Uint64(buf[4:12]), |
|||
Epoch: binary.BigEndian.Uint64(buf[12:20]), |
|||
}, nil |
|||
} |
|||
@ -0,0 +1,287 @@ |
|||
package blockvol |
|||
|
|||
import ( |
|||
"errors" |
|||
"fmt" |
|||
"log" |
|||
"net" |
|||
"sync" |
|||
"time" |
|||
) |
|||
|
|||
var ( |
|||
ErrStaleEpoch = errors.New("blockvol: stale epoch") |
|||
ErrDuplicateLSN = errors.New("blockvol: duplicate or out-of-order LSN") |
|||
) |
|||
|
|||
// ReplicaReceiver listens for WAL entries from a primary and applies them
|
|||
// to the local BlockVol. It runs two listeners: one for the data channel
|
|||
// (WAL entries) and one for the control channel (barrier requests).
|
|||
type ReplicaReceiver struct { |
|||
vol *BlockVol |
|||
barrierTimeout time.Duration |
|||
|
|||
mu sync.Mutex |
|||
receivedLSN uint64 |
|||
cond *sync.Cond |
|||
|
|||
connMu sync.Mutex // protects activeConns
|
|||
activeConns map[net.Conn]struct{} |
|||
|
|||
dataListener net.Listener |
|||
ctrlListener net.Listener |
|||
stopCh chan struct{} |
|||
stopped bool |
|||
wg sync.WaitGroup |
|||
} |
|||
|
|||
const defaultBarrierTimeout = 5 * time.Second |
|||
|
|||
// NewReplicaReceiver creates and starts listening on the data and control ports.
|
|||
func NewReplicaReceiver(vol *BlockVol, dataAddr, ctrlAddr string) (*ReplicaReceiver, error) { |
|||
dataLn, err := net.Listen("tcp", dataAddr) |
|||
if err != nil { |
|||
return nil, fmt.Errorf("replica: listen data %s: %w", dataAddr, err) |
|||
} |
|||
ctrlLn, err := net.Listen("tcp", ctrlAddr) |
|||
if err != nil { |
|||
dataLn.Close() |
|||
return nil, fmt.Errorf("replica: listen ctrl %s: %w", ctrlAddr, err) |
|||
} |
|||
|
|||
r := &ReplicaReceiver{ |
|||
vol: vol, |
|||
barrierTimeout: defaultBarrierTimeout, |
|||
dataListener: dataLn, |
|||
ctrlListener: ctrlLn, |
|||
stopCh: make(chan struct{}), |
|||
activeConns: make(map[net.Conn]struct{}), |
|||
} |
|||
r.cond = sync.NewCond(&r.mu) |
|||
return r, nil |
|||
} |
|||
|
|||
// Serve starts accept loops for both listeners. Call Stop() to shut down.
|
|||
func (r *ReplicaReceiver) Serve() { |
|||
r.wg.Add(2) |
|||
go r.acceptDataLoop() |
|||
go r.acceptCtrlLoop() |
|||
} |
|||
|
|||
// Stop shuts down both listeners, closes active connections, and waits for goroutines.
|
|||
func (r *ReplicaReceiver) Stop() { |
|||
r.mu.Lock() |
|||
if r.stopped { |
|||
r.mu.Unlock() |
|||
return |
|||
} |
|||
r.stopped = true |
|||
r.mu.Unlock() |
|||
|
|||
close(r.stopCh) |
|||
r.dataListener.Close() |
|||
r.ctrlListener.Close() |
|||
|
|||
// Close all active connections to unblock ReadFrame calls.
|
|||
r.connMu.Lock() |
|||
for conn := range r.activeConns { |
|||
conn.Close() |
|||
} |
|||
r.connMu.Unlock() |
|||
|
|||
// Wake any barrier waiters so they can exit (must hold mu for cond).
|
|||
r.mu.Lock() |
|||
r.cond.Broadcast() |
|||
r.mu.Unlock() |
|||
r.wg.Wait() |
|||
} |
|||
|
|||
func (r *ReplicaReceiver) trackConn(conn net.Conn) { |
|||
r.connMu.Lock() |
|||
r.activeConns[conn] = struct{}{} |
|||
r.connMu.Unlock() |
|||
} |
|||
|
|||
func (r *ReplicaReceiver) untrackConn(conn net.Conn) { |
|||
r.connMu.Lock() |
|||
delete(r.activeConns, conn) |
|||
r.connMu.Unlock() |
|||
} |
|||
|
|||
func (r *ReplicaReceiver) acceptDataLoop() { |
|||
defer r.wg.Done() |
|||
for { |
|||
conn, err := r.dataListener.Accept() |
|||
if err != nil { |
|||
select { |
|||
case <-r.stopCh: |
|||
return |
|||
default: |
|||
log.Printf("replica: data accept error: %v", err) |
|||
return |
|||
} |
|||
} |
|||
r.trackConn(conn) |
|||
r.wg.Add(1) |
|||
go func() { |
|||
defer r.wg.Done() |
|||
defer r.untrackConn(conn) |
|||
r.handleDataConn(conn) |
|||
}() |
|||
} |
|||
} |
|||
|
|||
func (r *ReplicaReceiver) acceptCtrlLoop() { |
|||
defer r.wg.Done() |
|||
for { |
|||
conn, err := r.ctrlListener.Accept() |
|||
if err != nil { |
|||
select { |
|||
case <-r.stopCh: |
|||
return |
|||
default: |
|||
log.Printf("replica: ctrl accept error: %v", err) |
|||
return |
|||
} |
|||
} |
|||
r.trackConn(conn) |
|||
r.wg.Add(1) |
|||
go func() { |
|||
defer r.wg.Done() |
|||
defer r.untrackConn(conn) |
|||
r.handleControlConn(conn) |
|||
}() |
|||
} |
|||
} |
|||
|
|||
// handleDataConn reads WAL entry frames and applies them to the local volume.
|
|||
func (r *ReplicaReceiver) handleDataConn(conn net.Conn) { |
|||
defer conn.Close() |
|||
for { |
|||
select { |
|||
case <-r.stopCh: |
|||
return |
|||
default: |
|||
} |
|||
|
|||
msgType, payload, err := ReadFrame(conn) |
|||
if err != nil { |
|||
select { |
|||
case <-r.stopCh: |
|||
default: |
|||
log.Printf("replica: data read error: %v", err) |
|||
} |
|||
return |
|||
} |
|||
|
|||
if msgType != MsgWALEntry { |
|||
log.Printf("replica: unexpected data message type 0x%02x", msgType) |
|||
continue |
|||
} |
|||
|
|||
if err := r.applyEntry(payload); err != nil { |
|||
log.Printf("replica: apply entry error: %v", err) |
|||
} |
|||
} |
|||
} |
|||
|
|||
// applyEntry decodes and applies a single WAL entry to the local volume.
|
|||
// The entire apply (LSN check → WAL append → dirty map → receivedLSN update)
|
|||
// is serialized under mu to prevent TOCTOU races between concurrent entries.
|
|||
func (r *ReplicaReceiver) applyEntry(payload []byte) error { |
|||
entry, err := DecodeWALEntry(payload) |
|||
if err != nil { |
|||
return fmt.Errorf("decode WAL entry: %w", err) |
|||
} |
|||
|
|||
// Validate epoch: replicas must NOT accept epoch bumps from WAL stream.
|
|||
// Only the master can change epochs (via SetEpoch in CP3).
|
|||
localEpoch := r.vol.epoch.Load() |
|||
if entry.Epoch != localEpoch { |
|||
return fmt.Errorf("%w: entry epoch %d != local %d", ErrStaleEpoch, entry.Epoch, localEpoch) |
|||
} |
|||
|
|||
r.mu.Lock() |
|||
defer r.mu.Unlock() |
|||
|
|||
// Enforce contiguous LSN: only accept the next expected entry.
|
|||
// This prevents gaps that would let a barrier pass incorrectly.
|
|||
if entry.LSN <= r.receivedLSN { |
|||
log.Printf("replica: skipping duplicate/old LSN %d (received %d)", entry.LSN, r.receivedLSN) |
|||
return nil |
|||
} |
|||
if entry.LSN != r.receivedLSN+1 { |
|||
return fmt.Errorf("%w: expected LSN %d, got %d (gap)", ErrDuplicateLSN, r.receivedLSN+1, entry.LSN) |
|||
} |
|||
|
|||
// Append to local WAL (with retry on WAL full).
|
|||
walOff, err := r.replicaAppendWithRetry(&entry) |
|||
if err != nil { |
|||
return fmt.Errorf("WAL append: %w", err) |
|||
} |
|||
|
|||
// Update dirty map.
|
|||
switch entry.Type { |
|||
case EntryTypeWrite, EntryTypeTrim: |
|||
blocks := entry.Length / r.vol.super.BlockSize |
|||
for i := uint32(0); i < blocks; i++ { |
|||
r.vol.dirtyMap.Put(entry.LBA+uint64(i), walOff, entry.LSN, r.vol.super.BlockSize) |
|||
} |
|||
} |
|||
|
|||
// Update receivedLSN and signal barrier waiters.
|
|||
r.receivedLSN = entry.LSN |
|||
r.cond.Broadcast() |
|||
|
|||
return nil |
|||
} |
|||
|
|||
// replicaAppendWithRetry appends a WAL entry, retrying on WAL-full by
|
|||
// triggering the flusher. Caller must hold r.mu.
|
|||
func (r *ReplicaReceiver) replicaAppendWithRetry(entry *WALEntry) (uint64, error) { |
|||
walOff, err := r.vol.wal.Append(entry) |
|||
if !errors.Is(err, ErrWALFull) { |
|||
return walOff, err |
|||
} |
|||
|
|||
deadline := time.After(r.vol.config.WALFullTimeout) |
|||
for errors.Is(err, ErrWALFull) { |
|||
select { |
|||
case <-r.stopCh: |
|||
return 0, fmt.Errorf("replica: stopped during WAL retry") |
|||
default: |
|||
} |
|||
if r.vol.flusher != nil { |
|||
r.vol.flusher.NotifyUrgent() |
|||
} |
|||
// Release mu briefly so barrier waiters can proceed and
|
|||
// the flusher can make progress (it may need dirty map lock).
|
|||
r.mu.Unlock() |
|||
select { |
|||
case <-deadline: |
|||
r.mu.Lock() |
|||
return 0, fmt.Errorf("replica: WAL full timeout: %w", ErrWALFull) |
|||
case <-time.After(1 * time.Millisecond): |
|||
} |
|||
r.mu.Lock() |
|||
walOff, err = r.vol.wal.Append(entry) |
|||
} |
|||
return walOff, err |
|||
} |
|||
|
|||
// ReceivedLSN returns the highest LSN received and written to the local WAL.
|
|||
func (r *ReplicaReceiver) ReceivedLSN() uint64 { |
|||
r.mu.Lock() |
|||
defer r.mu.Unlock() |
|||
return r.receivedLSN |
|||
} |
|||
|
|||
// DataAddr returns the data listener's address (useful for tests with :0 ports).
|
|||
func (r *ReplicaReceiver) DataAddr() string { |
|||
return r.dataListener.Addr().String() |
|||
} |
|||
|
|||
// CtrlAddr returns the control listener's address.
|
|||
func (r *ReplicaReceiver) CtrlAddr() string { |
|||
return r.ctrlListener.Addr().String() |
|||
} |
|||
@ -0,0 +1,101 @@ |
|||
package blockvol |
|||
|
|||
import ( |
|||
"log" |
|||
"net" |
|||
"time" |
|||
) |
|||
|
|||
// handleControlConn reads barrier requests from the control channel and
|
|||
// responds with barrier status after ensuring durability.
|
|||
func (r *ReplicaReceiver) handleControlConn(conn net.Conn) { |
|||
defer conn.Close() |
|||
for { |
|||
select { |
|||
case <-r.stopCh: |
|||
return |
|||
default: |
|||
} |
|||
|
|||
msgType, payload, err := ReadFrame(conn) |
|||
if err != nil { |
|||
select { |
|||
case <-r.stopCh: |
|||
default: |
|||
log.Printf("replica: ctrl read error: %v", err) |
|||
} |
|||
return |
|||
} |
|||
|
|||
if msgType != MsgBarrierReq { |
|||
log.Printf("replica: unexpected ctrl message type 0x%02x", msgType) |
|||
continue |
|||
} |
|||
|
|||
req, err := DecodeBarrierRequest(payload) |
|||
if err != nil { |
|||
log.Printf("replica: decode barrier request: %v", err) |
|||
continue |
|||
} |
|||
|
|||
resp := r.handleBarrier(req) |
|||
|
|||
respPayload := []byte{resp.Status} |
|||
if err := WriteFrame(conn, MsgBarrierResp, respPayload); err != nil { |
|||
log.Printf("replica: write barrier response: %v", err) |
|||
return |
|||
} |
|||
} |
|||
} |
|||
|
|||
// handleBarrier waits until all WAL entries up to req.LSN have been received,
|
|||
// then fsyncs the WAL to ensure durability.
|
|||
func (r *ReplicaReceiver) handleBarrier(req BarrierRequest) BarrierResponse { |
|||
// Fail fast on epoch mismatch.
|
|||
localEpoch := r.vol.epoch.Load() |
|||
if req.Epoch != localEpoch { |
|||
return BarrierResponse{Status: BarrierEpochMismatch} |
|||
} |
|||
|
|||
// Use a timer goroutine to wake us on deadline.
|
|||
timer := time.NewTimer(r.barrierTimeout) |
|||
defer timer.Stop() |
|||
timedOut := make(chan struct{}) |
|||
go func() { |
|||
select { |
|||
case <-timer.C: |
|||
close(timedOut) |
|||
// Broadcast to wake any cond.Wait blocked in the loop below.
|
|||
r.mu.Lock() |
|||
r.cond.Broadcast() |
|||
r.mu.Unlock() |
|||
case <-r.stopCh: |
|||
} |
|||
}() |
|||
|
|||
r.mu.Lock() |
|||
for r.receivedLSN < req.LSN { |
|||
// Check if timed out or stopped.
|
|||
select { |
|||
case <-timedOut: |
|||
r.mu.Unlock() |
|||
return BarrierResponse{Status: BarrierTimeout} |
|||
case <-r.stopCh: |
|||
r.mu.Unlock() |
|||
return BarrierResponse{Status: BarrierTimeout} |
|||
default: |
|||
} |
|||
|
|||
// Block on cond.Wait — woken by applyEntry or timeout goroutine.
|
|||
r.cond.Wait() |
|||
} |
|||
r.mu.Unlock() |
|||
|
|||
// fsync WAL AFTER confirming all entries received.
|
|||
if err := r.vol.fd.Sync(); err != nil { |
|||
log.Printf("replica: barrier fsync error: %v", err) |
|||
return BarrierResponse{Status: BarrierFsyncFailed} |
|||
} |
|||
|
|||
return BarrierResponse{Status: BarrierOK} |
|||
} |
|||
@ -0,0 +1,197 @@ |
|||
package blockvol |
|||
|
|||
import ( |
|||
"errors" |
|||
"fmt" |
|||
"log" |
|||
"net" |
|||
"sync" |
|||
"sync/atomic" |
|||
"time" |
|||
) |
|||
|
|||
var ( |
|||
ErrReplicaDegraded = errors.New("blockvol: replica degraded") |
|||
ErrShipperStopped = errors.New("blockvol: shipper stopped") |
|||
) |
|||
|
|||
const barrierTimeout = 5 * time.Second |
|||
|
|||
// WALShipper streams WAL entries from the primary to a replica over TCP.
|
|||
// Fire-and-forget: no per-entry ACK. Barriers provide durability confirmation.
|
|||
type WALShipper struct { |
|||
dataAddr string |
|||
controlAddr string |
|||
epochFn func() uint64 |
|||
|
|||
mu sync.Mutex // protects dataConn
|
|||
dataConn net.Conn |
|||
|
|||
ctrlMu sync.Mutex // protects ctrlConn
|
|||
ctrlConn net.Conn |
|||
|
|||
shippedLSN atomic.Uint64 |
|||
degraded atomic.Bool |
|||
stopped atomic.Bool |
|||
} |
|||
|
|||
// NewWALShipper creates a WAL shipper. Connections are established lazily on
|
|||
// first Ship/Barrier call. epochFn returns the current epoch for validation.
|
|||
func NewWALShipper(dataAddr, controlAddr string, epochFn func() uint64) *WALShipper { |
|||
return &WALShipper{ |
|||
dataAddr: dataAddr, |
|||
controlAddr: controlAddr, |
|||
epochFn: epochFn, |
|||
} |
|||
} |
|||
|
|||
// Ship sends a WAL entry to the replica over the data channel.
|
|||
// On write error, the shipper enters degraded mode permanently.
|
|||
func (s *WALShipper) Ship(entry *WALEntry) error { |
|||
if s.stopped.Load() || s.degraded.Load() { |
|||
return nil |
|||
} |
|||
|
|||
// Validate epoch: drop stale entries.
|
|||
if entry.Epoch != s.epochFn() { |
|||
log.Printf("wal_shipper: dropping entry LSN=%d with stale epoch %d (current %d)", |
|||
entry.LSN, entry.Epoch, s.epochFn()) |
|||
return nil |
|||
} |
|||
|
|||
encoded, err := entry.Encode() |
|||
if err != nil { |
|||
return fmt.Errorf("wal_shipper: encode entry: %w", err) |
|||
} |
|||
|
|||
s.mu.Lock() |
|||
defer s.mu.Unlock() |
|||
|
|||
if err := s.ensureDataConn(); err != nil { |
|||
s.markDegraded() |
|||
return nil |
|||
} |
|||
|
|||
if err := WriteFrame(s.dataConn, MsgWALEntry, encoded); err != nil { |
|||
s.markDegraded() |
|||
return nil |
|||
} |
|||
|
|||
s.shippedLSN.Store(entry.LSN) |
|||
return nil |
|||
} |
|||
|
|||
// Barrier sends a barrier request on the control channel and waits for the
|
|||
// replica to confirm durability up to lsnMax. Returns ErrReplicaDegraded if
|
|||
// the shipper is in degraded mode.
|
|||
func (s *WALShipper) Barrier(lsnMax uint64) error { |
|||
if s.stopped.Load() { |
|||
return ErrShipperStopped |
|||
} |
|||
if s.degraded.Load() { |
|||
return ErrReplicaDegraded |
|||
} |
|||
|
|||
req := EncodeBarrierRequest(BarrierRequest{ |
|||
LSN: lsnMax, |
|||
Epoch: s.epochFn(), |
|||
}) |
|||
|
|||
s.ctrlMu.Lock() |
|||
defer s.ctrlMu.Unlock() |
|||
|
|||
if err := s.ensureCtrlConn(); err != nil { |
|||
s.markDegraded() |
|||
return ErrReplicaDegraded |
|||
} |
|||
|
|||
s.ctrlConn.SetDeadline(time.Now().Add(barrierTimeout)) |
|||
|
|||
if err := WriteFrame(s.ctrlConn, MsgBarrierReq, req); err != nil { |
|||
s.markDegraded() |
|||
return ErrReplicaDegraded |
|||
} |
|||
|
|||
msgType, payload, err := ReadFrame(s.ctrlConn) |
|||
if err != nil { |
|||
s.markDegraded() |
|||
return ErrReplicaDegraded |
|||
} |
|||
|
|||
if msgType != MsgBarrierResp || len(payload) < 1 { |
|||
s.markDegraded() |
|||
return ErrReplicaDegraded |
|||
} |
|||
|
|||
switch payload[0] { |
|||
case BarrierOK: |
|||
return nil |
|||
case BarrierEpochMismatch: |
|||
return fmt.Errorf("wal_shipper: barrier epoch mismatch") |
|||
case BarrierTimeout: |
|||
return fmt.Errorf("wal_shipper: barrier timeout on replica") |
|||
case BarrierFsyncFailed: |
|||
return fmt.Errorf("wal_shipper: barrier fsync failed on replica") |
|||
default: |
|||
return fmt.Errorf("wal_shipper: unknown barrier status %d", payload[0]) |
|||
} |
|||
} |
|||
|
|||
// ShippedLSN returns the highest LSN successfully sent to the replica.
|
|||
func (s *WALShipper) ShippedLSN() uint64 { |
|||
return s.shippedLSN.Load() |
|||
} |
|||
|
|||
// IsDegraded returns true if the replica is unreachable.
|
|||
func (s *WALShipper) IsDegraded() bool { |
|||
return s.degraded.Load() |
|||
} |
|||
|
|||
// Stop shuts down the shipper and closes connections.
|
|||
func (s *WALShipper) Stop() { |
|||
if s.stopped.Swap(true) { |
|||
return |
|||
} |
|||
s.mu.Lock() |
|||
if s.dataConn != nil { |
|||
s.dataConn.Close() |
|||
s.dataConn = nil |
|||
} |
|||
s.mu.Unlock() |
|||
|
|||
s.ctrlMu.Lock() |
|||
if s.ctrlConn != nil { |
|||
s.ctrlConn.Close() |
|||
s.ctrlConn = nil |
|||
} |
|||
s.ctrlMu.Unlock() |
|||
} |
|||
|
|||
func (s *WALShipper) ensureDataConn() error { |
|||
if s.dataConn != nil { |
|||
return nil |
|||
} |
|||
conn, err := net.DialTimeout("tcp", s.dataAddr, 3*time.Second) |
|||
if err != nil { |
|||
return err |
|||
} |
|||
s.dataConn = conn |
|||
return nil |
|||
} |
|||
|
|||
func (s *WALShipper) ensureCtrlConn() error { |
|||
if s.ctrlConn != nil { |
|||
return nil |
|||
} |
|||
conn, err := net.DialTimeout("tcp", s.controlAddr, 3*time.Second) |
|||
if err != nil { |
|||
return err |
|||
} |
|||
s.ctrlConn = conn |
|||
return nil |
|||
} |
|||
|
|||
func (s *WALShipper) markDegraded() { |
|||
s.degraded.Store(true) |
|||
log.Printf("wal_shipper: replica degraded (data=%s, ctrl=%s)", s.dataAddr, s.controlAddr) |
|||
} |
|||
Write
Preview
Loading…
Cancel
Save
Reference in new issue