Contains the Concourse pipeline definition for building a line-server container
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

212 lines
4.5 KiB

  1. package s3
  2. import (
  3. "io"
  4. "io/ioutil"
  5. "os"
  6. "path"
  7. "strconv"
  8. "time"
  9. "github.com/andreimarcu/linx-server/backends"
  10. "github.com/andreimarcu/linx-server/helpers"
  11. "github.com/andreimarcu/linx-server/torrent"
  12. "github.com/aws/aws-sdk-go/aws"
  13. "github.com/aws/aws-sdk-go/aws/session"
  14. "github.com/aws/aws-sdk-go/service/s3"
  15. "github.com/aws/aws-sdk-go/service/s3/s3manager"
  16. "github.com/zeebo/bencode"
  17. )
  18. type S3Backend struct {
  19. bucket string
  20. svc *s3.S3
  21. }
  22. func (b S3Backend) Delete(key string) error {
  23. input := &s3.DeleteObjectInput{
  24. Bucket: aws.String(b.bucket),
  25. Key: aws.String(key),
  26. }
  27. _, err := b.svc.DeleteObject(input)
  28. if err != nil {
  29. return err
  30. }
  31. return os.Remove(path.Join(b.bucket, key))
  32. }
  33. func (b S3Backend) Exists(key string) (bool, error) {
  34. input := &s3.HeadObjectInput{
  35. Bucket: aws.String(b.bucket),
  36. Key: aws.String(key),
  37. }
  38. _, err := b.svc.HeadObject(input)
  39. return err == nil, err
  40. }
  41. func (b S3Backend) Head(key string) (metadata backends.Metadata, err error) {
  42. input := &s3.HeadObjectInput{
  43. Bucket: aws.String(b.bucket),
  44. Key: aws.String(key),
  45. }
  46. result, err := b.svc.HeadObject(input)
  47. if err != nil {
  48. return
  49. }
  50. metadata, err = unmapMetadata(result.Metadata)
  51. return
  52. }
  53. func (b S3Backend) Get(key string) (metadata backends.Metadata, r io.ReadCloser, err error) {
  54. input := &s3.GetObjectInput{
  55. Bucket: aws.String(b.bucket),
  56. Key: aws.String(key),
  57. }
  58. result, err := b.svc.GetObject(input)
  59. if err != nil {
  60. return
  61. }
  62. metadata, err = unmapMetadata(result.Metadata)
  63. r = result.Body
  64. return
  65. }
  66. func mapMetadata(m backends.Metadata) map[string]*string {
  67. return map[string]*string{
  68. "expiry": aws.String(strconv.FormatInt(m.Expiry.Unix(), 10)),
  69. "delete_key": aws.String(m.DeleteKey),
  70. "size": aws.String(strconv.FormatInt(m.Size, 10)),
  71. "mimetype": aws.String(m.Mimetype),
  72. "sha256sum": aws.String(m.Sha256sum),
  73. }
  74. }
  75. func unmapMetadata(input map[string]*string) (m backends.Metadata, err error) {
  76. expiry, err := strconv.ParseInt(*input["expiry"], 10, 64)
  77. if err != nil {
  78. return
  79. }
  80. m.Expiry = time.Unix(expiry, 0)
  81. m.Size, err = strconv.ParseInt(*input["size"], 10, 64)
  82. if err != nil {
  83. return
  84. }
  85. m.DeleteKey = *input["delete_key"]
  86. m.Mimetype = *input["mimetype"]
  87. m.Sha256sum = *input["sha256sum"]
  88. return
  89. }
  90. func (b S3Backend) Put(key string, r io.Reader, expiry time.Time, deleteKey string) (m backends.Metadata, err error) {
  91. tmpDst, err := ioutil.TempFile("", "linx-server-upload")
  92. if err != nil {
  93. return
  94. }
  95. defer tmpDst.Close()
  96. defer os.Remove(tmpDst.Name())
  97. bytes, err := io.Copy(tmpDst, r)
  98. if bytes == 0 {
  99. return m, backends.FileEmptyError
  100. } else if err != nil {
  101. return m, err
  102. }
  103. m.Expiry = expiry
  104. m.DeleteKey = deleteKey
  105. m.Size = bytes
  106. m.Mimetype, _ = helpers.DetectMime(tmpDst)
  107. m.Sha256sum, _ = helpers.Sha256sum(tmpDst)
  108. // XXX: we may not be able to write this to AWS easily
  109. //m.ArchiveFiles, _ = helpers.ListArchiveFiles(m.Mimetype, m.Size, tmpDst)
  110. uploader := s3manager.NewUploaderWithClient(b.svc)
  111. input := &s3manager.UploadInput{
  112. Bucket: aws.String(b.bucket),
  113. Key: aws.String(key),
  114. Body: tmpDst,
  115. Metadata: mapMetadata(m),
  116. }
  117. _, err = uploader.Upload(input)
  118. if err != nil {
  119. return
  120. }
  121. return
  122. }
  123. func (b S3Backend) Size(key string) (int64, error) {
  124. input := &s3.HeadObjectInput{
  125. Bucket: aws.String(b.bucket),
  126. Key: aws.String(key),
  127. }
  128. result, err := b.svc.HeadObject(input)
  129. if err != nil {
  130. return 0, err
  131. }
  132. return *result.ContentLength, nil
  133. }
  134. func (b S3Backend) GetTorrent(fileName string, url string) (t torrent.Torrent, err error) {
  135. input := &s3.GetObjectTorrentInput{
  136. Bucket: aws.String(b.bucket),
  137. Key: aws.String(fileName),
  138. }
  139. result, err := b.svc.GetObjectTorrent(input)
  140. if err != nil {
  141. return
  142. }
  143. defer result.Body.Close()
  144. data, err := ioutil.ReadAll(result.Body)
  145. if err != nil {
  146. return
  147. }
  148. err = bencode.DecodeBytes(data, &t)
  149. if err != nil {
  150. return
  151. }
  152. t.Info.Name = fileName
  153. t.UrlList = []string{url}
  154. return
  155. }
  156. func (b S3Backend) List() ([]string, error) {
  157. var output []string
  158. input := &s3.ListObjectsInput{
  159. Bucket: aws.String(b.bucket),
  160. }
  161. results, err := b.svc.ListObjects(input)
  162. if err != nil {
  163. return nil, err
  164. }
  165. for _, object := range results.Contents {
  166. output = append(output, *object.Key)
  167. }
  168. return output, nil
  169. }
  170. func NewS3Backend(bucket string, region string, endpoint string) S3Backend {
  171. awsConfig := &aws.Config{}
  172. if region != "" {
  173. awsConfig.Region = aws.String(region)
  174. }
  175. if endpoint != "" {
  176. awsConfig.Endpoint = aws.String(endpoint)
  177. }
  178. sess := session.Must(session.NewSession(awsConfig))
  179. svc := s3.New(sess)
  180. return S3Backend{bucket: bucket, svc: svc}
  181. }