Browse Source

Fix compilation errors in integration modules

- Fix NewPersistentLedger calls (returns 1 value, not 2)
- Fix GetStats calls (returns 3 values, not 4)
- Remove error handling for NewPersistentLedger since it doesn't return errors
- All Kafka integration modules now compile successfully
pull/7231/head
chrislu 2 months ago
parent
commit
8de1ce5497
  1. 8
      weed/mq/kafka/integration/persistent_handler.go
  2. 5
      weed/mq/kafka/integration/smq_publisher.go
  3. 5
      weed/mq/kafka/integration/smq_subscriber.go

8
weed/mq/kafka/integration/persistent_handler.go

@ -133,10 +133,7 @@ func (h *PersistentKafkaHandler) GetOrCreateLedger(topic string, partition int32
}
// Create persistent ledger
ledger, err := offset.NewPersistentLedger(key, h.offsetStorage)
if err != nil {
return nil, fmt.Errorf("failed to create persistent ledger: %w", err)
}
ledger := offset.NewPersistentLedger(key, h.offsetStorage)
h.ledgers[key] = ledger
return ledger, nil
@ -270,7 +267,8 @@ func (h *PersistentKafkaHandler) GetStats() map[string]interface{} {
h.ledgersMu.RLock()
ledgerStats := make(map[string]interface{})
for key, ledger := range h.ledgers {
entryCount, earliestTime, latestTime, nextOffset := ledger.GetStats()
entryCount, earliestTime, latestTime := ledger.GetStats()
nextOffset := ledger.GetHighWaterMark()
ledgerStats[key] = map[string]interface{}{
"entry_count": entryCount,
"earliest_time": earliestTime,

5
weed/mq/kafka/integration/smq_publisher.go

@ -169,10 +169,7 @@ func (p *SMQPublisher) getOrCreateLedger(kafkaTopic string, partition int32) (*o
}
// Create persistent ledger
ledger, err := offset.NewPersistentLedger(key, p.offsetStorage)
if err != nil {
return nil, fmt.Errorf("failed to create persistent ledger: %w", err)
}
ledger := offset.NewPersistentLedger(key, p.offsetStorage)
p.ledgers[key] = ledger
return ledger, nil

5
weed/mq/kafka/integration/smq_subscriber.go

@ -102,10 +102,7 @@ func (s *SMQSubscriber) Subscribe(
// Create persistent ledger for offset mapping
ledgerKey := fmt.Sprintf("%s-%d", kafkaTopic, kafkaPartition)
ledger, err := offset.NewPersistentLedger(ledgerKey, s.offsetStorage)
if err != nil {
return nil, fmt.Errorf("failed to create ledger: %w", err)
}
ledger := offset.NewPersistentLedger(ledgerKey, s.offsetStorage)
// Create offset mapper
offsetMapper := offset.NewKafkaToSMQMapper(ledger.Ledger)

Loading…
Cancel
Save