diff --git a/weed/cluster/lock_client.go b/weed/cluster/lock_client.go index 5b8daf1a7..6618f5d2f 100644 --- a/weed/cluster/lock_client.go +++ b/weed/cluster/lock_client.go @@ -31,9 +31,9 @@ func NewLockClient(grpcDialOption grpc.DialOption, seedFiler pb.ServerAddress) * type LiveLock struct { key string renewToken string - expireAtNs int64 - hostFiler pb.ServerAddress - cancelCh chan struct{} + expireAtNs int64 + hostFiler pb.ServerAddress + cancelCh chan struct{} grpcDialOption grpc.DialOption isLocked bool self string diff --git a/weed/mq/broker/broker_grpc_pub.go b/weed/mq/broker/broker_grpc_pub.go index b9dc807e1..3b68db1af 100644 --- a/weed/mq/broker/broker_grpc_pub.go +++ b/weed/mq/broker/broker_grpc_pub.go @@ -63,9 +63,9 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis for _, follower := range initMessage.FollowerBrokers { followErr := b.withBrokerClient(false, pb.ServerAddress(follower), func(client mq_pb.SeaweedMessagingClient) error { _, err := client.PublishFollowMe(context.Background(), &mq_pb.PublishFollowMeRequest{ - Topic: initMessage.Topic, - Partition: initMessage.Partition, - BrokerSelf: string(b.option.BrokerAddress()), + Topic: initMessage.Topic, + Partition: initMessage.Partition, + BrokerSelf: string(b.option.BrokerAddress()), }) return err }) diff --git a/weed/mq/broker/broker_grpc_pub_follow.go b/weed/mq/broker/broker_grpc_pub_follow.go index 533e32f18..8ef85110a 100644 --- a/weed/mq/broker/broker_grpc_pub_follow.go +++ b/weed/mq/broker/broker_grpc_pub_follow.go @@ -12,7 +12,7 @@ import ( "time" ) -func (b *MessageQueueBroker) PublishFollowMe(c context.Context, request *mq_pb.PublishFollowMeRequest) (*mq_pb.PublishFollowMeResponse, error){ +func (b *MessageQueueBroker) PublishFollowMe(c context.Context, request *mq_pb.PublishFollowMeRequest) (*mq_pb.PublishFollowMeResponse, error) { glog.V(0).Infof("PublishFollowMe %v", request) var wg sync.WaitGroup wg.Add(1) @@ -75,7 +75,7 @@ func (b *MessageQueueBroker) doFollowInMemoryMessage(c context.Context, client m } if resp.Message != nil { // process ctrl message or data message - switch m:= resp.Message.(type) { + switch m := resp.Message.(type) { case *mq_pb.FollowInMemoryMessagesResponse_Data: // process data message print("d") diff --git a/weed/mq/broker/broker_grpc_sub.go b/weed/mq/broker/broker_grpc_sub.go index e6027d26b..1141ff47f 100644 --- a/weed/mq/broker/broker_grpc_sub.go +++ b/weed/mq/broker/broker_grpc_sub.go @@ -130,7 +130,7 @@ func getRequestPosition(offset *mq_pb.PartitionOffset) (startPosition log_buffer return } -func (b *MessageQueueBroker) FollowInMemoryMessages(req *mq_pb.FollowInMemoryMessagesRequest, stream mq_pb.SeaweedMessaging_FollowInMemoryMessagesServer) (err error) { +func (b *MessageQueueBroker) FollowInMemoryMessages(req *mq_pb.FollowInMemoryMessagesRequest, stream mq_pb.SeaweedMessaging_FollowInMemoryMessagesServer) (err error) { ctx := stream.Context() clientName := req.GetInit().ConsumerId @@ -188,8 +188,7 @@ func (b *MessageQueueBroker) FollowInMemoryMessages(req *mq_pb.FollowInMemoryMe // to indicate the follower is connected stream.Send(&mq_pb.FollowInMemoryMessagesResponse{ Message: &mq_pb.FollowInMemoryMessagesResponse_Ctrl{ - Ctrl: &mq_pb.FollowInMemoryMessagesResponse_CtrlMessage{ - }, + Ctrl: &mq_pb.FollowInMemoryMessagesResponse_CtrlMessage{}, }, }) @@ -200,7 +199,7 @@ func (b *MessageQueueBroker) FollowInMemoryMessages(req *mq_pb.FollowInMemoryMe var prevFlushTsNs int64 - _,_, err = localTopicPartition.LogBuffer.LoopProcessLogData(clientName, startPosition, 0, func() bool { + _, _, err = localTopicPartition.LogBuffer.LoopProcessLogData(clientName, startPosition, 0, func() bool { if !isConnected { return false } @@ -285,12 +284,12 @@ func (b *MessageQueueBroker) FollowInMemoryMessages(req *mq_pb.FollowInMemoryMe // send the log entry if err := stream.Send(&mq_pb.FollowInMemoryMessagesResponse{ Message: &mq_pb.FollowInMemoryMessagesResponse_Data{ - Data: &mq_pb.DataMessage{ - Key: logEntry.Key, - Value: logEntry.Data, - TsNs: logEntry.TsNs, - }, - }}); err != nil { + Data: &mq_pb.DataMessage{ + Key: logEntry.Key, + Value: logEntry.Data, + TsNs: logEntry.TsNs, + }, + }}); err != nil { glog.Errorf("Error sending setup response: %v", err) return false, err } diff --git a/weed/mq/pub_balancer/allocate.go b/weed/mq/pub_balancer/allocate.go index 39d91bef3..d7632f8d6 100644 --- a/weed/mq/pub_balancer/allocate.go +++ b/weed/mq/pub_balancer/allocate.go @@ -102,7 +102,7 @@ func EnsureAssignmentsToActiveBrokers(activeBrokers cmap.ConcurrentMap[string, * assignment.LeaderBroker = "" count++ } - for i:=0; i= len(assignment.FollowerBrokers) { count++ continue @@ -128,7 +128,7 @@ func EnsureAssignmentsToActiveBrokers(activeBrokers cmap.ConcurrentMap[string, * hasEmptyFollowers := false j := 0 - for ; j