diff --git a/unmaintained/see_log_entry/see_log_entry.go b/unmaintained/see_log_entry/see_log_entry.go index b7d724344..69503b65d 100644 --- a/unmaintained/see_log_entry/see_log_entry.go +++ b/unmaintained/see_log_entry/see_log_entry.go @@ -9,12 +9,13 @@ import ( "github.com/golang/protobuf/proto" + "github.com/chrislusf/seaweedfs/weed/filer2" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/util" ) var ( - logdataFile = flag.String("logdata", "", "log data file saved under /.meta/log/...") + logdataFile = flag.String("logdata", "", "log data file saved under "+ filer2.SystemLogDir) ) func main() { diff --git a/weed/filer2/filer.go b/weed/filer2/filer.go index df6379d26..acd609847 100644 --- a/weed/filer2/filer.go +++ b/weed/filer2/filer.go @@ -36,9 +36,11 @@ type Filer struct { buckets *FilerBuckets Cipher bool metaLogBuffer *log_buffer.LogBuffer + metaLogCollection string + metaLogReplication string } -func NewFiler(masters []string, grpcDialOption grpc.DialOption, filerGrpcPort uint32, notifyFn func()) *Filer { +func NewFiler(masters []string, grpcDialOption grpc.DialOption, filerGrpcPort uint32, collection string, replication string, notifyFn func()) *Filer { f := &Filer{ directoryCache: ccache.New(ccache.Configure().MaxSize(1000).ItemsToPrune(100)), MasterClient: wdclient.NewMasterClient(grpcDialOption, "filer", filerGrpcPort, masters), @@ -46,6 +48,8 @@ func NewFiler(masters []string, grpcDialOption grpc.DialOption, filerGrpcPort ui GrpcDialOption: grpcDialOption, } f.metaLogBuffer = log_buffer.NewLogBuffer(time.Minute, f.logFlushFunc, notifyFn) + f.metaLogCollection = collection + f.metaLogReplication = replication go f.loopProcessingDeletion() diff --git a/weed/filer2/filer_notify.go b/weed/filer2/filer_notify.go index de07e1cf9..18f96658b 100644 --- a/weed/filer2/filer_notify.go +++ b/weed/filer2/filer_notify.go @@ -25,7 +25,7 @@ func (f *Filer) NotifyUpdateEvent(oldEntry, newEntry *Entry, deleteChunks bool) // println("fullpath:", fullpath) - if strings.HasPrefix(fullpath, "/.meta") { + if strings.HasPrefix(fullpath, SystemLogDir) { return } @@ -69,11 +69,10 @@ func (f *Filer) logMetaEvent(fullpath string, eventNotification *filer_pb.EventN func (f *Filer) logFlushFunc(startTime, stopTime time.Time, buf []byte) { - return - - targetFile := fmt.Sprintf("/.meta/log/%04d/%02d/%02d/%02d/%02d/%02d.%09d.log", + targetFile := fmt.Sprintf("%s/%04d-%02d-%02d/%02d-%02d.segment", SystemLogDir, startTime.Year(), startTime.Month(), startTime.Day(), startTime.Hour(), startTime.Minute(), - startTime.Second(), startTime.Nanosecond()) + // startTime.Second(), startTime.Nanosecond(), + ) if err := f.appendToFile(targetFile, buf); err != nil { glog.V(0).Infof("log write failed %s: %v", targetFile, err) diff --git a/weed/filer2/filer_notify_append.go b/weed/filer2/filer_notify_append.go index 4c134ae66..0e6e8d50f 100644 --- a/weed/filer2/filer_notify_append.go +++ b/weed/filer2/filer_notify_append.go @@ -13,25 +13,10 @@ import ( func (f *Filer) appendToFile(targetFile string, data []byte) error { - // assign a volume location - assignRequest := &operation.VolumeAssignRequest{ - Count: 1, - } - assignResult, err := operation.Assign(f.GetMaster(), f.GrpcDialOption, assignRequest) - if err != nil { - return fmt.Errorf("AssignVolume: %v", err) - } - if assignResult.Error != "" { - return fmt.Errorf("AssignVolume error: %v", assignResult.Error) - } - - // upload data - targetUrl := "http://" + assignResult.Url + "/" + assignResult.Fid - uploadResult, err := operation.UploadData(targetUrl, "", false, data, false, "", nil, assignResult.Auth) - if err != nil { - return fmt.Errorf("upload data %s: %v", targetUrl, err) + assignResult, err, uploadResult, err2 := f.assignAndUpload(data) + if err2 != nil { + return err2 } - // println("uploaded to", targetUrl) // find out existing entry fullpath := util.FullPath(targetFile) @@ -68,3 +53,29 @@ func (f *Filer) appendToFile(targetFile string, data []byte) error { return err } + +func (f *Filer) assignAndUpload(data []byte) (*operation.AssignResult, error, *operation.UploadResult, error) { + // assign a volume location + assignRequest := &operation.VolumeAssignRequest{ + Count: 1, + Collection: f.metaLogCollection, + Replication: f.metaLogReplication, + WritableVolumeCount: 1, + } + assignResult, err := operation.Assign(f.GetMaster(), f.GrpcDialOption, assignRequest) + if err != nil { + return nil, nil, nil, fmt.Errorf("AssignVolume: %v", err) + } + if assignResult.Error != "" { + return nil, nil, nil, fmt.Errorf("AssignVolume error: %v", assignResult.Error) + } + + // upload data + targetUrl := "http://" + assignResult.Url + "/" + assignResult.Fid + uploadResult, err := operation.UploadData(targetUrl, "", false, data, false, "", nil, assignResult.Auth) + if err != nil { + return nil, nil, nil, fmt.Errorf("upload data %s: %v", targetUrl, err) + } + // println("uploaded to", targetUrl) + return assignResult, err, uploadResult, nil +} diff --git a/weed/filer2/leveldb/leveldb_store_test.go b/weed/filer2/leveldb/leveldb_store_test.go index 21d126322..4f415bb9c 100644 --- a/weed/filer2/leveldb/leveldb_store_test.go +++ b/weed/filer2/leveldb/leveldb_store_test.go @@ -11,7 +11,7 @@ import ( ) func TestCreateAndFind(t *testing.T) { - filer := filer2.NewFiler(nil, nil, 0, nil) + filer := filer2.NewFiler(nil, nil, 0, "", "", nil) dir, _ := ioutil.TempDir("", "seaweedfs_filer_test") defer os.RemoveAll(dir) store := &LevelDBStore{} @@ -66,7 +66,7 @@ func TestCreateAndFind(t *testing.T) { } func TestEmptyRoot(t *testing.T) { - filer := filer2.NewFiler(nil, nil, 0, nil) + filer := filer2.NewFiler(nil, nil, 0, "", "", nil) dir, _ := ioutil.TempDir("", "seaweedfs_filer_test2") defer os.RemoveAll(dir) store := &LevelDBStore{} diff --git a/weed/filer2/leveldb2/leveldb2_store_test.go b/weed/filer2/leveldb2/leveldb2_store_test.go index 324b07d6c..d4ab2c163 100644 --- a/weed/filer2/leveldb2/leveldb2_store_test.go +++ b/weed/filer2/leveldb2/leveldb2_store_test.go @@ -11,7 +11,7 @@ import ( ) func TestCreateAndFind(t *testing.T) { - filer := filer2.NewFiler(nil, nil, 0, nil) + filer := filer2.NewFiler(nil, nil, 0, "", "", nil) dir, _ := ioutil.TempDir("", "seaweedfs_filer_test") defer os.RemoveAll(dir) store := &LevelDB2Store{} @@ -66,7 +66,7 @@ func TestCreateAndFind(t *testing.T) { } func TestEmptyRoot(t *testing.T) { - filer := filer2.NewFiler(nil, nil, 0, nil) + filer := filer2.NewFiler(nil, nil, 0, "", "", nil) dir, _ := ioutil.TempDir("", "seaweedfs_filer_test2") defer os.RemoveAll(dir) store := &LevelDB2Store{} diff --git a/weed/filer2/topics.go b/weed/filer2/topics.go new file mode 100644 index 000000000..8ab409774 --- /dev/null +++ b/weed/filer2/topics.go @@ -0,0 +1,6 @@ +package filer2 + +const( + TopicsDir = "/topics" + SystemLogDir = TopicsDir + "/.system/log" +) diff --git a/weed/server/filer_grpc_server_listen.go b/weed/server/filer_grpc_server_listen.go index 9c98f74e5..e2d932a99 100644 --- a/weed/server/filer_grpc_server_listen.go +++ b/weed/server/filer_grpc_server_listen.go @@ -4,6 +4,7 @@ import ( "strings" "time" + "github.com/chrislusf/seaweedfs/weed/filer2" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/util" @@ -37,7 +38,7 @@ func (fs *FilerServer) ListenForEvents(req *filer_pb.ListenForEventsRequest, str fullpath := util.Join(dirPath, entryName) // skip on filer internal meta logs - if strings.HasPrefix(fullpath, "/.meta") { + if strings.HasPrefix(fullpath, filer2.SystemLogDir) { return nil } diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go index d8761a538..96a5545f4 100644 --- a/weed/server/filer_server.go +++ b/weed/server/filer_server.go @@ -73,7 +73,7 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) glog.Fatal("master list is required!") } - fs.filer = filer2.NewFiler(option.Masters, fs.grpcDialOption, option.Port+10000, fs.notifyMetaListeners) + fs.filer = filer2.NewFiler(option.Masters, fs.grpcDialOption, option.Port+10000, option.Collection, option.DefaultReplication, fs.notifyMetaListeners) fs.filer.Cipher = option.Cipher maybeStartMetrics(fs, option)