You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

421 lines
13 KiB

7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
4 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
5 years ago
4 years ago
4 years ago
  1. package s3api
  2. import (
  3. "context"
  4. "encoding/xml"
  5. "fmt"
  6. "github.com/chrislusf/seaweedfs/weed/glog"
  7. "github.com/chrislusf/seaweedfs/weed/s3api/s3_constants"
  8. "io"
  9. "net/http"
  10. "net/url"
  11. "path/filepath"
  12. "strconv"
  13. "strings"
  14. "time"
  15. "github.com/chrislusf/seaweedfs/weed/filer"
  16. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  17. "github.com/chrislusf/seaweedfs/weed/s3api/s3err"
  18. )
  19. type ListBucketResultV2 struct {
  20. XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ ListBucketResult"`
  21. Name string `xml:"Name"`
  22. Prefix string `xml:"Prefix"`
  23. MaxKeys int `xml:"MaxKeys"`
  24. Delimiter string `xml:"Delimiter,omitempty"`
  25. IsTruncated bool `xml:"IsTruncated"`
  26. Contents []ListEntry `xml:"Contents,omitempty"`
  27. CommonPrefixes []PrefixEntry `xml:"CommonPrefixes,omitempty"`
  28. ContinuationToken string `xml:"ContinuationToken,omitempty"`
  29. NextContinuationToken string `xml:"NextContinuationToken,omitempty"`
  30. KeyCount int `xml:"KeyCount"`
  31. StartAfter string `xml:"StartAfter,omitempty"`
  32. }
  33. func (s3a *S3ApiServer) ListObjectsV2Handler(w http.ResponseWriter, r *http.Request) {
  34. // https://docs.aws.amazon.com/AmazonS3/latest/API/v2-RESTBucketGET.html
  35. // collect parameters
  36. bucket, _ := s3_constants.GetBucketAndObject(r)
  37. glog.V(3).Infof("ListObjectsV2Handler %s", bucket)
  38. originalPrefix, continuationToken, startAfter, delimiter, _, maxKeys := getListObjectsV2Args(r.URL.Query())
  39. if maxKeys < 0 {
  40. s3err.WriteErrorResponse(w, r, s3err.ErrInvalidMaxKeys)
  41. return
  42. }
  43. if delimiter != "" && delimiter != "/" {
  44. s3err.WriteErrorResponse(w, r, s3err.ErrNotImplemented)
  45. return
  46. }
  47. marker := continuationToken
  48. if continuationToken == "" {
  49. marker = startAfter
  50. }
  51. response, err := s3a.listFilerEntries(bucket, originalPrefix, maxKeys, marker, delimiter)
  52. if err != nil {
  53. s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
  54. return
  55. }
  56. if len(response.Contents) == 0 {
  57. if exists, existErr := s3a.exists(s3a.option.BucketsPath, bucket, true); existErr == nil && !exists {
  58. s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchBucket)
  59. return
  60. }
  61. }
  62. responseV2 := &ListBucketResultV2{
  63. XMLName: response.XMLName,
  64. Name: response.Name,
  65. CommonPrefixes: response.CommonPrefixes,
  66. Contents: response.Contents,
  67. ContinuationToken: continuationToken,
  68. Delimiter: response.Delimiter,
  69. IsTruncated: response.IsTruncated,
  70. KeyCount: len(response.Contents) + len(response.CommonPrefixes),
  71. MaxKeys: response.MaxKeys,
  72. NextContinuationToken: response.NextMarker,
  73. Prefix: response.Prefix,
  74. StartAfter: startAfter,
  75. }
  76. writeSuccessResponseXML(w, r, responseV2)
  77. }
  78. func (s3a *S3ApiServer) ListObjectsV1Handler(w http.ResponseWriter, r *http.Request) {
  79. // https://docs.aws.amazon.com/AmazonS3/latest/API/RESTBucketGET.html
  80. // collect parameters
  81. bucket, _ := s3_constants.GetBucketAndObject(r)
  82. glog.V(3).Infof("ListObjectsV1Handler %s", bucket)
  83. originalPrefix, marker, delimiter, maxKeys := getListObjectsV1Args(r.URL.Query())
  84. if maxKeys < 0 {
  85. s3err.WriteErrorResponse(w, r, s3err.ErrInvalidMaxKeys)
  86. return
  87. }
  88. if delimiter != "" && delimiter != "/" {
  89. s3err.WriteErrorResponse(w, r, s3err.ErrNotImplemented)
  90. return
  91. }
  92. response, err := s3a.listFilerEntries(bucket, originalPrefix, maxKeys, marker, delimiter)
  93. if err != nil {
  94. s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
  95. return
  96. }
  97. if len(response.Contents) == 0 {
  98. if exists, existErr := s3a.exists(s3a.option.BucketsPath, bucket, true); existErr == nil && !exists {
  99. s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchBucket)
  100. return
  101. }
  102. }
  103. writeSuccessResponseXML(w, r, response)
  104. }
  105. func (s3a *S3ApiServer) listFilerEntries(bucket string, originalPrefix string, maxKeys int, marker string, delimiter string) (response ListBucketResult, err error) {
  106. // convert full path prefix into directory name and prefix for entry name
  107. reqDir, prefix := filepath.Split(originalPrefix)
  108. if strings.HasPrefix(reqDir, "/") {
  109. reqDir = reqDir[1:]
  110. }
  111. bucketPrefix := fmt.Sprintf("%s/%s/", s3a.option.BucketsPath, bucket)
  112. reqDir = fmt.Sprintf("%s%s", bucketPrefix, reqDir)
  113. if strings.HasSuffix(reqDir, "/") {
  114. reqDir = strings.TrimSuffix(reqDir, "/")
  115. }
  116. var contents []ListEntry
  117. var commonPrefixes []PrefixEntry
  118. var isTruncated bool
  119. var doErr error
  120. var nextMarker string
  121. // check filer
  122. err = s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
  123. _, isTruncated, nextMarker, doErr = s3a.doListFilerEntries(client, reqDir, prefix, maxKeys, marker, delimiter, false, func(dir string, entry *filer_pb.Entry) {
  124. if entry.IsDirectory {
  125. if delimiter == "/" {
  126. commonPrefixes = append(commonPrefixes, PrefixEntry{
  127. Prefix: fmt.Sprintf("%s/%s/", dir, entry.Name)[len(bucketPrefix):],
  128. })
  129. }
  130. if entry.Attributes.Mime == "" || !strings.HasSuffix(entry.Name, "/") {
  131. return
  132. }
  133. }
  134. storageClass := "STANDARD"
  135. if v, ok := entry.Extended[s3_constants.AmzStorageClass]; ok {
  136. storageClass = string(v)
  137. }
  138. contents = append(contents, ListEntry{
  139. Key: fmt.Sprintf("%s/%s", dir, entry.Name)[len(bucketPrefix):],
  140. LastModified: time.Unix(entry.Attributes.Mtime, 0).UTC(),
  141. ETag: "\"" + filer.ETag(entry) + "\"",
  142. Size: int64(filer.FileSize(entry)),
  143. Owner: CanonicalUser{
  144. ID: fmt.Sprintf("%x", entry.Attributes.Uid),
  145. DisplayName: entry.Attributes.UserName,
  146. },
  147. StorageClass: StorageClass(storageClass),
  148. })
  149. })
  150. if doErr != nil {
  151. return doErr
  152. }
  153. if !isTruncated {
  154. nextMarker = ""
  155. }
  156. if len(contents) == 0 && len(commonPrefixes) == 0 && maxKeys > 0 {
  157. if strings.HasSuffix(originalPrefix, "/") && prefix == "" {
  158. reqDir, prefix = filepath.Split(strings.TrimSuffix(reqDir, "/"))
  159. reqDir = strings.TrimSuffix(reqDir, "/")
  160. }
  161. _, _, _, doErr = s3a.doListFilerEntries(client, reqDir, prefix, 1, prefix, delimiter, true, func(dir string, entry *filer_pb.Entry) {
  162. if entry.IsDirectory && entry.Attributes.Mime != "" && entry.Name == prefix {
  163. storageClass := "STANDARD"
  164. if v, ok := entry.Extended[s3_constants.AmzStorageClass]; ok {
  165. storageClass = string(v)
  166. }
  167. contents = append(contents, ListEntry{
  168. Key: fmt.Sprintf("%s/%s/", dir, entry.Name)[len(bucketPrefix):],
  169. LastModified: time.Unix(entry.Attributes.Mtime, 0).UTC(),
  170. ETag: "\"" + fmt.Sprintf("%x", entry.Attributes.Md5) + "\"",
  171. Size: int64(filer.FileSize(entry)),
  172. Owner: CanonicalUser{
  173. ID: fmt.Sprintf("%x", entry.Attributes.Uid),
  174. DisplayName: entry.Attributes.UserName,
  175. },
  176. StorageClass: StorageClass(storageClass),
  177. })
  178. }
  179. })
  180. if doErr != nil {
  181. return doErr
  182. }
  183. }
  184. response = ListBucketResult{
  185. Name: bucket,
  186. Prefix: originalPrefix,
  187. Marker: marker,
  188. NextMarker: nextMarker,
  189. MaxKeys: maxKeys,
  190. Delimiter: delimiter,
  191. IsTruncated: isTruncated,
  192. Contents: contents,
  193. CommonPrefixes: commonPrefixes,
  194. }
  195. return nil
  196. })
  197. return
  198. }
  199. func (s3a *S3ApiServer) doListFilerEntries(client filer_pb.SeaweedFilerClient, dir, prefix string, maxKeys int, marker, delimiter string, inclusiveStartFrom bool, eachEntryFn func(dir string, entry *filer_pb.Entry)) (counter int, isTruncated bool, nextMarker string, err error) {
  200. // invariants
  201. // prefix and marker should be under dir, marker may contain "/"
  202. // maxKeys should be updated for each recursion
  203. if prefix == "/" && delimiter == "/" {
  204. return
  205. }
  206. if maxKeys <= 0 {
  207. return
  208. }
  209. if strings.Contains(marker, "/") {
  210. sepIndex := strings.Index(marker, "/")
  211. subDir, subMarker := marker[0:sepIndex], marker[sepIndex+1:]
  212. glog.V(4).Infoln("doListFilerEntries dir", dir+"/"+subDir, "subMarker", subMarker, "maxKeys", maxKeys)
  213. subCounter, subIsTruncated, subNextMarker, subErr := s3a.doListFilerEntries(client, dir+"/"+subDir, "", maxKeys, subMarker, delimiter, false, eachEntryFn)
  214. if subErr != nil {
  215. err = subErr
  216. return
  217. }
  218. counter += subCounter
  219. isTruncated = isTruncated || subIsTruncated
  220. maxKeys -= subCounter
  221. nextMarker = subDir + "/" + subNextMarker
  222. // finished processing this sub directory
  223. marker = subDir
  224. }
  225. if maxKeys <= 0 {
  226. return
  227. }
  228. // now marker is also a direct child of dir
  229. request := &filer_pb.ListEntriesRequest{
  230. Directory: dir,
  231. Prefix: prefix,
  232. Limit: uint32(maxKeys + 1),
  233. StartFromFileName: marker,
  234. InclusiveStartFrom: inclusiveStartFrom,
  235. }
  236. ctx, cancel := context.WithCancel(context.Background())
  237. defer cancel()
  238. stream, listErr := client.ListEntries(ctx, request)
  239. if listErr != nil {
  240. err = fmt.Errorf("list entires %+v: %v", request, listErr)
  241. return
  242. }
  243. for {
  244. resp, recvErr := stream.Recv()
  245. if recvErr != nil {
  246. if recvErr == io.EOF {
  247. break
  248. } else {
  249. err = fmt.Errorf("iterating entires %+v: %v", request, recvErr)
  250. return
  251. }
  252. }
  253. if counter >= maxKeys {
  254. isTruncated = true
  255. return
  256. }
  257. entry := resp.Entry
  258. nextMarker = entry.Name
  259. if entry.IsDirectory {
  260. // println("ListEntries", dir, "dir:", entry.Name)
  261. if entry.Name == ".uploads" { // FIXME no need to apply to all directories. this extra also affects maxKeys
  262. continue
  263. }
  264. if delimiter != "/" {
  265. eachEntryFn(dir, entry)
  266. // println("doListFilerEntries2 dir", dir+"/"+entry.Name, "maxKeys", maxKeys-counter)
  267. subCounter, subIsTruncated, subNextMarker, subErr := s3a.doListFilerEntries(client, dir+"/"+entry.Name, "", maxKeys-counter, "", delimiter, false, eachEntryFn)
  268. if subErr != nil {
  269. err = fmt.Errorf("doListFilerEntries2: %v", subErr)
  270. return
  271. }
  272. // println("doListFilerEntries2 dir", dir+"/"+entry.Name, "maxKeys", maxKeys-counter, "subCounter", subCounter, "subNextMarker", subNextMarker, "subIsTruncated", subIsTruncated)
  273. if subCounter == 0 && entry.Attributes.Mime != "" {
  274. entry.Name += "/"
  275. eachEntryFn(dir, entry)
  276. counter++
  277. }
  278. counter += subCounter
  279. nextMarker = entry.Name + "/" + subNextMarker
  280. if subIsTruncated {
  281. isTruncated = true
  282. return
  283. }
  284. } else {
  285. var isEmpty bool
  286. if !s3a.option.AllowEmptyFolder && entry.Attributes.Mime == "" {
  287. if isEmpty, err = s3a.isDirectoryAllEmpty(client, dir, entry.Name); err != nil {
  288. glog.Errorf("check empty folder %s: %v", dir, err)
  289. }
  290. }
  291. if !isEmpty {
  292. eachEntryFn(dir, entry)
  293. counter++
  294. }
  295. }
  296. } else {
  297. // println("ListEntries", dir, "file:", entry.Name)
  298. eachEntryFn(dir, entry)
  299. counter++
  300. }
  301. }
  302. return
  303. }
  304. func getListObjectsV2Args(values url.Values) (prefix, token, startAfter, delimiter string, fetchOwner bool, maxkeys int) {
  305. prefix = values.Get("prefix")
  306. token = values.Get("continuation-token")
  307. startAfter = values.Get("start-after")
  308. delimiter = values.Get("delimiter")
  309. if values.Get("max-keys") != "" {
  310. maxkeys, _ = strconv.Atoi(values.Get("max-keys"))
  311. } else {
  312. maxkeys = maxObjectListSizeLimit
  313. }
  314. fetchOwner = values.Get("fetch-owner") == "true"
  315. return
  316. }
  317. func getListObjectsV1Args(values url.Values) (prefix, marker, delimiter string, maxkeys int) {
  318. prefix = values.Get("prefix")
  319. marker = values.Get("marker")
  320. delimiter = values.Get("delimiter")
  321. if values.Get("max-keys") != "" {
  322. maxkeys, _ = strconv.Atoi(values.Get("max-keys"))
  323. } else {
  324. maxkeys = maxObjectListSizeLimit
  325. }
  326. return
  327. }
  328. func (s3a *S3ApiServer) isDirectoryAllEmpty(filerClient filer_pb.SeaweedFilerClient, parentDir, name string) (isEmpty bool, err error) {
  329. // println("+ isDirectoryAllEmpty", dir, name)
  330. glog.V(4).Infof("+ isEmpty %s/%s", parentDir, name)
  331. defer glog.V(4).Infof("- isEmpty %s/%s %v", parentDir, name, isEmpty)
  332. var fileCounter int
  333. var subDirs []string
  334. currentDir := parentDir + "/" + name
  335. var startFrom string
  336. var isExhausted bool
  337. var foundEntry bool
  338. for fileCounter == 0 && !isExhausted && err == nil {
  339. err = filer_pb.SeaweedList(filerClient, currentDir, "", func(entry *filer_pb.Entry, isLast bool) error {
  340. foundEntry = true
  341. if entry.IsDirectory {
  342. subDirs = append(subDirs, entry.Name)
  343. } else {
  344. fileCounter++
  345. }
  346. startFrom = entry.Name
  347. isExhausted = isExhausted || isLast
  348. glog.V(4).Infof(" * %s/%s isLast: %t", currentDir, startFrom, isLast)
  349. return nil
  350. }, startFrom, false, 8)
  351. if !foundEntry {
  352. break
  353. }
  354. }
  355. if err != nil {
  356. return false, err
  357. }
  358. if fileCounter > 0 {
  359. return false, nil
  360. }
  361. for _, subDir := range subDirs {
  362. isSubEmpty, subErr := s3a.isDirectoryAllEmpty(filerClient, currentDir, subDir)
  363. if subErr != nil {
  364. return false, subErr
  365. }
  366. if !isSubEmpty {
  367. return false, nil
  368. }
  369. }
  370. glog.V(1).Infof("deleting empty folder %s", currentDir)
  371. if err = doDeleteEntry(filerClient, parentDir, name, true, true); err != nil {
  372. return
  373. }
  374. return true, nil
  375. }