diff --git a/weed/mq/kafka/integration/persistent_handler.go b/weed/mq/kafka/integration/persistent_handler.go index fc2df4c0a..405fda877 100644 --- a/weed/mq/kafka/integration/persistent_handler.go +++ b/weed/mq/kafka/integration/persistent_handler.go @@ -133,10 +133,7 @@ func (h *PersistentKafkaHandler) GetOrCreateLedger(topic string, partition int32 } // Create persistent ledger - ledger, err := offset.NewPersistentLedger(key, h.offsetStorage) - if err != nil { - return nil, fmt.Errorf("failed to create persistent ledger: %w", err) - } + ledger := offset.NewPersistentLedger(key, h.offsetStorage) h.ledgers[key] = ledger return ledger, nil @@ -270,7 +267,8 @@ func (h *PersistentKafkaHandler) GetStats() map[string]interface{} { h.ledgersMu.RLock() ledgerStats := make(map[string]interface{}) for key, ledger := range h.ledgers { - entryCount, earliestTime, latestTime, nextOffset := ledger.GetStats() + entryCount, earliestTime, latestTime := ledger.GetStats() + nextOffset := ledger.GetHighWaterMark() ledgerStats[key] = map[string]interface{}{ "entry_count": entryCount, "earliest_time": earliestTime, diff --git a/weed/mq/kafka/integration/smq_publisher.go b/weed/mq/kafka/integration/smq_publisher.go index bb7e798b6..3e502aed8 100644 --- a/weed/mq/kafka/integration/smq_publisher.go +++ b/weed/mq/kafka/integration/smq_publisher.go @@ -169,10 +169,7 @@ func (p *SMQPublisher) getOrCreateLedger(kafkaTopic string, partition int32) (*o } // Create persistent ledger - ledger, err := offset.NewPersistentLedger(key, p.offsetStorage) - if err != nil { - return nil, fmt.Errorf("failed to create persistent ledger: %w", err) - } + ledger := offset.NewPersistentLedger(key, p.offsetStorage) p.ledgers[key] = ledger return ledger, nil diff --git a/weed/mq/kafka/integration/smq_subscriber.go b/weed/mq/kafka/integration/smq_subscriber.go index 8e6de1c7b..c9cbd636a 100644 --- a/weed/mq/kafka/integration/smq_subscriber.go +++ b/weed/mq/kafka/integration/smq_subscriber.go @@ -102,10 +102,7 @@ func (s *SMQSubscriber) Subscribe( // Create persistent ledger for offset mapping ledgerKey := fmt.Sprintf("%s-%d", kafkaTopic, kafkaPartition) - ledger, err := offset.NewPersistentLedger(ledgerKey, s.offsetStorage) - if err != nil { - return nil, fmt.Errorf("failed to create ledger: %w", err) - } + ledger := offset.NewPersistentLedger(ledgerKey, s.offsetStorage) // Create offset mapper offsetMapper := offset.NewKafkaToSMQMapper(ledger.Ledger)