@ -9,7 +9,6 @@ import (
"io"
"io"
"net/http"
"net/http"
"net/url"
"net/url"
"path/filepath"
"strconv"
"strconv"
"strings"
"strings"
"time"
"time"
@ -126,79 +125,57 @@ func (s3a *S3ApiServer) ListObjectsV1Handler(w http.ResponseWriter, r *http.Requ
writeSuccessResponseXML ( w , r , response )
writeSuccessResponseXML ( w , r , response )
}
}
func ( s3a * S3ApiServer ) listFilerEntries ( bucket string , originalPrefix string , maxKeys int , m arker string , delimiter string ) ( response ListBucketResult , err error ) {
func ( s3a * S3ApiServer ) listFilerEntries ( bucket string , originalPrefix string , maxKeys int , originalM arker string , delimiter string ) ( response ListBucketResult , err error ) {
// convert full path prefix into directory name and prefix for entry name
// convert full path prefix into directory name and prefix for entry name
reqDir , prefix := filepath . Split ( originalPrefix )
if strings . HasPrefix ( reqDir , "/" ) {
reqDir = reqDir [ 1 : ]
}
requestDir , prefix , marker := normalizePrefixMarker ( originalPrefix , originalMarker )
bucketPrefix := fmt . Sprintf ( "%s/%s/" , s3a . option . BucketsPath , bucket )
bucketPrefix := fmt . Sprintf ( "%s/%s/" , s3a . option . BucketsPath , bucket )
bucketPrefixLen := len ( bucketPrefix )
reqDir = fmt . Sprintf ( "%s%s" , bucketPrefix , reqDir )
if strings . HasSuffix ( reqDir , "/" ) {
reqDir = strings . TrimSuffix ( reqDir , "/" )
reqDir := bucketPrefix [ : len ( bucketPrefix ) - 1 ]
if requestDir != "" {
reqDir = fmt . Sprintf ( "%s%s" , bucketPrefix , requestDir )
}
}
var contents [ ] ListEntry
var contents [ ] ListEntry
var commonPrefixes [ ] PrefixEntry
var commonPrefixes [ ] PrefixEntry
var isTruncated bool
var doErr error
var doErr error
var nextMarker string
var nextMarker string
cursor := & ListingCursor {
maxKeys : maxKeys ,
}
// check filer
// check filer
err = s3a . WithFilerClient ( false , func ( client filer_pb . SeaweedFilerClient ) error {
err = s3a . WithFilerClient ( false , func ( client filer_pb . SeaweedFilerClient ) error {
_ , isTruncated , nextMarker , doErr = s3a . doListFilerEntries ( client , reqDir , prefix , maxKeys , marker , delimiter , false , false , bucketPrefixLen , func ( dir string , entry * filer_pb . Entry ) {
nextMarker , doErr = s3a . doListFilerEntries ( client , reqDir , prefix , cursor , marker , delimiter , false , func ( dir string , entry * filer_pb . Entry ) {
if entry . IsDirectory {
if entry . IsDirectory {
if delimiter == "/" {
// https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html
if delimiter == "/" { // A response can contain CommonPrefixes only if you specify a delimiter.
commonPrefixes = append ( commonPrefixes , PrefixEntry {
commonPrefixes = append ( commonPrefixes , PrefixEntry {
Prefix : fmt . Sprintf ( "%s/%s/" , dir , entry . Name ) [ bucketPrefixLen : ] ,
Prefix : fmt . Sprintf ( "%s/%s/" , dir , entry . Name ) [ len ( bucketPrefix ) : ] ,
} )
} )
}
if ! ( entry . IsDirectoryKeyObject ( ) && strings . HasSuffix ( entry . Name , "/" ) ) {
return
}
}
storageClass := "STANDARD"
if v , ok := entry . Extended [ s3_constants . AmzStorageClass ] ; ok {
storageClass = string ( v )
}
//All of the keys (up to 1,000) rolled up into a common prefix count as a single return when calculating the number of returns.
cursor . maxKeys --
} else if entry . IsDirectoryKeyObject ( ) {
contents = append ( contents , ListEntry {
contents = append ( contents , ListEntry {
Key : fmt . Sprintf ( "%s/%s" , dir , entry . Name ) [ bucketPrefixLen : ] ,
Key : fmt . Sprintf ( "%s/%s/" , dir , entry . Name ) [ len ( bucketPrefix ) : ] ,
LastModified : time . Unix ( entry . Attributes . Mtime , 0 ) . UTC ( ) ,
LastModified : time . Unix ( entry . Attributes . Mtime , 0 ) . UTC ( ) ,
ETag : "\"" + filer . ETag ( entry ) + "\"" ,
ETag : "\"" + filer . ETag ( entry ) + "\"" ,
Size : int64 ( filer . FileSize ( entry ) ) ,
Owner : CanonicalUser {
Owner : CanonicalUser {
ID : fmt . Sprintf ( "%x" , entry . Attributes . Uid ) ,
ID : fmt . Sprintf ( "%x" , entry . Attributes . Uid ) ,
DisplayName : entry . Attributes . UserName ,
DisplayName : entry . Attributes . UserName ,
} ,
} ,
StorageClass : StorageClass ( storageClass ) ,
} )
StorageClass : "STANDARD" ,
} )
} )
glog . V ( 4 ) . Infof ( "end doListFilerEntries isTruncated:%v nextMarker:%v reqDir: %v prefix: %v" , isTruncated , nextMarker , reqDir , prefix )
if doErr != nil {
return doErr
}
if ! isTruncated {
nextMarker = ""
cursor . maxKeys --
}
}
if len ( contents ) == 0 && len ( commonPrefixes ) == 0 && maxKeys > 0 {
if strings . HasSuffix ( originalPrefix , "/" ) && prefix == "" {
reqDir , prefix = filepath . Split ( strings . TrimSuffix ( reqDir , "/" ) )
reqDir = strings . TrimSuffix ( reqDir , "/" )
}
_ , _ , _ , doErr = s3a . doListFilerEntries ( client , reqDir , prefix , 1 , prefix , delimiter , true , false , bucketPrefixLen , func ( dir string , entry * filer_pb . Entry ) {
if entry . IsDirectoryKeyObject ( ) && entry . Name == prefix {
} else {
storageClass := "STANDARD"
storageClass := "STANDARD"
if v , ok := entry . Extended [ s3_constants . AmzStorageClass ] ; ok {
if v , ok := entry . Extended [ s3_constants . AmzStorageClass ] ; ok {
storageClass = string ( v )
storageClass = string ( v )
}
}
contents = append ( contents , ListEntry {
contents = append ( contents , ListEntry {
Key : fmt . Sprintf ( "%s/%s/ " , dir , entry . Name ) [ bucketPrefixLen : ] ,
Key : fmt . Sprintf ( "%s/%s" , dir , entry . Name ) [ len ( bucketPrefix ) : ] ,
LastModified : time . Unix ( entry . Attributes . Mtime , 0 ) . UTC ( ) ,
LastModified : time . Unix ( entry . Attributes . Mtime , 0 ) . UTC ( ) ,
ETag : "\"" + fmt . Sprintf ( "%x" , entry . Attributes . Md5 ) + "\"" ,
ETag : "\"" + filer . ETag ( entry ) + "\"" ,
Size : int64 ( filer . FileSize ( entry ) ) ,
Size : int64 ( filer . FileSize ( entry ) ) ,
Owner : CanonicalUser {
Owner : CanonicalUser {
ID : fmt . Sprintf ( "%x" , entry . Attributes . Uid ) ,
ID : fmt . Sprintf ( "%x" , entry . Attributes . Uid ) ,
@ -206,25 +183,29 @@ func (s3a *S3ApiServer) listFilerEntries(bucket string, originalPrefix string, m
} ,
} ,
StorageClass : StorageClass ( storageClass ) ,
StorageClass : StorageClass ( storageClass ) ,
} )
} )
cursor . maxKeys --
}
}
} )
} )
if doErr != nil {
if doErr != nil {
return doErr
return doErr
}
}
}
if len ( nextMarker ) > 0 {
nextMarker = nextMarker [ bucketPrefixLen : ]
if ! cursor . isTruncated {
nextMarker = ""
} else {
if requestDir != "" {
nextMarker = requestDir + "/" + nextMarker
}
}
}
response = ListBucketResult {
response = ListBucketResult {
Name : bucket ,
Name : bucket ,
Prefix : originalPrefix ,
Prefix : originalPrefix ,
Marker : m arker,
Marker : originalM arker,
NextMarker : nextMarker ,
NextMarker : nextMarker ,
MaxKeys : maxKeys ,
MaxKeys : maxKeys ,
Delimiter : delimiter ,
Delimiter : delimiter ,
IsTruncated : isTruncated ,
IsTruncated : cursor . isTruncated ,
Contents : contents ,
Contents : contents ,
CommonPrefixes : commonPrefixes ,
CommonPrefixes : commonPrefixes ,
}
}
@ -235,7 +216,64 @@ func (s3a *S3ApiServer) listFilerEntries(bucket string, originalPrefix string, m
return
return
}
}
func ( s3a * S3ApiServer ) doListFilerEntries ( client filer_pb . SeaweedFilerClient , dir , prefix string , maxKeys int , marker , delimiter string , inclusiveStartFrom bool , subEntries bool , bucketPrefixLen int , eachEntryFn func ( dir string , entry * filer_pb . Entry ) ) ( counter int , isTruncated bool , nextMarker string , err error ) {
type ListingCursor struct {
maxKeys int
isTruncated bool
}
// the prefix and marker may be in different directories
// normalizePrefixMarker ensures the prefix and marker both starts from the same directory
func normalizePrefixMarker ( prefix , marker string ) ( alignedDir , alignedPrefix , alignedMarker string ) {
// alignedDir should not end with "/"
// alignedDir, alignedPrefix, alignedMarker should only have "/" in middle
prefix = strings . TrimLeft ( prefix , "/" )
marker = strings . TrimLeft ( marker , "/" )
if prefix == "" {
return "" , "" , marker
}
if marker == "" {
alignedDir , alignedPrefix = toDirAndName ( prefix )
return
}
if ! strings . HasPrefix ( marker , prefix ) {
// something wrong
return "" , prefix , marker
}
if strings . HasPrefix ( marker , prefix + "/" ) {
alignedDir = prefix
alignedPrefix = ""
alignedMarker = marker [ len ( alignedDir ) + 1 : ]
return
}
alignedDir , alignedPrefix = toDirAndName ( prefix )
if alignedDir != "" {
alignedMarker = marker [ len ( alignedDir ) + 1 : ]
} else {
alignedMarker = marker
}
return
}
func toDirAndName ( dirAndName string ) ( dir , name string ) {
sepIndex := strings . LastIndex ( dirAndName , "/" )
if sepIndex >= 0 {
dir , name = dirAndName [ 0 : sepIndex ] , dirAndName [ sepIndex + 1 : ]
} else {
name = dirAndName
}
return
}
func toParentAndDescendants ( dirAndName string ) ( dir , name string ) {
sepIndex := strings . Index ( dirAndName , "/" )
if sepIndex >= 0 {
dir , name = dirAndName [ 0 : sepIndex ] , dirAndName [ sepIndex + 1 : ]
} else {
name = dirAndName
}
return
}
func ( s3a * S3ApiServer ) doListFilerEntries ( client filer_pb . SeaweedFilerClient , dir , prefix string , cursor * ListingCursor , marker , delimiter string , inclusiveStartFrom bool , eachEntryFn func ( dir string , entry * filer_pb . Entry ) ) ( nextMarker string , err error ) {
// invariants
// invariants
// prefix and marker should be under dir, marker may contain "/"
// prefix and marker should be under dir, marker may contain "/"
// maxKeys should be updated for each recursion
// maxKeys should be updated for each recursion
@ -243,37 +281,23 @@ func (s3a *S3ApiServer) doListFilerEntries(client filer_pb.SeaweedFilerClient, d
if prefix == "/" && delimiter == "/" {
if prefix == "/" && delimiter == "/" {
return
return
}
}
if maxKeys <= 0 {
if cursor . maxKeys <= 0 {
return
return
}
}
if strings . Contains ( marker , "/" ) {
if strings . Contains ( marker , "/" ) {
if strings . HasSuffix ( marker , "/" ) {
marker = strings . TrimSuffix ( marker , "/" )
}
sepIndex := strings . Index ( marker , "/" )
if sepIndex != - 1 {
subPrefix , subMarker := marker [ 0 : sepIndex ] , marker [ sepIndex + 1 : ]
var subDir string
if len ( dir ) > bucketPrefixLen && dir [ bucketPrefixLen : ] == subPrefix {
subDir = dir
} else {
subDir = fmt . Sprintf ( "%s/%s" , dir , subPrefix )
}
subCounter , subIsTruncated , subNextMarker , subErr := s3a . doListFilerEntries ( client , subDir , "" , maxKeys , subMarker , delimiter , false , false , bucketPrefixLen , eachEntryFn )
subDir , subMarker := toParentAndDescendants ( marker )
// println("doListFilerEntries dir", dir+"/"+subDir, "subMarker", subMarker)
subNextMarker , subErr := s3a . doListFilerEntries ( client , dir + "/" + subDir , "" , cursor , subMarker , delimiter , false , eachEntryFn )
if subErr != nil {
if subErr != nil {
err = subErr
err = subErr
return
return
}
}
counter += subCounter
isTruncated = isTruncated || subIsTruncated
maxKeys -= subCounter
nextMarker = subNextMarker
nextMarker = subDir + "/" + subNextMarker
// finished processing this sub directory
// finished processing this sub directory
marker = subPrefix
}
marker = subDir
}
}
if maxKeys <= 0 {
if cursor . maxKeys <= 0 {
return
return
}
}
@ -281,7 +305,7 @@ func (s3a *S3ApiServer) doListFilerEntries(client filer_pb.SeaweedFilerClient, d
request := & filer_pb . ListEntriesRequest {
request := & filer_pb . ListEntriesRequest {
Directory : dir ,
Directory : dir ,
Prefix : prefix ,
Prefix : prefix ,
Limit : uint32 ( maxKeys + 2 ) , // bucket root directory needs to skip additional s3_constants.MultipartUploadsFolder folder
Limit : uint32 ( cursor . maxKeys + 2 ) , // bucket root directory needs to skip additional s3_constants.MultipartUploadsFolder folder
StartFromFileName : marker ,
StartFromFileName : marker ,
InclusiveStartFrom : inclusiveStartFrom ,
InclusiveStartFrom : inclusiveStartFrom ,
}
}
@ -304,38 +328,31 @@ func (s3a *S3ApiServer) doListFilerEntries(client filer_pb.SeaweedFilerClient, d
return
return
}
}
}
}
if counter >= maxKeys {
isTruncated = true
if cursor . maxKeys <= 0 {
cursor . isTruncated = true
return
return
}
}
entry := resp . Entry
entry := resp . Entry
nextMarker = dir + "/" + entry . Name
nextMarker = entry . Name
if entry . IsDirectory {
if entry . IsDirectory {
// println("ListEntries", dir, "dir:", entry.Name)
// println("ListEntries", dir, "dir:", entry.Name)
if entry . Name == s3_constants . MultipartUploadsFolder { // FIXME no need to apply to all directories. this extra also affects maxKeys
if entry . Name == s3_constants . MultipartUploadsFolder { // FIXME no need to apply to all directories. this extra also affects maxKeys
continue
continue
}
}
if delimiter == " " {
if delimiter != "/ " {
eachEntryFn ( dir , entry )
eachEntryFn ( dir , entry )
// println("doListFilerEntries2 dir", dir+"/"+entry.Name, "maxKeys", maxKeys-counter)
subCounter , subIsTruncated , subNextMarker , subErr := s3a . doListFilerEntries ( client , dir + "/" + entry . Name , "" , maxKeys - counter , "" , delimiter , false , true , bucketPrefixLen , eachEntryFn )
subNextMarker , subErr := s3a . doListFilerEntries ( client , dir + "/" + entry . Name , "" , cursor , "" , delimiter , false , eachEntryFn )
if subErr != nil {
if subErr != nil {
err = fmt . Errorf ( "doListFilerEntries2: %v" , subErr )
err = fmt . Errorf ( "doListFilerEntries2: %v" , subErr )
return
return
}
}
// println("doListFilerEntries2 dir", dir+"/"+entry.Name, "maxKeys", maxKeys-counter, "subCounter", subCounter, "subNextMarker", subNextMarker, "subIsTruncated", subIsTruncated)
if subCounter == 0 && entry . IsDirectoryKeyObject ( ) {
entry . Name += "/"
eachEntryFn ( dir , entry )
counter ++
}
counter += subCounter
nextMarker = subNextMarker
if subIsTruncated {
isTruncated = true
// println("doListFilerEntries2 dir", dir+"/"+entry.Name, "subNextMarker", subNextMarker)
nextMarker = entry . Name + "/" + subNextMarker
if cursor . isTruncated {
return
return
}
}
} else if delimiter == "/" {
// println("doListFilerEntries2 nextMarker", nextMarker)
} else {
var isEmpty bool
var isEmpty bool
if ! s3a . option . AllowEmptyFolder && ! entry . IsDirectoryKeyObject ( ) {
if ! s3a . option . AllowEmptyFolder && ! entry . IsDirectoryKeyObject ( ) {
if isEmpty , err = s3a . ensureDirectoryAllEmpty ( client , dir , entry . Name ) ; err != nil {
if isEmpty , err = s3a . ensureDirectoryAllEmpty ( client , dir , entry . Name ) ; err != nil {
@ -343,15 +360,12 @@ func (s3a *S3ApiServer) doListFilerEntries(client filer_pb.SeaweedFilerClient, d
}
}
}
}
if ! isEmpty {
if ! isEmpty {
nextMarker += "/"
eachEntryFn ( dir , entry )
eachEntryFn ( dir , entry )
counter ++
}
}
}
}
} else if ! ( delimiter == "/" && subEntries ) {
// println("ListEntries", dir, "file:", entry.Name)
} else {
eachEntryFn ( dir , entry )
eachEntryFn ( dir , entry )
counter ++
// println("ListEntries", dir, "file:", entry.Name, "maxKeys", cursor.maxKeys)
}
}
}
}
return
return