Browse Source

Address critical and high-priority review comments

- Add per-session locking to prevent race conditions in updateTusSessionOffset
- Stream data directly to volume server instead of buffering entire chunk
- Only buffer 512 bytes for MIME type detection, then stream remaining data
- Clean up session locks when session is deleted
feature/tus-protocol
chrislu 3 days ago
parent
commit
870e32c4ad
  1. 36
      weed/server/filer_server_tus_handlers.go
  2. 42
      weed/server/filer_server_tus_session.go

36
weed/server/filer_server_tus_handlers.go

@ -292,15 +292,6 @@ func (fs *FilerServer) tusWriteData(ctx context.Context, session *TusSession, of
} }
// Read data into buffer // Read data into buffer
buf := new(bytes.Buffer)
n, err := io.CopyN(buf, reader, contentLength)
if err != nil && err != io.EOF {
return 0, fmt.Errorf("read data: %w", err)
}
if n == 0 {
return 0, nil
}
// Determine storage options based on target path // Determine storage options based on target path
so, err := fs.detectStorageOption0(ctx, session.TargetPath, "", "", "", "", "", "", "", "", "") so, err := fs.detectStorageOption0(ctx, session.TargetPath, "", "", "", "", "", "", "", "", "")
if err != nil { if err != nil {
@ -319,10 +310,31 @@ func (fs *FilerServer) tusWriteData(ctx context.Context, session *TusSession, of
return 0, fmt.Errorf("create uploader: %w", uploaderErr) return 0, fmt.Errorf("create uploader: %w", uploaderErr)
} }
// Detect MIME type from data
mimeType := http.DetectContentType(buf.Bytes())
// Read first 512 bytes for MIME type detection, then stream the rest
sniffBuf := make([]byte, 512)
sniffN, sniffErr := io.ReadFull(reader, sniffBuf)
if sniffErr != nil && sniffErr != io.EOF && sniffErr != io.ErrUnexpectedEOF {
return 0, fmt.Errorf("read data for mime detection: %w", sniffErr)
}
if sniffN == 0 {
return 0, nil
}
sniffBuf = sniffBuf[:sniffN]
// Detect MIME type from sniffed bytes
mimeType := http.DetectContentType(sniffBuf)
// Create a reader that combines sniffed bytes with remaining data
var dataReader io.Reader
if int64(sniffN) >= contentLength {
// All data fits in sniff buffer
dataReader = bytes.NewReader(sniffBuf)
} else {
// Combine sniffed bytes with remaining stream
dataReader = io.MultiReader(bytes.NewReader(sniffBuf), io.LimitReader(reader, contentLength-int64(sniffN)))
}
uploadResult, uploadErr, _ := uploader.Upload(ctx, bytes.NewReader(buf.Bytes()), &operation.UploadOption{
uploadResult, uploadErr, _ := uploader.Upload(ctx, dataReader, &operation.UploadOption{
UploadUrl: urlLocation, UploadUrl: urlLocation,
Filename: "", Filename: "",
Cipher: fs.option.Cipher, Cipher: fs.option.Cipher,

42
weed/server/filer_server_tus_session.go

@ -6,6 +6,7 @@ import (
"fmt" "fmt"
"os" "os"
"sort" "sort"
"sync"
"time" "time"
"github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/filer"
@ -14,6 +15,39 @@ import (
"github.com/seaweedfs/seaweedfs/weed/util" "github.com/seaweedfs/seaweedfs/weed/util"
) )
// tusSessionLocks provides per-session locking to prevent race conditions
var tusSessionLocks = struct {
sync.RWMutex
locks map[string]*sync.Mutex
}{locks: make(map[string]*sync.Mutex)}
// getTusSessionLock returns a lock for the given upload ID
func getTusSessionLock(uploadID string) *sync.Mutex {
tusSessionLocks.RLock()
lock, exists := tusSessionLocks.locks[uploadID]
tusSessionLocks.RUnlock()
if exists {
return lock
}
tusSessionLocks.Lock()
defer tusSessionLocks.Unlock()
// Double-check after acquiring write lock
if lock, exists = tusSessionLocks.locks[uploadID]; exists {
return lock
}
lock = &sync.Mutex{}
tusSessionLocks.locks[uploadID] = lock
return lock
}
// removeTusSessionLock removes the lock for the given upload ID
func removeTusSessionLock(uploadID string) {
tusSessionLocks.Lock()
defer tusSessionLocks.Unlock()
delete(tusSessionLocks.locks, uploadID)
}
const ( const (
TusVersion = "1.0.0" TusVersion = "1.0.0"
TusMaxSize = int64(5 * 1024 * 1024 * 1024) // 5GB default max size TusMaxSize = int64(5 * 1024 * 1024 * 1024) // 5GB default max size
@ -145,6 +179,11 @@ func (fs *FilerServer) getTusSession(ctx context.Context, uploadID string) (*Tus
// updateTusSessionOffset updates the session offset after a successful chunk upload // updateTusSessionOffset updates the session offset after a successful chunk upload
func (fs *FilerServer) updateTusSessionOffset(ctx context.Context, uploadID string, newOffset int64, chunk *TusChunkInfo) error { func (fs *FilerServer) updateTusSessionOffset(ctx context.Context, uploadID string, newOffset int64, chunk *TusChunkInfo) error {
// Lock the session to prevent concurrent modifications
lock := getTusSessionLock(uploadID)
lock.Lock()
defer lock.Unlock()
session, err := fs.getTusSession(ctx, uploadID) session, err := fs.getTusSession(ctx, uploadID)
if err != nil { if err != nil {
return err return err
@ -160,6 +199,9 @@ func (fs *FilerServer) updateTusSessionOffset(ctx context.Context, uploadID stri
// deleteTusSession removes a TUS upload session and all its data // deleteTusSession removes a TUS upload session and all its data
func (fs *FilerServer) deleteTusSession(ctx context.Context, uploadID string) error { func (fs *FilerServer) deleteTusSession(ctx context.Context, uploadID string) error {
// Clean up the session lock
defer removeTusSessionLock(uploadID)
session, err := fs.getTusSession(ctx, uploadID) session, err := fs.getTusSession(ctx, uploadID)
if err != nil { if err != nil {
// Session might already be deleted or never existed // Session might already be deleted or never existed

Loading…
Cancel
Save