From b49ee254814d6ecea7937147f30862c154f25667 Mon Sep 17 00:00:00 2001 From: chrislu Date: Mon, 1 Dec 2025 16:21:29 -0800 Subject: [PATCH] Fix race condition to work across multiple filer instances - Store each chunk as a separate file entry instead of updating session JSON - Chunk file names encode offset, size, and fileId for atomic storage - getTusSession loads chunks from directory listing (atomic read) - Eliminates read-modify-write race condition across multiple filers - Remove in-memory mutex that only worked for single filer instance --- weed/server/filer_server_tus_session.go | 138 +++++++++++++++--------- 1 file changed, 90 insertions(+), 48 deletions(-) diff --git a/weed/server/filer_server_tus_session.go b/weed/server/filer_server_tus_session.go index c8ae7c1cd..550574058 100644 --- a/weed/server/filer_server_tus_session.go +++ b/weed/server/filer_server_tus_session.go @@ -6,7 +6,8 @@ import ( "fmt" "os" "sort" - "sync" + "strconv" + "strings" "time" "github.com/seaweedfs/seaweedfs/weed/filer" @@ -15,39 +16,6 @@ import ( "github.com/seaweedfs/seaweedfs/weed/util" ) -// tusSessionLocks provides per-session locking to prevent race conditions -var tusSessionLocks = struct { - sync.RWMutex - locks map[string]*sync.Mutex -}{locks: make(map[string]*sync.Mutex)} - -// getTusSessionLock returns a lock for the given upload ID -func getTusSessionLock(uploadID string) *sync.Mutex { - tusSessionLocks.RLock() - lock, exists := tusSessionLocks.locks[uploadID] - tusSessionLocks.RUnlock() - if exists { - return lock - } - - tusSessionLocks.Lock() - defer tusSessionLocks.Unlock() - // Double-check after acquiring write lock - if lock, exists = tusSessionLocks.locks[uploadID]; exists { - return lock - } - lock = &sync.Mutex{} - tusSessionLocks.locks[uploadID] = lock - return lock -} - -// removeTusSessionLock removes the lock for the given upload ID -func removeTusSessionLock(uploadID string) { - tusSessionLocks.Lock() - defer tusSessionLocks.Unlock() - delete(tusSessionLocks.locks, uploadID) -} - const ( TusVersion = "1.0.0" TusMaxSize = int64(5 * 1024 * 1024 * 1024) // 5GB default max size @@ -92,6 +60,41 @@ func (fs *FilerServer) tusSessionInfoPath(uploadID string) string { return fmt.Sprintf("/%s/%s/%s", TusUploadsFolder, uploadID, TusInfoFileName) } +// tusChunkPath returns the path to store a chunk info file +// Format: /{TusUploadsFolder}/{uploadID}/chunk_{offset}_{size}_{fileId} +func (fs *FilerServer) tusChunkPath(uploadID string, offset, size int64, fileId string) string { + // Replace / in fileId with _ to make it a valid filename + safeFileId := strings.ReplaceAll(fileId, "/", "_") + return fmt.Sprintf("/%s/%s/chunk_%016d_%016d_%s", TusUploadsFolder, uploadID, offset, size, safeFileId) +} + +// parseTusChunkPath parses chunk info from a chunk file name +func parseTusChunkPath(name string) (*TusChunkInfo, error) { + if !strings.HasPrefix(name, "chunk_") { + return nil, fmt.Errorf("not a chunk file: %s", name) + } + parts := strings.SplitN(name[6:], "_", 3) // Skip "chunk_" prefix + if len(parts) < 3 { + return nil, fmt.Errorf("invalid chunk file name: %s", name) + } + offset, err := strconv.ParseInt(parts[0], 10, 64) + if err != nil { + return nil, fmt.Errorf("invalid offset in chunk file: %s", name) + } + size, err := strconv.ParseInt(parts[1], 10, 64) + if err != nil { + return nil, fmt.Errorf("invalid size in chunk file: %s", name) + } + // Restore / in fileId + fileId := strings.ReplaceAll(parts[2], "_", "/") + return &TusChunkInfo{ + Offset: offset, + Size: size, + FileId: fileId, + UploadAt: time.Now().UnixNano(), + }, nil +} + // createTusSession creates a new TUS upload session func (fs *FilerServer) createTusSession(ctx context.Context, uploadID, targetPath string, size int64, metadata map[string]string) (*TusSession, error) { session := &TusSession{ @@ -158,7 +161,7 @@ func (fs *FilerServer) saveTusSession(ctx context.Context, session *TusSession) return nil } -// getTusSession retrieves a TUS session by upload ID +// getTusSession retrieves a TUS session by upload ID, including chunks from directory listing func (fs *FilerServer) getTusSession(ctx context.Context, uploadID string) (*TusSession, error) { infoPath := util.FullPath(fs.tusSessionInfoPath(uploadID)) entry, err := fs.filer.FindEntry(ctx, infoPath) @@ -174,33 +177,72 @@ func (fs *FilerServer) getTusSession(ctx context.Context, uploadID string) (*Tus return nil, fmt.Errorf("unmarshal session: %w", err) } + // Load chunks from directory listing (atomic read, no race condition) + sessionDirPath := util.FullPath(fs.tusSessionPath(uploadID)) + entries, _, err := fs.filer.ListDirectoryEntries(ctx, sessionDirPath, "", false, 10000, "", "", "") + if err != nil { + return nil, fmt.Errorf("list session directory: %w", err) + } + + session.Chunks = nil + session.Offset = 0 + for _, e := range entries { + if strings.HasPrefix(e.Name(), "chunk_") { + chunk, parseErr := parseTusChunkPath(e.Name()) + if parseErr != nil { + glog.V(1).Infof("Skipping invalid chunk file %s: %v", e.Name(), parseErr) + continue + } + session.Chunks = append(session.Chunks, chunk) + } + } + + // Sort chunks by offset and compute current offset + if len(session.Chunks) > 0 { + sort.Slice(session.Chunks, func(i, j int) bool { + return session.Chunks[i].Offset < session.Chunks[j].Offset + }) + // Current offset is the end of the last chunk + lastChunk := session.Chunks[len(session.Chunks)-1] + session.Offset = lastChunk.Offset + lastChunk.Size + } + return &session, nil } -// updateTusSessionOffset updates the session offset after a successful chunk upload +// updateTusSessionOffset stores the chunk info as a separate file entry +// This avoids read-modify-write race conditions across multiple filer instances func (fs *FilerServer) updateTusSessionOffset(ctx context.Context, uploadID string, newOffset int64, chunk *TusChunkInfo) error { - // Lock the session to prevent concurrent modifications - lock := getTusSessionLock(uploadID) - lock.Lock() - defer lock.Unlock() + if chunk == nil { + return nil + } - session, err := fs.getTusSession(ctx, uploadID) + // Store chunk info as a separate file entry (atomic operation) + chunkPath := util.FullPath(fs.tusChunkPath(uploadID, chunk.Offset, chunk.Size, chunk.FileId)) + chunkData, err := json.Marshal(chunk) if err != nil { - return err + return fmt.Errorf("marshal chunk info: %w", err) } - session.Offset = newOffset - if chunk != nil { - session.Chunks = append(session.Chunks, chunk) + if err := fs.filer.CreateEntry(ctx, &filer.Entry{ + FullPath: chunkPath, + Attr: filer.Attr{ + Mode: 0644, + Crtime: time.Now(), + Mtime: time.Now(), + Uid: OS_UID, + Gid: OS_GID, + }, + Content: chunkData, + }, false, false, nil, false, fs.filer.MaxFilenameLength); err != nil { + return fmt.Errorf("save chunk info: %w", err) } - return fs.saveTusSession(ctx, session) + return nil } // deleteTusSession removes a TUS upload session and all its data func (fs *FilerServer) deleteTusSession(ctx context.Context, uploadID string) error { - // Clean up the session lock - defer removeTusSessionLock(uploadID) session, err := fs.getTusSession(ctx, uploadID) if err != nil {