From b41b7ea4d09616b42edbad87ab71ac4f2b0fa08c Mon Sep 17 00:00:00 2001 From: limd Date: Sat, 1 Aug 2020 01:08:30 +0800 Subject: [PATCH] fix spark read s3 bug (sc.binaryFiles) --- weed/s3api/s3api_object_handlers.go | 5 +++++ weed/s3api/s3api_objects_list_handlers.go | 18 +++++++++++++----- 2 files changed, 18 insertions(+), 5 deletions(-) diff --git a/weed/s3api/s3api_object_handlers.go b/weed/s3api/s3api_object_handlers.go index 9773add81..357ac9ce0 100644 --- a/weed/s3api/s3api_object_handlers.go +++ b/weed/s3api/s3api_object_handlers.go @@ -230,6 +230,11 @@ func (s3a *S3ApiServer) proxyToFiler(w http.ResponseWriter, r *http.Request, des resp, postErr := client.Do(proxyReq) + if resp.ContentLength == -1 { + writeErrorResponse(w, ErrNoSuchKey, r.URL) + return + } + if postErr != nil { glog.Errorf("post to filer: %v", postErr) writeErrorResponse(w, ErrInternalError, r.URL) diff --git a/weed/s3api/s3api_objects_list_handlers.go b/weed/s3api/s3api_objects_list_handlers.go index e06faf213..9203c56f3 100644 --- a/weed/s3api/s3api_objects_list_handlers.go +++ b/weed/s3api/s3api_objects_list_handlers.go @@ -38,7 +38,7 @@ func (s3a *S3ApiServer) ListObjectsV2Handler(w http.ResponseWriter, r *http.Requ marker = startAfter } - response, err := s3a.listFilerEntries(bucket, originalPrefix, maxKeys, marker) + response, err := s3a.listFilerEntries(bucket, originalPrefix, maxKeys, marker, delimiter) if err != nil { writeErrorResponse(w, ErrInternalError, r.URL) @@ -66,7 +66,7 @@ func (s3a *S3ApiServer) ListObjectsV1Handler(w http.ResponseWriter, r *http.Requ return } - response, err := s3a.listFilerEntries(bucket, originalPrefix, maxKeys, marker) + response, err := s3a.listFilerEntries(bucket, originalPrefix, maxKeys, marker, delimiter) if err != nil { writeErrorResponse(w, ErrInternalError, r.URL) @@ -76,8 +76,7 @@ func (s3a *S3ApiServer) ListObjectsV1Handler(w http.ResponseWriter, r *http.Requ writeSuccessResponseXML(w, encodeResponse(response)) } -func (s3a *S3ApiServer) listFilerEntries(bucket, originalPrefix string, maxKeys int, marker string) (response ListBucketResult, err error) { - +func (s3a *S3ApiServer) listFilerEntries(bucket, originalPrefix string, maxKeys int, marker string, delimiter string) (response ListBucketResult, err error) { // convert full path prefix into directory name and prefix for entry name dir, prefix := filepath.Split(originalPrefix) if strings.HasPrefix(dir, "/") { @@ -125,9 +124,18 @@ func (s3a *S3ApiServer) listFilerEntries(bucket, originalPrefix string, maxKeys lastEntryName = entry.Name if entry.IsDirectory { if entry.Name != ".uploads" { + prefix = fmt.Sprintf("%s%s/", dir, entry.Name) + commonPrefixes = append(commonPrefixes, PrefixEntry{ - Prefix: fmt.Sprintf("%s%s/", dir, entry.Name), + Prefix: prefix, }) + + if delimiter != "/" { + response, _ := s3a.listFilerEntries(bucket, prefix, maxKeys, marker, delimiter) + for _, content := range response.Contents { + contents = append(contents, content) + } + } } } else { contents = append(contents, ListEntry{