You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

294 lines
8.8 KiB

7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
4 years ago
5 years ago
5 years ago
5 years ago
5 years ago
  1. package s3api
  2. import (
  3. "context"
  4. "encoding/xml"
  5. "fmt"
  6. "io"
  7. "net/http"
  8. "net/url"
  9. "path/filepath"
  10. "strconv"
  11. "strings"
  12. "time"
  13. "github.com/chrislusf/seaweedfs/weed/filer"
  14. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  15. )
  16. type ListBucketResultV2 struct {
  17. XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ ListBucketResult"`
  18. Name string `xml:"Name"`
  19. Prefix string `xml:"Prefix"`
  20. MaxKeys int `xml:"MaxKeys"`
  21. Delimiter string `xml:"Delimiter,omitempty"`
  22. IsTruncated bool `xml:"IsTruncated"`
  23. Contents []ListEntry `xml:"Contents,omitempty"`
  24. CommonPrefixes []PrefixEntry `xml:"CommonPrefixes,omitempty"`
  25. ContinuationToken string `xml:"ContinuationToken,omitempty"`
  26. NextContinuationToken string `xml:"NextContinuationToken,omitempty"`
  27. KeyCount int `xml:"KeyCount"`
  28. StartAfter string `xml:"StartAfter,omitempty"`
  29. }
  30. func (s3a *S3ApiServer) ListObjectsV2Handler(w http.ResponseWriter, r *http.Request) {
  31. // https://docs.aws.amazon.com/AmazonS3/latest/API/v2-RESTBucketGET.html
  32. // collect parameters
  33. bucket, _ := getBucketAndObject(r)
  34. originalPrefix, continuationToken, startAfter, delimiter, _, maxKeys := getListObjectsV2Args(r.URL.Query())
  35. if maxKeys < 0 {
  36. writeErrorResponse(w, ErrInvalidMaxKeys, r.URL)
  37. return
  38. }
  39. if delimiter != "" && delimiter != "/" {
  40. writeErrorResponse(w, ErrNotImplemented, r.URL)
  41. return
  42. }
  43. marker := continuationToken
  44. if continuationToken == "" {
  45. marker = startAfter
  46. }
  47. response, err := s3a.listFilerEntries(bucket, originalPrefix, maxKeys, marker, delimiter)
  48. if err != nil {
  49. writeErrorResponse(w, ErrInternalError, r.URL)
  50. return
  51. }
  52. responseV2 := &ListBucketResultV2{
  53. XMLName: response.XMLName,
  54. Name: response.Name,
  55. CommonPrefixes: response.CommonPrefixes,
  56. Contents: response.Contents,
  57. ContinuationToken: continuationToken,
  58. Delimiter: response.Delimiter,
  59. IsTruncated: response.IsTruncated,
  60. KeyCount: len(response.Contents),
  61. MaxKeys: response.MaxKeys,
  62. NextContinuationToken: response.NextMarker,
  63. Prefix: response.Prefix,
  64. StartAfter: startAfter,
  65. }
  66. writeSuccessResponseXML(w, encodeResponse(responseV2))
  67. }
  68. func (s3a *S3ApiServer) ListObjectsV1Handler(w http.ResponseWriter, r *http.Request) {
  69. // https://docs.aws.amazon.com/AmazonS3/latest/API/RESTBucketGET.html
  70. // collect parameters
  71. bucket, _ := getBucketAndObject(r)
  72. originalPrefix, marker, delimiter, maxKeys := getListObjectsV1Args(r.URL.Query())
  73. if maxKeys < 0 {
  74. writeErrorResponse(w, ErrInvalidMaxKeys, r.URL)
  75. return
  76. }
  77. if delimiter != "" && delimiter != "/" {
  78. writeErrorResponse(w, ErrNotImplemented, r.URL)
  79. return
  80. }
  81. response, err := s3a.listFilerEntries(bucket, originalPrefix, maxKeys, marker, delimiter)
  82. if err != nil {
  83. writeErrorResponse(w, ErrInternalError, r.URL)
  84. return
  85. }
  86. writeSuccessResponseXML(w, encodeResponse(response))
  87. }
  88. func (s3a *S3ApiServer) listFilerEntries(bucket string, originalPrefix string, maxKeys int, marker string, delimiter string) (response ListBucketResult, err error) {
  89. // convert full path prefix into directory name and prefix for entry name
  90. reqDir, prefix := filepath.Split(originalPrefix)
  91. if strings.HasPrefix(reqDir, "/") {
  92. reqDir = reqDir[1:]
  93. }
  94. bucketPrefix := fmt.Sprintf("%s/%s/", s3a.option.BucketsPath, bucket)
  95. reqDir = fmt.Sprintf("%s%s", bucketPrefix, reqDir)
  96. if strings.HasSuffix(reqDir, "/") {
  97. // remove trailing "/"
  98. reqDir = reqDir[:len(reqDir)-1]
  99. }
  100. var contents []ListEntry
  101. var commonPrefixes []PrefixEntry
  102. var isTruncated bool
  103. var doErr error
  104. var nextMarker string
  105. // check filer
  106. err = s3a.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
  107. _, isTruncated, nextMarker, doErr = s3a.doListFilerEntries(client, reqDir, prefix, maxKeys, marker, delimiter, func(dir string, entry *filer_pb.Entry) {
  108. if entry.IsDirectory {
  109. if delimiter == "/" {
  110. commonPrefixes = append(commonPrefixes, PrefixEntry{
  111. Prefix: fmt.Sprintf("%s/%s/", dir, entry.Name)[len(bucketPrefix):],
  112. })
  113. }
  114. } else {
  115. contents = append(contents, ListEntry{
  116. Key: fmt.Sprintf("%s/%s", dir, entry.Name)[len(bucketPrefix):],
  117. LastModified: time.Unix(entry.Attributes.Mtime, 0).UTC(),
  118. ETag: "\"" + filer.ETag(entry) + "\"",
  119. Size: int64(filer.FileSize(entry)),
  120. Owner: CanonicalUser{
  121. ID: fmt.Sprintf("%x", entry.Attributes.Uid),
  122. DisplayName: entry.Attributes.UserName,
  123. },
  124. StorageClass: "STANDARD",
  125. })
  126. }
  127. })
  128. if doErr != nil {
  129. return doErr
  130. }
  131. if !isTruncated {
  132. nextMarker = ""
  133. }
  134. response = ListBucketResult{
  135. Name: bucket,
  136. Prefix: originalPrefix,
  137. Marker: marker,
  138. NextMarker: nextMarker,
  139. MaxKeys: maxKeys,
  140. Delimiter: delimiter,
  141. IsTruncated: isTruncated,
  142. Contents: contents,
  143. CommonPrefixes: commonPrefixes,
  144. }
  145. return nil
  146. })
  147. return
  148. }
  149. func (s3a *S3ApiServer) doListFilerEntries(client filer_pb.SeaweedFilerClient, dir, prefix string, maxKeys int, marker, delimiter string, eachEntryFn func(dir string, entry *filer_pb.Entry)) (counter int, isTruncated bool, nextMarker string, err error) {
  150. // invariants
  151. // prefix and marker should be under dir, marker may contain "/"
  152. // maxKeys should be updated for each recursion
  153. if prefix == "/" && delimiter == "/" {
  154. return
  155. }
  156. if maxKeys <= 0 {
  157. return
  158. }
  159. if strings.Contains(marker, "/") {
  160. sepIndex := strings.Index(marker, "/")
  161. subDir, subMarker := marker[0:sepIndex], marker[sepIndex+1:]
  162. // println("doListFilerEntries dir", dir+"/"+subDir, "subMarker", subMarker, "maxKeys", maxKeys)
  163. subCounter, _, subNextMarker, subErr := s3a.doListFilerEntries(client, dir+"/"+subDir, "", maxKeys, subMarker, delimiter, eachEntryFn)
  164. if subErr != nil {
  165. err = subErr
  166. return
  167. }
  168. maxKeys -= subCounter
  169. nextMarker = subDir + "/" + subNextMarker
  170. counter += subCounter
  171. // finished processing this sub directory
  172. marker = subDir
  173. }
  174. // now marker is also a direct child of dir
  175. request := &filer_pb.ListEntriesRequest{
  176. Directory: dir,
  177. Prefix: prefix,
  178. Limit: uint32(maxKeys + 1),
  179. StartFromFileName: marker,
  180. InclusiveStartFrom: false,
  181. }
  182. ctx, cancel := context.WithCancel(context.Background())
  183. defer cancel()
  184. stream, listErr := client.ListEntries(ctx, request)
  185. if listErr != nil {
  186. err = fmt.Errorf("list entires %+v: %v", request, listErr)
  187. return
  188. }
  189. for {
  190. resp, recvErr := stream.Recv()
  191. if recvErr != nil {
  192. if recvErr == io.EOF {
  193. break
  194. } else {
  195. err = fmt.Errorf("iterating entires %+v: %v", request, recvErr)
  196. return
  197. }
  198. }
  199. if counter >= maxKeys {
  200. isTruncated = true
  201. return
  202. }
  203. entry := resp.Entry
  204. nextMarker = entry.Name
  205. if entry.IsDirectory {
  206. // println("ListEntries", dir, "dir:", entry.Name)
  207. if entry.Name != ".uploads" { // FIXME no need to apply to all directories. this extra also affects maxKeys
  208. eachEntryFn(dir, entry)
  209. if delimiter != "/" {
  210. // println("doListFilerEntries2 dir", dir+"/"+entry.Name, "maxKeys", maxKeys-counter)
  211. subCounter, subIsTruncated, subNextMarker, subErr := s3a.doListFilerEntries(client, dir+"/"+entry.Name, "", maxKeys-counter, "", delimiter, eachEntryFn)
  212. if subErr != nil {
  213. err = fmt.Errorf("doListFilerEntries2: %v", subErr)
  214. return
  215. }
  216. // println("doListFilerEntries2 dir", dir+"/"+entry.Name, "maxKeys", maxKeys-counter, "subCounter", subCounter, "subNextMarker", subNextMarker, "subIsTruncated", subIsTruncated)
  217. counter += subCounter
  218. nextMarker = entry.Name + "/" + subNextMarker
  219. if subIsTruncated {
  220. isTruncated = true
  221. return
  222. }
  223. } else {
  224. counter++
  225. }
  226. }
  227. } else {
  228. // println("ListEntries", dir, "file:", entry.Name)
  229. eachEntryFn(dir, entry)
  230. counter++
  231. }
  232. }
  233. return
  234. }
  235. func getListObjectsV2Args(values url.Values) (prefix, token, startAfter, delimiter string, fetchOwner bool, maxkeys int) {
  236. prefix = values.Get("prefix")
  237. token = values.Get("continuation-token")
  238. startAfter = values.Get("start-after")
  239. delimiter = values.Get("delimiter")
  240. if values.Get("max-keys") != "" {
  241. maxkeys, _ = strconv.Atoi(values.Get("max-keys"))
  242. } else {
  243. maxkeys = maxObjectListSizeLimit
  244. }
  245. fetchOwner = values.Get("fetch-owner") == "true"
  246. return
  247. }
  248. func getListObjectsV1Args(values url.Values) (prefix, marker, delimiter string, maxkeys int) {
  249. prefix = values.Get("prefix")
  250. marker = values.Get("marker")
  251. delimiter = values.Get("delimiter")
  252. if values.Get("max-keys") != "" {
  253. maxkeys, _ = strconv.Atoi(values.Get("max-keys"))
  254. } else {
  255. maxkeys = maxObjectListSizeLimit
  256. }
  257. return
  258. }