From 8de1ce5497f3501700e4227f84f06ad936f128ad Mon Sep 17 00:00:00 2001 From: chrislu Date: Fri, 12 Sep 2025 21:30:14 -0700 Subject: [PATCH] Fix compilation errors in integration modules - Fix NewPersistentLedger calls (returns 1 value, not 2) - Fix GetStats calls (returns 3 values, not 4) - Remove error handling for NewPersistentLedger since it doesn't return errors - All Kafka integration modules now compile successfully --- weed/mq/kafka/integration/persistent_handler.go | 8 +++----- weed/mq/kafka/integration/smq_publisher.go | 5 +---- weed/mq/kafka/integration/smq_subscriber.go | 5 +---- 3 files changed, 5 insertions(+), 13 deletions(-) diff --git a/weed/mq/kafka/integration/persistent_handler.go b/weed/mq/kafka/integration/persistent_handler.go index fc2df4c0a..405fda877 100644 --- a/weed/mq/kafka/integration/persistent_handler.go +++ b/weed/mq/kafka/integration/persistent_handler.go @@ -133,10 +133,7 @@ func (h *PersistentKafkaHandler) GetOrCreateLedger(topic string, partition int32 } // Create persistent ledger - ledger, err := offset.NewPersistentLedger(key, h.offsetStorage) - if err != nil { - return nil, fmt.Errorf("failed to create persistent ledger: %w", err) - } + ledger := offset.NewPersistentLedger(key, h.offsetStorage) h.ledgers[key] = ledger return ledger, nil @@ -270,7 +267,8 @@ func (h *PersistentKafkaHandler) GetStats() map[string]interface{} { h.ledgersMu.RLock() ledgerStats := make(map[string]interface{}) for key, ledger := range h.ledgers { - entryCount, earliestTime, latestTime, nextOffset := ledger.GetStats() + entryCount, earliestTime, latestTime := ledger.GetStats() + nextOffset := ledger.GetHighWaterMark() ledgerStats[key] = map[string]interface{}{ "entry_count": entryCount, "earliest_time": earliestTime, diff --git a/weed/mq/kafka/integration/smq_publisher.go b/weed/mq/kafka/integration/smq_publisher.go index bb7e798b6..3e502aed8 100644 --- a/weed/mq/kafka/integration/smq_publisher.go +++ b/weed/mq/kafka/integration/smq_publisher.go @@ -169,10 +169,7 @@ func (p *SMQPublisher) getOrCreateLedger(kafkaTopic string, partition int32) (*o } // Create persistent ledger - ledger, err := offset.NewPersistentLedger(key, p.offsetStorage) - if err != nil { - return nil, fmt.Errorf("failed to create persistent ledger: %w", err) - } + ledger := offset.NewPersistentLedger(key, p.offsetStorage) p.ledgers[key] = ledger return ledger, nil diff --git a/weed/mq/kafka/integration/smq_subscriber.go b/weed/mq/kafka/integration/smq_subscriber.go index 8e6de1c7b..c9cbd636a 100644 --- a/weed/mq/kafka/integration/smq_subscriber.go +++ b/weed/mq/kafka/integration/smq_subscriber.go @@ -102,10 +102,7 @@ func (s *SMQSubscriber) Subscribe( // Create persistent ledger for offset mapping ledgerKey := fmt.Sprintf("%s-%d", kafkaTopic, kafkaPartition) - ledger, err := offset.NewPersistentLedger(ledgerKey, s.offsetStorage) - if err != nil { - return nil, fmt.Errorf("failed to create ledger: %w", err) - } + ledger := offset.NewPersistentLedger(ledgerKey, s.offsetStorage) // Create offset mapper offsetMapper := offset.NewKafkaToSMQMapper(ledger.Ledger)