diff --git a/weed/filer2/entry.go b/weed/filer2/entry.go index 00b9b132d..fedfde40d 100644 --- a/weed/filer2/entry.go +++ b/weed/filer2/entry.go @@ -22,6 +22,7 @@ type Attr struct { GroupNames []string SymlinkTarget string Md5 []byte + FileSize uint64 } func (attr Attr) IsDirectory() bool { @@ -39,7 +40,7 @@ type Entry struct { } func (entry *Entry) Size() uint64 { - return TotalSize(entry.Chunks) + return maxUint64(TotalSize(entry.Chunks), entry.FileSize) } func (entry *Entry) Timestamp() time.Time { @@ -81,3 +82,10 @@ func FromPbEntry(dir string, entry *filer_pb.Entry) *Entry { Chunks: entry.Chunks, } } + +func maxUint64(x, y uint64) uint64 { + if x > y { + return x + } + return y +} diff --git a/weed/filer2/entry_codec.go b/weed/filer2/entry_codec.go index 47c911011..4d615194f 100644 --- a/weed/filer2/entry_codec.go +++ b/weed/filer2/entry_codec.go @@ -53,6 +53,7 @@ func EntryAttributeToPb(entry *Entry) *filer_pb.FuseAttributes { GroupName: entry.Attr.GroupNames, SymlinkTarget: entry.Attr.SymlinkTarget, Md5: entry.Attr.Md5, + FileSize: entry.Attr.FileSize, } } @@ -73,6 +74,7 @@ func PbToEntryAttribute(attr *filer_pb.FuseAttributes) Attr { t.GroupNames = attr.GroupName t.SymlinkTarget = attr.SymlinkTarget t.Md5 = attr.Md5 + t.FileSize = attr.FileSize return t } diff --git a/weed/filer2/filechunks.go b/weed/filer2/filechunks.go index ea7772b4a..9de888d50 100644 --- a/weed/filer2/filechunks.go +++ b/weed/filer2/filechunks.go @@ -20,6 +20,10 @@ func TotalSize(chunks []*filer_pb.FileChunk) (size uint64) { return } +func FileSize(entry *filer_pb.Entry) (size uint64) { + return maxUint64(TotalSize(entry.Chunks), entry.Attributes.FileSize) +} + func ETag(entry *filer_pb.Entry) (etag string) { if entry.Attributes == nil || entry.Attributes.Md5 == nil { return ETagChunks(entry.Chunks) diff --git a/weed/filesys/dirty_page.go b/weed/filesys/dirty_page.go index 2af3e905a..8b7d92ffb 100644 --- a/weed/filesys/dirty_page.go +++ b/weed/filesys/dirty_page.go @@ -35,7 +35,7 @@ func (pages *ContinuousDirtyPages) AddPage(offset int64, data []byte) (chunks [] pages.lock.Lock() defer pages.lock.Unlock() - glog.V(4).Infof("%s AddPage [%d,%d)", pages.f.fullpath(), offset, offset+int64(len(data))) + glog.V(4).Infof("%s AddPage [%d,%d) of %d bytes", pages.f.fullpath(), offset, offset+int64(len(data)), pages.f.entry.Attributes.FileSize) if len(data) > int(pages.f.wfs.option.ChunkSizeLimit) { // this is more than what buffer can hold. @@ -121,14 +121,16 @@ func (pages *ContinuousDirtyPages) saveExistingLargestPageToStorage() (chunk *fi return nil, false, nil } + fileSize := int64(pages.f.entry.Attributes.FileSize) for { - chunk, err = pages.saveToStorage(maxList.ToReader(), maxList.Offset(), maxList.Size()) + chunkSize := min(maxList.Size(), fileSize-maxList.Offset()) + chunk, err = pages.saveToStorage(maxList.ToReader(), maxList.Offset(), chunkSize) if err == nil { hasSavedData = true - glog.V(4).Infof("%s saveToStorage [%d,%d) %s", pages.f.fullpath(), maxList.Offset(), maxList.Offset()+maxList.Size(), chunk.FileId) + glog.V(4).Infof("%s saveToStorage %s [%d,%d) of %d bytes", pages.f.fullpath(), chunk.FileId, maxList.Offset(), maxList.Offset()+chunkSize, fileSize) return } else { - glog.V(0).Infof("%s saveToStorage [%d,%d): %v", pages.f.fullpath(), maxList.Offset(), maxList.Offset()+maxList.Size(), err) + glog.V(0).Infof("%s saveToStorage [%d,%d): %v", pages.f.fullpath(), maxList.Offset(), maxList.Offset()+chunkSize, err) time.Sleep(5 * time.Second) } } @@ -139,6 +141,7 @@ func (pages *ContinuousDirtyPages) saveToStorage(reader io.Reader, offset int64, dir, _ := pages.f.fullpath().DirAndName() + reader = io.LimitReader(reader, size) chunk, collection, replication, err := pages.f.wfs.saveDataAsChunk(dir)(reader, pages.f.Name, offset) if err != nil { return nil, err @@ -149,6 +152,13 @@ func (pages *ContinuousDirtyPages) saveToStorage(reader io.Reader, offset int64, } +func maxUint64(x, y uint64) uint64 { + if x > y { + return x + } + return y +} + func max(x, y int64) int64 { if x > y { return x diff --git a/weed/filesys/file.go b/weed/filesys/file.go index dbfd7fd1a..83f6950bd 100644 --- a/weed/filesys/file.go +++ b/weed/filesys/file.go @@ -7,12 +7,13 @@ import ( "sort" "time" + "github.com/seaweedfs/fuse" + "github.com/seaweedfs/fuse/fs" + "github.com/chrislusf/seaweedfs/weed/filer2" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/util" - "github.com/seaweedfs/fuse" - "github.com/seaweedfs/fuse/fs" ) const blockSize = 512 @@ -35,6 +36,7 @@ type File struct { entryViewCache []filer2.VisibleInterval isOpen int reader io.ReaderAt + dirtyMetadata bool } func (file *File) fullpath() util.FullPath { @@ -43,7 +45,7 @@ func (file *File) fullpath() util.FullPath { func (file *File) Attr(ctx context.Context, attr *fuse.Attr) error { - glog.V(4).Infof("file Attr %s, open:%v, existing attr: %+v", file.fullpath(), file.isOpen, attr) + glog.V(5).Infof("file Attr %s, open:%v, existing attr: %+v", file.fullpath(), file.isOpen, attr) if file.isOpen <= 0 { if err := file.maybeLoadEntry(ctx); err != nil { @@ -54,7 +56,7 @@ func (file *File) Attr(ctx context.Context, attr *fuse.Attr) error { attr.Inode = file.fullpath().AsInode() attr.Valid = time.Second attr.Mode = os.FileMode(file.entry.Attributes.FileMode) - attr.Size = filer2.TotalSize(file.entry.Chunks) + attr.Size = filer2.FileSize(file.entry) if file.isOpen > 0 { attr.Size = file.entry.Attributes.FileSize glog.V(4).Infof("file Attr %s, open:%v, size: %d", file.fullpath(), file.isOpen, attr.Size) @@ -107,22 +109,31 @@ func (file *File) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *f if req.Valid.Size() { - glog.V(4).Infof("%v file setattr set size=%v", file.fullpath(), req.Size) + glog.V(4).Infof("%v file setattr set size=%v chunks=%d", file.fullpath(), req.Size, len(file.entry.Chunks)) if req.Size < filer2.TotalSize(file.entry.Chunks) { // fmt.Printf("truncate %v \n", fullPath) var chunks []*filer_pb.FileChunk + var truncatedChunks []*filer_pb.FileChunk for _, chunk := range file.entry.Chunks { int64Size := int64(chunk.Size) if chunk.Offset+int64Size > int64(req.Size) { + // this chunk is truncated int64Size = int64(req.Size) - chunk.Offset - } - if int64Size > 0 { - chunks = append(chunks, chunk) + if int64Size > 0 { + chunks = append(chunks, chunk) + glog.V(4).Infof("truncated chunk %+v from %d to %d\n", chunk, chunk.Size, int64Size) + chunk.Size = uint64(int64Size) + } else { + glog.V(4).Infof("truncated whole chunk %+v\n", chunk) + truncatedChunks = append(truncatedChunks, chunk) + } } } + file.wfs.deleteFileChunks(truncatedChunks) file.entry.Chunks = chunks file.entryViewCache = nil file.reader = nil + file.dirtyMetadata = true } file.entry.Attributes.FileSize = req.Size } diff --git a/weed/filesys/filehandle.go b/weed/filesys/filehandle.go index 680500c75..42a0b2446 100644 --- a/weed/filesys/filehandle.go +++ b/weed/filesys/filehandle.go @@ -19,10 +19,9 @@ import ( type FileHandle struct { // cache file has been written to - dirtyPages *ContinuousDirtyPages - contentType string - dirtyMetadata bool - handle uint64 + dirtyPages *ContinuousDirtyPages + contentType string + handle uint64 f *File RequestId fuse.RequestID // unique ID for request @@ -40,7 +39,7 @@ func newFileHandle(file *File, uid, gid uint32) *FileHandle { Gid: gid, } if fh.f.entry != nil { - fh.f.entry.Attributes.FileSize = filer2.TotalSize(fh.f.entry.Chunks) + fh.f.entry.Attributes.FileSize = filer2.FileSize(fh.f.entry) } return fh } @@ -55,7 +54,7 @@ var _ = fs.HandleReleaser(&FileHandle{}) func (fh *FileHandle) Read(ctx context.Context, req *fuse.ReadRequest, resp *fuse.ReadResponse) error { - glog.V(4).Infof("%s read fh %d: [%d,%d)", fh.f.fullpath(), fh.handle, req.Offset, req.Offset+int64(req.Size)) + glog.V(2).Infof("%s read fh %d: [%d,%d)", fh.f.fullpath(), fh.handle, req.Offset, req.Offset+int64(req.Size)) buff := make([]byte, req.Size) @@ -126,7 +125,7 @@ func (fh *FileHandle) Write(ctx context.Context, req *fuse.WriteRequest, resp *f copy(data, req.Data) fh.f.entry.Attributes.FileSize = uint64(max(req.Offset+int64(len(data)), int64(fh.f.entry.Attributes.FileSize))) - glog.V(4).Infof("%v write [%d,%d)", fh.f.fullpath(), req.Offset, req.Offset+int64(len(req.Data))) + glog.V(2).Infof("%v write [%d,%d)", fh.f.fullpath(), req.Offset, req.Offset+int64(len(req.Data))) chunks, err := fh.dirtyPages.AddPage(req.Offset, data) if err != nil { @@ -139,14 +138,14 @@ func (fh *FileHandle) Write(ctx context.Context, req *fuse.WriteRequest, resp *f if req.Offset == 0 { // detect mime type fh.contentType = http.DetectContentType(data) - fh.dirtyMetadata = true + fh.f.dirtyMetadata = true } if len(chunks) > 0 { fh.f.addChunks(chunks) - fh.dirtyMetadata = true + fh.f.dirtyMetadata = true } return nil @@ -181,10 +180,10 @@ func (fh *FileHandle) Flush(ctx context.Context, req *fuse.FlushRequest) error { if len(chunks) > 0 { fh.f.addChunks(chunks) - fh.dirtyMetadata = true + fh.f.dirtyMetadata = true } - if !fh.dirtyMetadata { + if !fh.f.dirtyMetadata { return nil } @@ -246,7 +245,7 @@ func (fh *FileHandle) Flush(ctx context.Context, req *fuse.FlushRequest) error { }) if err == nil { - fh.dirtyMetadata = false + fh.f.dirtyMetadata = false } if err != nil { diff --git a/weed/replication/sink/azuresink/azure_sink.go b/weed/replication/sink/azuresink/azure_sink.go index fa229de22..3240b705a 100644 --- a/weed/replication/sink/azuresink/azure_sink.go +++ b/weed/replication/sink/azuresink/azure_sink.go @@ -95,7 +95,7 @@ func (g *AzureSink) CreateEntry(key string, entry *filer_pb.Entry) error { return nil } - totalSize := filer2.TotalSize(entry.Chunks) + totalSize := filer2.FileSize(entry) chunkViews := filer2.ViewFromChunks(g.filerSource.LookupFileId, entry.Chunks, 0, int64(totalSize)) // Create a URL that references a to-be-created blob in your diff --git a/weed/replication/sink/b2sink/b2_sink.go b/weed/replication/sink/b2sink/b2_sink.go index bf8632827..8532c0231 100644 --- a/weed/replication/sink/b2sink/b2_sink.go +++ b/weed/replication/sink/b2sink/b2_sink.go @@ -84,7 +84,7 @@ func (g *B2Sink) CreateEntry(key string, entry *filer_pb.Entry) error { return nil } - totalSize := filer2.TotalSize(entry.Chunks) + totalSize := filer2.FileSize(entry) chunkViews := filer2.ViewFromChunks(g.filerSource.LookupFileId, entry.Chunks, 0, int64(totalSize)) bucket, err := g.client.Bucket(context.Background(), g.bucket) diff --git a/weed/replication/sink/gcssink/gcs_sink.go b/weed/replication/sink/gcssink/gcs_sink.go index 4b58160db..35a7dd9f7 100644 --- a/weed/replication/sink/gcssink/gcs_sink.go +++ b/weed/replication/sink/gcssink/gcs_sink.go @@ -89,7 +89,7 @@ func (g *GcsSink) CreateEntry(key string, entry *filer_pb.Entry) error { return nil } - totalSize := filer2.TotalSize(entry.Chunks) + totalSize := filer2.FileSize(entry) chunkViews := filer2.ViewFromChunks(g.filerSource.LookupFileId, entry.Chunks, 0, int64(totalSize)) wc := g.client.Bucket(g.bucket).Object(key).NewWriter(context.Background()) diff --git a/weed/replication/sink/s3sink/s3_sink.go b/weed/replication/sink/s3sink/s3_sink.go index 625cf406c..56fc1930d 100644 --- a/weed/replication/sink/s3sink/s3_sink.go +++ b/weed/replication/sink/s3sink/s3_sink.go @@ -107,7 +107,7 @@ func (s3sink *S3Sink) CreateEntry(key string, entry *filer_pb.Entry) error { return err } - totalSize := filer2.TotalSize(entry.Chunks) + totalSize := filer2.FileSize(entry) chunkViews := filer2.ViewFromChunks(s3sink.filerSource.LookupFileId, entry.Chunks, 0, int64(totalSize)) parts := make([]*s3.CompletedPart, len(chunkViews)) diff --git a/weed/s3api/filer_multipart.go b/weed/s3api/filer_multipart.go index 31ac850b1..24bbafe1d 100644 --- a/weed/s3api/filer_multipart.go +++ b/weed/s3api/filer_multipart.go @@ -208,7 +208,7 @@ func (s3a *S3ApiServer) listObjectParts(input *s3.ListPartsInput) (output *ListP output.Parts = append(output.Parts, &s3.Part{ PartNumber: aws.Int64(int64(partNumber)), LastModified: aws.Time(time.Unix(entry.Attributes.Mtime, 0).UTC()), - Size: aws.Int64(int64(filer2.TotalSize(entry.Chunks))), + Size: aws.Int64(int64(filer2.FileSize(entry))), ETag: aws.String("\"" + filer2.ETag(entry) + "\""), }) } diff --git a/weed/s3api/s3api_objects_list_handlers.go b/weed/s3api/s3api_objects_list_handlers.go index 311442551..46d5b90c7 100644 --- a/weed/s3api/s3api_objects_list_handlers.go +++ b/weed/s3api/s3api_objects_list_handlers.go @@ -141,7 +141,7 @@ func (s3a *S3ApiServer) listFilerEntries(bucket string, originalPrefix string, m Key: fmt.Sprintf("%s%s", dir[len(bucketPrefix):], entry.Name), LastModified: time.Unix(entry.Attributes.Mtime, 0).UTC(), ETag: "\"" + filer2.ETag(entry) + "\"", - Size: int64(filer2.TotalSize(entry.Chunks)), + Size: int64(filer2.FileSize(entry)), Owner: CanonicalUser{ ID: fmt.Sprintf("%x", entry.Attributes.Uid), DisplayName: entry.Attributes.UserName, diff --git a/weed/server/filer_server_handlers_read.go b/weed/server/filer_server_handlers_read.go index 657158c2f..449b9f1a0 100644 --- a/weed/server/filer_server_handlers_read.go +++ b/weed/server/filer_server_handlers_read.go @@ -105,11 +105,11 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request, adjustHeaderContentDisposition(w, r, filename) if r.Method == "HEAD" { - w.Header().Set("Content-Length", strconv.FormatInt(int64(filer2.TotalSize(entry.Chunks)), 10)) + w.Header().Set("Content-Length", strconv.FormatInt(int64(entry.Size()), 10)) return } - totalSize := int64(filer2.TotalSize(entry.Chunks)) + totalSize := int64(entry.Size()) if rangeReq := r.Header.Get("Range"); rangeReq == "" { ext := filepath.Ext(filename) diff --git a/weed/server/webdav_server.go b/weed/server/webdav_server.go index 8655daf70..e9f7b23fd 100644 --- a/weed/server/webdav_server.go +++ b/weed/server/webdav_server.go @@ -338,7 +338,7 @@ func (fs *WebDavFileSystem) stat(ctx context.Context, fullFilePath string) (os.F if err != nil { return nil, err } - fi.size = int64(filer2.TotalSize(entry.GetChunks())) + fi.size = int64(filer2.FileSize(entry)) fi.name = string(fullpath) fi.mode = os.FileMode(entry.Attributes.FileMode) fi.modifiledTime = time.Unix(entry.Attributes.Mtime, 0) @@ -507,7 +507,7 @@ func (f *WebDavFile) Readdir(count int) (ret []os.FileInfo, err error) { err = filer_pb.ReadDirAllEntries(f.fs, util.FullPath(dir), "", func(entry *filer_pb.Entry, isLast bool) error { fi := FileInfo{ - size: int64(filer2.TotalSize(entry.GetChunks())), + size: int64(filer2.FileSize(entry)), name: entry.Name, mode: os.FileMode(entry.Attributes.FileMode), modifiledTime: time.Unix(entry.Attributes.Mtime, 0), diff --git a/weed/shell/command_fs_du.go b/weed/shell/command_fs_du.go index 96551dd5a..5404b0cdb 100644 --- a/weed/shell/command_fs_du.go +++ b/weed/shell/command_fs_du.go @@ -70,9 +70,9 @@ func duTraverseDirectory(writer io.Writer, filerClient filer_pb.FilerClient, dir } } else { fileBlockCount = uint64(len(entry.Chunks)) - fileByteCount = filer2.TotalSize(entry.Chunks) - blockCount += uint64(len(entry.Chunks)) - byteCount += filer2.TotalSize(entry.Chunks) + fileByteCount = filer2.FileSize(entry) + blockCount += fileBlockCount + byteCount += fileByteCount } if name != "" && !entry.IsDirectory { diff --git a/weed/shell/command_fs_ls.go b/weed/shell/command_fs_ls.go index 36133992f..4110c7b8d 100644 --- a/weed/shell/command_fs_ls.go +++ b/weed/shell/command_fs_ls.go @@ -95,7 +95,7 @@ func (c *commandFsLs) Do(args []string, commandEnv *CommandEnv, writer io.Writer fmt.Fprintf(writer, "%s %3d %s %s %6d %s/%s\n", fileMode, len(entry.Chunks), userName, groupName, - filer2.TotalSize(entry.Chunks), dir, entry.Name) + filer2.FileSize(entry), dir, entry.Name) } else { fmt.Fprintf(writer, "%s\n", entry.Name) }