|
|
|
@ -3,6 +3,13 @@ package pub_client |
|
|
|
import ( |
|
|
|
"context" |
|
|
|
"fmt" |
|
|
|
"log" |
|
|
|
"sort" |
|
|
|
"strings" |
|
|
|
"sync" |
|
|
|
"sync/atomic" |
|
|
|
"time" |
|
|
|
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/glog" |
|
|
|
"github.com/seaweedfs/seaweedfs/weed/pb" |
|
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" |
|
|
|
@ -11,11 +18,6 @@ import ( |
|
|
|
"google.golang.org/grpc/codes" |
|
|
|
"google.golang.org/grpc/credentials/insecure" |
|
|
|
"google.golang.org/grpc/status" |
|
|
|
"log" |
|
|
|
"sort" |
|
|
|
"sync" |
|
|
|
"sync/atomic" |
|
|
|
"time" |
|
|
|
) |
|
|
|
|
|
|
|
type EachPartitionError struct { |
|
|
|
@ -39,7 +41,7 @@ func (p *TopicPublisher) startSchedulerThread(wg *sync.WaitGroup) error { |
|
|
|
return fmt.Errorf("configure topic %s: %v", p.config.Topic, err) |
|
|
|
} |
|
|
|
|
|
|
|
log.Printf("start scheduler thread for topic %s", p.config.Topic) |
|
|
|
glog.V(0).Infof("start scheduler thread for topic %s", p.config.Topic) |
|
|
|
|
|
|
|
generation := 0 |
|
|
|
var errChan chan EachPartitionError |
|
|
|
@ -66,7 +68,12 @@ func (p *TopicPublisher) startSchedulerThread(wg *sync.WaitGroup) error { |
|
|
|
for { |
|
|
|
select { |
|
|
|
case eachErr := <-errChan: |
|
|
|
glog.Errorf("gen %d publish to topic %s partition %v: %v", eachErr.generation, p.config.Topic, eachErr.Partition, eachErr.Err) |
|
|
|
// Use appropriate logging level based on error type
|
|
|
|
if isConnectionError(eachErr.Err) { |
|
|
|
glog.V(1).Infof("gen %d connection error for topic %s partition %v (will retry): %v", eachErr.generation, p.config.Topic, eachErr.Partition, eachErr.Err) |
|
|
|
} else { |
|
|
|
glog.Errorf("gen %d publish to topic %s partition %v: %v", eachErr.generation, p.config.Topic, eachErr.Partition, eachErr.Err) |
|
|
|
} |
|
|
|
if eachErr.generation < generation { |
|
|
|
continue |
|
|
|
} |
|
|
|
@ -114,7 +121,12 @@ func (p *TopicPublisher) onEachAssignments(generation int, assignments []*mq_pb. |
|
|
|
go func(job *EachPartitionPublishJob) { |
|
|
|
defer job.wg.Done() |
|
|
|
if err := p.doPublishToPartition(job); err != nil { |
|
|
|
log.Printf("publish to %s partition %v: %v", p.config.Topic, job.Partition, err) |
|
|
|
// Use appropriate logging level based on error type
|
|
|
|
if isConnectionError(err) { |
|
|
|
glog.V(1).Infof("connection error publishing to %s partition %v (will retry): %v", p.config.Topic, job.Partition, err) |
|
|
|
} else { |
|
|
|
log.Printf("publish to %s partition %v: %v", p.config.Topic, job.Partition, err) |
|
|
|
} |
|
|
|
errChan <- EachPartitionError{assignment, err, generation} |
|
|
|
} |
|
|
|
}(job) |
|
|
|
@ -128,7 +140,7 @@ func (p *TopicPublisher) onEachAssignments(generation int, assignments []*mq_pb. |
|
|
|
|
|
|
|
func (p *TopicPublisher) doPublishToPartition(job *EachPartitionPublishJob) error { |
|
|
|
|
|
|
|
log.Printf("connecting to %v for topic partition %+v", job.LeaderBroker, job.Partition) |
|
|
|
glog.V(1).Infof("connecting to %v for topic partition %+v", job.LeaderBroker, job.Partition) |
|
|
|
|
|
|
|
grpcConnection, err := grpc.NewClient(job.LeaderBroker, grpc.WithTransportCredentials(insecure.NewCredentials()), p.grpcDialOption) |
|
|
|
if err != nil { |
|
|
|
@ -176,20 +188,25 @@ func (p *TopicPublisher) doPublishToPartition(job *EachPartitionPublishJob) erro |
|
|
|
if err != nil { |
|
|
|
e, _ := status.FromError(err) |
|
|
|
if e.Code() == codes.Unknown && e.Message() == "EOF" { |
|
|
|
log.Printf("publish to %s EOF", publishClient.Broker) |
|
|
|
glog.V(1).Infof("publish stream to %s closed (EOF)", publishClient.Broker) |
|
|
|
return |
|
|
|
} |
|
|
|
publishClient.Err = err |
|
|
|
log.Printf("publish1 to %s error: %v\n", publishClient.Broker, err) |
|
|
|
// Use appropriate logging level based on error type
|
|
|
|
if isConnectionError(err) { |
|
|
|
glog.V(1).Infof("connection error receiving from %s (will retry): %v", publishClient.Broker, err) |
|
|
|
} else { |
|
|
|
log.Printf("publish receive error from %s: %v", publishClient.Broker, err) |
|
|
|
} |
|
|
|
return |
|
|
|
} |
|
|
|
if ackResp.Error != "" { |
|
|
|
publishClient.Err = fmt.Errorf("ack error: %v", ackResp.Error) |
|
|
|
log.Printf("publish2 to %s error: %v\n", publishClient.Broker, ackResp.Error) |
|
|
|
log.Printf("publish ack error from %s: %v", publishClient.Broker, ackResp.Error) |
|
|
|
return |
|
|
|
} |
|
|
|
if ackResp.AckSequence > 0 { |
|
|
|
log.Printf("ack %d published %d hasMoreData:%d", ackResp.AckSequence, atomic.LoadInt64(&publishedTsNs), atomic.LoadInt32(&hasMoreData)) |
|
|
|
glog.V(2).Infof("ack %d published %d hasMoreData:%d", ackResp.AckSequence, atomic.LoadInt64(&publishedTsNs), atomic.LoadInt32(&hasMoreData)) |
|
|
|
} |
|
|
|
if atomic.LoadInt64(&publishedTsNs) <= ackResp.AckSequence && atomic.LoadInt32(&hasMoreData) == 0 { |
|
|
|
return |
|
|
|
@ -222,7 +239,7 @@ func (p *TopicPublisher) doPublishToPartition(job *EachPartitionPublishJob) erro |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
log.Printf("published %d messages to %v for topic partition %+v", publishCounter, job.LeaderBroker, job.Partition) |
|
|
|
glog.V(1).Infof("published %d messages to %v for topic partition %+v", publishCounter, job.LeaderBroker, job.Partition) |
|
|
|
|
|
|
|
return nil |
|
|
|
} |
|
|
|
@ -296,3 +313,40 @@ func (p *TopicPublisher) doLookupTopicPartitions() (assignments []*mq_pb.BrokerP |
|
|
|
return nil, fmt.Errorf("lookup topic %s: %v", p.config.Topic, lastErr) |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
// isConnectionError checks if an error is a connection-level error that is handled automatically
|
|
|
|
func isConnectionError(err error) bool { |
|
|
|
if err == nil { |
|
|
|
return false |
|
|
|
} |
|
|
|
|
|
|
|
// Check gRPC status codes for connection errors
|
|
|
|
if grpcStatus, ok := status.FromError(err); ok { |
|
|
|
switch grpcStatus.Code() { |
|
|
|
case codes.Unavailable, codes.DeadlineExceeded, codes.Canceled, codes.Unknown: |
|
|
|
return true |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
// Check for common connection error patterns
|
|
|
|
errStr := strings.ToLower(err.Error()) |
|
|
|
connectionErrorPatterns := []string{ |
|
|
|
"eof", |
|
|
|
"error reading server preface", |
|
|
|
"connection refused", |
|
|
|
"connection reset", |
|
|
|
"broken pipe", |
|
|
|
"no such host", |
|
|
|
"network is unreachable", |
|
|
|
"context deadline exceeded", |
|
|
|
"context canceled", |
|
|
|
} |
|
|
|
|
|
|
|
for _, pattern := range connectionErrorPatterns { |
|
|
|
if strings.Contains(errStr, pattern) { |
|
|
|
return true |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
return false |
|
|
|
} |