|
@ -2,23 +2,22 @@ package command |
|
|
|
|
|
|
|
|
import ( |
|
|
import ( |
|
|
"github.com/seaweedfs/seaweedfs/weed/glog" |
|
|
"github.com/seaweedfs/seaweedfs/weed/glog" |
|
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/pb" |
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" |
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" |
|
|
"github.com/seaweedfs/seaweedfs/weed/util" |
|
|
"github.com/seaweedfs/seaweedfs/weed/util" |
|
|
"sync" |
|
|
"sync" |
|
|
) |
|
|
) |
|
|
|
|
|
|
|
|
type MetadataProcessFunc func(resp *filer_pb.SubscribeMetadataResponse) error |
|
|
|
|
|
|
|
|
|
|
|
type MetadataProcessor struct { |
|
|
type MetadataProcessor struct { |
|
|
activeJobs map[int64]*filer_pb.SubscribeMetadataResponse |
|
|
activeJobs map[int64]*filer_pb.SubscribeMetadataResponse |
|
|
activeJobsLock sync.Mutex |
|
|
activeJobsLock sync.Mutex |
|
|
activeJobsCond *sync.Cond |
|
|
activeJobsCond *sync.Cond |
|
|
concurrencyLimit int |
|
|
concurrencyLimit int |
|
|
fn MetadataProcessFunc |
|
|
|
|
|
|
|
|
fn pb.ProcessMetadataFunc |
|
|
processedTsWatermark int64 |
|
|
processedTsWatermark int64 |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
func NewMetadataProcessor(fn MetadataProcessFunc, concurrency int) *MetadataProcessor { |
|
|
|
|
|
|
|
|
func NewMetadataProcessor(fn pb.ProcessMetadataFunc, concurrency int) *MetadataProcessor { |
|
|
t := &MetadataProcessor{ |
|
|
t := &MetadataProcessor{ |
|
|
fn: fn, |
|
|
fn: fn, |
|
|
activeJobs: make(map[int64]*filer_pb.SubscribeMetadataResponse), |
|
|
activeJobs: make(map[int64]*filer_pb.SubscribeMetadataResponse), |
|
|