|
|
package sub
import ( "encoding/json" "fmt" "io/ioutil" "sync" "time"
"github.com/Shopify/sarama" "github.com/chrislusf/seaweedfs/weed/util/log" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/util" "github.com/golang/protobuf/proto" )
func init() { NotificationInputs = append(NotificationInputs, &KafkaInput{}) }
type KafkaInput struct { topic string consumer sarama.Consumer messageChan chan *sarama.ConsumerMessage }
func (k *KafkaInput) GetName() string { return "kafka" }
func (k *KafkaInput) Initialize(configuration util.Configuration, prefix string) error { log.Infof("replication.notification.kafka.hosts: %v\n", configuration.GetStringSlice(prefix+"hosts")) log.Infof("replication.notification.kafka.topic: %v\n", configuration.GetString(prefix+"topic")) return k.initialize( configuration.GetStringSlice(prefix+"hosts"), configuration.GetString(prefix+"topic"), configuration.GetString(prefix+"offsetFile"), configuration.GetInt(prefix+"offsetSaveIntervalSeconds"), ) }
func (k *KafkaInput) initialize(hosts []string, topic string, offsetFile string, offsetSaveIntervalSeconds int) (err error) { config := sarama.NewConfig() config.Consumer.Return.Errors = true k.consumer, err = sarama.NewConsumer(hosts, config) if err != nil { panic(err) } else { log.Infof("connected to %v", hosts) }
k.topic = topic k.messageChan = make(chan *sarama.ConsumerMessage, 1)
partitions, err := k.consumer.Partitions(topic) if err != nil { panic(err) }
progress := loadProgress(offsetFile) if progress == nil || progress.Topic != topic { progress = &KafkaProgress{ Topic: topic, PartitionOffsets: make(map[int32]int64), } } progress.lastSaveTime = time.Now() progress.offsetFile = offsetFile progress.offsetSaveIntervalSeconds = offsetSaveIntervalSeconds
for _, partition := range partitions { offset, found := progress.PartitionOffsets[partition] if !found { offset = sarama.OffsetOldest } else { offset += 1 } partitionConsumer, err := k.consumer.ConsumePartition(topic, partition, offset) if err != nil { panic(err) } go func() { for { select { case err := <-partitionConsumer.Errors(): fmt.Println(err) case msg := <-partitionConsumer.Messages(): k.messageChan <- msg if err := progress.setOffset(msg.Partition, msg.Offset); err != nil { log.Warnf("set kafka offset: %v", err) } } } }() }
return nil }
func (k *KafkaInput) ReceiveMessage() (key string, message *filer_pb.EventNotification, err error) {
msg := <-k.messageChan
key = string(msg.Key) message = &filer_pb.EventNotification{} err = proto.Unmarshal(msg.Value, message)
return }
type KafkaProgress struct { Topic string `json:"topic"` PartitionOffsets map[int32]int64 `json:"partitionOffsets"` offsetFile string lastSaveTime time.Time offsetSaveIntervalSeconds int sync.Mutex }
func loadProgress(offsetFile string) *KafkaProgress { progress := &KafkaProgress{} data, err := ioutil.ReadFile(offsetFile) if err != nil { log.Warnf("failed to read kafka progress file: %s", offsetFile) return nil } err = json.Unmarshal(data, progress) if err != nil { log.Warnf("failed to read kafka progress message: %s", string(data)) return nil } return progress }
func (progress *KafkaProgress) saveProgress() error { data, err := json.Marshal(progress) if err != nil { return fmt.Errorf("failed to marshal progress: %v", err) } err = ioutil.WriteFile(progress.offsetFile, data, 0640) if err != nil { return fmt.Errorf("failed to save progress to %s: %v", progress.offsetFile, err) }
progress.lastSaveTime = time.Now() return nil }
func (progress *KafkaProgress) setOffset(parition int32, offset int64) error { progress.Lock() defer progress.Unlock()
progress.PartitionOffsets[parition] = offset if int(time.Now().Sub(progress.lastSaveTime).Seconds()) > progress.offsetSaveIntervalSeconds { return progress.saveProgress() } return nil }
|