|
|
@ -21,6 +21,8 @@ func (k *KafkaQueue) GetName() string { |
|
|
|
} |
|
|
|
|
|
|
|
func (k *KafkaQueue) Initialize(configuration msgqueue.Configuration) (err error) { |
|
|
|
glog.V(0).Infof("filer.msgqueue.kafka.hosts: %v\n", configuration.GetStringSlice("hosts")) |
|
|
|
glog.V(0).Infof("filer.msgqueue.kafka.topic: %v\n", configuration.GetString("topic")) |
|
|
|
return k.initialize( |
|
|
|
configuration.GetStringSlice("hosts"), |
|
|
|
configuration.GetString("topic"), |
|
|
@ -34,6 +36,7 @@ func (k *KafkaQueue) initialize(hosts []string, topic string) (err error) { |
|
|
|
config.Producer.Return.Successes = true |
|
|
|
config.Producer.Return.Errors = true |
|
|
|
k.producer, err = sarama.NewAsyncProducer(hosts, config) |
|
|
|
k.topic = topic |
|
|
|
go k.handleSuccess() |
|
|
|
go k.handleError() |
|
|
|
return nil |
|
|
@ -60,7 +63,7 @@ func (k *KafkaQueue) handleSuccess() { |
|
|
|
for { |
|
|
|
pm := <-k.producer.Successes() |
|
|
|
if pm != nil { |
|
|
|
glog.Infof("producer message success, partition:%d offset:%d key:%v valus:%s", pm.Partition, pm.Offset, pm.Key, pm.Value) |
|
|
|
glog.V(3).Infof("producer message success, partition:%d offset:%d key:%v", pm.Partition, pm.Offset, pm.Key) |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
@ -69,7 +72,7 @@ func (k *KafkaQueue) handleError() { |
|
|
|
for { |
|
|
|
err := <-k.producer.Errors() |
|
|
|
if err != nil { |
|
|
|
glog.Errorf("producer message error, partition:%d offset:%d key:%v valus:%s error(%v)", err.Msg.Partition, err.Msg.Offset, err.Msg.Key, err.Msg.Value, err.Err) |
|
|
|
glog.Errorf("producer message error, partition:%d offset:%d key:%v valus:%s error(%v) topic:%s", err.Msg.Partition, err.Msg.Offset, err.Msg.Key, err.Msg.Value, err.Err, k.topic) |
|
|
|
} |
|
|
|
} |
|
|
|
} |