|
|
@ -19,6 +19,7 @@ import ( |
|
|
|
const ( |
|
|
|
LogFlushInterval = time.Minute |
|
|
|
PaginationSize = 1024 * 256 |
|
|
|
FilerStoreId = "filer.store.id" |
|
|
|
) |
|
|
|
|
|
|
|
var ( |
|
|
@ -48,7 +49,6 @@ func NewFiler(masters []string, grpcDialOption grpc.DialOption, |
|
|
|
MasterClient: wdclient.NewMasterClient(grpcDialOption, "filer", filerHost, filerGrpcPort, masters), |
|
|
|
fileIdDeletionQueue: util.NewUnboundedQueue(), |
|
|
|
GrpcDialOption: grpcDialOption, |
|
|
|
Signature: util.RandomInt32(), |
|
|
|
} |
|
|
|
f.LocalMetaLogBuffer = log_buffer.NewLogBuffer(LogFlushInterval, f.logFlushFunc, notifyFn) |
|
|
|
f.metaLogCollection = collection |
|
|
@ -72,6 +72,27 @@ func (f *Filer) AggregateFromPeers(self string, filers []string) { |
|
|
|
|
|
|
|
func (f *Filer) SetStore(store FilerStore) { |
|
|
|
f.Store = NewFilerStoreWrapper(store) |
|
|
|
|
|
|
|
f.setOrLoadFilerStoreSignature(store) |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
func (f *Filer) setOrLoadFilerStoreSignature(store FilerStore) { |
|
|
|
storeIdBytes, err := store.KvGet(context.Background(), []byte(FilerStoreId)) |
|
|
|
if err == ErrKvNotFound || err == nil && len(storeIdBytes) == 0 { |
|
|
|
f.Signature = util.RandomInt32() |
|
|
|
storeIdBytes = make([]byte, 4) |
|
|
|
util.Uint32toBytes(storeIdBytes, uint32(f.Signature)) |
|
|
|
if err = store.KvPut(context.Background(), []byte(FilerStoreId), storeIdBytes); err != nil { |
|
|
|
glog.Fatalf("set %s=%d : %v", FilerStoreId, f.Signature, err) |
|
|
|
} |
|
|
|
glog.V(0).Infof("create %s to %d", FilerStoreId, f.Signature) |
|
|
|
} else if err == nil && len(storeIdBytes) == 4 { |
|
|
|
f.Signature = int32(util.BytesToUint32(storeIdBytes)) |
|
|
|
glog.V(0).Infof("existing %s = %d", FilerStoreId, f.Signature) |
|
|
|
} else { |
|
|
|
glog.Fatalf("read %v=%v : %v", FilerStoreId, string(storeIdBytes), err) |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
func (f *Filer) GetStore() (store FilerStore) { |
|
|
|