From 263e891da0d9aae1618e48b4da5df6bfd9802f80 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Fri, 24 Oct 2025 17:09:58 -0700 Subject: [PATCH] Clients to volume server requires JWT tokens for all read operations (#7376) * [Admin UI] Login not possible due to securecookie error * avoid 404 favicon * Update weed/admin/dash/auth_middleware.go Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> * address comments * avoid variable over shadowing * log session save error * When jwt.signing.read.key is enabled in security.toml, the volume server requires JWT tokens for all read operations. * reuse fileId * refactor --------- Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> --- weed/filer/filechunk_manifest.go | 5 ++- weed/filer/stream.go | 30 +++++++++++++- .../replication/repl_util/replication_util.go | 4 +- weed/s3api/s3api_key_rotation.go | 10 +++-- weed/s3api/s3api_object_handlers_copy.go | 40 +++++++++++-------- weed/util/http/http_global_client_util.go | 8 +--- 6 files changed, 66 insertions(+), 31 deletions(-) diff --git a/weed/filer/filechunk_manifest.go b/weed/filer/filechunk_manifest.go index 80a741cf5..b04244669 100644 --- a/weed/filer/filechunk_manifest.go +++ b/weed/filer/filechunk_manifest.go @@ -109,7 +109,8 @@ func fetchWholeChunk(ctx context.Context, bytesBuffer *bytes.Buffer, lookupFileI glog.ErrorfCtx(ctx, "operation LookupFileId %s failed, err: %v", fileId, err) return err } - err = retriedStreamFetchChunkData(ctx, bytesBuffer, urlStrings, "", cipherKey, isGzipped, true, 0, 0) + jwt := JwtForVolumeServer(fileId) + err = retriedStreamFetchChunkData(ctx, bytesBuffer, urlStrings, jwt, cipherKey, isGzipped, true, 0, 0) if err != nil { return err } @@ -150,7 +151,7 @@ func retriedStreamFetchChunkData(ctx context.Context, writer io.Writer, urlStrin retriedCnt++ var localProcessed int var writeErr error - shouldRetry, err = util_http.ReadUrlAsStreamAuthenticated(ctx, urlString+"?readDeleted=true", jwt, cipherKey, isGzipped, isFullChunk, offset, size, func(data []byte) { + shouldRetry, err = util_http.ReadUrlAsStream(ctx, urlString+"?readDeleted=true", jwt, cipherKey, isGzipped, isFullChunk, offset, size, func(data []byte) { // Check for context cancellation during data processing select { case <-ctx.Done(): diff --git a/weed/filer/stream.go b/weed/filer/stream.go index 87280d6b0..b2ee00555 100644 --- a/weed/filer/stream.go +++ b/weed/filer/stream.go @@ -14,6 +14,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/security" "github.com/seaweedfs/seaweedfs/weed/stats" "github.com/seaweedfs/seaweedfs/weed/util" util_http "github.com/seaweedfs/seaweedfs/weed/util/http" @@ -26,6 +27,30 @@ var getLookupFileIdBackoffSchedule = []time.Duration{ 1800 * time.Millisecond, } +var ( + jwtSigningReadKey security.SigningKey + jwtSigningReadKeyExpires int + loadJwtConfigOnce sync.Once +) + +func loadJwtConfig() { + v := util.GetViper() + jwtSigningReadKey = security.SigningKey(v.GetString("jwt.signing.read.key")) + jwtSigningReadKeyExpires = v.GetInt("jwt.signing.read.expires_after_seconds") + if jwtSigningReadKeyExpires == 0 { + jwtSigningReadKeyExpires = 60 + } +} + +// JwtForVolumeServer generates a JWT token for volume server read operations if jwt.signing.read is configured +func JwtForVolumeServer(fileId string) string { + loadJwtConfigOnce.Do(loadJwtConfig) + if len(jwtSigningReadKey) == 0 { + return "" + } + return string(security.GenJwtForVolumeServer(jwtSigningReadKey, jwtSigningReadKeyExpires, fileId)) +} + func HasData(entry *filer_pb.Entry) bool { if len(entry.Content) > 0 { @@ -152,7 +177,7 @@ func PrepareStreamContentWithThrottler(ctx context.Context, masterClient wdclien } func StreamContent(masterClient wdclient.HasLookupFileIdFunction, writer io.Writer, chunks []*filer_pb.FileChunk, offset int64, size int64) error { - streamFn, err := PrepareStreamContent(masterClient, noJwtFunc, chunks, offset, size) + streamFn, err := PrepareStreamContent(masterClient, JwtForVolumeServer, chunks, offset, size) if err != nil { return err } @@ -351,8 +376,9 @@ func (c *ChunkStreamReader) fetchChunkToBuffer(chunkView *ChunkView) error { } var buffer bytes.Buffer var shouldRetry bool + jwt := JwtForVolumeServer(chunkView.FileId) for _, urlString := range urlStrings { - shouldRetry, err = util_http.ReadUrlAsStream(context.Background(), urlString+"?readDeleted=true", chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.OffsetInChunk, int(chunkView.ViewSize), func(data []byte) { + shouldRetry, err = util_http.ReadUrlAsStream(context.Background(), urlString+"?readDeleted=true", jwt, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.OffsetInChunk, int(chunkView.ViewSize), func(data []byte) { buffer.Write(data) }) if !shouldRetry { diff --git a/weed/replication/repl_util/replication_util.go b/weed/replication/repl_util/replication_util.go index 57c206e3e..c9812382c 100644 --- a/weed/replication/repl_util/replication_util.go +++ b/weed/replication/repl_util/replication_util.go @@ -2,6 +2,7 @@ package repl_util import ( "context" + "github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/replication/source" @@ -20,9 +21,10 @@ func CopyFromChunkViews(chunkViews *filer.IntervalList[*filer.ChunkView], filerS var writeErr error var shouldRetry bool + jwt := filer.JwtForVolumeServer(chunk.FileId) for _, fileUrl := range fileUrls { - shouldRetry, err = util_http.ReadUrlAsStream(context.Background(), fileUrl, chunk.CipherKey, chunk.IsGzipped, chunk.IsFullChunk(), chunk.OffsetInChunk, int(chunk.ViewSize), func(data []byte) { + shouldRetry, err = util_http.ReadUrlAsStream(context.Background(), fileUrl, jwt, chunk.CipherKey, chunk.IsGzipped, chunk.IsFullChunk(), chunk.OffsetInChunk, int(chunk.ViewSize), func(data []byte) { writeErr = writeFunc(data) }) if err != nil { diff --git a/weed/s3api/s3api_key_rotation.go b/weed/s3api/s3api_key_rotation.go index 499505678..050a2826c 100644 --- a/weed/s3api/s3api_key_rotation.go +++ b/weed/s3api/s3api_key_rotation.go @@ -175,13 +175,14 @@ func (s3a *S3ApiServer) rotateSSECChunk(chunk *filer_pb.FileChunk, sourceKey, de } // Get source chunk data - srcUrl, err := s3a.lookupVolumeUrl(chunk.GetFileIdString()) + fileId := chunk.GetFileIdString() + srcUrl, err := s3a.lookupVolumeUrl(fileId) if err != nil { return nil, fmt.Errorf("lookup source volume: %w", err) } // Download encrypted data - encryptedData, err := s3a.downloadChunkData(srcUrl, 0, int64(chunk.Size)) + encryptedData, err := s3a.downloadChunkData(srcUrl, fileId, 0, int64(chunk.Size)) if err != nil { return nil, fmt.Errorf("download chunk data: %w", err) } @@ -243,13 +244,14 @@ func (s3a *S3ApiServer) rotateSSEKMSChunk(chunk *filer_pb.FileChunk, srcKeyID, d } // Get source chunk data - srcUrl, err := s3a.lookupVolumeUrl(chunk.GetFileIdString()) + fileId := chunk.GetFileIdString() + srcUrl, err := s3a.lookupVolumeUrl(fileId) if err != nil { return nil, fmt.Errorf("lookup source volume: %w", err) } // Download data (this would be encrypted with the old KMS key) - chunkData, err := s3a.downloadChunkData(srcUrl, 0, int64(chunk.Size)) + chunkData, err := s3a.downloadChunkData(srcUrl, fileId, 0, int64(chunk.Size)) if err != nil { return nil, fmt.Errorf("download chunk data: %w", err) } diff --git a/weed/s3api/s3api_object_handlers_copy.go b/weed/s3api/s3api_object_handlers_copy.go index 65de55d1e..f04522ca6 100644 --- a/weed/s3api/s3api_object_handlers_copy.go +++ b/weed/s3api/s3api_object_handlers_copy.go @@ -734,7 +734,8 @@ func (s3a *S3ApiServer) copySingleChunk(chunk *filer_pb.FileChunk, dstPath strin dstChunk := s3a.createDestinationChunk(chunk, chunk.Offset, chunk.Size) // Prepare chunk copy (assign new volume and get source URL) - assignResult, srcUrl, err := s3a.prepareChunkCopy(chunk.GetFileIdString(), dstPath) + fileId := chunk.GetFileIdString() + assignResult, srcUrl, err := s3a.prepareChunkCopy(fileId, dstPath) if err != nil { return nil, err } @@ -745,7 +746,7 @@ func (s3a *S3ApiServer) copySingleChunk(chunk *filer_pb.FileChunk, dstPath strin } // Download and upload the chunk - chunkData, err := s3a.downloadChunkData(srcUrl, 0, int64(chunk.Size)) + chunkData, err := s3a.downloadChunkData(srcUrl, fileId, 0, int64(chunk.Size)) if err != nil { return nil, fmt.Errorf("download chunk data: %w", err) } @@ -763,7 +764,8 @@ func (s3a *S3ApiServer) copySingleChunkForRange(originalChunk, rangeChunk *filer dstChunk := s3a.createDestinationChunk(rangeChunk, rangeChunk.Offset, rangeChunk.Size) // Prepare chunk copy (assign new volume and get source URL) - assignResult, srcUrl, err := s3a.prepareChunkCopy(originalChunk.GetFileIdString(), dstPath) + fileId := originalChunk.GetFileIdString() + assignResult, srcUrl, err := s3a.prepareChunkCopy(fileId, dstPath) if err != nil { return nil, err } @@ -779,7 +781,7 @@ func (s3a *S3ApiServer) copySingleChunkForRange(originalChunk, rangeChunk *filer offsetInChunk := overlapStart - chunkStart // Download and upload the chunk portion - chunkData, err := s3a.downloadChunkData(srcUrl, offsetInChunk, int64(rangeChunk.Size)) + chunkData, err := s3a.downloadChunkData(srcUrl, fileId, offsetInChunk, int64(rangeChunk.Size)) if err != nil { return nil, fmt.Errorf("download chunk range data: %w", err) } @@ -1096,9 +1098,10 @@ func (s3a *S3ApiServer) uploadChunkData(chunkData []byte, assignResult *filer_pb } // downloadChunkData downloads chunk data from the source URL -func (s3a *S3ApiServer) downloadChunkData(srcUrl string, offset, size int64) ([]byte, error) { +func (s3a *S3ApiServer) downloadChunkData(srcUrl, fileId string, offset, size int64) ([]byte, error) { + jwt := filer.JwtForVolumeServer(fileId) var chunkData []byte - shouldRetry, err := util_http.ReadUrlAsStream(context.Background(), srcUrl, nil, false, false, offset, int(size), func(data []byte) { + shouldRetry, err := util_http.ReadUrlAsStream(context.Background(), srcUrl, jwt, nil, false, false, offset, int(size), func(data []byte) { chunkData = append(chunkData, data...) }) if err != nil { @@ -1218,7 +1221,8 @@ func (s3a *S3ApiServer) copyMultipartSSEKMSChunk(chunk *filer_pb.FileChunk, dest dstChunk := s3a.createDestinationChunk(chunk, chunk.Offset, chunk.Size) // Prepare chunk copy (assign new volume and get source URL) - assignResult, srcUrl, err := s3a.prepareChunkCopy(chunk.GetFileIdString(), dstPath) + fileId := chunk.GetFileIdString() + assignResult, srcUrl, err := s3a.prepareChunkCopy(fileId, dstPath) if err != nil { return nil, err } @@ -1229,7 +1233,7 @@ func (s3a *S3ApiServer) copyMultipartSSEKMSChunk(chunk *filer_pb.FileChunk, dest } // Download encrypted chunk data - encryptedData, err := s3a.downloadChunkData(srcUrl, 0, int64(chunk.Size)) + encryptedData, err := s3a.downloadChunkData(srcUrl, fileId, 0, int64(chunk.Size)) if err != nil { return nil, fmt.Errorf("download encrypted chunk data: %w", err) } @@ -1315,7 +1319,8 @@ func (s3a *S3ApiServer) copyMultipartSSECChunk(chunk *filer_pb.FileChunk, copySo dstChunk := s3a.createDestinationChunk(chunk, chunk.Offset, chunk.Size) // Prepare chunk copy (assign new volume and get source URL) - assignResult, srcUrl, err := s3a.prepareChunkCopy(chunk.GetFileIdString(), dstPath) + fileId := chunk.GetFileIdString() + assignResult, srcUrl, err := s3a.prepareChunkCopy(fileId, dstPath) if err != nil { return nil, nil, err } @@ -1326,7 +1331,7 @@ func (s3a *S3ApiServer) copyMultipartSSECChunk(chunk *filer_pb.FileChunk, copySo } // Download encrypted chunk data - encryptedData, err := s3a.downloadChunkData(srcUrl, 0, int64(chunk.Size)) + encryptedData, err := s3a.downloadChunkData(srcUrl, fileId, 0, int64(chunk.Size)) if err != nil { return nil, nil, fmt.Errorf("download encrypted chunk data: %w", err) } @@ -1537,7 +1542,8 @@ func (s3a *S3ApiServer) copyCrossEncryptionChunk(chunk *filer_pb.FileChunk, sour dstChunk := s3a.createDestinationChunk(chunk, chunk.Offset, chunk.Size) // Prepare chunk copy (assign new volume and get source URL) - assignResult, srcUrl, err := s3a.prepareChunkCopy(chunk.GetFileIdString(), dstPath) + fileId := chunk.GetFileIdString() + assignResult, srcUrl, err := s3a.prepareChunkCopy(fileId, dstPath) if err != nil { return nil, err } @@ -1548,7 +1554,7 @@ func (s3a *S3ApiServer) copyCrossEncryptionChunk(chunk *filer_pb.FileChunk, sour } // Download encrypted chunk data - encryptedData, err := s3a.downloadChunkData(srcUrl, 0, int64(chunk.Size)) + encryptedData, err := s3a.downloadChunkData(srcUrl, fileId, 0, int64(chunk.Size)) if err != nil { return nil, fmt.Errorf("download encrypted chunk data: %w", err) } @@ -1834,7 +1840,8 @@ func (s3a *S3ApiServer) copyChunkWithReencryption(chunk *filer_pb.FileChunk, cop dstChunk := s3a.createDestinationChunk(chunk, chunk.Offset, chunk.Size) // Prepare chunk copy (assign new volume and get source URL) - assignResult, srcUrl, err := s3a.prepareChunkCopy(chunk.GetFileIdString(), dstPath) + fileId := chunk.GetFileIdString() + assignResult, srcUrl, err := s3a.prepareChunkCopy(fileId, dstPath) if err != nil { return nil, err } @@ -1845,7 +1852,7 @@ func (s3a *S3ApiServer) copyChunkWithReencryption(chunk *filer_pb.FileChunk, cop } // Download encrypted chunk data - encryptedData, err := s3a.downloadChunkData(srcUrl, 0, int64(chunk.Size)) + encryptedData, err := s3a.downloadChunkData(srcUrl, fileId, 0, int64(chunk.Size)) if err != nil { return nil, fmt.Errorf("download encrypted chunk data: %w", err) } @@ -2052,7 +2059,8 @@ func (s3a *S3ApiServer) copyChunkWithSSEKMSReencryption(chunk *filer_pb.FileChun dstChunk := s3a.createDestinationChunk(chunk, chunk.Offset, chunk.Size) // Prepare chunk copy (assign new volume and get source URL) - assignResult, srcUrl, err := s3a.prepareChunkCopy(chunk.GetFileIdString(), dstPath) + fileId := chunk.GetFileIdString() + assignResult, srcUrl, err := s3a.prepareChunkCopy(fileId, dstPath) if err != nil { return nil, err } @@ -2063,7 +2071,7 @@ func (s3a *S3ApiServer) copyChunkWithSSEKMSReencryption(chunk *filer_pb.FileChun } // Download chunk data - chunkData, err := s3a.downloadChunkData(srcUrl, 0, int64(chunk.Size)) + chunkData, err := s3a.downloadChunkData(srcUrl, fileId, 0, int64(chunk.Size)) if err != nil { return nil, fmt.Errorf("download chunk data: %w", err) } diff --git a/weed/util/http/http_global_client_util.go b/weed/util/http/http_global_client_util.go index 64a1640ce..38f129365 100644 --- a/weed/util/http/http_global_client_util.go +++ b/weed/util/http/http_global_client_util.go @@ -305,11 +305,7 @@ func ReadUrl(ctx context.Context, fileUrl string, cipherKey []byte, isContentCom return n, err } -func ReadUrlAsStream(ctx context.Context, fileUrl string, cipherKey []byte, isContentGzipped bool, isFullChunk bool, offset int64, size int, fn func(data []byte)) (retryable bool, err error) { - return ReadUrlAsStreamAuthenticated(ctx, fileUrl, "", cipherKey, isContentGzipped, isFullChunk, offset, size, fn) -} - -func ReadUrlAsStreamAuthenticated(ctx context.Context, fileUrl, jwt string, cipherKey []byte, isContentGzipped bool, isFullChunk bool, offset int64, size int, fn func(data []byte)) (retryable bool, err error) { +func ReadUrlAsStream(ctx context.Context, fileUrl, jwt string, cipherKey []byte, isContentGzipped bool, isFullChunk bool, offset int64, size int, fn func(data []byte)) (retryable bool, err error) { if cipherKey != nil { return readEncryptedUrl(ctx, fileUrl, jwt, cipherKey, isContentGzipped, isFullChunk, offset, size, fn) } @@ -509,7 +505,7 @@ func RetriedFetchChunkData(ctx context.Context, buffer []byte, urlStrings []stri if strings.Contains(urlString, "%") { urlString = url.PathEscape(urlString) } - shouldRetry, err = ReadUrlAsStreamAuthenticated(ctx, urlString+"?readDeleted=true", string(jwt), cipherKey, isGzipped, isFullChunk, offset, len(buffer), func(data []byte) { + shouldRetry, err = ReadUrlAsStream(ctx, urlString+"?readDeleted=true", string(jwt), cipherKey, isGzipped, isFullChunk, offset, len(buffer), func(data []byte) { // Check for context cancellation during data processing select { case <-ctx.Done():