diff --git a/weed/server/filer_server_tus_handlers.go b/weed/server/filer_server_tus_handlers.go index 7411b2527..f0b47f6d0 100644 --- a/weed/server/filer_server_tus_handlers.go +++ b/weed/server/filer_server_tus_handlers.go @@ -292,15 +292,6 @@ func (fs *FilerServer) tusWriteData(ctx context.Context, session *TusSession, of } // Read data into buffer - buf := new(bytes.Buffer) - n, err := io.CopyN(buf, reader, contentLength) - if err != nil && err != io.EOF { - return 0, fmt.Errorf("read data: %w", err) - } - if n == 0 { - return 0, nil - } - // Determine storage options based on target path so, err := fs.detectStorageOption0(ctx, session.TargetPath, "", "", "", "", "", "", "", "", "") if err != nil { @@ -319,10 +310,31 @@ func (fs *FilerServer) tusWriteData(ctx context.Context, session *TusSession, of return 0, fmt.Errorf("create uploader: %w", uploaderErr) } - // Detect MIME type from data - mimeType := http.DetectContentType(buf.Bytes()) + // Read first 512 bytes for MIME type detection, then stream the rest + sniffBuf := make([]byte, 512) + sniffN, sniffErr := io.ReadFull(reader, sniffBuf) + if sniffErr != nil && sniffErr != io.EOF && sniffErr != io.ErrUnexpectedEOF { + return 0, fmt.Errorf("read data for mime detection: %w", sniffErr) + } + if sniffN == 0 { + return 0, nil + } + sniffBuf = sniffBuf[:sniffN] + + // Detect MIME type from sniffed bytes + mimeType := http.DetectContentType(sniffBuf) + + // Create a reader that combines sniffed bytes with remaining data + var dataReader io.Reader + if int64(sniffN) >= contentLength { + // All data fits in sniff buffer + dataReader = bytes.NewReader(sniffBuf) + } else { + // Combine sniffed bytes with remaining stream + dataReader = io.MultiReader(bytes.NewReader(sniffBuf), io.LimitReader(reader, contentLength-int64(sniffN))) + } - uploadResult, uploadErr, _ := uploader.Upload(ctx, bytes.NewReader(buf.Bytes()), &operation.UploadOption{ + uploadResult, uploadErr, _ := uploader.Upload(ctx, dataReader, &operation.UploadOption{ UploadUrl: urlLocation, Filename: "", Cipher: fs.option.Cipher, diff --git a/weed/server/filer_server_tus_session.go b/weed/server/filer_server_tus_session.go index 685488e10..c8ae7c1cd 100644 --- a/weed/server/filer_server_tus_session.go +++ b/weed/server/filer_server_tus_session.go @@ -6,6 +6,7 @@ import ( "fmt" "os" "sort" + "sync" "time" "github.com/seaweedfs/seaweedfs/weed/filer" @@ -14,6 +15,39 @@ import ( "github.com/seaweedfs/seaweedfs/weed/util" ) +// tusSessionLocks provides per-session locking to prevent race conditions +var tusSessionLocks = struct { + sync.RWMutex + locks map[string]*sync.Mutex +}{locks: make(map[string]*sync.Mutex)} + +// getTusSessionLock returns a lock for the given upload ID +func getTusSessionLock(uploadID string) *sync.Mutex { + tusSessionLocks.RLock() + lock, exists := tusSessionLocks.locks[uploadID] + tusSessionLocks.RUnlock() + if exists { + return lock + } + + tusSessionLocks.Lock() + defer tusSessionLocks.Unlock() + // Double-check after acquiring write lock + if lock, exists = tusSessionLocks.locks[uploadID]; exists { + return lock + } + lock = &sync.Mutex{} + tusSessionLocks.locks[uploadID] = lock + return lock +} + +// removeTusSessionLock removes the lock for the given upload ID +func removeTusSessionLock(uploadID string) { + tusSessionLocks.Lock() + defer tusSessionLocks.Unlock() + delete(tusSessionLocks.locks, uploadID) +} + const ( TusVersion = "1.0.0" TusMaxSize = int64(5 * 1024 * 1024 * 1024) // 5GB default max size @@ -145,6 +179,11 @@ func (fs *FilerServer) getTusSession(ctx context.Context, uploadID string) (*Tus // updateTusSessionOffset updates the session offset after a successful chunk upload func (fs *FilerServer) updateTusSessionOffset(ctx context.Context, uploadID string, newOffset int64, chunk *TusChunkInfo) error { + // Lock the session to prevent concurrent modifications + lock := getTusSessionLock(uploadID) + lock.Lock() + defer lock.Unlock() + session, err := fs.getTusSession(ctx, uploadID) if err != nil { return err @@ -160,6 +199,9 @@ func (fs *FilerServer) updateTusSessionOffset(ctx context.Context, uploadID stri // deleteTusSession removes a TUS upload session and all its data func (fs *FilerServer) deleteTusSession(ctx context.Context, uploadID string) error { + // Clean up the session lock + defer removeTusSessionLock(uploadID) + session, err := fs.getTusSession(ctx, uploadID) if err != nil { // Session might already be deleted or never existed