Browse Source

Use filerGroup for s3 buckets collection prefix (#4465)

* Use filerGroup for s3 buckets collection prefix

* Fix templates

* Remove flags

* Remove s3CollectionPrefix
pull/4484/head
SmsS4 2 years ago
committed by GitHub
parent
commit
17e91d2917
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 3
      k8s/charts/seaweedfs/templates/filer-statefulset.yaml
  2. 1
      k8s/charts/seaweedfs/values.yaml
  3. 3
      weed/command/s3.go
  4. 7
      weed/s3api/filer_util.go
  5. 6
      weed/s3api/s3api_bucket_handlers.go
  6. 4
      weed/s3api/s3api_object_copy_handlers.go
  7. 10
      weed/s3api/s3api_object_handlers.go
  8. 2
      weed/s3api/s3api_object_handlers_postpolicy.go
  9. 2
      weed/s3api/s3api_object_multipart_handlers.go
  10. 1
      weed/s3api/s3api_server.go
  11. 2
      weed/shell/command_s3_bucket_delete.go
  12. 2
      weed/shell/command_s3_bucket_list.go
  13. 2
      weed/shell/command_s3_bucket_quota_check.go
  14. 7
      weed/shell/commands.go

3
k8s/charts/seaweedfs/templates/filer-statefulset.yaml

@ -148,6 +148,9 @@ spec:
-encryptVolumeData \ -encryptVolumeData \
{{- end }} {{- end }}
-ip=${POD_IP} \ -ip=${POD_IP} \
{{- if .Values.filer.filerGroup}}
-filerGroup={{ .Values.filer.filerGroup}} \
{{- end }}
{{- if .Values.filer.s3.enabled }} {{- if .Values.filer.s3.enabled }}
-s3 \ -s3 \
-s3.port={{ .Values.filer.s3.port }} \ -s3.port={{ .Values.filer.s3.port }} \

1
k8s/charts/seaweedfs/values.yaml

@ -273,6 +273,7 @@ filer:
grpcPort: 18888 grpcPort: 18888
metricsPort: 9327 metricsPort: 9327
loggingOverrideLevel: null loggingOverrideLevel: null
filerGroup: ""
# replication type is XYZ: # replication type is XYZ:
# X number of replica in other data centers # X number of replica in other data centers
# Y number of replica in other racks in the same data center # Y number of replica in other racks in the same data center

3
weed/command/s3.go

@ -155,6 +155,7 @@ func (s3opt *S3Options) startS3Server() bool {
filerAddress := pb.ServerAddress(*s3opt.filer) filerAddress := pb.ServerAddress(*s3opt.filer)
filerBucketsPath := "/buckets" filerBucketsPath := "/buckets"
filerGroup := ""
grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client") grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
@ -169,6 +170,7 @@ func (s3opt *S3Options) startS3Server() bool {
return fmt.Errorf("get filer %s configuration: %v", filerAddress, err) return fmt.Errorf("get filer %s configuration: %v", filerAddress, err)
} }
filerBucketsPath = resp.DirBuckets filerBucketsPath = resp.DirBuckets
filerGroup = resp.FilerGroup
metricsAddress, metricsIntervalSec = resp.MetricsAddress, int(resp.MetricsIntervalSec) metricsAddress, metricsIntervalSec = resp.MetricsAddress, int(resp.MetricsIntervalSec)
glog.V(0).Infof("S3 read filer buckets dir: %s", filerBucketsPath) glog.V(0).Infof("S3 read filer buckets dir: %s", filerBucketsPath)
return nil return nil
@ -200,6 +202,7 @@ func (s3opt *S3Options) startS3Server() bool {
AllowDeleteBucketNotEmpty: *s3opt.allowDeleteBucketNotEmpty, AllowDeleteBucketNotEmpty: *s3opt.allowDeleteBucketNotEmpty,
LocalFilerSocket: localFilerSocket, LocalFilerSocket: localFilerSocket,
DataCenter: *s3opt.dataCenter, DataCenter: *s3opt.dataCenter,
FilerGroup: filerGroup,
}) })
if s3ApiServer_err != nil { if s3ApiServer_err != nil {
glog.Fatalf("S3 API Server startup error: %v", s3ApiServer_err) glog.Fatalf("S3 API Server startup error: %v", s3ApiServer_err)

7
weed/s3api/filer_util.go

@ -107,6 +107,13 @@ func (s3a *S3ApiServer) updateEntry(parentDirectoryPath string, newEntry *filer_
return err return err
} }
func (s3a *S3ApiServer) getCollectionName(bucket string) string {
if s3a.option.FilerGroup != "" {
return fmt.Sprintf("%s_%s", s3a.option.FilerGroup, bucket)
}
return bucket
}
func objectKey(key *string) *string { func objectKey(key *string) *string {
if strings.HasPrefix(*key, "/") { if strings.HasPrefix(*key, "/") {
t := (*key)[1:] t := (*key)[1:]

6
weed/s3api/s3api_bucket_handlers.go

@ -104,7 +104,7 @@ func (s3a *S3ApiServer) PutBucketHandler(w http.ResponseWriter, r *http.Request)
return fmt.Errorf("list collections: %v", err) return fmt.Errorf("list collections: %v", err)
} else { } else {
for _, c := range resp.Collections { for _, c := range resp.Collections {
if bucket == c.Name {
if s3a.getCollectionName(bucket) == c.Name {
errCode = s3err.ErrBucketAlreadyExists errCode = s3err.ErrBucketAlreadyExists
break break
} }
@ -174,7 +174,7 @@ func (s3a *S3ApiServer) DeleteBucketHandler(w http.ResponseWriter, r *http.Reque
// delete collection // delete collection
deleteCollectionRequest := &filer_pb.DeleteCollectionRequest{ deleteCollectionRequest := &filer_pb.DeleteCollectionRequest{
Collection: bucket,
Collection: s3a.getCollectionName(bucket),
} }
glog.V(1).Infof("delete collection: %v", deleteCollectionRequest) glog.V(1).Infof("delete collection: %v", deleteCollectionRequest)
@ -304,7 +304,7 @@ func (s3a *S3ApiServer) GetBucketLifecycleConfigurationHandler(w http.ResponseWr
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return return
} }
ttls := fc.GetCollectionTtls(bucket)
ttls := fc.GetCollectionTtls(s3a.getCollectionName(bucket))
if len(ttls) == 0 { if len(ttls) == 0 {
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchLifecycleConfiguration) s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchLifecycleConfiguration)
return return

4
weed/s3api/s3api_object_copy_handlers.go

@ -100,7 +100,7 @@ func (s3a *S3ApiServer) CopyObjectHandler(w http.ResponseWriter, r *http.Request
} }
glog.V(2).Infof("copy from %s to %s", srcUrl, dstUrl) glog.V(2).Infof("copy from %s to %s", srcUrl, dstUrl)
destination := fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, dstBucket, dstObject) destination := fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, dstBucket, dstObject)
etag, errCode := s3a.putToFiler(r, dstUrl, resp.Body, destination)
etag, errCode := s3a.putToFiler(r, dstUrl, resp.Body, destination, dstBucket)
if errCode != s3err.ErrNone { if errCode != s3err.ErrNone {
s3err.WriteErrorResponse(w, r, errCode) s3err.WriteErrorResponse(w, r, errCode)
@ -185,7 +185,7 @@ func (s3a *S3ApiServer) CopyObjectPartHandler(w http.ResponseWriter, r *http.Req
glog.V(2).Infof("copy from %s to %s", srcUrl, dstUrl) glog.V(2).Infof("copy from %s to %s", srcUrl, dstUrl)
destination := fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, dstBucket, dstObject) destination := fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, dstBucket, dstObject)
etag, errCode := s3a.putToFiler(r, dstUrl, dataReader, destination)
etag, errCode := s3a.putToFiler(r, dstUrl, dataReader, destination, dstBucket)
if errCode != s3err.ErrNone { if errCode != s3err.ErrNone {
s3err.WriteErrorResponse(w, r, errCode) s3err.WriteErrorResponse(w, r, errCode)

10
weed/s3api/s3api_object_handlers.go

@ -115,7 +115,7 @@ func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request)
dataReader = mimeDetect(r, dataReader) dataReader = mimeDetect(r, dataReader)
} }
etag, errCode := s3a.putToFiler(r, uploadUrl, dataReader, "")
etag, errCode := s3a.putToFiler(r, uploadUrl, dataReader, "", bucket)
if errCode != s3err.ErrNone { if errCode != s3err.ErrNone {
s3err.WriteErrorResponse(w, r, errCode) s3err.WriteErrorResponse(w, r, errCode)
@ -457,7 +457,7 @@ func passThroughResponse(proxyResponse *http.Response, w http.ResponseWriter) (s
return statusCode return statusCode
} }
func (s3a *S3ApiServer) putToFiler(r *http.Request, uploadUrl string, dataReader io.Reader, destination string) (etag string, code s3err.ErrorCode) {
func (s3a *S3ApiServer) putToFiler(r *http.Request, uploadUrl string, dataReader io.Reader, destination string, bucket string) (etag string, code s3err.ErrorCode) {
hash := md5.New() hash := md5.New()
var body = io.TeeReader(dataReader, hash) var body = io.TeeReader(dataReader, hash)
@ -474,6 +474,12 @@ func (s3a *S3ApiServer) putToFiler(r *http.Request, uploadUrl string, dataReader
proxyReq.Header.Set(s3_constants.SeaweedStorageDestinationHeader, destination) proxyReq.Header.Set(s3_constants.SeaweedStorageDestinationHeader, destination)
} }
if s3a.option.FilerGroup != "" {
query := proxyReq.URL.Query()
query.Add("collection", s3a.getCollectionName(bucket))
proxyReq.URL.RawQuery = query.Encode()
}
for header, values := range r.Header { for header, values := range r.Header {
for _, value := range values { for _, value := range values {
proxyReq.Header.Add(header, value) proxyReq.Header.Add(header, value)

2
weed/s3api/s3api_object_handlers_postpolicy.go

@ -115,7 +115,7 @@ func (s3a *S3ApiServer) PostPolicyBucketHandler(w http.ResponseWriter, r *http.R
uploadUrl := fmt.Sprintf("http://%s%s/%s%s", s3a.option.Filer.ToHttpAddress(), s3a.option.BucketsPath, bucket, urlEscapeObject(object)) uploadUrl := fmt.Sprintf("http://%s%s/%s%s", s3a.option.Filer.ToHttpAddress(), s3a.option.BucketsPath, bucket, urlEscapeObject(object))
etag, errCode := s3a.putToFiler(r, uploadUrl, fileBody, "")
etag, errCode := s3a.putToFiler(r, uploadUrl, fileBody, "", bucket)
if errCode != s3err.ErrNone { if errCode != s3err.ErrNone {
s3err.WriteErrorResponse(w, r, errCode) s3err.WriteErrorResponse(w, r, errCode)

2
weed/s3api/s3api_object_multipart_handlers.go

@ -255,7 +255,7 @@ func (s3a *S3ApiServer) PutObjectPartHandler(w http.ResponseWriter, r *http.Requ
} }
destination := fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, bucket, object) destination := fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, bucket, object)
etag, errCode := s3a.putToFiler(r, uploadUrl, dataReader, destination)
etag, errCode := s3a.putToFiler(r, uploadUrl, dataReader, destination, bucket)
if errCode != s3err.ErrNone { if errCode != s3err.ErrNone {
s3err.WriteErrorResponse(w, r, errCode) s3err.WriteErrorResponse(w, r, errCode)
return return

1
weed/s3api/s3api_server.go

@ -31,6 +31,7 @@ type S3ApiServerOption struct {
AllowDeleteBucketNotEmpty bool AllowDeleteBucketNotEmpty bool
LocalFilerSocket string LocalFilerSocket string
DataCenter string DataCenter string
FilerGroup string
} }
type S3ApiServer struct { type S3ApiServer struct {

2
weed/shell/command_s3_bucket_delete.go

@ -54,7 +54,7 @@ func (c *commandS3BucketDelete) Do(args []string, commandEnv *CommandEnv, writer
// delete the collection directly first // delete the collection directly first
err = commandEnv.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error { err = commandEnv.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error {
_, err = client.CollectionDelete(context.Background(), &master_pb.CollectionDeleteRequest{ _, err = client.CollectionDelete(context.Background(), &master_pb.CollectionDeleteRequest{
Name: *bucketName,
Name: getCollectionName(commandEnv, *bucketName),
}) })
return err return err
}) })

2
weed/shell/command_s3_bucket_list.go

@ -57,7 +57,7 @@ func (c *commandS3BucketList) Do(args []string, commandEnv *CommandEnv, writer i
if !entry.IsDirectory { if !entry.IsDirectory {
return nil return nil
} }
collection := entry.Name
collection := getCollectionName(commandEnv, entry.Name)
var collectionSize, fileCount float64 var collectionSize, fileCount float64
if collectionInfo, found := collectionInfos[collection]; found { if collectionInfo, found := collectionInfos[collection]; found {
collectionSize = collectionInfo.Size collectionSize = collectionInfo.Size

2
weed/shell/command_s3_bucket_quota_check.go

@ -65,7 +65,7 @@ func (c *commandS3BucketQuotaEnforce) Do(args []string, commandEnv *CommandEnv,
if !entry.IsDirectory { if !entry.IsDirectory {
return nil return nil
} }
collection := entry.Name
collection := getCollectionName(commandEnv, entry.Name)
var collectionSize float64 var collectionSize float64
if collectionInfo, found := collectionInfos[collection]; found { if collectionInfo, found := collectionInfos[collection]; found {
collectionSize = collectionInfo.Size collectionSize = collectionInfo.Size

7
weed/shell/commands.go

@ -184,3 +184,10 @@ func readNeedleStatus(grpcDialOption grpc.DialOption, sourceVolumeServer pb.Serv
) )
return return
} }
func getCollectionName(commandEnv *CommandEnv, bucket string) string {
if *commandEnv.option.FilerGroup != "" {
return fmt.Sprintf("%s_%s", *commandEnv.option.FilerGroup, bucket)
}
return bucket
}
Loading…
Cancel
Save