You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

379 lines
11 KiB

7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
5 years ago
4 years ago
4 years ago
4 years ago
  1. package s3api
  2. import (
  3. "context"
  4. "encoding/xml"
  5. "fmt"
  6. "github.com/chrislusf/seaweedfs/weed/glog"
  7. "io"
  8. "net/http"
  9. "net/url"
  10. "path/filepath"
  11. "strconv"
  12. "strings"
  13. "time"
  14. "github.com/chrislusf/seaweedfs/weed/filer"
  15. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  16. xhttp "github.com/chrislusf/seaweedfs/weed/s3api/http"
  17. "github.com/chrislusf/seaweedfs/weed/s3api/s3err"
  18. )
  19. type ListBucketResultV2 struct {
  20. XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ ListBucketResult"`
  21. Name string `xml:"Name"`
  22. Prefix string `xml:"Prefix"`
  23. MaxKeys int `xml:"MaxKeys"`
  24. Delimiter string `xml:"Delimiter,omitempty"`
  25. IsTruncated bool `xml:"IsTruncated"`
  26. Contents []ListEntry `xml:"Contents,omitempty"`
  27. CommonPrefixes []PrefixEntry `xml:"CommonPrefixes,omitempty"`
  28. ContinuationToken string `xml:"ContinuationToken,omitempty"`
  29. NextContinuationToken string `xml:"NextContinuationToken,omitempty"`
  30. KeyCount int `xml:"KeyCount"`
  31. StartAfter string `xml:"StartAfter,omitempty"`
  32. }
  33. func (s3a *S3ApiServer) ListObjectsV2Handler(w http.ResponseWriter, r *http.Request) {
  34. // https://docs.aws.amazon.com/AmazonS3/latest/API/v2-RESTBucketGET.html
  35. // collect parameters
  36. bucket, _ := getBucketAndObject(r)
  37. originalPrefix, continuationToken, startAfter, delimiter, _, maxKeys := getListObjectsV2Args(r.URL.Query())
  38. if maxKeys < 0 {
  39. writeErrorResponse(w, s3err.ErrInvalidMaxKeys, r.URL)
  40. return
  41. }
  42. if delimiter != "" && delimiter != "/" {
  43. writeErrorResponse(w, s3err.ErrNotImplemented, r.URL)
  44. return
  45. }
  46. marker := continuationToken
  47. if continuationToken == "" {
  48. marker = startAfter
  49. }
  50. response, err := s3a.listFilerEntries(bucket, originalPrefix, maxKeys, marker, delimiter)
  51. if err != nil {
  52. writeErrorResponse(w, s3err.ErrInternalError, r.URL)
  53. return
  54. }
  55. if len(response.Contents) == 0 {
  56. if exists, existErr := s3a.exists(s3a.option.BucketsPath, bucket, true); existErr == nil && !exists {
  57. writeErrorResponse(w, s3err.ErrNoSuchBucket, r.URL)
  58. return
  59. }
  60. }
  61. responseV2 := &ListBucketResultV2{
  62. XMLName: response.XMLName,
  63. Name: response.Name,
  64. CommonPrefixes: response.CommonPrefixes,
  65. Contents: response.Contents,
  66. ContinuationToken: continuationToken,
  67. Delimiter: response.Delimiter,
  68. IsTruncated: response.IsTruncated,
  69. KeyCount: len(response.Contents) + len(response.CommonPrefixes),
  70. MaxKeys: response.MaxKeys,
  71. NextContinuationToken: response.NextMarker,
  72. Prefix: response.Prefix,
  73. StartAfter: startAfter,
  74. }
  75. writeSuccessResponseXML(w, encodeResponse(responseV2))
  76. }
  77. func (s3a *S3ApiServer) ListObjectsV1Handler(w http.ResponseWriter, r *http.Request) {
  78. // https://docs.aws.amazon.com/AmazonS3/latest/API/RESTBucketGET.html
  79. // collect parameters
  80. bucket, _ := getBucketAndObject(r)
  81. originalPrefix, marker, delimiter, maxKeys := getListObjectsV1Args(r.URL.Query())
  82. if maxKeys < 0 {
  83. writeErrorResponse(w, s3err.ErrInvalidMaxKeys, r.URL)
  84. return
  85. }
  86. if delimiter != "" && delimiter != "/" {
  87. writeErrorResponse(w, s3err.ErrNotImplemented, r.URL)
  88. return
  89. }
  90. response, err := s3a.listFilerEntries(bucket, originalPrefix, maxKeys, marker, delimiter)
  91. if err != nil {
  92. writeErrorResponse(w, s3err.ErrInternalError, r.URL)
  93. return
  94. }
  95. if len(response.Contents) == 0 {
  96. if exists, existErr := s3a.exists(s3a.option.BucketsPath, bucket, true); existErr == nil && !exists {
  97. writeErrorResponse(w, s3err.ErrNoSuchBucket, r.URL)
  98. return
  99. }
  100. }
  101. writeSuccessResponseXML(w, encodeResponse(response))
  102. }
  103. func (s3a *S3ApiServer) listFilerEntries(bucket string, originalPrefix string, maxKeys int, marker string, delimiter string) (response ListBucketResult, err error) {
  104. // convert full path prefix into directory name and prefix for entry name
  105. reqDir, prefix := filepath.Split(originalPrefix)
  106. if strings.HasPrefix(reqDir, "/") {
  107. reqDir = reqDir[1:]
  108. }
  109. bucketPrefix := fmt.Sprintf("%s/%s/", s3a.option.BucketsPath, bucket)
  110. reqDir = fmt.Sprintf("%s%s", bucketPrefix, reqDir)
  111. if strings.HasSuffix(reqDir, "/") {
  112. // remove trailing "/"
  113. reqDir = reqDir[:len(reqDir)-1]
  114. }
  115. var contents []ListEntry
  116. var commonPrefixes []PrefixEntry
  117. var isTruncated bool
  118. var doErr error
  119. var nextMarker string
  120. // check filer
  121. err = s3a.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
  122. _, isTruncated, nextMarker, doErr = s3a.doListFilerEntries(client, reqDir, prefix, maxKeys, marker, delimiter, func(dir string, entry *filer_pb.Entry) {
  123. if entry.IsDirectory {
  124. if delimiter == "/" {
  125. commonPrefixes = append(commonPrefixes, PrefixEntry{
  126. Prefix: fmt.Sprintf("%s/%s/", dir, entry.Name)[len(bucketPrefix):],
  127. })
  128. }
  129. } else {
  130. storageClass := "STANDARD"
  131. if v, ok := entry.Extended[xhttp.AmzStorageClass]; ok {
  132. storageClass = string(v)
  133. }
  134. contents = append(contents, ListEntry{
  135. Key: fmt.Sprintf("%s/%s", dir, entry.Name)[len(bucketPrefix):],
  136. LastModified: time.Unix(entry.Attributes.Mtime, 0).UTC(),
  137. ETag: "\"" + filer.ETag(entry) + "\"",
  138. Size: int64(filer.FileSize(entry)),
  139. Owner: CanonicalUser{
  140. ID: fmt.Sprintf("%x", entry.Attributes.Uid),
  141. DisplayName: entry.Attributes.UserName,
  142. },
  143. StorageClass: StorageClass(storageClass),
  144. })
  145. }
  146. })
  147. if doErr != nil {
  148. return doErr
  149. }
  150. if !isTruncated {
  151. nextMarker = ""
  152. }
  153. response = ListBucketResult{
  154. Name: bucket,
  155. Prefix: originalPrefix,
  156. Marker: marker,
  157. NextMarker: nextMarker,
  158. MaxKeys: maxKeys,
  159. Delimiter: delimiter,
  160. IsTruncated: isTruncated,
  161. Contents: contents,
  162. CommonPrefixes: commonPrefixes,
  163. }
  164. return nil
  165. })
  166. return
  167. }
  168. func (s3a *S3ApiServer) doListFilerEntries(client filer_pb.SeaweedFilerClient, dir, prefix string, maxKeys int, marker, delimiter string, eachEntryFn func(dir string, entry *filer_pb.Entry)) (counter int, isTruncated bool, nextMarker string, err error) {
  169. // invariants
  170. // prefix and marker should be under dir, marker may contain "/"
  171. // maxKeys should be updated for each recursion
  172. if prefix == "/" && delimiter == "/" {
  173. return
  174. }
  175. if maxKeys <= 0 {
  176. return
  177. }
  178. if strings.Contains(marker, "/") {
  179. sepIndex := strings.Index(marker, "/")
  180. subDir, subMarker := marker[0:sepIndex], marker[sepIndex+1:]
  181. // println("doListFilerEntries dir", dir+"/"+subDir, "subMarker", subMarker, "maxKeys", maxKeys)
  182. subCounter, subIsTruncated, subNextMarker, subErr := s3a.doListFilerEntries(client, dir+"/"+subDir, "", maxKeys, subMarker, delimiter, eachEntryFn)
  183. if subErr != nil {
  184. err = subErr
  185. return
  186. }
  187. isTruncated = isTruncated || subIsTruncated
  188. maxKeys -= subCounter
  189. nextMarker = subDir + "/" + subNextMarker
  190. // finished processing this sub directory
  191. marker = subDir
  192. }
  193. // now marker is also a direct child of dir
  194. request := &filer_pb.ListEntriesRequest{
  195. Directory: dir,
  196. Prefix: prefix,
  197. Limit: uint32(maxKeys + 1),
  198. StartFromFileName: marker,
  199. InclusiveStartFrom: false,
  200. }
  201. ctx, cancel := context.WithCancel(context.Background())
  202. defer cancel()
  203. stream, listErr := client.ListEntries(ctx, request)
  204. if listErr != nil {
  205. err = fmt.Errorf("list entires %+v: %v", request, listErr)
  206. return
  207. }
  208. for {
  209. resp, recvErr := stream.Recv()
  210. if recvErr != nil {
  211. if recvErr == io.EOF {
  212. break
  213. } else {
  214. err = fmt.Errorf("iterating entires %+v: %v", request, recvErr)
  215. return
  216. }
  217. }
  218. if counter >= maxKeys {
  219. isTruncated = true
  220. return
  221. }
  222. entry := resp.Entry
  223. nextMarker = entry.Name
  224. if entry.IsDirectory {
  225. // println("ListEntries", dir, "dir:", entry.Name)
  226. if entry.Name != ".uploads" { // FIXME no need to apply to all directories. this extra also affects maxKeys
  227. if delimiter != "/" {
  228. eachEntryFn(dir, entry)
  229. // println("doListFilerEntries2 dir", dir+"/"+entry.Name, "maxKeys", maxKeys-counter)
  230. subCounter, subIsTruncated, subNextMarker, subErr := s3a.doListFilerEntries(client, dir+"/"+entry.Name, "", maxKeys-counter, "", delimiter, eachEntryFn)
  231. if subErr != nil {
  232. err = fmt.Errorf("doListFilerEntries2: %v", subErr)
  233. return
  234. }
  235. // println("doListFilerEntries2 dir", dir+"/"+entry.Name, "maxKeys", maxKeys-counter, "subCounter", subCounter, "subNextMarker", subNextMarker, "subIsTruncated", subIsTruncated)
  236. counter += subCounter
  237. nextMarker = entry.Name + "/" + subNextMarker
  238. if subIsTruncated {
  239. isTruncated = true
  240. return
  241. }
  242. } else {
  243. var isEmpty bool
  244. if !s3a.option.AllowEmptyFolder {
  245. if isEmpty, err = s3a.isDirectoryAllEmpty(client, dir, entry.Name); err != nil {
  246. glog.Errorf("check empty folder %s: %v", dir, err)
  247. }
  248. }
  249. if !isEmpty {
  250. eachEntryFn(dir, entry)
  251. counter++
  252. }
  253. }
  254. }
  255. } else {
  256. // println("ListEntries", dir, "file:", entry.Name)
  257. eachEntryFn(dir, entry)
  258. counter++
  259. }
  260. }
  261. return
  262. }
  263. func getListObjectsV2Args(values url.Values) (prefix, token, startAfter, delimiter string, fetchOwner bool, maxkeys int) {
  264. prefix = values.Get("prefix")
  265. token = values.Get("continuation-token")
  266. startAfter = values.Get("start-after")
  267. delimiter = values.Get("delimiter")
  268. if values.Get("max-keys") != "" {
  269. maxkeys, _ = strconv.Atoi(values.Get("max-keys"))
  270. } else {
  271. maxkeys = maxObjectListSizeLimit
  272. }
  273. fetchOwner = values.Get("fetch-owner") == "true"
  274. return
  275. }
  276. func getListObjectsV1Args(values url.Values) (prefix, marker, delimiter string, maxkeys int) {
  277. prefix = values.Get("prefix")
  278. marker = values.Get("marker")
  279. delimiter = values.Get("delimiter")
  280. if values.Get("max-keys") != "" {
  281. maxkeys, _ = strconv.Atoi(values.Get("max-keys"))
  282. } else {
  283. maxkeys = maxObjectListSizeLimit
  284. }
  285. return
  286. }
  287. func (s3a *S3ApiServer) isDirectoryAllEmpty(filerClient filer_pb.SeaweedFilerClient, parentDir, name string) (isEmpty bool, err error) {
  288. // println("+ isDirectoryAllEmpty", dir, name)
  289. glog.V(4).Infof("+ isEmpty %s/%s", parentDir, name)
  290. defer glog.V(4).Infof("- isEmpty %s/%s %v", parentDir, name, isEmpty)
  291. var fileCounter int
  292. var subDirs []string
  293. currentDir := parentDir + "/" + name
  294. var startFrom string
  295. var isExhausted bool
  296. var foundEntry bool
  297. for fileCounter == 0 && !isExhausted && err == nil {
  298. err = filer_pb.SeaweedList(filerClient, currentDir, "", func(entry *filer_pb.Entry, isLast bool) error {
  299. foundEntry = true
  300. if entry.IsDirectory {
  301. subDirs = append(subDirs, entry.Name)
  302. } else {
  303. fileCounter++
  304. }
  305. startFrom = entry.Name
  306. isExhausted = isExhausted || isLast
  307. glog.V(4).Infof(" * %s/%s isLast: %t", currentDir, startFrom, isLast)
  308. return nil
  309. }, startFrom, false, 8)
  310. if !foundEntry {
  311. break
  312. }
  313. }
  314. if err != nil {
  315. return false, err
  316. }
  317. if fileCounter > 0 {
  318. return false, nil
  319. }
  320. for _, subDir := range subDirs {
  321. isSubEmpty, subErr := s3a.isDirectoryAllEmpty(filerClient, currentDir, subDir)
  322. if subErr != nil {
  323. return false, subErr
  324. }
  325. if !isSubEmpty {
  326. return false, nil
  327. }
  328. }
  329. glog.V(1).Infof("deleting empty folder %s", currentDir)
  330. if err = doDeleteEntry(filerClient, parentDir, name, true, true); err != nil {
  331. return
  332. }
  333. return true, nil
  334. }