From 5c8e6014baabe84cde25eea00ea75d0be5f0aa8f Mon Sep 17 00:00:00 2001 From: Henco Appel Date: Mon, 8 Apr 2024 15:27:00 +0100 Subject: [PATCH] fix: filer authenticate with with volume server (#5480) --- weed/filer/filechunk_manifest.go | 6 ++-- weed/filer/stream.go | 17 +++++++--- weed/server/filer_server.go | 10 ++++++ weed/server/filer_server_handlers_proxy.go | 21 ++++++++++++ weed/server/filer_server_handlers_read.go | 7 +++- weed/util/http_util.go | 37 +++++++++++++--------- 6 files changed, 74 insertions(+), 24 deletions(-) diff --git a/weed/filer/filechunk_manifest.go b/weed/filer/filechunk_manifest.go index 60a5c538b..7ea2f0353 100644 --- a/weed/filer/filechunk_manifest.go +++ b/weed/filer/filechunk_manifest.go @@ -107,7 +107,7 @@ func fetchWholeChunk(bytesBuffer *bytes.Buffer, lookupFileIdFn wdclient.LookupFi glog.Errorf("operation LookupFileId %s failed, err: %v", fileId, err) return err } - err = retriedStreamFetchChunkData(bytesBuffer, urlStrings, cipherKey, isGzipped, true, 0, 0) + err = retriedStreamFetchChunkData(bytesBuffer, urlStrings, "", cipherKey, isGzipped, true, 0, 0) if err != nil { return err } @@ -123,7 +123,7 @@ func fetchChunkRange(buffer []byte, lookupFileIdFn wdclient.LookupFileIdFunction return util.RetriedFetchChunkData(buffer, urlStrings, cipherKey, isGzipped, false, offset) } -func retriedStreamFetchChunkData(writer io.Writer, urlStrings []string, cipherKey []byte, isGzipped bool, isFullChunk bool, offset int64, size int) (err error) { +func retriedStreamFetchChunkData(writer io.Writer, urlStrings []string, jwt string, cipherKey []byte, isGzipped bool, isFullChunk bool, offset int64, size int) (err error) { var shouldRetry bool var totalWritten int @@ -132,7 +132,7 @@ func retriedStreamFetchChunkData(writer io.Writer, urlStrings []string, cipherKe for _, urlString := range urlStrings { var localProcessed int var writeErr error - shouldRetry, err = util.ReadUrlAsStream(urlString+"?readDeleted=true", cipherKey, isGzipped, isFullChunk, offset, size, func(data []byte) { + shouldRetry, err = util.ReadUrlAsStreamAuthenticated(urlString+"?readDeleted=true", jwt, cipherKey, isGzipped, isFullChunk, offset, size, func(data []byte) { if totalWritten > localProcessed { toBeSkipped := totalWritten - localProcessed if len(data) <= toBeSkipped { diff --git a/weed/filer/stream.go b/weed/filer/stream.go index 2686fd833..23a853b9a 100644 --- a/weed/filer/stream.go +++ b/weed/filer/stream.go @@ -69,11 +69,17 @@ func NewFileReader(filerClient filer_pb.FilerClient, entry *filer_pb.Entry) io.R type DoStreamContent func(writer io.Writer) error -func PrepareStreamContent(masterClient wdclient.HasLookupFileIdFunction, chunks []*filer_pb.FileChunk, offset int64, size int64) (DoStreamContent, error) { - return PrepareStreamContentWithThrottler(masterClient, chunks, offset, size, 0) +func PrepareStreamContent(masterClient wdclient.HasLookupFileIdFunction, jwtFunc VolumeServerJwtFunction, chunks []*filer_pb.FileChunk, offset int64, size int64) (DoStreamContent, error) { + return PrepareStreamContentWithThrottler(masterClient, jwtFunc, chunks, offset, size, 0) } -func PrepareStreamContentWithThrottler(masterClient wdclient.HasLookupFileIdFunction, chunks []*filer_pb.FileChunk, offset int64, size int64, downloadMaxBytesPs int64) (DoStreamContent, error) { +type VolumeServerJwtFunction func(fileId string) string + +func noJwtFunc(string) string { + return "" +} + +func PrepareStreamContentWithThrottler(masterClient wdclient.HasLookupFileIdFunction, jwtFunc VolumeServerJwtFunction, chunks []*filer_pb.FileChunk, offset int64, size int64, downloadMaxBytesPs int64) (DoStreamContent, error) { glog.V(4).Infof("prepare to stream content for chunks: %d", len(chunks)) chunkViews := ViewFromChunks(masterClient.GetLookupFileIdFunction(), chunks, offset, size) @@ -119,7 +125,8 @@ func PrepareStreamContentWithThrottler(masterClient wdclient.HasLookupFileIdFunc } urlStrings := fileId2Url[chunkView.FileId] start := time.Now() - err := retriedStreamFetchChunkData(writer, urlStrings, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.OffsetInChunk, int(chunkView.ViewSize)) + jwt := jwtFunc(chunkView.FileId) + err := retriedStreamFetchChunkData(writer, urlStrings, jwt, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.OffsetInChunk, int(chunkView.ViewSize)) offset += int64(chunkView.ViewSize) remaining -= int64(chunkView.ViewSize) stats.FilerRequestHistogram.WithLabelValues("chunkDownload").Observe(time.Since(start).Seconds()) @@ -143,7 +150,7 @@ func PrepareStreamContentWithThrottler(masterClient wdclient.HasLookupFileIdFunc } func StreamContent(masterClient wdclient.HasLookupFileIdFunction, writer io.Writer, chunks []*filer_pb.FileChunk, offset int64, size int64) error { - streamFn, err := PrepareStreamContent(masterClient, chunks, offset, size) + streamFn, err := PrepareStreamContent(masterClient, noJwtFunc, chunks, offset, size) if err != nil { return err } diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go index 356761f30..9880afee0 100644 --- a/weed/server/filer_server.go +++ b/weed/server/filer_server.go @@ -91,6 +91,7 @@ type FilerServer struct { secret security.SigningKey filer *filer.Filer filerGuard *security.Guard + volumeGuard *security.Guard grpcDialOption grpc.DialOption // metrics read from the master @@ -113,6 +114,14 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) v.SetDefault("jwt.filer_signing.read.expires_after_seconds", 60) readExpiresAfterSec := v.GetInt("jwt.filer_signing.read.expires_after_seconds") + volumeSigningKey := v.GetString("jwt.signing.key") + v.SetDefault("jwt.signing.expires_after_seconds", 10) + volumeExpiresAfterSec := v.GetInt("jwt.signing.expires_after_seconds") + + volumeReadSigningKey := v.GetString("jwt.signing.read.key") + v.SetDefault("jwt.signing.read.expires_after_seconds", 60) + volumeReadExpiresAfterSec := v.GetInt("jwt.signing.read.expires_after_seconds") + v.SetDefault("cors.allowed_origins.values", "*") allowedOrigins := v.GetString("cors.allowed_origins.values") @@ -145,6 +154,7 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) fs.filer.Cipher = option.Cipher // we do not support IP whitelist right now fs.filerGuard = security.NewGuard([]string{}, signingKey, expiresAfterSec, readSigningKey, readExpiresAfterSec) + fs.volumeGuard = security.NewGuard([]string{}, volumeSigningKey, volumeExpiresAfterSec, volumeReadSigningKey, volumeReadExpiresAfterSec) fs.checkWithMaster() diff --git a/weed/server/filer_server_handlers_proxy.go b/weed/server/filer_server_handlers_proxy.go index db46f00b3..e04994569 100644 --- a/weed/server/filer_server_handlers_proxy.go +++ b/weed/server/filer_server_handlers_proxy.go @@ -2,6 +2,7 @@ package weed_server import ( "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/security" "github.com/seaweedfs/seaweedfs/weed/util" "github.com/seaweedfs/seaweedfs/weed/util/mem" "io" @@ -20,6 +21,26 @@ func init() { }} } +func (fs *FilerServer) maybeAddVolumeJwtAuthorization(r *http.Request, fileId string, isWrite bool) { + encodedJwt := fs.maybeGetVolumeJwtAuthorizationToken(fileId, isWrite) + + if encodedJwt == "" { + return + } + + r.Header.Set("Authorization", "BEARER "+string(encodedJwt)) +} + +func (fs *FilerServer) maybeGetVolumeJwtAuthorizationToken(fileId string, isWrite bool) string { + var encodedJwt security.EncodedJwt + if isWrite { + encodedJwt = security.GenJwtForVolumeServer(fs.volumeGuard.SigningKey, fs.volumeGuard.ExpiresAfterSec, fileId) + } else { + encodedJwt = security.GenJwtForVolumeServer(fs.volumeGuard.ReadSigningKey, fs.volumeGuard.ReadExpiresAfterSec, fileId) + } + return string(encodedJwt) +} + func (fs *FilerServer) proxyToVolumeServer(w http.ResponseWriter, r *http.Request, fileId string) { urlStrings, err := fs.filer.MasterClient.GetLookupFileIdFunction()(fileId) diff --git a/weed/server/filer_server_handlers_read.go b/weed/server/filer_server_handlers_read.go index c139060e4..83411051d 100644 --- a/weed/server/filer_server_handlers_read.go +++ b/weed/server/filer_server_handlers_read.go @@ -15,6 +15,7 @@ import ( "time" "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants" + "github.com/seaweedfs/seaweedfs/weed/security" "github.com/seaweedfs/seaweedfs/weed/util/mem" "github.com/seaweedfs/seaweedfs/weed/filer" @@ -261,7 +262,7 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) } } - streamFn, err := filer.PrepareStreamContentWithThrottler(fs.filer.MasterClient, chunks, offset, size, fs.option.DownloadMaxBytesPs) + streamFn, err := filer.PrepareStreamContentWithThrottler(fs.filer.MasterClient, fs.maybeGetVolumeReadJwtAuthorizationToken, chunks, offset, size, fs.option.DownloadMaxBytesPs) if err != nil { stats.FilerHandlerCounter.WithLabelValues(stats.ErrorReadStream).Inc() glog.Errorf("failed to prepare stream content %s: %v", r.URL, err) @@ -277,3 +278,7 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) }, nil }) } + +func (fs *FilerServer) maybeGetVolumeReadJwtAuthorizationToken(fileId string) string { + return string(security.GenJwtForVolumeServer(fs.volumeGuard.ReadSigningKey, fs.volumeGuard.ReadExpiresAfterSec, fileId)) +} diff --git a/weed/util/http_util.go b/weed/util/http_util.go index d1505f673..7b3ac4bc4 100644 --- a/weed/util/http_util.go +++ b/weed/util/http_util.go @@ -53,11 +53,15 @@ func Post(url string, values url.Values) ([]byte, error) { // github.com/seaweedfs/seaweedfs/unmaintained/repeated_vacuum/repeated_vacuum.go // may need increasing http.Client.Timeout func Get(url string) ([]byte, bool, error) { + return GetAuthenticated(url, "") +} +func GetAuthenticated(url, jwt string) ([]byte, bool, error) { request, err := http.NewRequest("GET", url, nil) if err != nil { return nil, true, err } + maybeAddAuth(request, jwt) request.Header.Add("Accept-Encoding", "gzip") response, err := client.Do(request) @@ -101,11 +105,15 @@ func Head(url string) (http.Header, error) { return r.Header, nil } -func Delete(url string, jwt string) error { - req, err := http.NewRequest("DELETE", url, nil) +func maybeAddAuth(req *http.Request, jwt string) { if jwt != "" { req.Header.Set("Authorization", "BEARER "+string(jwt)) } +} + +func Delete(url string, jwt string) error { + req, err := http.NewRequest("DELETE", url, nil) + maybeAddAuth(req, jwt) if err != nil { return err } @@ -133,9 +141,7 @@ func Delete(url string, jwt string) error { func DeleteProxied(url string, jwt string) (body []byte, httpStatus int, err error) { req, err := http.NewRequest("DELETE", url, nil) - if jwt != "" { - req.Header.Set("Authorization", "BEARER "+string(jwt)) - } + maybeAddAuth(req, jwt) if err != nil { return } @@ -193,9 +199,7 @@ func DownloadFile(fileUrl string, jwt string) (filename string, header http.Head return "", nil, nil, err } - if len(jwt) > 0 { - req.Header.Set("Authorization", "BEARER "+jwt) - } + maybeAddAuth(req, jwt) response, err := client.Do(req) if err != nil { @@ -229,7 +233,7 @@ func ReadUrl(fileUrl string, cipherKey []byte, isContentCompressed bool, isFullC if cipherKey != nil { var n int - _, err := readEncryptedUrl(fileUrl, cipherKey, isContentCompressed, isFullChunk, offset, size, func(data []byte) { + _, err := readEncryptedUrl(fileUrl, "", cipherKey, isContentCompressed, isFullChunk, offset, size, func(data []byte) { n = copy(buf, data) }) return int64(n), err @@ -298,11 +302,16 @@ func ReadUrl(fileUrl string, cipherKey []byte, isContentCompressed bool, isFullC } func ReadUrlAsStream(fileUrl string, cipherKey []byte, isContentGzipped bool, isFullChunk bool, offset int64, size int, fn func(data []byte)) (retryable bool, err error) { + return ReadUrlAsStreamAuthenticated(fileUrl, "", cipherKey, isContentGzipped, isFullChunk, offset, size, fn) +} + +func ReadUrlAsStreamAuthenticated(fileUrl, jwt string, cipherKey []byte, isContentGzipped bool, isFullChunk bool, offset int64, size int, fn func(data []byte)) (retryable bool, err error) { if cipherKey != nil { - return readEncryptedUrl(fileUrl, cipherKey, isContentGzipped, isFullChunk, offset, size, fn) + return readEncryptedUrl(fileUrl, jwt, cipherKey, isContentGzipped, isFullChunk, offset, size, fn) } req, err := http.NewRequest("GET", fileUrl, nil) + maybeAddAuth(req, jwt) if err != nil { return false, err } @@ -354,8 +363,8 @@ func ReadUrlAsStream(fileUrl string, cipherKey []byte, isContentGzipped bool, is } -func readEncryptedUrl(fileUrl string, cipherKey []byte, isContentCompressed bool, isFullChunk bool, offset int64, size int, fn func(data []byte)) (bool, error) { - encryptedData, retryable, err := Get(fileUrl) +func readEncryptedUrl(fileUrl, jwt string, cipherKey []byte, isContentCompressed bool, isFullChunk bool, offset int64, size int, fn func(data []byte)) (bool, error) { + encryptedData, retryable, err := GetAuthenticated(fileUrl, jwt) if err != nil { return retryable, fmt.Errorf("fetch %s: %v", fileUrl, err) } @@ -392,9 +401,7 @@ func ReadUrlAsReaderCloser(fileUrl string, jwt string, rangeHeader string) (*htt req.Header.Add("Accept-Encoding", "gzip") } - if len(jwt) > 0 { - req.Header.Set("Authorization", "BEARER "+jwt) - } + maybeAddAuth(req, jwt) r, err := client.Do(req) if err != nil {