You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

79 lines
2.0 KiB

6 years ago
6 years ago
  1. package kafka
  2. import (
  3. "github.com/Shopify/sarama"
  4. "github.com/chrislusf/seaweedfs/weed/glog"
  5. "github.com/chrislusf/seaweedfs/weed/notification"
  6. "github.com/chrislusf/seaweedfs/weed/util"
  7. "github.com/golang/protobuf/proto"
  8. )
  9. func init() {
  10. notification.MessageQueues = append(notification.MessageQueues, &KafkaQueue{})
  11. }
  12. type KafkaQueue struct {
  13. topic string
  14. producer sarama.AsyncProducer
  15. }
  16. func (k *KafkaQueue) GetName() string {
  17. return "kafka"
  18. }
  19. func (k *KafkaQueue) Initialize(configuration util.Configuration) (err error) {
  20. glog.V(0).Infof("filer.notification.kafka.hosts: %v\n", configuration.GetStringSlice("hosts"))
  21. glog.V(0).Infof("filer.notification.kafka.topic: %v\n", configuration.GetString("topic"))
  22. return k.initialize(
  23. configuration.GetStringSlice("hosts"),
  24. configuration.GetString("topic"),
  25. )
  26. }
  27. func (k *KafkaQueue) initialize(hosts []string, topic string) (err error) {
  28. config := sarama.NewConfig()
  29. config.Producer.RequiredAcks = sarama.WaitForLocal
  30. config.Producer.Partitioner = sarama.NewHashPartitioner
  31. config.Producer.Return.Successes = true
  32. config.Producer.Return.Errors = true
  33. k.producer, err = sarama.NewAsyncProducer(hosts, config)
  34. k.topic = topic
  35. go k.handleSuccess()
  36. go k.handleError()
  37. return nil
  38. }
  39. func (k *KafkaQueue) SendMessage(key string, message proto.Message) (err error) {
  40. bytes, err := proto.Marshal(message)
  41. if err != nil {
  42. return
  43. }
  44. msg := &sarama.ProducerMessage{
  45. Topic: k.topic,
  46. Key: sarama.StringEncoder(key),
  47. Value: sarama.ByteEncoder(bytes),
  48. }
  49. k.producer.Input() <- msg
  50. return nil
  51. }
  52. func (k *KafkaQueue) handleSuccess() {
  53. for {
  54. pm := <-k.producer.Successes()
  55. if pm != nil {
  56. glog.V(3).Infof("producer message success, partition:%d offset:%d key:%v", pm.Partition, pm.Offset, pm.Key)
  57. }
  58. }
  59. }
  60. func (k *KafkaQueue) handleError() {
  61. for {
  62. err := <-k.producer.Errors()
  63. if err != nil {
  64. glog.Errorf("producer message error, partition:%d offset:%d key:%v valus:%s error(%v) topic:%s", err.Msg.Partition, err.Msg.Offset, err.Msg.Key, err.Msg.Value, err.Err, k.topic)
  65. }
  66. }
  67. }