From e344c6ce247118240df4d966b919f4cf09478b1e Mon Sep 17 00:00:00 2001 From: chrislu Date: Tue, 14 Oct 2025 18:38:14 -0700 Subject: [PATCH] adjust return values on failures --- .../kafka/protocol/consumer_group_metadata.go | 5 +-- weed/mq/kafka/protocol/find_coordinator.go | 31 +++++++++++++------ weed/mq/kafka/protocol/handler.go | 10 +++--- weed/mq/kafka/protocol/joingroup.go | 20 ++++++++---- 4 files changed, 44 insertions(+), 22 deletions(-) diff --git a/weed/mq/kafka/protocol/consumer_group_metadata.go b/weed/mq/kafka/protocol/consumer_group_metadata.go index dd84e6675..e820f21c4 100644 --- a/weed/mq/kafka/protocol/consumer_group_metadata.go +++ b/weed/mq/kafka/protocol/consumer_group_metadata.go @@ -172,12 +172,13 @@ func ExtractTopicsFromMetadata(protocols []GroupProtocol, fallbackTopics []strin } } - // Fallback to provided topics or default + // Fallback to provided topics or empty list if len(fallbackTopics) > 0 { return fallbackTopics } - return []string{"test-topic"} + // Return empty slice if no topics found - consumer may be using pattern subscription + return []string{} } // SelectBestProtocol chooses the best assignment protocol from available options diff --git a/weed/mq/kafka/protocol/find_coordinator.go b/weed/mq/kafka/protocol/find_coordinator.go index 2c60cf39c..946c51ef6 100644 --- a/weed/mq/kafka/protocol/find_coordinator.go +++ b/weed/mq/kafka/protocol/find_coordinator.go @@ -353,9 +353,12 @@ func (h *Handler) findCoordinatorForGroup(groupID string) (host string, port int if registry == nil { // Fallback to current gateway if no registry available gatewayAddr := h.GetGatewayAddress() + if gatewayAddr == "" { + return "", 0, 0, fmt.Errorf("no coordinator registry and no gateway address configured") + } host, port, err := h.parseGatewayAddress(gatewayAddr) if err != nil { - return "localhost", 9092, 1, nil + return "", 0, 0, fmt.Errorf("failed to parse gateway address: %w", err) } nodeID = 1 return host, port, nodeID, nil @@ -386,13 +389,15 @@ func (h *Handler) handleCoordinatorAssignmentAsLeader(groupID string, registry C // No coordinator exists, assign the requesting gateway (first-come-first-serve) currentGateway := h.GetGatewayAddress() + if currentGateway == "" { + return "", 0, 0, fmt.Errorf("no gateway address configured for coordinator assignment") + } assignment, err := registry.AssignCoordinator(groupID, currentGateway) if err != nil { - // Fallback to current gateway - gatewayAddr := h.GetGatewayAddress() - host, port, err := h.parseGatewayAddress(gatewayAddr) - if err != nil { - return "localhost", 9092, 1, nil + // Fallback to current gateway on assignment error + host, port, parseErr := h.parseGatewayAddress(currentGateway) + if parseErr != nil { + return "", 0, 0, fmt.Errorf("failed to parse gateway address after assignment error: %w", parseErr) } nodeID = 1 return host, port, nodeID, nil @@ -408,9 +413,12 @@ func (h *Handler) requestCoordinatorFromLeader(groupID string, registry Coordina _, err = h.waitForLeader(registry, 10*time.Second) // 10 second timeout for enterprise clients if err != nil { gatewayAddr := h.GetGatewayAddress() - host, port, err := h.parseGatewayAddress(gatewayAddr) - if err != nil { - return "localhost", 9092, 1, nil + if gatewayAddr == "" { + return "", 0, 0, fmt.Errorf("failed to wait for leader and no gateway address configured: %w", err) + } + host, port, parseErr := h.parseGatewayAddress(gatewayAddr) + if parseErr != nil { + return "", 0, 0, fmt.Errorf("failed to parse gateway address after leader wait timeout: %w", parseErr) } nodeID = 1 return host, port, nodeID, nil @@ -426,9 +434,12 @@ func (h *Handler) requestCoordinatorFromLeader(groupID string, registry Coordina // use current gateway as fallback. In a full implementation, this would make // an RPC call to the leader gateway. gatewayAddr := h.GetGatewayAddress() + if gatewayAddr == "" { + return "", 0, 0, fmt.Errorf("no gateway address configured for fallback coordinator") + } host, port, parseErr := h.parseGatewayAddress(gatewayAddr) if parseErr != nil { - return "localhost", 9092, 1, nil + return "", 0, 0, fmt.Errorf("failed to parse gateway address for fallback: %w", parseErr) } nodeID = 1 return host, port, nodeID, nil diff --git a/weed/mq/kafka/protocol/handler.go b/weed/mq/kafka/protocol/handler.go index 4dd94baf6..9e694af43 100644 --- a/weed/mq/kafka/protocol/handler.go +++ b/weed/mq/kafka/protocol/handler.go @@ -390,8 +390,9 @@ func (h *Handler) GetSMQBrokerAddresses() []string { return h.smqBrokerAddresses } - // Final fallback for testing - return []string{"localhost:17777"} + // No brokers configured - return empty slice + // This will cause proper error handling in callers + return []string{} } // GetGatewayAddress returns the current gateway address as a string (for coordinator registry) @@ -399,8 +400,9 @@ func (h *Handler) GetGatewayAddress() string { if h.gatewayAddress != "" { return h.gatewayAddress } - // Fallback for testing - return "localhost:9092" + // No gateway address configured - return empty string + // Callers should handle this as a configuration error + return "" } // SetGatewayAddress sets the gateway address for coordinator registry diff --git a/weed/mq/kafka/protocol/joingroup.go b/weed/mq/kafka/protocol/joingroup.go index 27d8d8811..8f92f7cfd 100644 --- a/weed/mq/kafka/protocol/joingroup.go +++ b/weed/mq/kafka/protocol/joingroup.go @@ -7,6 +7,7 @@ import ( "sort" "time" + "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/mq/kafka/consumer" ) @@ -1328,8 +1329,10 @@ func (h *Handler) serializeSchemaRegistryAssignment(group *consumer.ConsumerGrou // requests to itself leaderMember, exists := group.Members[group.Leader] if !exists { - // Fallback if leader not found (shouldn't happen) - jsonAssignment := `{"error":0,"master":"","master_identity":{"host":"localhost","port":8081,"master_eligibility":true,"scheme":"http","version":1}}` + // Leader not found - return minimal assignment with no master identity + // Schema Registry should handle this by failing over to another instance + glog.Warningf("Schema Registry leader member %s not found in group %s", group.Leader, group.ID) + jsonAssignment := `{"error":0,"master":"","master_identity":{"host":"","port":0,"master_eligibility":false,"scheme":"http","version":1}}` return []byte(jsonAssignment) } @@ -1338,13 +1341,16 @@ func (h *Handler) serializeSchemaRegistryAssignment(group *consumer.ConsumerGrou var identity map[string]interface{} err := json.Unmarshal(leaderMember.Metadata, &identity) if err != nil { - // Fallback to basic assignment - jsonAssignment := fmt.Sprintf(`{"error":0,"master":"%s","master_identity":{"host":"localhost","port":8081,"master_eligibility":true,"scheme":"http","version":1}}`, group.Leader) + // Failed to parse metadata - return minimal assignment + // Schema Registry should provide valid metadata; if not, fail gracefully + glog.Warningf("Failed to parse Schema Registry metadata for leader %s: %v", group.Leader, err) + jsonAssignment := fmt.Sprintf(`{"error":0,"master":"%s","master_identity":{"host":"","port":0,"master_eligibility":false,"scheme":"http","version":1}}`, group.Leader) return []byte(jsonAssignment) } - // Extract fields with defaults - host := "localhost" + // Extract fields from identity - use empty/zero defaults if missing + // Schema Registry clients should provide complete metadata + host := "" port := 8081 scheme := "http" version := 1 @@ -1352,6 +1358,8 @@ func (h *Handler) serializeSchemaRegistryAssignment(group *consumer.ConsumerGrou if h, ok := identity["host"].(string); ok { host = h + } else { + glog.V(1).Infof("Schema Registry metadata missing 'host' field for leader %s", group.Leader) } if p, ok := identity["port"].(float64); ok { port = int(p)