You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

761 lines
19 KiB

package worker
import (
"context"
"fmt"
"io"
"sync"
"time"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
"github.com/seaweedfs/seaweedfs/weed/worker/types"
"google.golang.org/grpc"
)
// GrpcAdminClient implements AdminClient using gRPC bidirectional streaming
type GrpcAdminClient struct {
adminAddress string
workerID string
dialOption grpc.DialOption
conn *grpc.ClientConn
client worker_pb.WorkerServiceClient
stream worker_pb.WorkerService_WorkerStreamClient
streamCtx context.Context
streamCancel context.CancelFunc
connected bool
reconnecting bool
shouldReconnect bool
mutex sync.RWMutex
// Reconnection parameters
maxReconnectAttempts int
reconnectBackoff time.Duration
maxReconnectBackoff time.Duration
reconnectMultiplier float64
// Worker registration info for re-registration after reconnection
lastWorkerInfo *types.Worker
// Channels for communication
outgoing chan *worker_pb.WorkerMessage
incoming chan *worker_pb.AdminMessage
responseChans map[string]chan *worker_pb.AdminMessage
responsesMutex sync.RWMutex
// Shutdown channel
shutdownChan chan struct{}
}
// NewGrpcAdminClient creates a new gRPC admin client
func NewGrpcAdminClient(adminAddress string, workerID string, dialOption grpc.DialOption) *GrpcAdminClient {
// Admin uses HTTP port + 10000 as gRPC port
grpcAddress := pb.ServerToGrpcAddress(adminAddress)
return &GrpcAdminClient{
adminAddress: grpcAddress,
workerID: workerID,
dialOption: dialOption,
shouldReconnect: true,
maxReconnectAttempts: 0, // 0 means infinite attempts
reconnectBackoff: 1 * time.Second,
maxReconnectBackoff: 30 * time.Second,
reconnectMultiplier: 1.5,
outgoing: make(chan *worker_pb.WorkerMessage, 100),
incoming: make(chan *worker_pb.AdminMessage, 100),
responseChans: make(map[string]chan *worker_pb.AdminMessage),
shutdownChan: make(chan struct{}),
}
}
// Connect establishes gRPC connection to admin server with TLS detection
func (c *GrpcAdminClient) Connect() error {
c.mutex.Lock()
defer c.mutex.Unlock()
if c.connected {
return fmt.Errorf("already connected")
}
// Detect TLS support and create appropriate connection
conn, err := c.createConnection()
if err != nil {
return fmt.Errorf("failed to connect to admin server: %v", err)
}
c.conn = conn
c.client = worker_pb.NewWorkerServiceClient(conn)
// Create bidirectional stream
c.streamCtx, c.streamCancel = context.WithCancel(context.Background())
stream, err := c.client.WorkerStream(c.streamCtx)
if err != nil {
c.conn.Close()
return fmt.Errorf("failed to create worker stream: %v", err)
}
c.stream = stream
c.connected = true
// Start stream handlers and reconnection loop
go c.handleOutgoing()
go c.handleIncoming()
go c.reconnectionLoop()
glog.Infof("Connected to admin server at %s", c.adminAddress)
return nil
}
// createConnection attempts to connect using the provided dial option
func (c *GrpcAdminClient) createConnection() (*grpc.ClientConn, error) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
conn, err := pb.GrpcDial(ctx, c.adminAddress, false, c.dialOption)
if err != nil {
return nil, fmt.Errorf("failed to connect to admin server: %v", err)
}
glog.Infof("Connected to admin server at %s", c.adminAddress)
return conn, nil
}
// Disconnect closes the gRPC connection
func (c *GrpcAdminClient) Disconnect() error {
c.mutex.Lock()
defer c.mutex.Unlock()
if !c.connected {
return nil
}
c.connected = false
c.shouldReconnect = false
// Send shutdown signal to stop reconnection loop
select {
case c.shutdownChan <- struct{}{}:
default:
}
// Send shutdown message
shutdownMsg := &worker_pb.WorkerMessage{
WorkerId: c.workerID,
Timestamp: time.Now().Unix(),
Message: &worker_pb.WorkerMessage_Shutdown{
Shutdown: &worker_pb.WorkerShutdown{
WorkerId: c.workerID,
Reason: "normal shutdown",
},
},
}
select {
case c.outgoing <- shutdownMsg:
case <-time.After(time.Second):
glog.Warningf("Failed to send shutdown message")
}
// Cancel stream context
if c.streamCancel != nil {
c.streamCancel()
}
// Close stream
if c.stream != nil {
c.stream.CloseSend()
}
// Close connection
if c.conn != nil {
c.conn.Close()
}
// Close channels
close(c.outgoing)
close(c.incoming)
glog.Infof("Disconnected from admin server")
return nil
}
// reconnectionLoop handles automatic reconnection with exponential backoff
func (c *GrpcAdminClient) reconnectionLoop() {
backoff := c.reconnectBackoff
attempts := 0
for {
select {
case <-c.shutdownChan:
return
default:
}
c.mutex.RLock()
shouldReconnect := c.shouldReconnect && !c.connected && !c.reconnecting
c.mutex.RUnlock()
if !shouldReconnect {
time.Sleep(time.Second)
continue
}
c.mutex.Lock()
c.reconnecting = true
c.mutex.Unlock()
glog.Infof("Attempting to reconnect to admin server (attempt %d)", attempts+1)
// Attempt to reconnect
if err := c.reconnect(); err != nil {
attempts++
glog.Errorf("Reconnection attempt %d failed: %v", attempts, err)
// Reset reconnecting flag
c.mutex.Lock()
c.reconnecting = false
c.mutex.Unlock()
// Check if we should give up
if c.maxReconnectAttempts > 0 && attempts >= c.maxReconnectAttempts {
glog.Errorf("Max reconnection attempts (%d) reached, giving up", c.maxReconnectAttempts)
c.mutex.Lock()
c.shouldReconnect = false
c.mutex.Unlock()
return
}
// Wait with exponential backoff
glog.Infof("Waiting %v before next reconnection attempt", backoff)
select {
case <-c.shutdownChan:
return
case <-time.After(backoff):
}
// Increase backoff
backoff = time.Duration(float64(backoff) * c.reconnectMultiplier)
if backoff > c.maxReconnectBackoff {
backoff = c.maxReconnectBackoff
}
} else {
// Successful reconnection
attempts = 0
backoff = c.reconnectBackoff
glog.Infof("Successfully reconnected to admin server")
c.mutex.Lock()
c.reconnecting = false
c.mutex.Unlock()
}
}
}
// reconnect attempts to re-establish the connection
func (c *GrpcAdminClient) reconnect() error {
// Clean up existing connection completely
c.mutex.Lock()
if c.streamCancel != nil {
c.streamCancel()
}
if c.stream != nil {
c.stream.CloseSend()
}
if c.conn != nil {
c.conn.Close()
}
c.mutex.Unlock()
// Create new connection
conn, err := c.createConnection()
if err != nil {
return fmt.Errorf("failed to create connection: %v", err)
}
client := worker_pb.NewWorkerServiceClient(conn)
// Create new stream
streamCtx, streamCancel := context.WithCancel(context.Background())
stream, err := client.WorkerStream(streamCtx)
if err != nil {
conn.Close()
streamCancel()
return fmt.Errorf("failed to create stream: %v", err)
}
// Update client state
c.mutex.Lock()
c.conn = conn
c.client = client
c.stream = stream
c.streamCtx = streamCtx
c.streamCancel = streamCancel
c.connected = true
c.mutex.Unlock()
// Restart stream handlers
go c.handleOutgoing()
go c.handleIncoming()
// Re-register worker if we have previous registration info
c.mutex.RLock()
workerInfo := c.lastWorkerInfo
c.mutex.RUnlock()
if workerInfo != nil {
glog.Infof("Re-registering worker after reconnection...")
if err := c.sendRegistration(workerInfo); err != nil {
glog.Errorf("Failed to re-register worker: %v", err)
// Don't fail the reconnection because of registration failure
// The registration will be retried on next heartbeat or operation
}
}
return nil
}
// handleOutgoing processes outgoing messages to admin
func (c *GrpcAdminClient) handleOutgoing() {
for msg := range c.outgoing {
c.mutex.RLock()
connected := c.connected
stream := c.stream
c.mutex.RUnlock()
if !connected {
break
}
if err := stream.Send(msg); err != nil {
glog.Errorf("Failed to send message to admin: %v", err)
c.mutex.Lock()
c.connected = false
c.mutex.Unlock()
break
}
}
}
// handleIncoming processes incoming messages from admin
func (c *GrpcAdminClient) handleIncoming() {
for {
c.mutex.RLock()
connected := c.connected
stream := c.stream
c.mutex.RUnlock()
if !connected {
break
}
msg, err := stream.Recv()
if err != nil {
if err == io.EOF {
glog.Infof("Admin server closed the stream")
} else {
glog.Errorf("Failed to receive message from admin: %v", err)
}
c.mutex.Lock()
c.connected = false
c.mutex.Unlock()
break
}
// Route message to waiting goroutines or general handler
select {
case c.incoming <- msg:
case <-time.After(time.Second):
glog.Warningf("Incoming message buffer full, dropping message")
}
}
}
// RegisterWorker registers the worker with the admin server
func (c *GrpcAdminClient) RegisterWorker(worker *types.Worker) error {
if !c.connected {
return fmt.Errorf("not connected to admin server")
}
// Store worker info for re-registration after reconnection
c.mutex.Lock()
c.lastWorkerInfo = worker
c.mutex.Unlock()
return c.sendRegistration(worker)
}
// sendRegistration sends the registration message and waits for response
func (c *GrpcAdminClient) sendRegistration(worker *types.Worker) error {
capabilities := make([]string, len(worker.Capabilities))
for i, cap := range worker.Capabilities {
capabilities[i] = string(cap)
}
msg := &worker_pb.WorkerMessage{
WorkerId: c.workerID,
Timestamp: time.Now().Unix(),
Message: &worker_pb.WorkerMessage_Registration{
Registration: &worker_pb.WorkerRegistration{
WorkerId: c.workerID,
Address: worker.Address,
Capabilities: capabilities,
MaxConcurrent: int32(worker.MaxConcurrent),
Metadata: make(map[string]string),
},
},
}
select {
case c.outgoing <- msg:
case <-time.After(5 * time.Second):
return fmt.Errorf("failed to send registration message: timeout")
}
// Wait for registration response
timeout := time.NewTimer(10 * time.Second)
defer timeout.Stop()
for {
select {
case response := <-c.incoming:
if regResp := response.GetRegistrationResponse(); regResp != nil {
if regResp.Success {
glog.Infof("Worker registered successfully: %s", regResp.Message)
return nil
}
return fmt.Errorf("registration failed: %s", regResp.Message)
}
case <-timeout.C:
return fmt.Errorf("registration timeout")
}
}
}
// SendHeartbeat sends heartbeat to admin server
func (c *GrpcAdminClient) SendHeartbeat(workerID string, status *types.WorkerStatus) error {
if !c.connected {
// Wait for reconnection for a short time
if err := c.waitForConnection(10 * time.Second); err != nil {
return fmt.Errorf("not connected to admin server: %v", err)
}
}
taskIds := make([]string, len(status.CurrentTasks))
for i, task := range status.CurrentTasks {
taskIds[i] = task.ID
}
msg := &worker_pb.WorkerMessage{
WorkerId: c.workerID,
Timestamp: time.Now().Unix(),
Message: &worker_pb.WorkerMessage_Heartbeat{
Heartbeat: &worker_pb.WorkerHeartbeat{
WorkerId: c.workerID,
Status: status.Status,
CurrentLoad: int32(status.CurrentLoad),
MaxConcurrent: int32(status.MaxConcurrent),
CurrentTaskIds: taskIds,
TasksCompleted: int32(status.TasksCompleted),
TasksFailed: int32(status.TasksFailed),
UptimeSeconds: int64(status.Uptime.Seconds()),
},
},
}
select {
case c.outgoing <- msg:
return nil
case <-time.After(time.Second):
return fmt.Errorf("failed to send heartbeat: timeout")
}
}
// RequestTask requests a new task from admin server
func (c *GrpcAdminClient) RequestTask(workerID string, capabilities []types.TaskType) (*types.Task, error) {
if !c.connected {
// Wait for reconnection for a short time
if err := c.waitForConnection(5 * time.Second); err != nil {
return nil, fmt.Errorf("not connected to admin server: %v", err)
}
}
caps := make([]string, len(capabilities))
for i, cap := range capabilities {
caps[i] = string(cap)
}
msg := &worker_pb.WorkerMessage{
WorkerId: c.workerID,
Timestamp: time.Now().Unix(),
Message: &worker_pb.WorkerMessage_TaskRequest{
TaskRequest: &worker_pb.TaskRequest{
WorkerId: c.workerID,
Capabilities: caps,
AvailableSlots: 1, // Request one task
},
},
}
select {
case c.outgoing <- msg:
case <-time.After(time.Second):
return nil, fmt.Errorf("failed to send task request: timeout")
}
// Wait for task assignment
timeout := time.NewTimer(5 * time.Second)
defer timeout.Stop()
for {
select {
case response := <-c.incoming:
if taskAssign := response.GetTaskAssignment(); taskAssign != nil {
// Convert parameters map[string]string to map[string]interface{}
parameters := make(map[string]interface{})
for k, v := range taskAssign.Params.Parameters {
parameters[k] = v
}
// Convert to our task type
task := &types.Task{
ID: taskAssign.TaskId,
Type: types.TaskType(taskAssign.TaskType),
Status: types.TaskStatusAssigned,
VolumeID: taskAssign.Params.VolumeId,
Server: taskAssign.Params.Server,
Collection: taskAssign.Params.Collection,
Priority: types.TaskPriority(taskAssign.Priority),
CreatedAt: time.Unix(taskAssign.CreatedTime, 0),
Parameters: parameters,
}
return task, nil
}
case <-timeout.C:
return nil, nil // No task available
}
}
}
// CompleteTask reports task completion to admin server
func (c *GrpcAdminClient) CompleteTask(taskID string, success bool, errorMsg string) error {
if !c.connected {
// Wait for reconnection for a short time
if err := c.waitForConnection(5 * time.Second); err != nil {
return fmt.Errorf("not connected to admin server: %v", err)
}
}
msg := &worker_pb.WorkerMessage{
WorkerId: c.workerID,
Timestamp: time.Now().Unix(),
Message: &worker_pb.WorkerMessage_TaskComplete{
TaskComplete: &worker_pb.TaskComplete{
TaskId: taskID,
WorkerId: c.workerID,
Success: success,
ErrorMessage: errorMsg,
CompletionTime: time.Now().Unix(),
},
},
}
select {
case c.outgoing <- msg:
return nil
case <-time.After(time.Second):
return fmt.Errorf("failed to send task completion: timeout")
}
}
// UpdateTaskProgress updates task progress to admin server
func (c *GrpcAdminClient) UpdateTaskProgress(taskID string, progress float64) error {
if !c.connected {
// Wait for reconnection for a short time
if err := c.waitForConnection(5 * time.Second); err != nil {
return fmt.Errorf("not connected to admin server: %v", err)
}
}
msg := &worker_pb.WorkerMessage{
WorkerId: c.workerID,
Timestamp: time.Now().Unix(),
Message: &worker_pb.WorkerMessage_TaskUpdate{
TaskUpdate: &worker_pb.TaskUpdate{
TaskId: taskID,
WorkerId: c.workerID,
Status: "in_progress",
Progress: float32(progress),
},
},
}
select {
case c.outgoing <- msg:
return nil
case <-time.After(time.Second):
return fmt.Errorf("failed to send task progress: timeout")
}
}
// IsConnected returns whether the client is connected
func (c *GrpcAdminClient) IsConnected() bool {
c.mutex.RLock()
defer c.mutex.RUnlock()
return c.connected
}
// IsReconnecting returns whether the client is currently attempting to reconnect
func (c *GrpcAdminClient) IsReconnecting() bool {
c.mutex.RLock()
defer c.mutex.RUnlock()
return c.reconnecting
}
// SetReconnectionSettings allows configuration of reconnection behavior
func (c *GrpcAdminClient) SetReconnectionSettings(maxAttempts int, initialBackoff, maxBackoff time.Duration, multiplier float64) {
c.mutex.Lock()
defer c.mutex.Unlock()
c.maxReconnectAttempts = maxAttempts
c.reconnectBackoff = initialBackoff
c.maxReconnectBackoff = maxBackoff
c.reconnectMultiplier = multiplier
}
// StopReconnection stops the reconnection loop
func (c *GrpcAdminClient) StopReconnection() {
c.mutex.Lock()
defer c.mutex.Unlock()
c.shouldReconnect = false
}
// StartReconnection starts the reconnection loop
func (c *GrpcAdminClient) StartReconnection() {
c.mutex.Lock()
defer c.mutex.Unlock()
c.shouldReconnect = true
}
// waitForConnection waits for the connection to be established or timeout
func (c *GrpcAdminClient) waitForConnection(timeout time.Duration) error {
deadline := time.Now().Add(timeout)
for time.Now().Before(deadline) {
c.mutex.RLock()
connected := c.connected
shouldReconnect := c.shouldReconnect
c.mutex.RUnlock()
if connected {
return nil
}
if !shouldReconnect {
return fmt.Errorf("reconnection is disabled")
}
time.Sleep(100 * time.Millisecond)
}
return fmt.Errorf("timeout waiting for connection")
}
// MockAdminClient provides a mock implementation for testing
type MockAdminClient struct {
workerID string
connected bool
tasks []*types.Task
mutex sync.RWMutex
}
// NewMockAdminClient creates a new mock admin client
func NewMockAdminClient() *MockAdminClient {
return &MockAdminClient{
connected: true,
tasks: make([]*types.Task, 0),
}
}
// Connect mock implementation
func (m *MockAdminClient) Connect() error {
m.mutex.Lock()
defer m.mutex.Unlock()
m.connected = true
return nil
}
// Disconnect mock implementation
func (m *MockAdminClient) Disconnect() error {
m.mutex.Lock()
defer m.mutex.Unlock()
m.connected = false
return nil
}
// RegisterWorker mock implementation
func (m *MockAdminClient) RegisterWorker(worker *types.Worker) error {
m.workerID = worker.ID
glog.Infof("Mock: Worker %s registered with capabilities: %v", worker.ID, worker.Capabilities)
return nil
}
// SendHeartbeat mock implementation
func (m *MockAdminClient) SendHeartbeat(workerID string, status *types.WorkerStatus) error {
glog.V(2).Infof("Mock: Heartbeat from worker %s, status: %s, load: %d/%d",
workerID, status.Status, status.CurrentLoad, status.MaxConcurrent)
return nil
}
// RequestTask mock implementation
func (m *MockAdminClient) RequestTask(workerID string, capabilities []types.TaskType) (*types.Task, error) {
m.mutex.Lock()
defer m.mutex.Unlock()
if len(m.tasks) > 0 {
task := m.tasks[0]
m.tasks = m.tasks[1:]
glog.Infof("Mock: Assigned task %s to worker %s", task.ID, workerID)
return task, nil
}
// No tasks available
return nil, nil
}
// CompleteTask mock implementation
func (m *MockAdminClient) CompleteTask(taskID string, success bool, errorMsg string) error {
if success {
glog.Infof("Mock: Task %s completed successfully", taskID)
} else {
glog.Infof("Mock: Task %s failed: %s", taskID, errorMsg)
}
return nil
}
// UpdateTaskProgress mock implementation
func (m *MockAdminClient) UpdateTaskProgress(taskID string, progress float64) error {
glog.V(2).Infof("Mock: Task %s progress: %.1f%%", taskID, progress)
return nil
}
// IsConnected mock implementation
func (m *MockAdminClient) IsConnected() bool {
m.mutex.RLock()
defer m.mutex.RUnlock()
return m.connected
}
// AddMockTask adds a mock task for testing
func (m *MockAdminClient) AddMockTask(task *types.Task) {
m.mutex.Lock()
defer m.mutex.Unlock()
m.tasks = append(m.tasks, task)
}
// CreateAdminClient creates an admin client with the provided dial option
func CreateAdminClient(adminServer string, workerID string, dialOption grpc.DialOption) (AdminClient, error) {
return NewGrpcAdminClient(adminServer, workerID, dialOption), nil
}