@ -7,6 +7,7 @@ import (
"math"
"math"
"net/http"
"net/http"
"os"
"os"
"sync"
"time"
"time"
"github.com/seaweedfs/fuse"
"github.com/seaweedfs/fuse"
@ -22,6 +23,7 @@ type FileHandle struct {
dirtyPages * ContinuousDirtyPages
dirtyPages * ContinuousDirtyPages
contentType string
contentType string
handle uint64
handle uint64
sync . RWMutex
f * File
f * File
RequestId fuse . RequestID // unique ID for request
RequestId fuse . RequestID // unique ID for request
@ -41,6 +43,7 @@ func newFileHandle(file *File, uid, gid uint32) *FileHandle {
if fh . f . entry != nil {
if fh . f . entry != nil {
fh . f . entry . Attributes . FileSize = filer2 . FileSize ( fh . f . entry )
fh . f . entry . Attributes . FileSize = filer2 . FileSize ( fh . f . entry )
}
}
return fh
return fh
}
}
@ -55,6 +58,12 @@ var _ = fs.HandleReleaser(&FileHandle{})
func ( fh * FileHandle ) Read ( ctx context . Context , req * fuse . ReadRequest , resp * fuse . ReadResponse ) error {
func ( fh * FileHandle ) Read ( ctx context . Context , req * fuse . ReadRequest , resp * fuse . ReadResponse ) error {
glog . V ( 4 ) . Infof ( "%s read fh %d: [%d,%d) size %d resp.Data cap=%d" , fh . f . fullpath ( ) , fh . handle , req . Offset , req . Offset + int64 ( req . Size ) , req . Size , cap ( resp . Data ) )
glog . V ( 4 ) . Infof ( "%s read fh %d: [%d,%d) size %d resp.Data cap=%d" , fh . f . fullpath ( ) , fh . handle , req . Offset , req . Offset + int64 ( req . Size ) , req . Size , cap ( resp . Data ) )
fh . RLock ( )
defer fh . RUnlock ( )
if req . Size <= 0 {
return nil
}
buff := resp . Data [ : cap ( resp . Data ) ]
buff := resp . Data [ : cap ( resp . Data ) ]
if req . Size > cap ( resp . Data ) {
if req . Size > cap ( resp . Data ) {
@ -77,13 +86,15 @@ func (fh *FileHandle) Read(ctx context.Context, req *fuse.ReadRequest, resp *fus
glog . Warningf ( "%s FileHandle Read %d: [%d,%d) size %d totalRead %d" , fh . f . fullpath ( ) , fh . handle , req . Offset , req . Offset + int64 ( req . Size ) , req . Size , totalRead )
glog . Warningf ( "%s FileHandle Read %d: [%d,%d) size %d totalRead %d" , fh . f . fullpath ( ) , fh . handle , req . Offset , req . Offset + int64 ( req . Size ) , req . Size , totalRead )
totalRead = min ( int64 ( len ( buff ) ) , totalRead )
totalRead = min ( int64 ( len ( buff ) ) , totalRead )
}
}
resp . Data = buff [ : totalRead ]
// resp.Data = buff[:totalRead]
resp . Data = buff
return err
return err
}
}
func ( fh * FileHandle ) readFromDirtyPages ( buff [ ] byte , startOffset int64 ) ( maxStop int64 ) {
func ( fh * FileHandle ) readFromDirtyPages ( buff [ ] byte , startOffset int64 ) ( maxStop int64 ) {
return fh . dirtyPages . ReadDirtyDataAt ( buff , startOffset )
maxStop = fh . dirtyPages . ReadDirtyDataAt ( buff , startOffset )
return
}
}
func ( fh * FileHandle ) readFromChunks ( buff [ ] byte , offset int64 ) ( int64 , error ) {
func ( fh * FileHandle ) readFromChunks ( buff [ ] byte , offset int64 ) ( int64 , error ) {
@ -127,6 +138,9 @@ func (fh *FileHandle) readFromChunks(buff []byte, offset int64) (int64, error) {
// Write to the file handle
// Write to the file handle
func ( fh * FileHandle ) Write ( ctx context . Context , req * fuse . WriteRequest , resp * fuse . WriteResponse ) error {
func ( fh * FileHandle ) Write ( ctx context . Context , req * fuse . WriteRequest , resp * fuse . WriteResponse ) error {
fh . Lock ( )
defer fh . Unlock ( )
// write the request to volume servers
// write the request to volume servers
data := make ( [ ] byte , len ( req . Data ) )
data := make ( [ ] byte , len ( req . Data ) )
copy ( data , req . Data )
copy ( data , req . Data )
@ -162,19 +176,30 @@ func (fh *FileHandle) Release(ctx context.Context, req *fuse.ReleaseRequest) err
glog . V ( 4 ) . Infof ( "Release %v fh %d" , fh . f . fullpath ( ) , fh . handle )
glog . V ( 4 ) . Infof ( "Release %v fh %d" , fh . f . fullpath ( ) , fh . handle )
fh . Lock ( )
defer fh . Unlock ( )
fh . f . isOpen --
fh . f . isOpen --
if fh . f . isOpen <= 0 {
if fh . f . isOpen < 0 {
glog . V ( 0 ) . Infof ( "Release reset %s open count %d => %d" , fh . f . Name , fh . f . isOpen , 0 )
fh . f . isOpen = 0
return nil
}
if fh . f . isOpen == 0 {
fh . doFlush ( ctx , req . Header )
fh . doFlush ( ctx , req . Header )
fh . f . wfs . ReleaseHandle ( fh . f . fullpath ( ) , fuse . HandleID ( fh . handle ) )
fh . f . wfs . ReleaseHandle ( fh . f . fullpath ( ) , fuse . HandleID ( fh . handle ) )
fh . f . entryViewCache = nil
fh . f . reader = nil
}
}
return nil
return nil
}
}
func ( fh * FileHandle ) Flush ( ctx context . Context , req * fuse . FlushRequest ) error {
func ( fh * FileHandle ) Flush ( ctx context . Context , req * fuse . FlushRequest ) error {
fh . Lock ( )
defer fh . Unlock ( )
return fh . doFlush ( ctx , req . Header )
return fh . doFlush ( ctx , req . Header )
}
}
@ -183,13 +208,14 @@ func (fh *FileHandle) doFlush(ctx context.Context, header fuse.Header) error {
// send the data to the OS
// send the data to the OS
glog . V ( 4 ) . Infof ( "doFlush %s fh %d %v" , fh . f . fullpath ( ) , fh . handle , header )
glog . V ( 4 ) . Infof ( "doFlush %s fh %d %v" , fh . f . fullpath ( ) , fh . handle , header )
chunks , err := fh . dirtyPages . Flush ToStorage( )
chunks , err := fh . dirtyPages . saveExistingPages ToStorage( )
if err != nil {
if err != nil {
glog . Errorf ( "flush %s: %v" , fh . f . fullpath ( ) , err )
glog . Errorf ( "flush %s: %v" , fh . f . fullpath ( ) , err )
return fuse . EIO
return fuse . EIO
}
}
if len ( chunks ) > 0 {
if len ( chunks ) > 0 {
fh . f . addChunks ( chunks )
fh . f . addChunks ( chunks )
fh . f . dirtyMetadata = true
fh . f . dirtyMetadata = true
}
}
@ -227,18 +253,14 @@ func (fh *FileHandle) doFlush(ctx context.Context, header fuse.Header) error {
glog . V ( 4 ) . Infof ( "%s chunks %d: %v [%d,%d)" , fh . f . fullpath ( ) , i , chunk . GetFileIdString ( ) , chunk . Offset , chunk . Offset + int64 ( chunk . Size ) )
glog . V ( 4 ) . Infof ( "%s chunks %d: %v [%d,%d)" , fh . f . fullpath ( ) , i , chunk . GetFileIdString ( ) , chunk . Offset , chunk . Offset + int64 ( chunk . Size ) )
}
}
chunks , garbages := filer2 . CompactFileChunks ( filer2 . LookupFn ( fh . f . wfs ) , fh . f . entry . Chunks )
chunks , _ := filer2 . CompactFileChunks ( filer2 . LookupFn ( fh . f . wfs ) , fh . f . entry . Chunks )
chunks , manifestErr := filer2 . MaybeManifestize ( fh . f . wfs . saveDataAsChunk ( fh . f . dir . FullPath ( ) ) , chunks )
chunks , manifestErr := filer2 . MaybeManifestize ( fh . f . wfs . saveDataAsChunk ( fh . f . dir . FullPath ( ) ) , chunks )
if manifestErr != nil {
if manifestErr != nil {
// not good, but should be ok
// not good, but should be ok
glog . V ( 0 ) . Infof ( "MaybeManifestize: %v" , manifestErr )
glog . V ( 0 ) . Infof ( "MaybeManifestize: %v" , manifestErr )
}
}
fh . f . entry . Chunks = chunks
fh . f . entry . Chunks = chunks
// fh.f.entryViewCache = nil
// special handling of one chunk md5
if len ( chunks ) == 1 {
}
fh . f . entryViewCache = nil
if err := filer_pb . CreateEntry ( client , request ) ; err != nil {
if err := filer_pb . CreateEntry ( client , request ) ; err != nil {
glog . Errorf ( "fh flush create %s: %v" , fh . f . fullpath ( ) , err )
glog . Errorf ( "fh flush create %s: %v" , fh . f . fullpath ( ) , err )
@ -247,11 +269,6 @@ func (fh *FileHandle) doFlush(ctx context.Context, header fuse.Header) error {
fh . f . wfs . metaCache . InsertEntry ( context . Background ( ) , filer2 . FromPbEntry ( request . Directory , request . Entry ) )
fh . f . wfs . metaCache . InsertEntry ( context . Background ( ) , filer2 . FromPbEntry ( request . Directory , request . Entry ) )
fh . f . wfs . deleteFileChunks ( garbages )
for i , chunk := range garbages {
glog . V ( 4 ) . Infof ( "garbage %s chunks %d: %v [%d,%d)" , fh . f . fullpath ( ) , i , chunk . GetFileIdString ( ) , chunk . Offset , chunk . Offset + int64 ( chunk . Size ) )
}
return nil
return nil
} )
} )