diff --git a/weed/filer/entry.go b/weed/filer/entry.go index d2f257967..dbe10c9b1 100644 --- a/weed/filer/entry.go +++ b/weed/filer/entry.go @@ -44,7 +44,7 @@ type Entry struct { } func (entry *Entry) Size() uint64 { - return maxUint64(TotalSize(entry.Chunks), entry.FileSize) + return maxUint64(maxUint64(TotalSize(entry.Chunks), entry.FileSize), uint64(len(entry.Content))) } func (entry *Entry) Timestamp() time.Time { diff --git a/weed/filer/read_write.go b/weed/filer/read_write.go index 6e2f58850..ddc625c86 100644 --- a/weed/filer/read_write.go +++ b/weed/filer/read_write.go @@ -9,6 +9,7 @@ import ( "io/ioutil" "math" "net/http" + "time" ) func ReadEntry(masterClient *wdclient.MasterClient, filerClient filer_pb.SeaweedFilerClient, dir, name string, byteBuffer *bytes.Buffer) error { @@ -75,3 +76,41 @@ func SaveAs(host string, port int, dir, name string, contentType string, byteBuf return nil } + +func SaveInsideFiler(client filer_pb.SeaweedFilerClient, dir, name string, content []byte) error { + + resp, err := filer_pb.LookupEntry(client, &filer_pb.LookupDirectoryEntryRequest{ + Directory: dir, + Name: name, + }) + + if err == filer_pb.ErrNotFound { + err = filer_pb.CreateEntry(client, &filer_pb.CreateEntryRequest{ + Directory: dir, + Entry: &filer_pb.Entry{ + Name: name, + IsDirectory: false, + Attributes: &filer_pb.FuseAttributes{ + Mtime: time.Now().Unix(), + Crtime: time.Now().Unix(), + FileMode: uint32(0644), + Collection: "", + Replication: "", + FileSize: uint64(len(content)), + }, + Content: content, + }, + }) + } else if err == nil { + entry := resp.Entry + entry.Content = content + entry.Attributes.Mtime = time.Now().Unix() + entry.Attributes.FileSize = uint64(len(content)) + err = filer_pb.UpdateEntry(client, &filer_pb.UpdateEntryRequest{ + Directory: dir, + Entry: entry, + }) + } + + return err +} \ No newline at end of file diff --git a/weed/pb/filer_pb/filer_client.go b/weed/pb/filer_pb/filer_client.go index 96a716d5b..9fe5fd146 100644 --- a/weed/pb/filer_pb/filer_client.go +++ b/weed/pb/filer_pb/filer_client.go @@ -5,7 +5,6 @@ import ( "errors" "fmt" "io" - "math" "os" "strings" "time" @@ -61,14 +60,32 @@ type EachEntryFunciton func(entry *Entry, isLast bool) error func ReadDirAllEntries(filerClient FilerClient, fullDirPath util.FullPath, prefix string, fn EachEntryFunciton) (err error) { - return doList(filerClient, fullDirPath, prefix, fn, "", false, math.MaxUint32) + var counter uint32 + var startFrom string + var counterFunc = func(entry *Entry, isLast bool) error { + counter++ + startFrom = entry.Name + return fn(entry, isLast) + } + var paginationLimit uint32 = 10000 + + if err = doList(filerClient, fullDirPath, prefix, counterFunc, "", false, paginationLimit); err != nil { + return err + } + + for counter == paginationLimit { + counter = 0 + if err = doList(filerClient, fullDirPath, prefix, counterFunc, startFrom, false, paginationLimit); err != nil { + return err + } + } + + return nil } func List(filerClient FilerClient, parentDirectoryPath, prefix string, fn EachEntryFunciton, startFrom string, inclusive bool, limit uint32) (err error) { - return doList(filerClient, util.FullPath(parentDirectoryPath), prefix, fn, startFrom, inclusive, limit) - } func doList(filerClient FilerClient, fullDirPath util.FullPath, prefix string, fn EachEntryFunciton, startFrom string, inclusive bool, limit uint32) (err error) { diff --git a/weed/s3api/auth_credentials.go b/weed/s3api/auth_credentials.go index 83f5269b7..da0a38dbf 100644 --- a/weed/s3api/auth_credentials.go +++ b/weed/s3api/auth_credentials.go @@ -38,45 +38,40 @@ func NewIdentityAccessManagement(option *S3ApiServerOption) *IdentityAccessManag iam := &IdentityAccessManagement{ domain: option.DomainName, } - if err := iam.loadS3ApiConfigurationFromFiler(option); err != nil { - glog.Warningf("fail to load config: %v", err) - } - if len(iam.identities) == 0 && option.Config != "" { + if option.Config != "" { if err := iam.loadS3ApiConfigurationFromFile(option.Config); err != nil { glog.Fatalf("fail to load config file %s: %v", option.Config, err) } + } else { + if err := iam.loadS3ApiConfigurationFromFiler(option); err != nil { + glog.Warningf("fail to load config: %v", err) + } } return iam } func (iam *IdentityAccessManagement) loadS3ApiConfigurationFromFiler(option *S3ApiServerOption) error { - s3ApiConfiguration := &iam_pb.S3ApiConfiguration{} content, err := filer.ReadContent(option.Filer, filer.IamConfigDirecotry, filer.IamIdentityFile) if err != nil { return fmt.Errorf("read S3 config: %v", err) } - if err = filer.ParseS3ConfigurationFromBytes(content, s3ApiConfiguration); err != nil { - return fmt.Errorf("parse S3 config: %v", err) - } - if err := iam.loadS3ApiConfiguration(s3ApiConfiguration); err != nil { - return fmt.Errorf("laod S3 config: %v", err) - } - glog.V(0).Infof("loaded %d s3 identities", len(iam.identities)) - return nil + return iam.loadS3ApiConfigurationFromBytes(content) } func (iam *IdentityAccessManagement) loadS3ApiConfigurationFromFile(fileName string) error { - s3ApiConfiguration := &iam_pb.S3ApiConfiguration{} - rawData, readErr := ioutil.ReadFile(fileName) + content, readErr := ioutil.ReadFile(fileName) if readErr != nil { glog.Warningf("fail to read %s : %v", fileName, readErr) return fmt.Errorf("fail to read %s : %v", fileName, readErr) } + return iam.loadS3ApiConfigurationFromBytes(content) +} - glog.V(1).Infof("load s3 config: %v", fileName) - if err := filer.ParseS3ConfigurationFromBytes(rawData, s3ApiConfiguration); err != nil { +func (iam *IdentityAccessManagement) loadS3ApiConfigurationFromBytes(content []byte) error { + s3ApiConfiguration := &iam_pb.S3ApiConfiguration{} + if err := filer.ParseS3ConfigurationFromBytes(content, s3ApiConfiguration); err != nil { glog.Warningf("unmarshal error: %v", err) - return fmt.Errorf("unmarshal %s error: %v", fileName, err) + return fmt.Errorf("unmarshal error: %v", err) } if err := iam.loadS3ApiConfiguration(s3ApiConfiguration); err != nil { return err @@ -84,6 +79,7 @@ func (iam *IdentityAccessManagement) loadS3ApiConfigurationFromFile(fileName str return nil } + func (iam *IdentityAccessManagement) loadS3ApiConfiguration(config *iam_pb.S3ApiConfiguration) error { var identities []*Identity for _, ident := range config.Identities { diff --git a/weed/s3api/auth_credentials_subscribe.go b/weed/s3api/auth_credentials_subscribe.go index f541628bb..ea4b69550 100644 --- a/weed/s3api/auth_credentials_subscribe.go +++ b/weed/s3api/auth_credentials_subscribe.go @@ -25,9 +25,10 @@ func (s3a *S3ApiServer) subscribeMetaEvents(clientName string, prefix string, la dir = message.NewParentPath } if dir == filer.IamConfigDirecotry && message.NewEntry.Name == filer.IamIdentityFile { - if err := s3a.iam.loadS3ApiConfigurationFromFiler(s3a.option); err != nil { + if err := s3a.iam.loadS3ApiConfigurationFromBytes(message.NewEntry.Content); err != nil { return err } + glog.V(0).Infof("updated %s/%s", filer.IamConfigDirecotry, filer.IamIdentityFile) } return nil diff --git a/weed/s3api/s3api_objects_list_handlers.go b/weed/s3api/s3api_objects_list_handlers.go index 5d63f1039..40583f478 100644 --- a/weed/s3api/s3api_objects_list_handlers.go +++ b/weed/s3api/s3api_objects_list_handlers.go @@ -197,11 +197,12 @@ func (s3a *S3ApiServer) doListFilerEntries(client filer_pb.SeaweedFilerClient, d sepIndex := strings.Index(marker, "/") subDir, subMarker := marker[0:sepIndex], marker[sepIndex+1:] // println("doListFilerEntries dir", dir+"/"+subDir, "subMarker", subMarker, "maxKeys", maxKeys) - subCounter, _, subNextMarker, subErr := s3a.doListFilerEntries(client, dir+"/"+subDir, "", maxKeys, subMarker, delimiter, eachEntryFn) + subCounter, subIsTruncated, subNextMarker, subErr := s3a.doListFilerEntries(client, dir+"/"+subDir, "", maxKeys, subMarker, delimiter, eachEntryFn) if subErr != nil { err = subErr return } + isTruncated = isTruncated || subIsTruncated maxKeys -= subCounter nextMarker = subDir + "/" + subNextMarker counter += subCounter diff --git a/weed/server/filer_grpc_server.go b/weed/server/filer_grpc_server.go index 46e5c5957..5f1b2d819 100644 --- a/weed/server/filer_grpc_server.go +++ b/weed/server/filer_grpc_server.go @@ -170,6 +170,7 @@ func (fs *FilerServer) CreateEntry(ctx context.Context, req *filer_pb.CreateEntr Extended: req.Entry.Extended, HardLinkId: filer.HardLinkId(req.Entry.HardLinkId), HardLinkCounter: req.Entry.HardLinkCounter, + Content: req.Entry.Content, }, req.OExcl, req.IsFromOtherCluster, req.Signatures) if createErr == nil { @@ -204,6 +205,7 @@ func (fs *FilerServer) UpdateEntry(ctx context.Context, req *filer_pb.UpdateEntr Chunks: chunks, HardLinkId: filer.HardLinkId(req.Entry.HardLinkId), HardLinkCounter: req.Entry.HardLinkCounter, + Content: req.Entry.Content, } glog.V(3).Infof("updating %s: %+v, chunks %d: %v => %+v, chunks %d: %v, extended: %v => %v", diff --git a/weed/server/filer_server_handlers_read.go b/weed/server/filer_server_handlers_read.go index f77b7f08d..d55bf7cbb 100644 --- a/weed/server/filer_server_handlers_read.go +++ b/weed/server/filer_server_handlers_read.go @@ -60,7 +60,7 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request, return } - if len(entry.Chunks) == 0 { + if len(entry.Chunks) == 0 && len(entry.Content) == 0 { glog.V(1).Infof("no file chunks for %s, attr=%+v", path, entry.Attr) stats.FilerRequestCounter.WithLabelValues("read.nocontent").Inc() w.WriteHeader(http.StatusNoContent) @@ -123,13 +123,13 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request, filename := entry.Name() adjustHeaderContentDisposition(w, r, filename) + totalSize := int64(entry.Size()) + if r.Method == "HEAD" { - w.Header().Set("Content-Length", strconv.FormatInt(int64(entry.Size()), 10)) + w.Header().Set("Content-Length", strconv.FormatInt(totalSize, 10)) return } - totalSize := int64(entry.Size()) - if rangeReq := r.Header.Get("Range"); rangeReq == "" { ext := filepath.Ext(filename) width, height, mode, shouldResize := shouldResizeImages(ext, r) @@ -148,7 +148,7 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request, processRangeRequest(r, w, totalSize, mimeType, func(writer io.Writer, offset int64, size int64) error { if offset+size <= int64(len(entry.Content)) { - _, err := writer.Write(entry.Content[offset:offset+size]) + _, err := writer.Write(entry.Content[offset : offset+size]) return err } return filer.StreamContent(fs.filer.MasterClient, writer, entry.Chunks, offset, size) diff --git a/weed/shell/command_s3_configure.go b/weed/shell/command_s3_configure.go index 42d1048a3..869949a25 100644 --- a/weed/shell/command_s3_configure.go +++ b/weed/shell/command_s3_configure.go @@ -169,10 +169,11 @@ func (c *commandS3Configure) Do(args []string, commandEnv *CommandEnv, writer io fmt.Fprintf(writer, string(buf.Bytes())) fmt.Fprintln(writer) - if *apply { - if err := filer.SaveAs(commandEnv.option.FilerHost, int(commandEnv.option.FilerPort), filer.IamConfigDirecotry, filer.IamIdentityFile, "text/plain; charset=utf-8", &buf); err != nil { + if err := commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + return filer.SaveInsideFiler(client, filer.IamConfigDirecotry, filer.IamIdentityFile, buf.Bytes()) + }); err != nil { return err }