diff --git a/weed/filer/filer.go b/weed/filer/filer.go index d4c0b4eef..162db175a 100644 --- a/weed/filer/filer.go +++ b/weed/filer/filer.go @@ -42,6 +42,7 @@ type Filer struct { MetaAggregator *MetaAggregator Signature int32 FilerConf *FilerConf + RemoteStorage *FilerRemoteStorage } func NewFiler(masters []string, grpcDialOption grpc.DialOption, @@ -51,6 +52,7 @@ func NewFiler(masters []string, grpcDialOption grpc.DialOption, fileIdDeletionQueue: util.NewUnboundedQueue(), GrpcDialOption: grpcDialOption, FilerConf: NewFilerConf(), + RemoteStorage: NewFilerRemoteStorage(), } f.LocalMetaLogBuffer = log_buffer.NewLogBuffer("local", LogFlushInterval, f.logFlushFunc, notifyFn) f.metaLogCollection = collection diff --git a/weed/filer/filer_on_meta_event.go b/weed/filer/filer_on_meta_event.go index c9f75a5ca..32be4f180 100644 --- a/weed/filer/filer_on_meta_event.go +++ b/weed/filer/filer_on_meta_event.go @@ -12,6 +12,7 @@ import ( // onMetadataChangeEvent is triggered after filer processed change events from local or remote filers func (f *Filer) onMetadataChangeEvent(event *filer_pb.SubscribeMetadataResponse) { f.maybeReloadFilerConfiguration(event) + f.maybeReloadRemoteStorageConfigurationAndMapping(event) f.onBucketEvents(event) } @@ -80,3 +81,16 @@ func (f *Filer) LoadFilerConf() { } f.FilerConf = fc } + +//////////////////////////////////// +// load and maintain remote storages +//////////////////////////////////// +func (f *Filer) LoadRemoteStorageConfAndMapping() { + if err := f.RemoteStorage.LoadRemoteStorageConfigurationsAndMapping(f); err != nil { + glog.Errorf("read remote conf and mapping: %v", err) + return + } +} +func (f *Filer) maybeReloadRemoteStorageConfigurationAndMapping(event *filer_pb.SubscribeMetadataResponse) { + // FIXME add reloading +} diff --git a/weed/filer/filer_remote_storage.go b/weed/filer/filer_remote_storage.go index f6f3adb22..18b2676bc 100644 --- a/weed/filer/filer_remote_storage.go +++ b/weed/filer/filer_remote_storage.go @@ -31,7 +31,7 @@ func NewFilerRemoteStorage() (rs *FilerRemoteStorage) { return rs } -func (rs *FilerRemoteStorage) loadRemoteStorageConfigurations(filer *Filer) (err error) { +func (rs *FilerRemoteStorage) LoadRemoteStorageConfigurationsAndMapping(filer *Filer) (err error) { // execute this on filer entries, _, err := filer.ListDirectoryEntries(context.Background(), DirectoryEtcRemote, "", false, math.MaxInt64, "", "", "") @@ -74,7 +74,21 @@ func (rs *FilerRemoteStorage) mapDirectoryToRemoteStorage(dir util.FullPath, rem rs.rules.Put([]byte(dir+"/"), remoteStorageName) } -func (rs *FilerRemoteStorage) FindRemoteStorageClient(p util.FullPath) (client remote_storage.RemoteStorageClient, found bool) { +func (rs *FilerRemoteStorage) FindMountDirectory(p util.FullPath) (mountDir util.FullPath, remoteLocation remote_storage.RemoteStorageLocation) { + var storageLocation string + rs.rules.MatchPrefix([]byte(p), func(key []byte, value interface{}) bool { + mountDir = util.FullPath(string(key)) + storageLocation = value.(string) + return true + }) + if storageLocation == "" { + return + } + remoteLocation = remote_storage.RemoteStorageLocation(storageLocation) + return +} + +func (rs *FilerRemoteStorage) FindRemoteStorageClient(p util.FullPath) (client remote_storage.RemoteStorageClient, remoteConf *filer_pb.RemoteConf, found bool) { var storageLocation string rs.rules.MatchPrefix([]byte(p), func(key []byte, value interface{}) bool { storageLocation = value.(string) @@ -87,8 +101,12 @@ func (rs *FilerRemoteStorage) FindRemoteStorageClient(p util.FullPath) (client r storageName, _, _ := remote_storage.RemoteStorageLocation(storageLocation).NameBucketPath() - remoteConf, ok := rs.storageNameToConf[storageName] - if !ok { + return rs.GetRemoteStorageClient(storageName) +} + +func (rs *FilerRemoteStorage) GetRemoteStorageClient(storageName string) (client remote_storage.RemoteStorageClient, remoteConf *filer_pb.RemoteConf, found bool) { + remoteConf, found = rs.storageNameToConf[storageName] + if !found { return } diff --git a/weed/filer/filer_remote_storage_test.go b/weed/filer/filer_remote_storage_test.go index 1a41c6e63..e5996475e 100644 --- a/weed/filer/filer_remote_storage_test.go +++ b/weed/filer/filer_remote_storage_test.go @@ -16,15 +16,15 @@ func TestFilerRemoteStorage_FindRemoteStorageClient(t *testing.T) { rs.mapDirectoryToRemoteStorage("/a/b/c", "s7") - _, found := rs.FindRemoteStorageClient("/a/b/c/d/e/f") + _, _, found := rs.FindRemoteStorageClient("/a/b/c/d/e/f") assert.Equal(t, true, found, "find storage client") - _, found2 := rs.FindRemoteStorageClient("/a/b") + _, _, found2 := rs.FindRemoteStorageClient("/a/b") assert.Equal(t, false, found2, "should not find storage client") - _, found3 := rs.FindRemoteStorageClient("/a/b/c") + _, _, found3 := rs.FindRemoteStorageClient("/a/b/c") assert.Equal(t, false, found3, "should not find storage client") - _, found4 := rs.FindRemoteStorageClient("/a/b/cc") + _, _, found4 := rs.FindRemoteStorageClient("/a/b/cc") assert.Equal(t, false, found4, "should not find storage client") } \ No newline at end of file diff --git a/weed/filer/read_remote.go b/weed/filer/read_remote.go new file mode 100644 index 000000000..57450d6d8 --- /dev/null +++ b/weed/filer/read_remote.go @@ -0,0 +1,27 @@ +package filer + +import ( + "fmt" + "io" +) + +func (entry *Entry) IsRemoteOnly() bool { + return len(entry.Chunks) == 0 && entry.Remote != nil && entry.Remote.Size > 0 +} + +func (f *Filer) ReadRemote(w io.Writer, entry *Entry, offset int64, size int64) error { + client, _, found := f.RemoteStorage.GetRemoteStorageClient(remoteEntry.Remote.StorageName) + if !found { + return fmt.Errorf("remote storage %v not found", entry.Remote.StorageName) + } + + mountDir, remoteLoation := f.RemoteStorage.FindMountDirectory(entry.FullPath) + _, bucket, path := remoteLoation.NameBucketPath() + + remoteFullPath := path + string(entry.FullPath[len(mountDir):]) + + client.ReadFile(bucket, remoteFullPath[1:], offset, size, func(w io.Writer) error { + + }) + return nil +} diff --git a/weed/remote_storage/remote_storage.go b/weed/remote_storage/remote_storage.go index 8794e7268..b8c2b55ea 100644 --- a/weed/remote_storage/remote_storage.go +++ b/weed/remote_storage/remote_storage.go @@ -3,6 +3,7 @@ package remote_storage import ( "fmt" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "io" "strings" "sync" ) @@ -14,10 +15,10 @@ func (remote RemoteStorageLocation) NameBucketPath() (storageName, bucket, remot remote = remote[:len(remote)-1] } parts := strings.SplitN(string(remote), "/", 3) - if len(parts)>=1 { + if len(parts) >= 1 { storageName = parts[0] } - if len(parts)>=2 { + if len(parts) >= 2 { bucket = parts[1] } remotePath = string(remote[len(storageName)+1+len(bucket):]) @@ -31,6 +32,7 @@ type VisitFunc func(dir string, name string, isDirectory bool, remoteEntry *file type RemoteStorageClient interface { Traverse(remote RemoteStorageLocation, visitFn VisitFunc) error + ReadFile(bucket, key string, offset int64, size int64, writeFn func(w io.Writer) error) error } type RemoteStorageClientMaker interface { diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go index d7afaa65a..534bc4840 100644 --- a/weed/server/filer_server.go +++ b/weed/server/filer_server.go @@ -149,6 +149,8 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) fs.filer.LoadFilerConf() + fs.filer.LoadRemoteStorageConfAndMapping() + grace.OnInterrupt(func() { fs.filer.Shutdown() }) diff --git a/weed/server/filer_server_handlers_read.go b/weed/server/filer_server_handlers_read.go index 957e08855..add0be1f4 100644 --- a/weed/server/filer_server_handlers_read.go +++ b/weed/server/filer_server_handlers_read.go @@ -101,7 +101,7 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) //Seaweed custom header are not visible to Vue or javascript seaweedHeaders := []string{} - for header, _ := range w.Header() { + for header := range w.Header() { if strings.HasPrefix(header, "Seaweed-") { seaweedHeaders = append(seaweedHeaders, header) } @@ -163,9 +163,16 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) } return err } - err = filer.StreamContent(fs.filer.MasterClient, writer, entry.Chunks, offset, size) - if err != nil { - glog.Errorf("failed to stream content %s: %v", r.URL, err) + if entry.IsRemoteOnly() { + err = fs.filer.ReadRemote(writer, entry, offset, size) + if err != nil { + glog.Errorf("failed to read remote %s: %v", r.URL, err) + } + } else { + err = filer.StreamContent(fs.filer.MasterClient, writer, entry.Chunks, offset, size) + if err != nil { + glog.Errorf("failed to stream content %s: %v", r.URL, err) + } } return err })