You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

300 lines
9.0 KiB

7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
4 years ago
5 years ago
4 years ago
4 years ago
4 years ago
5 years ago
4 years ago
4 years ago
4 years ago
5 years ago
4 years ago
  1. package s3api
  2. import (
  3. "context"
  4. "encoding/xml"
  5. "fmt"
  6. "io"
  7. "net/http"
  8. "net/url"
  9. "path/filepath"
  10. "strconv"
  11. "strings"
  12. "time"
  13. "github.com/chrislusf/seaweedfs/weed/filer"
  14. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  15. xhttp "github.com/chrislusf/seaweedfs/weed/s3api/http"
  16. "github.com/chrislusf/seaweedfs/weed/s3api/s3err"
  17. )
  18. type ListBucketResultV2 struct {
  19. XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ ListBucketResult"`
  20. Name string `xml:"Name"`
  21. Prefix string `xml:"Prefix"`
  22. MaxKeys int `xml:"MaxKeys"`
  23. Delimiter string `xml:"Delimiter,omitempty"`
  24. IsTruncated bool `xml:"IsTruncated"`
  25. Contents []ListEntry `xml:"Contents,omitempty"`
  26. CommonPrefixes []PrefixEntry `xml:"CommonPrefixes,omitempty"`
  27. ContinuationToken string `xml:"ContinuationToken,omitempty"`
  28. NextContinuationToken string `xml:"NextContinuationToken,omitempty"`
  29. KeyCount int `xml:"KeyCount"`
  30. StartAfter string `xml:"StartAfter,omitempty"`
  31. }
  32. func (s3a *S3ApiServer) ListObjectsV2Handler(w http.ResponseWriter, r *http.Request) {
  33. // https://docs.aws.amazon.com/AmazonS3/latest/API/v2-RESTBucketGET.html
  34. // collect parameters
  35. bucket, _ := getBucketAndObject(r)
  36. originalPrefix, continuationToken, startAfter, delimiter, _, maxKeys := getListObjectsV2Args(r.URL.Query())
  37. if maxKeys < 0 {
  38. writeErrorResponse(w, s3err.ErrInvalidMaxKeys, r.URL)
  39. return
  40. }
  41. if delimiter != "" && delimiter != "/" {
  42. writeErrorResponse(w, s3err.ErrNotImplemented, r.URL)
  43. return
  44. }
  45. marker := continuationToken
  46. if continuationToken == "" {
  47. marker = startAfter
  48. }
  49. response, err := s3a.listFilerEntries(bucket, originalPrefix, maxKeys, marker, delimiter)
  50. if err != nil {
  51. writeErrorResponse(w, s3err.ErrInternalError, r.URL)
  52. return
  53. }
  54. responseV2 := &ListBucketResultV2{
  55. XMLName: response.XMLName,
  56. Name: response.Name,
  57. CommonPrefixes: response.CommonPrefixes,
  58. Contents: response.Contents,
  59. ContinuationToken: continuationToken,
  60. Delimiter: response.Delimiter,
  61. IsTruncated: response.IsTruncated,
  62. KeyCount: len(response.Contents),
  63. MaxKeys: response.MaxKeys,
  64. NextContinuationToken: response.NextMarker,
  65. Prefix: response.Prefix,
  66. StartAfter: startAfter,
  67. }
  68. writeSuccessResponseXML(w, encodeResponse(responseV2))
  69. }
  70. func (s3a *S3ApiServer) ListObjectsV1Handler(w http.ResponseWriter, r *http.Request) {
  71. // https://docs.aws.amazon.com/AmazonS3/latest/API/RESTBucketGET.html
  72. // collect parameters
  73. bucket, _ := getBucketAndObject(r)
  74. originalPrefix, marker, delimiter, maxKeys := getListObjectsV1Args(r.URL.Query())
  75. if maxKeys < 0 {
  76. writeErrorResponse(w, s3err.ErrInvalidMaxKeys, r.URL)
  77. return
  78. }
  79. if delimiter != "" && delimiter != "/" {
  80. writeErrorResponse(w, s3err.ErrNotImplemented, r.URL)
  81. return
  82. }
  83. response, err := s3a.listFilerEntries(bucket, originalPrefix, maxKeys, marker, delimiter)
  84. if err != nil {
  85. writeErrorResponse(w, s3err.ErrInternalError, r.URL)
  86. return
  87. }
  88. writeSuccessResponseXML(w, encodeResponse(response))
  89. }
  90. func (s3a *S3ApiServer) listFilerEntries(bucket string, originalPrefix string, maxKeys int, marker string, delimiter string) (response ListBucketResult, err error) {
  91. // convert full path prefix into directory name and prefix for entry name
  92. reqDir, prefix := filepath.Split(originalPrefix)
  93. if strings.HasPrefix(reqDir, "/") {
  94. reqDir = reqDir[1:]
  95. }
  96. bucketPrefix := fmt.Sprintf("%s/%s/", s3a.option.BucketsPath, bucket)
  97. reqDir = fmt.Sprintf("%s%s", bucketPrefix, reqDir)
  98. if strings.HasSuffix(reqDir, "/") {
  99. // remove trailing "/"
  100. reqDir = reqDir[:len(reqDir)-1]
  101. }
  102. var contents []ListEntry
  103. var commonPrefixes []PrefixEntry
  104. var isTruncated bool
  105. var doErr error
  106. var nextMarker string
  107. // check filer
  108. err = s3a.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
  109. _, isTruncated, nextMarker, doErr = s3a.doListFilerEntries(client, reqDir, prefix, maxKeys, marker, delimiter, func(dir string, entry *filer_pb.Entry) {
  110. if entry.IsDirectory {
  111. if delimiter == "/" {
  112. commonPrefixes = append(commonPrefixes, PrefixEntry{
  113. Prefix: fmt.Sprintf("%s/%s/", dir, entry.Name)[len(bucketPrefix):],
  114. })
  115. }
  116. } else {
  117. storageClass := "STANDARD"
  118. if v, ok := entry.Extended[xhttp.AmzStorageClass]; ok {
  119. storageClass = string(v)
  120. }
  121. contents = append(contents, ListEntry{
  122. Key: fmt.Sprintf("%s/%s", dir, entry.Name)[len(bucketPrefix):],
  123. LastModified: time.Unix(entry.Attributes.Mtime, 0).UTC(),
  124. ETag: "\"" + filer.ETag(entry) + "\"",
  125. Size: int64(filer.FileSize(entry)),
  126. Owner: CanonicalUser{
  127. ID: fmt.Sprintf("%x", entry.Attributes.Uid),
  128. DisplayName: entry.Attributes.UserName,
  129. },
  130. StorageClass: StorageClass(storageClass),
  131. })
  132. }
  133. })
  134. if doErr != nil {
  135. return doErr
  136. }
  137. if !isTruncated {
  138. nextMarker = ""
  139. }
  140. response = ListBucketResult{
  141. Name: bucket,
  142. Prefix: originalPrefix,
  143. Marker: marker,
  144. NextMarker: nextMarker,
  145. MaxKeys: maxKeys,
  146. Delimiter: delimiter,
  147. IsTruncated: isTruncated,
  148. Contents: contents,
  149. CommonPrefixes: commonPrefixes,
  150. }
  151. return nil
  152. })
  153. return
  154. }
  155. func (s3a *S3ApiServer) doListFilerEntries(client filer_pb.SeaweedFilerClient, dir, prefix string, maxKeys int, marker, delimiter string, eachEntryFn func(dir string, entry *filer_pb.Entry)) (counter int, isTruncated bool, nextMarker string, err error) {
  156. // invariants
  157. // prefix and marker should be under dir, marker may contain "/"
  158. // maxKeys should be updated for each recursion
  159. if prefix == "/" && delimiter == "/" {
  160. return
  161. }
  162. if maxKeys <= 0 {
  163. return
  164. }
  165. if strings.Contains(marker, "/") {
  166. sepIndex := strings.Index(marker, "/")
  167. subDir, subMarker := marker[0:sepIndex], marker[sepIndex+1:]
  168. // println("doListFilerEntries dir", dir+"/"+subDir, "subMarker", subMarker, "maxKeys", maxKeys)
  169. subCounter, _, subNextMarker, subErr := s3a.doListFilerEntries(client, dir+"/"+subDir, "", maxKeys, subMarker, delimiter, eachEntryFn)
  170. if subErr != nil {
  171. err = subErr
  172. return
  173. }
  174. maxKeys -= subCounter
  175. nextMarker = subDir + "/" + subNextMarker
  176. counter += subCounter
  177. // finished processing this sub directory
  178. marker = subDir
  179. }
  180. // now marker is also a direct child of dir
  181. request := &filer_pb.ListEntriesRequest{
  182. Directory: dir,
  183. Prefix: prefix,
  184. Limit: uint32(maxKeys + 1),
  185. StartFromFileName: marker,
  186. InclusiveStartFrom: false,
  187. }
  188. ctx, cancel := context.WithCancel(context.Background())
  189. defer cancel()
  190. stream, listErr := client.ListEntries(ctx, request)
  191. if listErr != nil {
  192. err = fmt.Errorf("list entires %+v: %v", request, listErr)
  193. return
  194. }
  195. for {
  196. resp, recvErr := stream.Recv()
  197. if recvErr != nil {
  198. if recvErr == io.EOF {
  199. break
  200. } else {
  201. err = fmt.Errorf("iterating entires %+v: %v", request, recvErr)
  202. return
  203. }
  204. }
  205. if counter >= maxKeys {
  206. isTruncated = true
  207. return
  208. }
  209. entry := resp.Entry
  210. nextMarker = entry.Name
  211. if entry.IsDirectory {
  212. // println("ListEntries", dir, "dir:", entry.Name)
  213. if entry.Name != ".uploads" { // FIXME no need to apply to all directories. this extra also affects maxKeys
  214. eachEntryFn(dir, entry)
  215. if delimiter != "/" {
  216. // println("doListFilerEntries2 dir", dir+"/"+entry.Name, "maxKeys", maxKeys-counter)
  217. subCounter, subIsTruncated, subNextMarker, subErr := s3a.doListFilerEntries(client, dir+"/"+entry.Name, "", maxKeys-counter, "", delimiter, eachEntryFn)
  218. if subErr != nil {
  219. err = fmt.Errorf("doListFilerEntries2: %v", subErr)
  220. return
  221. }
  222. // println("doListFilerEntries2 dir", dir+"/"+entry.Name, "maxKeys", maxKeys-counter, "subCounter", subCounter, "subNextMarker", subNextMarker, "subIsTruncated", subIsTruncated)
  223. counter += subCounter
  224. nextMarker = entry.Name + "/" + subNextMarker
  225. if subIsTruncated {
  226. isTruncated = true
  227. return
  228. }
  229. } else {
  230. counter++
  231. }
  232. }
  233. } else {
  234. // println("ListEntries", dir, "file:", entry.Name)
  235. eachEntryFn(dir, entry)
  236. counter++
  237. }
  238. }
  239. return
  240. }
  241. func getListObjectsV2Args(values url.Values) (prefix, token, startAfter, delimiter string, fetchOwner bool, maxkeys int) {
  242. prefix = values.Get("prefix")
  243. token = values.Get("continuation-token")
  244. startAfter = values.Get("start-after")
  245. delimiter = values.Get("delimiter")
  246. if values.Get("max-keys") != "" {
  247. maxkeys, _ = strconv.Atoi(values.Get("max-keys"))
  248. } else {
  249. maxkeys = maxObjectListSizeLimit
  250. }
  251. fetchOwner = values.Get("fetch-owner") == "true"
  252. return
  253. }
  254. func getListObjectsV1Args(values url.Values) (prefix, marker, delimiter string, maxkeys int) {
  255. prefix = values.Get("prefix")
  256. marker = values.Get("marker")
  257. delimiter = values.Get("delimiter")
  258. if values.Get("max-keys") != "" {
  259. maxkeys, _ = strconv.Atoi(values.Get("max-keys"))
  260. } else {
  261. maxkeys = maxObjectListSizeLimit
  262. }
  263. return
  264. }