Browse Source

Address code review comments

- Sort chunks by offset before assembling final file
- Use chunk.Offset directly instead of recalculating
- Return error on invalid file ID instead of skipping
- Require Content-Length header for PATCH requests
- Use fs.option.Cipher for encryption setting
- Detect MIME type from data using http.DetectContentType
- Fix concurrency group for push events in workflow
- Use os.Interrupt instead of Kill for graceful shutdown in tests
feature/tus-protocol
chrislu 4 days ago
parent
commit
f45918c3ea
  1. 2
      .github/workflows/tus-tests.yml
  2. 6
      test/tus/tus_integration_test.go
  3. 13
      weed/server/filer_server_tus_handlers.go
  4. 13
      weed/server/filer_server_tus_session.go

2
.github/workflows/tus-tests.yml

@ -15,7 +15,7 @@ on:
- 'test/tus/**' - 'test/tus/**'
concurrency: concurrency:
group: ${{ github.head_ref }}/tus-tests
group: ${{ github.head_ref || github.ref }}/tus-tests
cancel-in-progress: true cancel-in-progress: true
permissions: permissions:

6
test/tus/tus_integration_test.go

@ -36,15 +36,15 @@ type TestCluster struct {
func (c *TestCluster) Stop() { func (c *TestCluster) Stop() {
if c.filerCmd != nil && c.filerCmd.Process != nil { if c.filerCmd != nil && c.filerCmd.Process != nil {
c.filerCmd.Process.Kill()
c.filerCmd.Process.Signal(os.Interrupt)
c.filerCmd.Wait() c.filerCmd.Wait()
} }
if c.volumeCmd != nil && c.volumeCmd.Process != nil { if c.volumeCmd != nil && c.volumeCmd.Process != nil {
c.volumeCmd.Process.Kill()
c.volumeCmd.Process.Signal(os.Interrupt)
c.volumeCmd.Wait() c.volumeCmd.Wait()
} }
if c.masterCmd != nil && c.masterCmd.Process != nil { if c.masterCmd != nil && c.masterCmd.Process != nil {
c.masterCmd.Process.Kill()
c.masterCmd.Process.Signal(os.Interrupt)
c.masterCmd.Wait() c.masterCmd.Wait()
} }
} }

13
weed/server/filer_server_tus_handlers.go

@ -222,6 +222,12 @@ func (fs *FilerServer) tusPatchHandler(w http.ResponseWriter, r *http.Request, u
return return
} }
// TUS requires Content-Length header for PATCH requests
if r.ContentLength < 0 {
http.Error(w, "Content-Length header required", http.StatusBadRequest)
return
}
// Write data // Write data
bytesWritten, err := fs.tusWriteData(ctx, session, uploadOffset, r.Body, r.ContentLength) bytesWritten, err := fs.tusWriteData(ctx, session, uploadOffset, r.Body, r.ContentLength)
if err != nil { if err != nil {
@ -309,12 +315,15 @@ func (fs *FilerServer) tusWriteData(ctx context.Context, session *TusSession, of
return 0, fmt.Errorf("create uploader: %w", uploaderErr) return 0, fmt.Errorf("create uploader: %w", uploaderErr)
} }
// Detect MIME type from data
mimeType := http.DetectContentType(buf.Bytes())
uploadResult, uploadErr, _ := uploader.Upload(ctx, bytes.NewReader(buf.Bytes()), &operation.UploadOption{ uploadResult, uploadErr, _ := uploader.Upload(ctx, bytes.NewReader(buf.Bytes()), &operation.UploadOption{
UploadUrl: urlLocation, UploadUrl: urlLocation,
Filename: "", Filename: "",
Cipher: false,
Cipher: fs.option.Cipher,
IsInputCompressed: false, IsInputCompressed: false,
MimeType: "",
MimeType: mimeType,
PairMap: nil, PairMap: nil,
Jwt: auth, Jwt: auth,
}) })

13
weed/server/filer_server_tus_session.go

@ -5,6 +5,7 @@ import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"os" "os"
"sort"
"time" "time"
"github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/filer"
@ -191,26 +192,28 @@ func (fs *FilerServer) completeTusUpload(ctx context.Context, session *TusSessio
return fmt.Errorf("upload incomplete: offset=%d, expected=%d", session.Offset, session.Size) return fmt.Errorf("upload incomplete: offset=%d, expected=%d", session.Offset, session.Size)
} }
// Sort chunks by offset to ensure correct order
sort.Slice(session.Chunks, func(i, j int) bool {
return session.Chunks[i].Offset < session.Chunks[j].Offset
})
// Assemble file chunks in order // Assemble file chunks in order
var fileChunks []*filer_pb.FileChunk var fileChunks []*filer_pb.FileChunk
var offset int64 = 0
for _, chunk := range session.Chunks { for _, chunk := range session.Chunks {
fid, fidErr := filer_pb.ToFileIdObject(chunk.FileId) fid, fidErr := filer_pb.ToFileIdObject(chunk.FileId)
if fidErr != nil { if fidErr != nil {
glog.Warningf("Invalid file ID %s: %v", chunk.FileId, fidErr)
continue
return fmt.Errorf("invalid file ID %s at offset %d: %w", chunk.FileId, chunk.Offset, fidErr)
} }
fileChunk := &filer_pb.FileChunk{ fileChunk := &filer_pb.FileChunk{
FileId: chunk.FileId, FileId: chunk.FileId,
Offset: offset,
Offset: chunk.Offset,
Size: uint64(chunk.Size), Size: uint64(chunk.Size),
ModifiedTsNs: chunk.UploadAt, ModifiedTsNs: chunk.UploadAt,
Fid: fid, Fid: fid,
} }
fileChunks = append(fileChunks, fileChunk) fileChunks = append(fileChunks, fileChunk)
offset += chunk.Size
} }
// Determine content type from metadata // Determine content type from metadata

Loading…
Cancel
Save