Contains the Concourse pipeline definition for building a line-server container
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

211 lines
4.6 KiB

  1. package s3
  2. import (
  3. "io"
  4. "io/ioutil"
  5. "os"
  6. "strconv"
  7. "time"
  8. "github.com/andreimarcu/linx-server/backends"
  9. "github.com/andreimarcu/linx-server/helpers"
  10. "github.com/andreimarcu/linx-server/torrent"
  11. "github.com/aws/aws-sdk-go/aws"
  12. "github.com/aws/aws-sdk-go/aws/session"
  13. "github.com/aws/aws-sdk-go/service/s3"
  14. "github.com/aws/aws-sdk-go/service/s3/s3manager"
  15. "github.com/zeebo/bencode"
  16. )
  17. type S3Backend struct {
  18. bucket string
  19. svc *s3.S3
  20. }
  21. func (b S3Backend) Delete(key string) error {
  22. input := &s3.DeleteObjectInput{
  23. Bucket: aws.String(b.bucket),
  24. Key: aws.String(key),
  25. }
  26. _, err := b.svc.DeleteObject(input)
  27. if err != nil {
  28. return err
  29. }
  30. return nil
  31. }
  32. func (b S3Backend) Exists(key string) (bool, error) {
  33. input := &s3.HeadObjectInput{
  34. Bucket: aws.String(b.bucket),
  35. Key: aws.String(key),
  36. }
  37. _, err := b.svc.HeadObject(input)
  38. return err == nil, err
  39. }
  40. func (b S3Backend) Head(key string) (metadata backends.Metadata, err error) {
  41. input := &s3.HeadObjectInput{
  42. Bucket: aws.String(b.bucket),
  43. Key: aws.String(key),
  44. }
  45. result, err := b.svc.HeadObject(input)
  46. if err != nil {
  47. return
  48. }
  49. metadata, err = unmapMetadata(result.Metadata)
  50. return
  51. }
  52. func (b S3Backend) Get(key string) (metadata backends.Metadata, r io.ReadCloser, err error) {
  53. input := &s3.GetObjectInput{
  54. Bucket: aws.String(b.bucket),
  55. Key: aws.String(key),
  56. }
  57. result, err := b.svc.GetObject(input)
  58. if err != nil {
  59. return
  60. }
  61. metadata, err = unmapMetadata(result.Metadata)
  62. r = result.Body
  63. return
  64. }
  65. func mapMetadata(m backends.Metadata) map[string]*string {
  66. return map[string]*string{
  67. "Expiry": aws.String(strconv.FormatInt(m.Expiry.Unix(), 10)),
  68. "Delete_key": aws.String(m.DeleteKey),
  69. "Size": aws.String(strconv.FormatInt(m.Size, 10)),
  70. "Mimetype": aws.String(m.Mimetype),
  71. "Sha256sum": aws.String(m.Sha256sum),
  72. }
  73. }
  74. func unmapMetadata(input map[string]*string) (m backends.Metadata, err error) {
  75. expiry, err := strconv.ParseInt(aws.StringValue(input["Expiry"]), 10, 64)
  76. if err != nil {
  77. return
  78. }
  79. m.Expiry = time.Unix(expiry, 0)
  80. m.Size, err = strconv.ParseInt(aws.StringValue(input["Size"]), 10, 64)
  81. if err != nil {
  82. return
  83. }
  84. m.DeleteKey = aws.StringValue(input["Delete_key"])
  85. m.Mimetype = aws.StringValue(input["Mimetype"])
  86. m.Sha256sum = aws.StringValue(input["Sha256sum"])
  87. return
  88. }
  89. func (b S3Backend) Put(key string, r io.Reader, expiry time.Time, deleteKey string) (m backends.Metadata, err error) {
  90. tmpDst, err := ioutil.TempFile("", "linx-server-upload")
  91. if err != nil {
  92. return
  93. }
  94. defer tmpDst.Close()
  95. defer os.Remove(tmpDst.Name())
  96. bytes, err := io.Copy(tmpDst, r)
  97. if bytes == 0 {
  98. return m, backends.FileEmptyError
  99. } else if err != nil {
  100. return m, err
  101. }
  102. m.Expiry = expiry
  103. m.DeleteKey = deleteKey
  104. m.Size = bytes
  105. m.Mimetype, _ = helpers.DetectMime(tmpDst)
  106. m.Sha256sum, _ = helpers.Sha256sum(tmpDst)
  107. // XXX: we may not be able to write this to AWS easily
  108. //m.ArchiveFiles, _ = helpers.ListArchiveFiles(m.Mimetype, m.Size, tmpDst)
  109. uploader := s3manager.NewUploaderWithClient(b.svc)
  110. input := &s3manager.UploadInput{
  111. Bucket: aws.String(b.bucket),
  112. Key: aws.String(key),
  113. Body: tmpDst,
  114. Metadata: mapMetadata(m),
  115. }
  116. _, err = uploader.Upload(input)
  117. if err != nil {
  118. return
  119. }
  120. return
  121. }
  122. func (b S3Backend) Size(key string) (int64, error) {
  123. input := &s3.HeadObjectInput{
  124. Bucket: aws.String(b.bucket),
  125. Key: aws.String(key),
  126. }
  127. result, err := b.svc.HeadObject(input)
  128. if err != nil {
  129. return 0, err
  130. }
  131. return *result.ContentLength, nil
  132. }
  133. func (b S3Backend) GetTorrent(fileName string, url string) (t torrent.Torrent, err error) {
  134. input := &s3.GetObjectTorrentInput{
  135. Bucket: aws.String(b.bucket),
  136. Key: aws.String(fileName),
  137. }
  138. result, err := b.svc.GetObjectTorrent(input)
  139. if err != nil {
  140. return
  141. }
  142. defer result.Body.Close()
  143. data, err := ioutil.ReadAll(result.Body)
  144. if err != nil {
  145. return
  146. }
  147. err = bencode.DecodeBytes(data, &t)
  148. if err != nil {
  149. return
  150. }
  151. t.Info.Name = fileName
  152. t.UrlList = []string{url}
  153. return
  154. }
  155. func (b S3Backend) List() ([]string, error) {
  156. var output []string
  157. input := &s3.ListObjectsInput{
  158. Bucket: aws.String(b.bucket),
  159. }
  160. results, err := b.svc.ListObjects(input)
  161. if err != nil {
  162. return nil, err
  163. }
  164. for _, object := range results.Contents {
  165. output = append(output, *object.Key)
  166. }
  167. return output, nil
  168. }
  169. func NewS3Backend(bucket string, region string, endpoint string) S3Backend {
  170. awsConfig := &aws.Config{}
  171. if region != "" {
  172. awsConfig.Region = aws.String(region)
  173. }
  174. if endpoint != "" {
  175. awsConfig.Endpoint = aws.String(endpoint)
  176. }
  177. sess := session.Must(session.NewSession(awsConfig))
  178. svc := s3.New(sess)
  179. return S3Backend{bucket: bucket, svc: svc}
  180. }