|  |  | @ -8,6 +8,10 @@ import ( | 
			
		
	
		
			
				
					|  |  |  | 	"github.com/chrislusf/seaweedfs/weed/pb/filer_pb" | 
			
		
	
		
			
				
					|  |  |  | 	"github.com/chrislusf/seaweedfs/weed/util" | 
			
		
	
		
			
				
					|  |  |  | 	"github.com/golang/protobuf/proto" | 
			
		
	
		
			
				
					|  |  |  | 	"io/ioutil" | 
			
		
	
		
			
				
					|  |  |  | 	"encoding/json" | 
			
		
	
		
			
				
					|  |  |  | 	"sync" | 
			
		
	
		
			
				
					|  |  |  | 	"time" | 
			
		
	
		
			
				
					|  |  |  | ) | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | func init() { | 
			
		
	
	
		
			
				
					|  |  | @ -30,10 +34,12 @@ func (k *KafkaInput) Initialize(configuration util.Configuration) error { | 
			
		
	
		
			
				
					|  |  |  | 	return k.initialize( | 
			
		
	
		
			
				
					|  |  |  | 		configuration.GetStringSlice("hosts"), | 
			
		
	
		
			
				
					|  |  |  | 		configuration.GetString("topic"), | 
			
		
	
		
			
				
					|  |  |  | 		configuration.GetString("offsetFile"), | 
			
		
	
		
			
				
					|  |  |  | 		configuration.GetInt("offsetSaveIntervalSeconds"), | 
			
		
	
		
			
				
					|  |  |  | 	) | 
			
		
	
		
			
				
					|  |  |  | } | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | func (k *KafkaInput) initialize(hosts []string, topic string) (err error) { | 
			
		
	
		
			
				
					|  |  |  | func (k *KafkaInput) initialize(hosts []string, topic string, offsetFile string, offsetSaveIntervalSeconds int) (err error) { | 
			
		
	
		
			
				
					|  |  |  | 	config := sarama.NewConfig() | 
			
		
	
		
			
				
					|  |  |  | 	config.Consumer.Return.Errors = true | 
			
		
	
		
			
				
					|  |  |  | 	k.consumer, err = sarama.NewConsumer(hosts, config) | 
			
		
	
	
		
			
				
					|  |  | @ -51,8 +57,25 @@ func (k *KafkaInput) initialize(hosts []string, topic string) (err error) { | 
			
		
	
		
			
				
					|  |  |  | 		panic(err) | 
			
		
	
		
			
				
					|  |  |  | 	} | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | 	progress := loadProgress(offsetFile) | 
			
		
	
		
			
				
					|  |  |  | 	if progress == nil || progress.Topic != topic { | 
			
		
	
		
			
				
					|  |  |  | 		progress = &KafkaProgress{ | 
			
		
	
		
			
				
					|  |  |  | 			Topic:            topic, | 
			
		
	
		
			
				
					|  |  |  | 			PartitionOffsets: make(map[int32]int64), | 
			
		
	
		
			
				
					|  |  |  | 		} | 
			
		
	
		
			
				
					|  |  |  | 	} | 
			
		
	
		
			
				
					|  |  |  | 	progress.lastSaveTime = time.Now() | 
			
		
	
		
			
				
					|  |  |  | 	progress.offsetFile = offsetFile | 
			
		
	
		
			
				
					|  |  |  | 	progress.offsetSaveIntervalSeconds = offsetSaveIntervalSeconds | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | 	for _, partition := range partitions { | 
			
		
	
		
			
				
					|  |  |  | 		partitionConsumer, err := k.consumer.ConsumePartition(topic, partition, sarama.OffsetNewest) | 
			
		
	
		
			
				
					|  |  |  | 		offset, found := progress.PartitionOffsets[partition] | 
			
		
	
		
			
				
					|  |  |  | 		if !found { | 
			
		
	
		
			
				
					|  |  |  | 			offset = sarama.OffsetOldest | 
			
		
	
		
			
				
					|  |  |  | 		} else { | 
			
		
	
		
			
				
					|  |  |  | 			offset += 1 | 
			
		
	
		
			
				
					|  |  |  | 		} | 
			
		
	
		
			
				
					|  |  |  | 		partitionConsumer, err := k.consumer.ConsumePartition(topic, partition, offset) | 
			
		
	
		
			
				
					|  |  |  | 		if err != nil { | 
			
		
	
		
			
				
					|  |  |  | 			panic(err) | 
			
		
	
		
			
				
					|  |  |  | 		} | 
			
		
	
	
		
			
				
					|  |  | @ -63,6 +86,9 @@ func (k *KafkaInput) initialize(hosts []string, topic string) (err error) { | 
			
		
	
		
			
				
					|  |  |  | 					fmt.Println(err) | 
			
		
	
		
			
				
					|  |  |  | 				case msg := <-partitionConsumer.Messages(): | 
			
		
	
		
			
				
					|  |  |  | 					k.messageChan <- msg | 
			
		
	
		
			
				
					|  |  |  | 					if err := progress.setOffset(msg.Partition, msg.Offset); err != nil { | 
			
		
	
		
			
				
					|  |  |  | 						glog.Warningf("set kafka offset: %v", err) | 
			
		
	
		
			
				
					|  |  |  | 					} | 
			
		
	
		
			
				
					|  |  |  | 				} | 
			
		
	
		
			
				
					|  |  |  | 			} | 
			
		
	
		
			
				
					|  |  |  | 		}() | 
			
		
	
	
		
			
				
					|  |  | @ -81,3 +107,52 @@ func (k *KafkaInput) ReceiveMessage() (key string, message *filer_pb.EventNotifi | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | 	return | 
			
		
	
		
			
				
					|  |  |  | } | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | type KafkaProgress struct { | 
			
		
	
		
			
				
					|  |  |  | 	Topic                     string          `json:"topic"` | 
			
		
	
		
			
				
					|  |  |  | 	PartitionOffsets          map[int32]int64 `json:"partitionOffsets"` | 
			
		
	
		
			
				
					|  |  |  | 	offsetFile                string | 
			
		
	
		
			
				
					|  |  |  | 	lastSaveTime              time.Time | 
			
		
	
		
			
				
					|  |  |  | 	offsetSaveIntervalSeconds int | 
			
		
	
		
			
				
					|  |  |  | 	sync.Mutex | 
			
		
	
		
			
				
					|  |  |  | } | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | func loadProgress(offsetFile string) *KafkaProgress { | 
			
		
	
		
			
				
					|  |  |  | 	progress := &KafkaProgress{} | 
			
		
	
		
			
				
					|  |  |  | 	data, err := ioutil.ReadFile(offsetFile) | 
			
		
	
		
			
				
					|  |  |  | 	if err != nil { | 
			
		
	
		
			
				
					|  |  |  | 		glog.Warningf("failed to read kafka progress file: %s", offsetFile) | 
			
		
	
		
			
				
					|  |  |  | 		return nil | 
			
		
	
		
			
				
					|  |  |  | 	} | 
			
		
	
		
			
				
					|  |  |  | 	err = json.Unmarshal(data, progress) | 
			
		
	
		
			
				
					|  |  |  | 	if err != nil { | 
			
		
	
		
			
				
					|  |  |  | 		glog.Warningf("failed to read kafka progress message: %s", string(data)) | 
			
		
	
		
			
				
					|  |  |  | 		return nil | 
			
		
	
		
			
				
					|  |  |  | 	} | 
			
		
	
		
			
				
					|  |  |  | 	return progress | 
			
		
	
		
			
				
					|  |  |  | } | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | func (progress *KafkaProgress) saveProgress() error { | 
			
		
	
		
			
				
					|  |  |  | 	data, err := json.Marshal(progress) | 
			
		
	
		
			
				
					|  |  |  | 	if err != nil { | 
			
		
	
		
			
				
					|  |  |  | 		return fmt.Errorf("failed to marshal progress: %v", err) | 
			
		
	
		
			
				
					|  |  |  | 	} | 
			
		
	
		
			
				
					|  |  |  | 	err = ioutil.WriteFile(progress.offsetFile, data, 0640) | 
			
		
	
		
			
				
					|  |  |  | 	if err != nil { | 
			
		
	
		
			
				
					|  |  |  | 		return fmt.Errorf("failed to save progress to %s: %v", progress.offsetFile, err) | 
			
		
	
		
			
				
					|  |  |  | 	} | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | 	progress.lastSaveTime = time.Now() | 
			
		
	
		
			
				
					|  |  |  | 	return nil | 
			
		
	
		
			
				
					|  |  |  | } | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | func (progress *KafkaProgress) setOffset(parition int32, offset int64) error { | 
			
		
	
		
			
				
					|  |  |  | 	progress.Lock() | 
			
		
	
		
			
				
					|  |  |  | 	defer progress.Unlock() | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | 	progress.PartitionOffsets[parition] = offset | 
			
		
	
		
			
				
					|  |  |  | 	if int(time.Now().Sub(progress.lastSaveTime).Seconds()) > progress.offsetSaveIntervalSeconds { | 
			
		
	
		
			
				
					|  |  |  | 		return progress.saveProgress() | 
			
		
	
		
			
				
					|  |  |  | 	} | 
			
		
	
		
			
				
					|  |  |  | 	return nil | 
			
		
	
		
			
				
					|  |  |  | } |