Browse Source

Add TUS session storage types and utilities

Implements TUS upload session management:
- TusSession struct for tracking upload state
- Session creation with directory-based storage
- Session persistence using filer entries
- Session retrieval and offset updates
- Session deletion with chunk cleanup
- Upload completion with chunk assembly into final file

Session data is stored in /.uploads.tus/{upload-id}/ directory,
following the pattern used by S3 multipart uploads.
feature/tus-protocol
chrislu 2 days ago
parent
commit
3675d9db01
  1. 255
      weed/server/filer_server_tus_session.go

255
weed/server/filer_server_tus_session.go

@ -0,0 +1,255 @@
package weed_server
import (
"context"
"encoding/json"
"fmt"
"os"
"time"
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/util"
)
const (
TusVersion = "1.0.0"
TusMaxSize = int64(5 * 1024 * 1024 * 1024) // 5GB default max size
TusUploadsFolder = ".uploads.tus"
TusInfoFileName = ".info"
TusChunkExt = ".chunk"
TusExtensions = "creation,creation-with-upload,termination"
)
// TusSession represents an in-progress TUS upload session
type TusSession struct {
ID string `json:"id"`
TargetPath string `json:"target_path"`
Size int64 `json:"size"`
Offset int64 `json:"offset"`
Metadata map[string]string `json:"metadata,omitempty"`
CreatedAt time.Time `json:"created_at"`
ExpiresAt time.Time `json:"expires_at,omitempty"`
Chunks []*TusChunkInfo `json:"chunks,omitempty"`
}
// TusChunkInfo tracks individual chunk uploads within a session
type TusChunkInfo struct {
Offset int64 `json:"offset"`
Size int64 `json:"size"`
FileId string `json:"file_id"`
UploadAt int64 `json:"upload_at"`
}
// tusSessionDir returns the directory path for storing TUS upload sessions
func (fs *FilerServer) tusSessionDir() string {
return "/" + TusUploadsFolder
}
// tusSessionPath returns the path to a specific upload session directory
func (fs *FilerServer) tusSessionPath(uploadID string) string {
return fmt.Sprintf("/%s/%s", TusUploadsFolder, uploadID)
}
// tusSessionInfoPath returns the path to the session info file
func (fs *FilerServer) tusSessionInfoPath(uploadID string) string {
return fmt.Sprintf("/%s/%s/%s", TusUploadsFolder, uploadID, TusInfoFileName)
}
// createTusSession creates a new TUS upload session
func (fs *FilerServer) createTusSession(ctx context.Context, uploadID, targetPath string, size int64, metadata map[string]string) (*TusSession, error) {
session := &TusSession{
ID: uploadID,
TargetPath: targetPath,
Size: size,
Offset: 0,
Metadata: metadata,
CreatedAt: time.Now(),
ExpiresAt: time.Now().Add(7 * 24 * time.Hour), // 7 days default expiration
Chunks: []*TusChunkInfo{},
}
// Create session directory
sessionDirPath := util.FullPath(fs.tusSessionPath(uploadID))
if err := fs.filer.CreateEntry(ctx, &filer.Entry{
FullPath: sessionDirPath,
Attr: filer.Attr{
Mode: os.ModeDir | 0755,
Crtime: time.Now(),
Mtime: time.Now(),
Uid: OS_UID,
Gid: OS_GID,
},
}, false, false, nil, false, fs.filer.MaxFilenameLength); err != nil {
return nil, fmt.Errorf("create session directory: %w", err)
}
// Save session info
if err := fs.saveTusSession(ctx, session); err != nil {
// Cleanup the directory on failure
fs.filer.DeleteEntryMetaAndData(ctx, sessionDirPath, true, true, false, false, nil, 0)
return nil, fmt.Errorf("save session info: %w", err)
}
glog.V(2).Infof("Created TUS session %s for %s, size=%d", uploadID, targetPath, size)
return session, nil
}
// saveTusSession saves the session info to the filer
func (fs *FilerServer) saveTusSession(ctx context.Context, session *TusSession) error {
sessionData, err := json.Marshal(session)
if err != nil {
return fmt.Errorf("marshal session: %w", err)
}
infoPath := util.FullPath(fs.tusSessionInfoPath(session.ID))
entry := &filer.Entry{
FullPath: infoPath,
Attr: filer.Attr{
Mode: 0644,
Crtime: session.CreatedAt,
Mtime: time.Now(),
Uid: OS_UID,
Gid: OS_GID,
},
Content: sessionData,
}
if err := fs.filer.CreateEntry(ctx, entry, false, false, nil, false, fs.filer.MaxFilenameLength); err != nil {
return fmt.Errorf("save session info entry: %w", err)
}
return nil
}
// getTusSession retrieves a TUS session by upload ID
func (fs *FilerServer) getTusSession(ctx context.Context, uploadID string) (*TusSession, error) {
infoPath := util.FullPath(fs.tusSessionInfoPath(uploadID))
entry, err := fs.filer.FindEntry(ctx, infoPath)
if err != nil {
if err == filer_pb.ErrNotFound {
return nil, fmt.Errorf("session not found: %s", uploadID)
}
return nil, fmt.Errorf("find session: %w", err)
}
var session TusSession
if err := json.Unmarshal(entry.Content, &session); err != nil {
return nil, fmt.Errorf("unmarshal session: %w", err)
}
return &session, nil
}
// updateTusSessionOffset updates the session offset after a successful chunk upload
func (fs *FilerServer) updateTusSessionOffset(ctx context.Context, uploadID string, newOffset int64, chunk *TusChunkInfo) error {
session, err := fs.getTusSession(ctx, uploadID)
if err != nil {
return err
}
session.Offset = newOffset
if chunk != nil {
session.Chunks = append(session.Chunks, chunk)
}
return fs.saveTusSession(ctx, session)
}
// deleteTusSession removes a TUS upload session and all its data
func (fs *FilerServer) deleteTusSession(ctx context.Context, uploadID string) error {
session, err := fs.getTusSession(ctx, uploadID)
if err != nil {
// Session might already be deleted or never existed
glog.V(1).Infof("TUS session %s not found for deletion: %v", uploadID, err)
return nil
}
// Delete any uploaded chunks from volume servers
for _, chunk := range session.Chunks {
if chunk.FileId != "" {
fs.filer.DeleteChunks(ctx, util.FullPath(session.TargetPath), []*filer_pb.FileChunk{
{FileId: chunk.FileId},
})
}
}
// Delete the session directory
sessionDirPath := util.FullPath(fs.tusSessionPath(uploadID))
if err := fs.filer.DeleteEntryMetaAndData(ctx, sessionDirPath, true, true, false, false, nil, 0); err != nil {
return fmt.Errorf("delete session directory: %w", err)
}
glog.V(2).Infof("Deleted TUS session %s", uploadID)
return nil
}
// completeTusUpload assembles all chunks and creates the final file
func (fs *FilerServer) completeTusUpload(ctx context.Context, session *TusSession) error {
if session.Offset != session.Size {
return fmt.Errorf("upload incomplete: offset=%d, expected=%d", session.Offset, session.Size)
}
// Assemble file chunks in order
var fileChunks []*filer_pb.FileChunk
var offset int64 = 0
for _, chunk := range session.Chunks {
fid, fidErr := filer_pb.ToFileIdObject(chunk.FileId)
if fidErr != nil {
glog.Warningf("Invalid file ID %s: %v", chunk.FileId, fidErr)
continue
}
fileChunk := &filer_pb.FileChunk{
FileId: chunk.FileId,
Offset: offset,
Size: uint64(chunk.Size),
ModifiedTsNs: chunk.UploadAt,
Fid: fid,
}
fileChunks = append(fileChunks, fileChunk)
offset += chunk.Size
}
// Determine content type from metadata
contentType := ""
if session.Metadata != nil {
if ct, ok := session.Metadata["content-type"]; ok {
contentType = ct
}
}
// Create the final file entry
targetPath := util.FullPath(session.TargetPath)
entry := &filer.Entry{
FullPath: targetPath,
Attr: filer.Attr{
Mode: 0644,
Crtime: session.CreatedAt,
Mtime: time.Now(),
Uid: OS_UID,
Gid: OS_GID,
Mime: contentType,
},
Chunks: fileChunks,
}
// Ensure parent directory exists
if err := fs.filer.CreateEntry(ctx, entry, false, false, nil, false, fs.filer.MaxFilenameLength); err != nil {
return fmt.Errorf("create final file entry: %w", err)
}
// Delete the session (but keep the chunks since they're now part of the final file)
sessionDirPath := util.FullPath(fs.tusSessionPath(session.ID))
if err := fs.filer.DeleteEntryMetaAndData(ctx, sessionDirPath, true, false, false, false, nil, 0); err != nil {
glog.V(1).Infof("Failed to cleanup TUS session directory %s: %v", session.ID, err)
}
glog.V(2).Infof("Completed TUS upload %s -> %s, size=%d, chunks=%d",
session.ID, session.TargetPath, session.Size, len(fileChunks))
return nil
}
Loading…
Cancel
Save