You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

295 lines
8.8 KiB

7 years ago
7 years ago
4 years ago
7 years ago
7 years ago
7 years ago
4 years ago
5 years ago
4 years ago
4 years ago
4 years ago
5 years ago
4 years ago
4 years ago
4 years ago
5 years ago
5 years ago
  1. package s3api
  2. import (
  3. "context"
  4. "encoding/xml"
  5. "fmt"
  6. "github.com/chrislusf/seaweedfs/weed/s3api/s3err"
  7. "io"
  8. "net/http"
  9. "net/url"
  10. "path/filepath"
  11. "strconv"
  12. "strings"
  13. "time"
  14. "github.com/chrislusf/seaweedfs/weed/filer"
  15. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  16. )
  17. type ListBucketResultV2 struct {
  18. XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ ListBucketResult"`
  19. Name string `xml:"Name"`
  20. Prefix string `xml:"Prefix"`
  21. MaxKeys int `xml:"MaxKeys"`
  22. Delimiter string `xml:"Delimiter,omitempty"`
  23. IsTruncated bool `xml:"IsTruncated"`
  24. Contents []ListEntry `xml:"Contents,omitempty"`
  25. CommonPrefixes []PrefixEntry `xml:"CommonPrefixes,omitempty"`
  26. ContinuationToken string `xml:"ContinuationToken,omitempty"`
  27. NextContinuationToken string `xml:"NextContinuationToken,omitempty"`
  28. KeyCount int `xml:"KeyCount"`
  29. StartAfter string `xml:"StartAfter,omitempty"`
  30. }
  31. func (s3a *S3ApiServer) ListObjectsV2Handler(w http.ResponseWriter, r *http.Request) {
  32. // https://docs.aws.amazon.com/AmazonS3/latest/API/v2-RESTBucketGET.html
  33. // collect parameters
  34. bucket, _ := getBucketAndObject(r)
  35. originalPrefix, continuationToken, startAfter, delimiter, _, maxKeys := getListObjectsV2Args(r.URL.Query())
  36. if maxKeys < 0 {
  37. writeErrorResponse(w, s3err.ErrInvalidMaxKeys, r.URL)
  38. return
  39. }
  40. if delimiter != "" && delimiter != "/" {
  41. writeErrorResponse(w, s3err.ErrNotImplemented, r.URL)
  42. return
  43. }
  44. marker := continuationToken
  45. if continuationToken == "" {
  46. marker = startAfter
  47. }
  48. response, err := s3a.listFilerEntries(bucket, originalPrefix, maxKeys, marker, delimiter)
  49. if err != nil {
  50. writeErrorResponse(w, s3err.ErrInternalError, r.URL)
  51. return
  52. }
  53. responseV2 := &ListBucketResultV2{
  54. XMLName: response.XMLName,
  55. Name: response.Name,
  56. CommonPrefixes: response.CommonPrefixes,
  57. Contents: response.Contents,
  58. ContinuationToken: continuationToken,
  59. Delimiter: response.Delimiter,
  60. IsTruncated: response.IsTruncated,
  61. KeyCount: len(response.Contents),
  62. MaxKeys: response.MaxKeys,
  63. NextContinuationToken: response.NextMarker,
  64. Prefix: response.Prefix,
  65. StartAfter: startAfter,
  66. }
  67. writeSuccessResponseXML(w, encodeResponse(responseV2))
  68. }
  69. func (s3a *S3ApiServer) ListObjectsV1Handler(w http.ResponseWriter, r *http.Request) {
  70. // https://docs.aws.amazon.com/AmazonS3/latest/API/RESTBucketGET.html
  71. // collect parameters
  72. bucket, _ := getBucketAndObject(r)
  73. originalPrefix, marker, delimiter, maxKeys := getListObjectsV1Args(r.URL.Query())
  74. if maxKeys < 0 {
  75. writeErrorResponse(w, s3err.ErrInvalidMaxKeys, r.URL)
  76. return
  77. }
  78. if delimiter != "" && delimiter != "/" {
  79. writeErrorResponse(w, s3err.ErrNotImplemented, r.URL)
  80. return
  81. }
  82. response, err := s3a.listFilerEntries(bucket, originalPrefix, maxKeys, marker, delimiter)
  83. if err != nil {
  84. writeErrorResponse(w, s3err.ErrInternalError, r.URL)
  85. return
  86. }
  87. writeSuccessResponseXML(w, encodeResponse(response))
  88. }
  89. func (s3a *S3ApiServer) listFilerEntries(bucket string, originalPrefix string, maxKeys int, marker string, delimiter string) (response ListBucketResult, err error) {
  90. // convert full path prefix into directory name and prefix for entry name
  91. reqDir, prefix := filepath.Split(originalPrefix)
  92. if strings.HasPrefix(reqDir, "/") {
  93. reqDir = reqDir[1:]
  94. }
  95. bucketPrefix := fmt.Sprintf("%s/%s/", s3a.option.BucketsPath, bucket)
  96. reqDir = fmt.Sprintf("%s%s", bucketPrefix, reqDir)
  97. if strings.HasSuffix(reqDir, "/") {
  98. // remove trailing "/"
  99. reqDir = reqDir[:len(reqDir)-1]
  100. }
  101. var contents []ListEntry
  102. var commonPrefixes []PrefixEntry
  103. var isTruncated bool
  104. var doErr error
  105. var nextMarker string
  106. // check filer
  107. err = s3a.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
  108. _, isTruncated, nextMarker, doErr = s3a.doListFilerEntries(client, reqDir, prefix, maxKeys, marker, delimiter, func(dir string, entry *filer_pb.Entry) {
  109. if entry.IsDirectory {
  110. if delimiter == "/" {
  111. commonPrefixes = append(commonPrefixes, PrefixEntry{
  112. Prefix: fmt.Sprintf("%s/%s/", dir, entry.Name)[len(bucketPrefix):],
  113. })
  114. }
  115. } else {
  116. contents = append(contents, ListEntry{
  117. Key: fmt.Sprintf("%s/%s", dir, entry.Name)[len(bucketPrefix):],
  118. LastModified: time.Unix(entry.Attributes.Mtime, 0).UTC(),
  119. ETag: "\"" + filer.ETag(entry) + "\"",
  120. Size: int64(filer.FileSize(entry)),
  121. Owner: CanonicalUser{
  122. ID: fmt.Sprintf("%x", entry.Attributes.Uid),
  123. DisplayName: entry.Attributes.UserName,
  124. },
  125. StorageClass: "STANDARD",
  126. })
  127. }
  128. })
  129. if doErr != nil {
  130. return doErr
  131. }
  132. if !isTruncated {
  133. nextMarker = ""
  134. }
  135. response = ListBucketResult{
  136. Name: bucket,
  137. Prefix: originalPrefix,
  138. Marker: marker,
  139. NextMarker: nextMarker,
  140. MaxKeys: maxKeys,
  141. Delimiter: delimiter,
  142. IsTruncated: isTruncated,
  143. Contents: contents,
  144. CommonPrefixes: commonPrefixes,
  145. }
  146. return nil
  147. })
  148. return
  149. }
  150. func (s3a *S3ApiServer) doListFilerEntries(client filer_pb.SeaweedFilerClient, dir, prefix string, maxKeys int, marker, delimiter string, eachEntryFn func(dir string, entry *filer_pb.Entry)) (counter int, isTruncated bool, nextMarker string, err error) {
  151. // invariants
  152. // prefix and marker should be under dir, marker may contain "/"
  153. // maxKeys should be updated for each recursion
  154. if prefix == "/" && delimiter == "/" {
  155. return
  156. }
  157. if maxKeys <= 0 {
  158. return
  159. }
  160. if strings.Contains(marker, "/") {
  161. sepIndex := strings.Index(marker, "/")
  162. subDir, subMarker := marker[0:sepIndex], marker[sepIndex+1:]
  163. // println("doListFilerEntries dir", dir+"/"+subDir, "subMarker", subMarker, "maxKeys", maxKeys)
  164. subCounter, _, subNextMarker, subErr := s3a.doListFilerEntries(client, dir+"/"+subDir, "", maxKeys, subMarker, delimiter, eachEntryFn)
  165. if subErr != nil {
  166. err = subErr
  167. return
  168. }
  169. maxKeys -= subCounter
  170. nextMarker = subDir + "/" + subNextMarker
  171. counter += subCounter
  172. // finished processing this sub directory
  173. marker = subDir
  174. }
  175. // now marker is also a direct child of dir
  176. request := &filer_pb.ListEntriesRequest{
  177. Directory: dir,
  178. Prefix: prefix,
  179. Limit: uint32(maxKeys + 1),
  180. StartFromFileName: marker,
  181. InclusiveStartFrom: false,
  182. }
  183. ctx, cancel := context.WithCancel(context.Background())
  184. defer cancel()
  185. stream, listErr := client.ListEntries(ctx, request)
  186. if listErr != nil {
  187. err = fmt.Errorf("list entires %+v: %v", request, listErr)
  188. return
  189. }
  190. for {
  191. resp, recvErr := stream.Recv()
  192. if recvErr != nil {
  193. if recvErr == io.EOF {
  194. break
  195. } else {
  196. err = fmt.Errorf("iterating entires %+v: %v", request, recvErr)
  197. return
  198. }
  199. }
  200. if counter >= maxKeys {
  201. isTruncated = true
  202. return
  203. }
  204. entry := resp.Entry
  205. nextMarker = entry.Name
  206. if entry.IsDirectory {
  207. // println("ListEntries", dir, "dir:", entry.Name)
  208. if entry.Name != ".uploads" { // FIXME no need to apply to all directories. this extra also affects maxKeys
  209. eachEntryFn(dir, entry)
  210. if delimiter != "/" {
  211. // println("doListFilerEntries2 dir", dir+"/"+entry.Name, "maxKeys", maxKeys-counter)
  212. subCounter, subIsTruncated, subNextMarker, subErr := s3a.doListFilerEntries(client, dir+"/"+entry.Name, "", maxKeys-counter, "", delimiter, eachEntryFn)
  213. if subErr != nil {
  214. err = fmt.Errorf("doListFilerEntries2: %v", subErr)
  215. return
  216. }
  217. // println("doListFilerEntries2 dir", dir+"/"+entry.Name, "maxKeys", maxKeys-counter, "subCounter", subCounter, "subNextMarker", subNextMarker, "subIsTruncated", subIsTruncated)
  218. counter += subCounter
  219. nextMarker = entry.Name + "/" + subNextMarker
  220. if subIsTruncated {
  221. isTruncated = true
  222. return
  223. }
  224. } else {
  225. counter++
  226. }
  227. }
  228. } else {
  229. // println("ListEntries", dir, "file:", entry.Name)
  230. eachEntryFn(dir, entry)
  231. counter++
  232. }
  233. }
  234. return
  235. }
  236. func getListObjectsV2Args(values url.Values) (prefix, token, startAfter, delimiter string, fetchOwner bool, maxkeys int) {
  237. prefix = values.Get("prefix")
  238. token = values.Get("continuation-token")
  239. startAfter = values.Get("start-after")
  240. delimiter = values.Get("delimiter")
  241. if values.Get("max-keys") != "" {
  242. maxkeys, _ = strconv.Atoi(values.Get("max-keys"))
  243. } else {
  244. maxkeys = maxObjectListSizeLimit
  245. }
  246. fetchOwner = values.Get("fetch-owner") == "true"
  247. return
  248. }
  249. func getListObjectsV1Args(values url.Values) (prefix, marker, delimiter string, maxkeys int) {
  250. prefix = values.Get("prefix")
  251. marker = values.Get("marker")
  252. delimiter = values.Get("delimiter")
  253. if values.Get("max-keys") != "" {
  254. maxkeys, _ = strconv.Atoi(values.Get("max-keys"))
  255. } else {
  256. maxkeys = maxObjectListSizeLimit
  257. }
  258. return
  259. }