Contains the Concourse pipeline definition for building a line-server container
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

213 lines
4.6 KiB

  1. package s3
  2. import (
  3. "errors"
  4. "io"
  5. "io/ioutil"
  6. "os"
  7. "path"
  8. "strconv"
  9. "time"
  10. "github.com/andreimarcu/linx-server/backends"
  11. "github.com/andreimarcu/linx-server/helpers"
  12. "github.com/andreimarcu/linx-server/torrent"
  13. "github.com/aws/aws-sdk-go/aws"
  14. "github.com/aws/aws-sdk-go/aws/session"
  15. "github.com/aws/aws-sdk-go/service/s3"
  16. "github.com/aws/aws-sdk-go/service/s3/s3manager"
  17. "github.com/zeebo/bencode"
  18. )
  19. type S3Backend struct {
  20. bucket string
  21. svc *s3.S3
  22. }
  23. func (b S3Backend) Delete(key string) error {
  24. input := &s3.DeleteObjectInput{
  25. Bucket: aws.String(b.bucket),
  26. Key: aws.String(key),
  27. }
  28. _, err := b.svc.DeleteObject(input)
  29. if err != nil {
  30. return err
  31. }
  32. return os.Remove(path.Join(b.bucket, key))
  33. }
  34. func (b S3Backend) Exists(key string) (bool, error) {
  35. input := &s3.HeadObjectInput{
  36. Bucket: aws.String(b.bucket),
  37. Key: aws.String(key),
  38. }
  39. _, err := b.svc.HeadObject(input)
  40. return err == nil, err
  41. }
  42. func (b S3Backend) Head(key string) (metadata backends.Metadata, err error) {
  43. input := &s3.HeadObjectInput{
  44. Bucket: aws.String(b.bucket),
  45. Key: aws.String(key),
  46. }
  47. result, err := b.svc.HeadObject(input)
  48. if err != nil {
  49. return
  50. }
  51. metadata, err = unmapMetadata(result.Metadata)
  52. return
  53. }
  54. func (b S3Backend) Get(key string) (metadata backends.Metadata, r io.ReadCloser, err error) {
  55. input := &s3.GetObjectInput{
  56. Bucket: aws.String(b.bucket),
  57. Key: aws.String(key),
  58. }
  59. result, err := b.svc.GetObject(input)
  60. if err != nil {
  61. return
  62. }
  63. metadata, err = unmapMetadata(result.Metadata)
  64. r = result.Body
  65. return
  66. }
  67. func mapMetadata(m backends.Metadata) map[string]*string {
  68. return map[string]*string{
  69. "expiry": aws.String(strconv.FormatInt(m.Expiry.Unix(), 10)),
  70. "delete_key": aws.String(m.DeleteKey),
  71. "size": aws.String(strconv.FormatInt(m.Size, 10)),
  72. "mimetype": aws.String(m.Mimetype),
  73. "sha256sum": aws.String(m.Sha256sum),
  74. }
  75. }
  76. func unmapMetadata(input map[string]*string) (m backends.Metadata, err error) {
  77. expiry, err := strconv.ParseInt(*input["expiry"], 10, 64)
  78. if err != nil {
  79. return
  80. }
  81. m.Expiry = time.Unix(expiry, 0)
  82. m.Size, err = strconv.ParseInt(*input["size"], 10, 64)
  83. if err != nil {
  84. return
  85. }
  86. m.DeleteKey = *input["delete_key"]
  87. m.Mimetype = *input["mimetype"]
  88. m.Sha256sum = *input["sha256sum"]
  89. return
  90. }
  91. func (b S3Backend) Put(key string, r io.Reader, expiry time.Time, deleteKey string) (m backends.Metadata, err error) {
  92. tmpDst, err := ioutil.TempFile("", "linx-server-upload")
  93. if err != nil {
  94. return
  95. }
  96. defer tmpDst.Close()
  97. defer os.Remove(tmpDst.Name())
  98. bytes, err := io.Copy(tmpDst, r)
  99. if bytes == 0 {
  100. return m, errors.New("Empty file")
  101. } else if err != nil {
  102. return m, err
  103. }
  104. m.Expiry = expiry
  105. m.DeleteKey = deleteKey
  106. m.Size = bytes
  107. m.Mimetype, _ = helpers.DetectMime(tmpDst)
  108. m.Sha256sum, _ = helpers.Sha256sum(tmpDst)
  109. // XXX: we may not be able to write this to AWS easily
  110. //m.ArchiveFiles, _ = helpers.ListArchiveFiles(m.Mimetype, m.Size, tmpDst)
  111. uploader := s3manager.NewUploaderWithClient(b.svc)
  112. input := &s3manager.UploadInput{
  113. Bucket: aws.String(b.bucket),
  114. Key: aws.String(key),
  115. Body: tmpDst,
  116. Metadata: mapMetadata(m),
  117. }
  118. _, err = uploader.Upload(input)
  119. if err != nil {
  120. return
  121. }
  122. return
  123. }
  124. func (b S3Backend) Size(key string) (int64, error) {
  125. input := &s3.HeadObjectInput{
  126. Bucket: aws.String(b.bucket),
  127. Key: aws.String(key),
  128. }
  129. result, err := b.svc.HeadObject(input)
  130. if err != nil {
  131. return 0, err
  132. }
  133. return *result.ContentLength, nil
  134. }
  135. func (b S3Backend) GetTorrent(fileName string, url string) (t torrent.Torrent, err error) {
  136. input := &s3.GetObjectTorrentInput{
  137. Bucket: aws.String(b.bucket),
  138. Key: aws.String(fileName),
  139. }
  140. result, err := b.svc.GetObjectTorrent(input)
  141. if err != nil {
  142. return
  143. }
  144. defer result.Body.Close()
  145. data, err := ioutil.ReadAll(result.Body)
  146. if err != nil {
  147. return
  148. }
  149. err = bencode.DecodeBytes(data, &t)
  150. if err != nil {
  151. return
  152. }
  153. t.Info.Name = fileName
  154. t.UrlList = []string{url}
  155. return
  156. }
  157. func (b S3Backend) List() ([]string, error) {
  158. var output []string
  159. input := &s3.ListObjectsInput{
  160. Bucket: aws.String(b.bucket),
  161. }
  162. results, err := b.svc.ListObjects(input)
  163. if err != nil {
  164. return nil, err
  165. }
  166. for _, object := range results.Contents {
  167. output = append(output, *object.Key)
  168. }
  169. return output, nil
  170. }
  171. func NewS3Backend(bucket string, region string, endpoint string) S3Backend {
  172. awsConfig := &aws.Config{}
  173. if region != "" {
  174. awsConfig.Region = aws.String(region)
  175. }
  176. if endpoint != "" {
  177. awsConfig.Endpoint = aws.String(endpoint)
  178. }
  179. sess := session.Must(session.NewSession(awsConfig))
  180. svc := s3.New(sess)
  181. return S3Backend{bucket: bucket, svc: svc}
  182. }