Contains the Concourse pipeline definition for building a line-server container
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

220 lines
5.0 KiB

  1. package s3
  2. import (
  3. "io"
  4. "io/ioutil"
  5. "os"
  6. "strconv"
  7. "time"
  8. "github.com/andreimarcu/linx-server/backends"
  9. "github.com/andreimarcu/linx-server/helpers"
  10. "github.com/aws/aws-sdk-go/aws"
  11. "github.com/aws/aws-sdk-go/aws/awserr"
  12. "github.com/aws/aws-sdk-go/aws/session"
  13. "github.com/aws/aws-sdk-go/service/s3"
  14. "github.com/aws/aws-sdk-go/service/s3/s3manager"
  15. )
  16. type S3Backend struct {
  17. bucket string
  18. svc *s3.S3
  19. }
  20. func (b S3Backend) Delete(key string) error {
  21. _, err := b.svc.DeleteObject(&s3.DeleteObjectInput{
  22. Bucket: aws.String(b.bucket),
  23. Key: aws.String(key),
  24. })
  25. if err != nil {
  26. return err
  27. }
  28. return nil
  29. }
  30. func (b S3Backend) Exists(key string) (bool, error) {
  31. _, err := b.svc.HeadObject(&s3.HeadObjectInput{
  32. Bucket: aws.String(b.bucket),
  33. Key: aws.String(key),
  34. })
  35. return err == nil, err
  36. }
  37. func (b S3Backend) Head(key string) (metadata backends.Metadata, err error) {
  38. var result *s3.HeadObjectOutput
  39. result, err = b.svc.HeadObject(&s3.HeadObjectInput{
  40. Bucket: aws.String(b.bucket),
  41. Key: aws.String(key),
  42. })
  43. if err != nil {
  44. if aerr, ok := err.(awserr.Error); ok {
  45. if aerr.Code() == s3.ErrCodeNoSuchKey || aerr.Code() == "NotFound" {
  46. err = backends.NotFoundErr
  47. }
  48. }
  49. return
  50. }
  51. metadata, err = unmapMetadata(result.Metadata)
  52. return
  53. }
  54. func (b S3Backend) Get(key string) (metadata backends.Metadata, r io.ReadCloser, err error) {
  55. var result *s3.GetObjectOutput
  56. result, err = b.svc.GetObject(&s3.GetObjectInput{
  57. Bucket: aws.String(b.bucket),
  58. Key: aws.String(key),
  59. })
  60. if err != nil {
  61. if aerr, ok := err.(awserr.Error); ok {
  62. if aerr.Code() == s3.ErrCodeNoSuchKey || aerr.Code() == "NotFound" {
  63. err = backends.NotFoundErr
  64. }
  65. }
  66. return
  67. }
  68. metadata, err = unmapMetadata(result.Metadata)
  69. r = result.Body
  70. return
  71. }
  72. func mapMetadata(m backends.Metadata) map[string]*string {
  73. return map[string]*string{
  74. "Expiry": aws.String(strconv.FormatInt(m.Expiry.Unix(), 10)),
  75. "Deletekey": aws.String(m.DeleteKey),
  76. "Size": aws.String(strconv.FormatInt(m.Size, 10)),
  77. "Mimetype": aws.String(m.Mimetype),
  78. "Sha256sum": aws.String(m.Sha256sum),
  79. }
  80. }
  81. func unmapMetadata(input map[string]*string) (m backends.Metadata, err error) {
  82. expiry, err := strconv.ParseInt(aws.StringValue(input["Expiry"]), 10, 64)
  83. if err != nil {
  84. return m, err
  85. }
  86. m.Expiry = time.Unix(expiry, 0)
  87. m.Size, err = strconv.ParseInt(aws.StringValue(input["Size"]), 10, 64)
  88. if err != nil {
  89. return
  90. }
  91. m.DeleteKey = aws.StringValue(input["Deletekey"])
  92. m.Mimetype = aws.StringValue(input["Mimetype"])
  93. m.Sha256sum = aws.StringValue(input["Sha256sum"])
  94. return
  95. }
  96. func (b S3Backend) Put(key string, r io.Reader, expiry time.Time, deleteKey string) (m backends.Metadata, err error) {
  97. tmpDst, err := ioutil.TempFile("", "linx-server-upload")
  98. if err != nil {
  99. return m, err
  100. }
  101. defer tmpDst.Close()
  102. defer os.Remove(tmpDst.Name())
  103. bytes, err := io.Copy(tmpDst, r)
  104. if bytes == 0 {
  105. return m, backends.FileEmptyError
  106. } else if err != nil {
  107. return m, err
  108. }
  109. _, err = tmpDst.Seek(0, 0)
  110. if err != nil {
  111. return m, err
  112. }
  113. m, err = helpers.GenerateMetadata(tmpDst)
  114. if err != nil {
  115. return
  116. }
  117. m.Expiry = expiry
  118. m.DeleteKey = deleteKey
  119. // XXX: we may not be able to write this to AWS easily
  120. //m.ArchiveFiles, _ = helpers.ListArchiveFiles(m.Mimetype, m.Size, tmpDst)
  121. _, err = tmpDst.Seek(0, 0)
  122. if err != nil {
  123. return m, err
  124. }
  125. uploader := s3manager.NewUploaderWithClient(b.svc)
  126. input := &s3manager.UploadInput{
  127. Bucket: aws.String(b.bucket),
  128. Key: aws.String(key),
  129. Body: tmpDst,
  130. Metadata: mapMetadata(m),
  131. }
  132. _, err = uploader.Upload(input)
  133. if err != nil {
  134. return
  135. }
  136. return
  137. }
  138. func (b S3Backend) PutMetadata(key string, m backends.Metadata) (err error) {
  139. _, err = b.svc.CopyObject(&s3.CopyObjectInput{
  140. Bucket: aws.String(b.bucket),
  141. Key: aws.String(key),
  142. CopySource: aws.String("/" + b.bucket + "/" + key),
  143. Metadata: mapMetadata(m),
  144. MetadataDirective: aws.String("REPLACE"),
  145. })
  146. if err != nil {
  147. return
  148. }
  149. return
  150. }
  151. func (b S3Backend) Size(key string) (int64, error) {
  152. input := &s3.HeadObjectInput{
  153. Bucket: aws.String(b.bucket),
  154. Key: aws.String(key),
  155. }
  156. result, err := b.svc.HeadObject(input)
  157. if err != nil {
  158. return 0, err
  159. }
  160. return *result.ContentLength, nil
  161. }
  162. func (b S3Backend) List() ([]string, error) {
  163. var output []string
  164. input := &s3.ListObjectsInput{
  165. Bucket: aws.String(b.bucket),
  166. }
  167. results, err := b.svc.ListObjects(input)
  168. if err != nil {
  169. return nil, err
  170. }
  171. for _, object := range results.Contents {
  172. output = append(output, *object.Key)
  173. }
  174. return output, nil
  175. }
  176. func NewS3Backend(bucket string, region string, endpoint string, forcePathStyle bool) S3Backend {
  177. awsConfig := &aws.Config{}
  178. if region != "" {
  179. awsConfig.Region = aws.String(region)
  180. }
  181. if endpoint != "" {
  182. awsConfig.Endpoint = aws.String(endpoint)
  183. }
  184. if forcePathStyle == true {
  185. awsConfig.S3ForcePathStyle = aws.Bool(true)
  186. }
  187. sess := session.Must(session.NewSession(awsConfig))
  188. svc := s3.New(sess)
  189. return S3Backend{bucket: bucket, svc: svc}
  190. }