Browse Source

Add initial version of S3 backend

This new backend currently isn't hooked up; new and existing installs
will continue to use the localfs backend.

* Rework torrent generation to be backend-dependent so we can use S3's
  existing torrent API.
* Remove the torrent test cases, which broke with this torrent rework;
  they will need to be added back later.
* Use `http.MaxBytesReader` for better max size handling.
* Allow backends to return errors in `ServeFile` if needed.
pull/156/head
mutantmonkey 6 years ago
parent
commit
e24bbd1255
  1. 37
      backends/localfs/localfs.go
  2. 170
      backends/s3/s3.go
  3. 5
      backends/storage.go
  4. 5
      fileserve.go
  5. 76
      server_test.go
  6. 60
      torrent.go
  7. 28
      torrent/torrent.go
  8. 73
      torrent_test.go
  9. 19
      upload.go

37
backends/localfs/localfs.go

@ -9,6 +9,7 @@ import (
"path"
"github.com/andreimarcu/linx-server/backends"
"github.com/andreimarcu/linx-server/torrent"
)
type LocalfsBackend struct {
@ -51,9 +52,10 @@ func (b LocalfsBackend) Open(key string) (backends.ReadSeekCloser, error) {
return os.Open(path.Join(b.basePath, key))
}
func (b LocalfsBackend) ServeFile(key string, w http.ResponseWriter, r *http.Request) {
func (b LocalfsBackend) ServeFile(key string, w http.ResponseWriter, r *http.Request) error {
filePath := path.Join(b.basePath, key)
http.ServeFile(w, r, filePath)
return nil
}
func (b LocalfsBackend) Size(key string) (int64, error) {
@ -65,6 +67,39 @@ func (b LocalfsBackend) Size(key string) (int64, error) {
return fileInfo.Size(), nil
}
func (b LocalfsBackend) GetTorrent(fileName string, url string) (t torrent.Torrent, err error) {
chunk := make([]byte, torrent.TORRENT_PIECE_LENGTH)
t = torrent.Torrent{
Encoding: "UTF-8",
Info: torrent.TorrentInfo{
PieceLength: torrent.TORRENT_PIECE_LENGTH,
Name: fileName,
},
UrlList: []string{url},
}
f, err := b.Open(fileName)
if err != nil {
return
}
defer f.Close()
for {
n, err := f.Read(chunk)
if err == io.EOF {
break
} else if err != nil {
return t, err
}
t.Info.Length += n
t.Info.Pieces += string(torrent.HashPiece(chunk[:n]))
}
return
}
func (b LocalfsBackend) List() ([]string, error) {
var output []string

170
backends/s3/s3.go

@ -0,0 +1,170 @@
package localfs
import (
"errors"
"io"
"io/ioutil"
"net/http"
"os"
"path"
"github.com/andreimarcu/linx-server/backends"
"github.com/andreimarcu/linx-server/torrent"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
"github.com/zeebo/bencode"
)
type S3Backend struct {
bucket string
svc *S3
}
func (b S3Backend) Delete(key string) error {
input := &s3.DeleteObjectInput{
Bucket: aws.String(b.bucket),
Key: aws.String(key),
}
_, err := b.svc.DeleteObject(input)
return os.Remove(path.Join(b.bucket, key))
}
func (b S3Backend) Exists(key string) (bool, error) {
input := &s3.HeadObjectInput{
Bucket: aws.String(b.bucket),
Key: aws.String(key),
}
_, err := b.svc.HeadObject(input)
return err == nil, err
}
func (b S3Backend) Get(key string) ([]byte, error) {
input := &s3.GetObjectInput{
Bucket: aws.String(b.bucket),
Key: aws.String(key),
}
result, err := b.svc.GetObject(input)
if err != nil {
return []byte{}, err
}
defer result.Body.Close()
return ioutil.ReadAll(result.Body)
}
func (b S3Backend) Put(key string, r io.Reader) (int64, error) {
uploader := s3manager.NewUploaderWithClient(b.svc)
input := &s3manager.UploadInput{
Bucket: aws.String(b.bucket),
Key: aws.String(key),
Body: r,
}
result, err := uploader.Upload(input)
if err != nil {
return 0, err
}
return -1, nil
}
func (b S3Backend) Open(key string) (backends.ReadSeekCloser, error) {
input := &s3.GetObjectInput{
Bucket: aws.String(b.bucket),
Key: aws.String(key),
}
result, err := b.svc.GetObject(input)
if err != nil {
return nil, err
}
return result.Body, nil
}
func (b S3Backend) ServeFile(key string, w http.ResponseWriter, r *http.Request) {
input := &s3.GetObjectInput{
Bucket: aws.String(b.bucket),
Key: aws.String(key),
}
result, err := b.svc.GetObject(input)
if err != nil {
return err
}
defer result.Body.Close()
http.ServeContent(w, r, key, *result.LastModified, result.Body)
return nil
}
func (b S3Backend) Size(key string) (int64, error) {
input := &s3.HeadObjectInput{
Bucket: aws.String(b.bucket),
Key: aws.String(key),
}
result, err := b.svc.HeadObject(input)
if err != nil {
return 0, err
}
return *result.ContentLength, nil
}
func (b S3Backend) GetTorrent(fileName string, url string) (t torrent.Torrent, err error) {
input := &s3.GetObjectTorrentInput{
Bucket: aws.String(b.bucket),
Key: aws.String(fileName),
}
result, err := b.svc.GetObjectTorrent(input)
if err != nil {
return
}
defer result.Body.Close()
data, err := ioutil.ReadAll(result.Body)
if err != nil {
return
}
err = bencode.DecodeBytes(data, &t)
if err != nil {
return
}
t.Info.Name = fileName
t.UrlList = []string{url}
return
}
func (b S3Backend) List() ([]string, error) {
var output []string
input := &s3.ListObjectsInput{
bucket: aws.String(b.bucket),
}
results, err := b.svc.ListObjects(input)
if err != nil {
return nil, err
}
for _, object := range results.Contents {
output = append(output, *object.Key)
}
return output, nil
}
func NewS3Backend(bucket string, region string, endpoint string) S3Backend {
awsConfig := &aws.Config{}
if region != "" {
awsConfig.Region = aws.String(region)
}
if endpoint != "" {
awsConfig.Endpoint = aws.String(endpoint)
}
sess := session.Must(session.NewSession(awsConfig))
svc := s3.New(sess)
return S3Backend{bucket: bucket, svc: svc}
}

5
backends/storage.go

@ -3,6 +3,8 @@ package backends
import (
"io"
"net/http"
"github.com/andreimarcu/linx-server/torrent"
)
type ReadSeekCloser interface {
@ -18,8 +20,9 @@ type StorageBackend interface {
Get(key string) ([]byte, error)
Put(key string, r io.Reader) (int64, error)
Open(key string) (ReadSeekCloser, error)
ServeFile(key string, w http.ResponseWriter, r *http.Request)
ServeFile(key string, w http.ResponseWriter, r *http.Request) error
Size(key string) (int64, error)
GetTorrent(fileName string, url string) (torrent.Torrent, error)
}
type MetaStorageBackend interface {

5
fileserve.go

@ -41,7 +41,10 @@ func fileServeHandler(c web.C, w http.ResponseWriter, r *http.Request) {
w.Header().Set("Etag", metadata.Sha256sum)
w.Header().Set("Cache-Control", "max-age=0")
fileBackend.ServeFile(fileName, w, r)
err = fileBackend.ServeFile(fileName, w, r)
if err != nil {
oopsHandler(c, w, r, RespAUTO, err.Error())
}
}
func staticHandler(c web.C, w http.ResponseWriter, r *http.Request) {

76
server_test.go

@ -653,6 +653,45 @@ func TestPostEmptyUpload(t *testing.T) {
}
}
func TestPostTooLargeUpload(t *testing.T) {
mux := setup()
oldMaxSize := Config.maxSize
Config.maxSize = 2
w := httptest.NewRecorder()
filename := generateBarename() + ".txt"
var b bytes.Buffer
mw := multipart.NewWriter(&b)
fw, err := mw.CreateFormFile("file", filename)
if err != nil {
t.Fatal(err)
}
fw.Write([]byte("test content"))
mw.Close()
req, err := http.NewRequest("POST", "/upload/", &b)
req.Header.Set("Content-Type", mw.FormDataContentType())
req.Header.Set("Referer", Config.siteURL)
if err != nil {
t.Fatal(err)
}
mux.ServeHTTP(w, req)
if w.Code != 500 {
t.Log(w.Body.String())
t.Fatalf("Status code is not 500, but %d", w.Code)
}
if !strings.Contains(w.Body.String(), "request body too large") {
t.Fatal("Response did not contain 'request body too large'")
}
Config.maxSize = oldMaxSize
}
func TestPostEmptyJSONUpload(t *testing.T) {
mux := setup()
w := httptest.NewRecorder()
@ -768,9 +807,44 @@ func TestPutEmptyUpload(t *testing.T) {
mux.ServeHTTP(w, req)
if w.Code != 500 {
t.Log(w.Body.String())
t.Fatalf("Status code is not 500, but %d", w.Code)
}
if !strings.Contains(w.Body.String(), "Empty file") {
t.Fatal("Response doesn't contain'Empty file'")
t.Fatal("Response did not contain 'Empty file'")
}
}
func TestPutTooLargeUpload(t *testing.T) {
mux := setup()
oldMaxSize := Config.maxSize
Config.maxSize = 2
w := httptest.NewRecorder()
filename := generateBarename() + ".file"
req, err := http.NewRequest("PUT", "/upload/"+filename, strings.NewReader("File too big"))
if err != nil {
t.Fatal(err)
}
req.Header.Set("Linx-Randomize", "yes")
mux.ServeHTTP(w, req)
if w.Code != 500 {
t.Log(w.Body.String())
t.Fatalf("Status code is not 500, but %d", w.Code)
}
if !strings.Contains(w.Body.String(), "request body too large") {
t.Fatal("Response did not contain 'request body too large'")
}
Config.maxSize = oldMaxSize
}
func TestPutJSONUpload(t *testing.T) {

60
torrent.go

@ -2,9 +2,7 @@ package main
import (
"bytes"
"crypto/sha1"
"fmt"
"io"
"net/http"
"time"
@ -13,54 +11,15 @@ import (
"github.com/zenazn/goji/web"
)
const (
TORRENT_PIECE_LENGTH = 262144
)
type TorrentInfo struct {
PieceLength int `bencode:"piece length"`
Pieces string `bencode:"pieces"`
Name string `bencode:"name"`
Length int `bencode:"length"`
}
type Torrent struct {
Encoding string `bencode:"encoding"`
Info TorrentInfo `bencode:"info"`
UrlList []string `bencode:"url-list"`
}
func hashPiece(piece []byte) []byte {
h := sha1.New()
h.Write(piece)
return h.Sum(nil)
}
func createTorrent(fileName string, f io.ReadCloser, r *http.Request) ([]byte, error) {
chunk := make([]byte, TORRENT_PIECE_LENGTH)
torrent := Torrent{
Encoding: "UTF-8",
Info: TorrentInfo{
PieceLength: TORRENT_PIECE_LENGTH,
Name: fileName,
},
UrlList: []string{fmt.Sprintf("%sselif/%s", getSiteURL(r), fileName)},
}
func createTorrent(fileName string, r *http.Request) ([]byte, error) {
url := fmt.Sprintf("%sselif/%s", getSiteURL(r), fileName)
for {
n, err := f.Read(chunk)
if err == io.EOF {
break
} else if err != nil {
t, err := fileBackend.GetTorrent(fileName, url)
if err != nil {
return []byte{}, err
}
torrent.Info.Length += n
torrent.Info.Pieces += string(hashPiece(chunk[:n]))
}
data, err := bencode.EncodeBytes(&torrent)
data, err := bencode.EncodeBytes(&t)
if err != nil {
return []byte{}, err
}
@ -80,14 +39,7 @@ func fileTorrentHandler(c web.C, w http.ResponseWriter, r *http.Request) {
return
}
f, err := fileBackend.Open(fileName)
if err != nil {
oopsHandler(c, w, r, RespHTML, "Could not create torrent.")
return
}
defer f.Close()
encoded, err := createTorrent(fileName, f, r)
encoded, err := createTorrent(fileName, r)
if err != nil {
oopsHandler(c, w, r, RespHTML, "Could not create torrent.")
return

28
torrent/torrent.go

@ -0,0 +1,28 @@
package torrent
import (
"crypto/sha1"
)
const (
TORRENT_PIECE_LENGTH = 262144
)
type TorrentInfo struct {
PieceLength int `bencode:"piece length"`
Pieces string `bencode:"pieces"`
Name string `bencode:"name"`
Length int `bencode:"length"`
}
type Torrent struct {
Encoding string `bencode:"encoding"`
Info TorrentInfo `bencode:"info"`
UrlList []string `bencode:"url-list"`
}
func HashPiece(piece []byte) []byte {
h := sha1.New()
h.Write(piece)
return h.Sum(nil)
}

73
torrent_test.go

@ -1,73 +0,0 @@
package main
import (
"fmt"
"os"
"testing"
"github.com/zeebo/bencode"
)
func TestCreateTorrent(t *testing.T) {
fileName := "server.go"
var decoded Torrent
f, err := os.Open("server.go")
if err != nil {
t.Fatal(err)
}
defer f.Close()
encoded, err := createTorrent(fileName, f, nil)
if err != nil {
t.Fatal(err)
}
bencode.DecodeBytes(encoded, &decoded)
if decoded.Encoding != "UTF-8" {
t.Fatalf("Encoding was %s, expected UTF-8", decoded.Encoding)
}
if decoded.Info.Name != "server.go" {
t.Fatalf("Name was %s, expected server.go", decoded.Info.Name)
}
if decoded.Info.PieceLength <= 0 {
t.Fatal("Expected a piece length, got none")
}
if len(decoded.Info.Pieces) <= 0 {
t.Fatal("Expected at least one piece, got none")
}
if decoded.Info.Length <= 0 {
t.Fatal("Length was less than or equal to 0, expected more")
}
tracker := fmt.Sprintf("%sselif/%s", Config.siteURL, fileName)
if decoded.UrlList[0] != tracker {
t.Fatalf("First entry in URL list was %s, expected %s", decoded.UrlList[0], tracker)
}
}
func TestCreateTorrentWithImage(t *testing.T) {
var decoded Torrent
f, err := os.Open("static/images/404.jpg")
if err != nil {
t.Fatal(err)
}
defer f.Close()
encoded, err := createTorrent("test.jpg", f, nil)
if err != nil {
t.Fatal(err)
}
bencode.DecodeBytes(encoded, &decoded)
if decoded.Info.Pieces != "r\x01\x80j\x99\x84\n\xd3dZ;1NX\xec;\x9d$+f" {
t.Fatal("Torrent pieces did not match expected pieces for image")
}
}

19
upload.go

@ -70,7 +70,7 @@ func uploadPostHandler(c web.C, w http.ResponseWriter, r *http.Request) {
upReq.randomBarename = true
}
upReq.expiry = parseExpiry(r.Form.Get("expires"))
upReq.src = file
upReq.src = http.MaxBytesReader(w, file, Config.maxSize)
upReq.filename = headers.Filename
} else {
if r.FormValue("content") == "" {
@ -82,7 +82,13 @@ func uploadPostHandler(c web.C, w http.ResponseWriter, r *http.Request) {
extension = "txt"
}
upReq.src = strings.NewReader(r.FormValue("content"))
content := r.FormValue("content")
if int64(len(content)) > Config.maxSize {
oopsHandler(c, w, r, RespJSON, "Content length exceeds max size")
return
}
upReq.src = strings.NewReader(content)
upReq.expiry = parseExpiry(r.FormValue("expires"))
upReq.filename = r.FormValue("filename") + "." + extension
}
@ -115,7 +121,7 @@ func uploadPutHandler(c web.C, w http.ResponseWriter, r *http.Request) {
defer r.Body.Close()
upReq.filename = c.URLParams["name"]
upReq.src = r.Body
upReq.src = http.MaxBytesReader(w, r.Body, Config.maxSize)
upload, err := processUpload(upReq)
@ -162,7 +168,7 @@ func uploadRemote(c web.C, w http.ResponseWriter, r *http.Request) {
}
upReq.filename = filepath.Base(grabUrl.Path)
upReq.src = resp.Body
upReq.src = http.MaxBytesReader(w, resp.Body, Config.maxSize)
upReq.deletionKey = r.FormValue("deletekey")
upReq.randomBarename = r.FormValue("randomize") == "yes"
upReq.expiry = parseExpiry(r.FormValue("expiry"))
@ -267,12 +273,9 @@ func processUpload(upReq UploadRequest) (upload Upload, err error) {
fileExpiry = time.Now().Add(upReq.expiry)
}
bytes, err := fileBackend.Put(upload.Filename, io.MultiReader(bytes.NewReader(header), upReq.src))
_, err = fileBackend.Put(upload.Filename, io.MultiReader(bytes.NewReader(header), upReq.src))
if err != nil {
return upload, err
} else if bytes > Config.maxSize {
fileBackend.Delete(upload.Filename)
return upload, errors.New("File too large")
}
upload.Metadata, err = generateMetadata(upload.Filename, fileExpiry, upReq.deletionKey)

Loading…
Cancel
Save