|
@ -1,6 +1,7 @@ |
|
|
package broker |
|
|
package broker |
|
|
|
|
|
|
|
|
import ( |
|
|
import ( |
|
|
|
|
|
"fmt" |
|
|
"github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer" |
|
|
"github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer" |
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" |
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" |
|
|
"google.golang.org/grpc/codes" |
|
|
"google.golang.org/grpc/codes" |
|
@ -14,7 +15,7 @@ func (b *MessageQueueBroker) PublisherToPubBalancer(stream mq_pb.SeaweedMessagin |
|
|
} |
|
|
} |
|
|
req, err := stream.Recv() |
|
|
req, err := stream.Recv() |
|
|
if err != nil { |
|
|
if err != nil { |
|
|
return err |
|
|
|
|
|
|
|
|
return fmt.Errorf("receive init message: %v", err) |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
// process init message
|
|
|
// process init message
|
|
@ -33,7 +34,7 @@ func (b *MessageQueueBroker) PublisherToPubBalancer(stream mq_pb.SeaweedMessagin |
|
|
for { |
|
|
for { |
|
|
req, err := stream.Recv() |
|
|
req, err := stream.Recv() |
|
|
if err != nil { |
|
|
if err != nil { |
|
|
return err |
|
|
|
|
|
|
|
|
return fmt.Errorf("receive stats message from %s: %v", initMessage.Broker, err) |
|
|
} |
|
|
} |
|
|
if !b.isLockOwner() { |
|
|
if !b.isLockOwner() { |
|
|
return status.Errorf(codes.Unavailable, "not current broker balancer") |
|
|
return status.Errorf(codes.Unavailable, "not current broker balancer") |
|
|