@ -6,7 +6,8 @@ import (
"fmt"
"os"
"sort"
"sync"
"strconv"
"strings"
"time"
"github.com/seaweedfs/seaweedfs/weed/filer"
@ -15,39 +16,6 @@ import (
"github.com/seaweedfs/seaweedfs/weed/util"
)
// tusSessionLocks provides per-session locking to prevent race conditions
var tusSessionLocks = struct {
sync . RWMutex
locks map [ string ] * sync . Mutex
} { locks : make ( map [ string ] * sync . Mutex ) }
// getTusSessionLock returns a lock for the given upload ID
func getTusSessionLock ( uploadID string ) * sync . Mutex {
tusSessionLocks . RLock ( )
lock , exists := tusSessionLocks . locks [ uploadID ]
tusSessionLocks . RUnlock ( )
if exists {
return lock
}
tusSessionLocks . Lock ( )
defer tusSessionLocks . Unlock ( )
// Double-check after acquiring write lock
if lock , exists = tusSessionLocks . locks [ uploadID ] ; exists {
return lock
}
lock = & sync . Mutex { }
tusSessionLocks . locks [ uploadID ] = lock
return lock
}
// removeTusSessionLock removes the lock for the given upload ID
func removeTusSessionLock ( uploadID string ) {
tusSessionLocks . Lock ( )
defer tusSessionLocks . Unlock ( )
delete ( tusSessionLocks . locks , uploadID )
}
const (
TusVersion = "1.0.0"
TusMaxSize = int64 ( 5 * 1024 * 1024 * 1024 ) // 5GB default max size
@ -92,6 +60,41 @@ func (fs *FilerServer) tusSessionInfoPath(uploadID string) string {
return fmt . Sprintf ( "/%s/%s/%s" , TusUploadsFolder , uploadID , TusInfoFileName )
}
// tusChunkPath returns the path to store a chunk info file
// Format: /{TusUploadsFolder}/{uploadID}/chunk_{offset}_{size}_{fileId}
func ( fs * FilerServer ) tusChunkPath ( uploadID string , offset , size int64 , fileId string ) string {
// Replace / in fileId with _ to make it a valid filename
safeFileId := strings . ReplaceAll ( fileId , "/" , "_" )
return fmt . Sprintf ( "/%s/%s/chunk_%016d_%016d_%s" , TusUploadsFolder , uploadID , offset , size , safeFileId )
}
// parseTusChunkPath parses chunk info from a chunk file name
func parseTusChunkPath ( name string ) ( * TusChunkInfo , error ) {
if ! strings . HasPrefix ( name , "chunk_" ) {
return nil , fmt . Errorf ( "not a chunk file: %s" , name )
}
parts := strings . SplitN ( name [ 6 : ] , "_" , 3 ) // Skip "chunk_" prefix
if len ( parts ) < 3 {
return nil , fmt . Errorf ( "invalid chunk file name: %s" , name )
}
offset , err := strconv . ParseInt ( parts [ 0 ] , 10 , 64 )
if err != nil {
return nil , fmt . Errorf ( "invalid offset in chunk file: %s" , name )
}
size , err := strconv . ParseInt ( parts [ 1 ] , 10 , 64 )
if err != nil {
return nil , fmt . Errorf ( "invalid size in chunk file: %s" , name )
}
// Restore / in fileId
fileId := strings . ReplaceAll ( parts [ 2 ] , "_" , "/" )
return & TusChunkInfo {
Offset : offset ,
Size : size ,
FileId : fileId ,
UploadAt : time . Now ( ) . UnixNano ( ) ,
} , nil
}
// createTusSession creates a new TUS upload session
func ( fs * FilerServer ) createTusSession ( ctx context . Context , uploadID , targetPath string , size int64 , metadata map [ string ] string ) ( * TusSession , error ) {
session := & TusSession {
@ -158,7 +161,7 @@ func (fs *FilerServer) saveTusSession(ctx context.Context, session *TusSession)
return nil
}
// getTusSession retrieves a TUS session by upload ID
// getTusSession retrieves a TUS session by upload ID, including chunks from directory listing
func ( fs * FilerServer ) getTusSession ( ctx context . Context , uploadID string ) ( * TusSession , error ) {
infoPath := util . FullPath ( fs . tusSessionInfoPath ( uploadID ) )
entry , err := fs . filer . FindEntry ( ctx , infoPath )
@ -174,33 +177,72 @@ func (fs *FilerServer) getTusSession(ctx context.Context, uploadID string) (*Tus
return nil , fmt . Errorf ( "unmarshal session: %w" , err )
}
// Load chunks from directory listing (atomic read, no race condition)
sessionDirPath := util . FullPath ( fs . tusSessionPath ( uploadID ) )
entries , _ , err := fs . filer . ListDirectoryEntries ( ctx , sessionDirPath , "" , false , 10000 , "" , "" , "" )
if err != nil {
return nil , fmt . Errorf ( "list session directory: %w" , err )
}
session . Chunks = nil
session . Offset = 0
for _ , e := range entries {
if strings . HasPrefix ( e . Name ( ) , "chunk_" ) {
chunk , parseErr := parseTusChunkPath ( e . Name ( ) )
if parseErr != nil {
glog . V ( 1 ) . Infof ( "Skipping invalid chunk file %s: %v" , e . Name ( ) , parseErr )
continue
}
session . Chunks = append ( session . Chunks , chunk )
}
}
// Sort chunks by offset and compute current offset
if len ( session . Chunks ) > 0 {
sort . Slice ( session . Chunks , func ( i , j int ) bool {
return session . Chunks [ i ] . Offset < session . Chunks [ j ] . Offset
} )
// Current offset is the end of the last chunk
lastChunk := session . Chunks [ len ( session . Chunks ) - 1 ]
session . Offset = lastChunk . Offset + lastChunk . Size
}
return & session , nil
}
// updateTusSessionOffset updates the session offset after a successful chunk upload
// updateTusSessionOffset stores the chunk info as a separate file entry
// This avoids read-modify-write race conditions across multiple filer instances
func ( fs * FilerServer ) updateTusSessionOffset ( ctx context . Context , uploadID string , newOffset int64 , chunk * TusChunkInfo ) error {
// Lock the session to prevent concurrent modifications
lock := getTusSessionLock ( uploadID )
lock . Lock ( )
defer lock . Unlock ( )
if chunk == nil {
return nil
}
session , err := fs . getTusSession ( ctx , uploadID )
// Store chunk info as a separate file entry (atomic operation)
chunkPath := util . FullPath ( fs . tusChunkPath ( uploadID , chunk . Offset , chunk . Size , chunk . FileId ) )
chunkData , err := json . Marshal ( chunk )
if err != nil {
return err
return fmt . Errorf ( "marshal chunk info: %w" , err )
}
session . Offset = newOffset
if chunk != nil {
session . Chunks = append ( session . Chunks , chunk )
if err := fs . filer . CreateEntry ( ctx , & filer . Entry {
FullPath : chunkPath ,
Attr : filer . Attr {
Mode : 0644 ,
Crtime : time . Now ( ) ,
Mtime : time . Now ( ) ,
Uid : OS_UID ,
Gid : OS_GID ,
} ,
Content : chunkData ,
} , false , false , nil , false , fs . filer . MaxFilenameLength ) ; err != nil {
return fmt . Errorf ( "save chunk info: %w" , err )
}
return fs . saveTusSession ( ctx , session )
return nil
}
// deleteTusSession removes a TUS upload session and all its data
func ( fs * FilerServer ) deleteTusSession ( ctx context . Context , uploadID string ) error {
// Clean up the session lock
defer removeTusSessionLock ( uploadID )
session , err := fs . getTusSession ( ctx , uploadID )
if err != nil {