Browse Source

Merge branch 'feature/chunked-file-support' into develop

pull/279/head
tnextday 10 years ago
parent
commit
46babd7847
  1. 8
      go/operation/chunked_file.go
  2. 2
      go/operation/submit.go
  3. 5
      go/util/http_util.go
  4. 94
      go/weed/download.go
  5. 219
      go/weed/weed_server/volume_server_handlers_read.go

8
go/operation/chunked_file.go

@ -65,7 +65,7 @@ func LoadChunkManifest(buffer []byte, isGzipped bool) (*ChunkManifest, error) {
return &cm, nil return &cm, nil
} }
func (cm *ChunkManifest) GetData() ([]byte, error) {
func (cm *ChunkManifest) Marshal() ([]byte, error) {
return json.Marshal(cm) return json.Marshal(cm)
} }
@ -74,7 +74,7 @@ func (cm *ChunkManifest) DeleteChunks(master string) error {
for _, ci := range cm.Chunks { for _, ci := range cm.Chunks {
if e := DeleteFile(master, ci.Fid, ""); e != nil { if e := DeleteFile(master, ci.Fid, ""); e != nil {
deleteError++ deleteError++
glog.V(0).Infoln("delete error:", e, ci.Fid)
glog.V(0).Infof("Delete %s error: %s, master: %s", ci.Fid, e.Error(), master)
} }
} }
if deleteError > 0 { if deleteError > 0 {
@ -83,10 +83,6 @@ func (cm *ChunkManifest) DeleteChunks(master string) error {
return nil return nil
} }
//func (cm *ChunkManifest) StoredHelper() error {
// return nil
//}
func readChunkNeedle(fileUrl string, w io.Writer, offset int64) (written int64, e error) { func readChunkNeedle(fileUrl string, w io.Writer, offset int64) (written int64, e error) {
req, err := http.NewRequest("GET", fileUrl, nil) req, err := http.NewRequest("GET", fileUrl, nil)
if err != nil { if err != nil {

2
go/operation/submit.go

@ -180,7 +180,7 @@ func upload_one_chunk(filename string, reader io.Reader, master,
} }
func upload_chunked_file_manifest(fileUrl string, manifest *ChunkManifest, jwt security.EncodedJwt) error { func upload_chunked_file_manifest(fileUrl string, manifest *ChunkManifest, jwt security.EncodedJwt) error {
buf, e := manifest.GetData()
buf, e := manifest.Marshal()
if e != nil { if e != nil {
return e return e
} }

5
go/util/http_util.go

@ -136,12 +136,11 @@ func GetUrlStream(url string, values url.Values, readFn func(io.Reader) error) e
return readFn(r.Body) return readFn(r.Body)
} }
func DownloadUrl(fileUrl string) (filename string, content []byte, e error) {
func DownloadUrl(fileUrl string) (filename string, rc io.ReadCloser, e error) {
response, err := client.Get(fileUrl) response, err := client.Get(fileUrl)
if err != nil { if err != nil {
return "", nil, err return "", nil, err
} }
defer response.Body.Close()
contentDisposition := response.Header["Content-Disposition"] contentDisposition := response.Header["Content-Disposition"]
if len(contentDisposition) > 0 { if len(contentDisposition) > 0 {
if strings.HasPrefix(contentDisposition[0], "filename=") { if strings.HasPrefix(contentDisposition[0], "filename=") {
@ -149,7 +148,7 @@ func DownloadUrl(fileUrl string) (filename string, content []byte, e error) {
filename = strings.Trim(filename, "\"") filename = strings.Trim(filename, "\"")
} }
} }
content, e = ioutil.ReadAll(response.Body)
rc = response.Body
return return
} }

94
go/weed/download.go

@ -3,9 +3,11 @@ package main
import ( import (
"fmt" "fmt"
"io" "io"
"io/ioutil"
"os" "os"
"path" "path"
"io/ioutil"
"strings" "strings"
"github.com/chrislusf/seaweedfs/go/operation" "github.com/chrislusf/seaweedfs/go/operation"
@ -43,50 +45,76 @@ var cmdDownload = &Command{
func runDownload(cmd *Command, args []string) bool { func runDownload(cmd *Command, args []string) bool {
for _, fid := range args { for _, fid := range args {
filename, content, e := fetchFileId(*d.server, fid)
if e != nil {
fmt.Println("Fetch Error:", e)
continue
if e := downloadToFile(*d.server, fid, *d.dir); e != nil {
fmt.Println("Download Error: ", fid, e)
} }
if filename == "" {
filename = fid
}
return true
}
func downloadToFile(server, fileId, saveDir string) error {
fileUrl, lookupError := operation.LookupFileId(server, fileId)
if lookupError != nil {
return lookupError
}
filename, rc, err := util.DownloadUrl(fileUrl)
if err != nil {
return err
}
defer rc.Close()
if filename == "" {
filename = fileId
}
isFileList := false
if strings.HasSuffix(filename, "-list") {
// old command compatible
isFileList = true
filename = filename[0 : len(filename)-len("-list")]
}
f, err := os.OpenFile(path.Join(saveDir, filename), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, os.ModePerm)
if err != nil {
return err
}
defer f.Close()
if isFileList {
content, err := ioutil.ReadAll(rc)
if err != nil {
return err
} }
if strings.HasSuffix(filename, "-list") {
filename = filename[0 : len(filename)-len("-list")]
fids := strings.Split(string(content), "\n")
f, err := os.OpenFile(path.Join(*d.dir, filename), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, os.ModePerm)
if err != nil {
fmt.Println("File Creation Error:", e)
continue
fids := strings.Split(string(content), "\n")
for _, partId := range fids {
var n int
_, part, err := fetchContent(*d.server, partId)
if err == nil {
n, err = f.Write(part)
} }
defer f.Close()
for _, partId := range fids {
var n int
_, part, err := fetchFileId(*d.server, partId)
if err == nil {
n, err = f.Write(part)
}
if err == nil && n < len(part) {
err = io.ErrShortWrite
}
if err != nil {
fmt.Println("File Write Error:", err)
break
}
if err == nil && n < len(part) {
err = io.ErrShortWrite
} }
} else {
ioutil.WriteFile(path.Join(*d.dir, filename), content, os.ModePerm)
if err != nil {
return err
}
}
} else {
if _, err = io.Copy(f, rc); err != nil {
return err
} }
} }
return true
return nil
} }
func fetchFileId(server string, fileId string) (filename string, content []byte, e error) {
func fetchContent(server string, fileId string) (filename string, content []byte, e error) {
fileUrl, lookupError := operation.LookupFileId(server, fileId) fileUrl, lookupError := operation.LookupFileId(server, fileId)
if lookupError != nil { if lookupError != nil {
return "", nil, lookupError return "", nil, lookupError
} }
filename, content, e = util.DownloadUrl(fileUrl)
var rc io.ReadCloser
if filename, rc, e = util.DownloadUrl(fileUrl); e != nil {
return "", nil, e
}
content, e = ioutil.ReadAll(rc)
rc.Close()
return return
} }

219
go/weed/weed_server/volume_server_handlers_read.go

@ -9,6 +9,10 @@ import (
"strings" "strings"
"time" "time"
"path"
"bytes"
"github.com/chrislusf/seaweedfs/go/glog" "github.com/chrislusf/seaweedfs/go/glog"
"github.com/chrislusf/seaweedfs/go/images" "github.com/chrislusf/seaweedfs/go/images"
"github.com/chrislusf/seaweedfs/go/operation" "github.com/chrislusf/seaweedfs/go/operation"
@ -88,34 +92,25 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
if n.NameSize > 0 && filename == "" { if n.NameSize > 0 && filename == "" {
filename = string(n.Name) filename = string(n.Name)
dotIndex := strings.LastIndex(filename, ".")
if dotIndex > 0 {
ext = filename[dotIndex:]
if ext == "" {
ext = path.Ext(filename)
} }
} }
mtype := "" mtype := ""
if ext != "" {
mtype = mime.TypeByExtension(ext)
}
if n.MimeSize > 0 { if n.MimeSize > 0 {
mt := string(n.Mime) mt := string(n.Mime)
if !strings.HasPrefix(mt, "application/octet-stream") { if !strings.HasPrefix(mt, "application/octet-stream") {
mtype = mt mtype = mt
} }
} }
if mtype != "" {
w.Header().Set("Content-Type", mtype)
}
if filename != "" {
w.Header().Set("Content-Disposition", "filename=\""+fileNameEscaper.Replace(filename)+"\"")
}
if ext != ".gz" { if ext != ".gz" {
if n.IsGzipped() { if n.IsGzipped() {
if strings.Contains(r.Header.Get("Accept-Encoding"), "gzip") { if strings.Contains(r.Header.Get("Accept-Encoding"), "gzip") {
w.Header().Set("Content-Encoding", "gzip") w.Header().Set("Content-Encoding", "gzip")
} else { } else {
if n.Data, err = operation.UnGzipData(n.Data); err != nil { if n.Data, err = operation.UnGzipData(n.Data); err != nil {
glog.V(0).Infoln("lookup error:", err, r.URL.Path)
glog.V(0).Infoln("ungzip error:", err, r.URL.Path)
} }
} }
} }
@ -131,94 +126,9 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
n.Data, _, _ = images.Resized(ext, n.Data, width, height) n.Data, _, _ = images.Resized(ext, n.Data, width, height)
} }
w.Header().Set("Accept-Ranges", "bytes")
if r.Method == "HEAD" {
w.Header().Set("Content-Length", strconv.Itoa(len(n.Data)))
return
}
rangeReq := r.Header.Get("Range")
if rangeReq == "" {
w.Header().Set("Content-Length", strconv.Itoa(len(n.Data)))
if _, e = w.Write(n.Data); e != nil {
glog.V(4).Infoln("response write error:", e)
}
return
}
//the rest is dealing with partial content request
//mostly copy from src/pkg/net/http/fs.go
size := int64(len(n.Data))
ranges, err := parseRange(rangeReq, size)
if err != nil {
http.Error(w, err.Error(), http.StatusRequestedRangeNotSatisfiable)
return
}
if sumRangesSize(ranges) > size {
// The total number of bytes in all the ranges
// is larger than the size of the file by
// itself, so this is probably an attack, or a
// dumb client. Ignore the range request.
ranges = nil
return
}
if len(ranges) == 0 {
return
}
if len(ranges) == 1 {
// RFC 2616, Section 14.16:
// "When an HTTP message includes the content of a single
// range (for example, a response to a request for a
// single range, or to a request for a set of ranges
// that overlap without any holes), this content is
// transmitted with a Content-Range header, and a
// Content-Length header showing the number of bytes
// actually transferred.
// ...
// A response to a request for a single range MUST NOT
// be sent using the multipart/byteranges media type."
ra := ranges[0]
w.Header().Set("Content-Length", strconv.FormatInt(ra.length, 10))
w.Header().Set("Content-Range", ra.contentRange(size))
w.WriteHeader(http.StatusPartialContent)
if _, e = w.Write(n.Data[ra.start : ra.start+ra.length]); e != nil {
glog.V(2).Infoln("response write error:", e)
}
return
}
// process multiple ranges
for _, ra := range ranges {
if ra.start > size {
http.Error(w, "Out of Range", http.StatusRequestedRangeNotSatisfiable)
return
}
if e := writeResponseContent(filename, mtype, bytes.NewReader(n.Data), w, r); e != nil {
glog.V(2).Infoln("response write error:", e)
} }
sendSize := rangesMIMESize(ranges, mtype, size)
pr, pw := io.Pipe()
mw := multipart.NewWriter(pw)
w.Header().Set("Content-Type", "multipart/byteranges; boundary="+mw.Boundary())
sendContent := pr
defer pr.Close() // cause writing goroutine to fail and exit if CopyN doesn't finish.
go func() {
for _, ra := range ranges {
part, err := mw.CreatePart(ra.mimeHeader(mtype, size))
if err != nil {
pw.CloseWithError(err)
return
}
if _, err = part.Write(n.Data[ra.start : ra.start+ra.length]); err != nil {
pw.CloseWithError(err)
return
}
}
mw.Close()
pw.Close()
}()
if w.Header().Get("Content-Encoding") == "" {
w.Header().Set("Content-Length", strconv.FormatInt(sendSize, 10))
}
w.WriteHeader(http.StatusPartialContent)
io.CopyN(w, sendContent, sendSize)
} }
func (vs *VolumeServer) tryHandleChunkedFile(n *storage.Needle, fileName string, w http.ResponseWriter, r *http.Request) (processed bool) { func (vs *VolumeServer) tryHandleChunkedFile(n *storage.Needle, fileName string, w http.ResponseWriter, r *http.Request) (processed bool) {
@ -233,72 +143,77 @@ func (vs *VolumeServer) tryHandleChunkedFile(n *storage.Needle, fileName string,
chunkManifest, e := operation.LoadChunkManifest(n.Data, n.IsGzipped()) chunkManifest, e := operation.LoadChunkManifest(n.Data, n.IsGzipped())
if e != nil { if e != nil {
glog.V(0).Infoln("load chunked manifest error:", e)
glog.V(0).Infof("load chunked manifest (%s) error: %s", r.URL.Path, e.Error())
return false return false
} }
ext := ""
if fileName == "" && chunkManifest.Name != "" { if fileName == "" && chunkManifest.Name != "" {
fileName = chunkManifest.Name fileName = chunkManifest.Name
dotIndex := strings.LastIndex(fileName, ".")
if dotIndex > 0 {
ext = fileName[dotIndex:]
}
}
mtype := ""
if ext != "" {
mtype = mime.TypeByExtension(ext)
} }
mType := ""
if chunkManifest.Mime != "" { if chunkManifest.Mime != "" {
mt := chunkManifest.Mime mt := chunkManifest.Mime
if !strings.HasPrefix(mt, "application/octet-stream") { if !strings.HasPrefix(mt, "application/octet-stream") {
mtype = mt
mType = mt
} }
} }
if mtype != "" {
w.Header().Set("Content-Type", mtype)
}
if fileName != "" {
w.Header().Set("Content-Disposition", `filename="`+fileNameEscaper.Replace(fileName)+`"`)
}
w.Header().Set("X-File-Store", "chunked") w.Header().Set("X-File-Store", "chunked")
w.Header().Set("Accept-Ranges", "bytes")
if r.Method == "HEAD" {
w.Header().Set("Content-Length", strconv.FormatInt(chunkManifest.Size, 10))
return true
}
chunkedFileReader := &operation.ChunkedFileReader{ chunkedFileReader := &operation.ChunkedFileReader{
Manifest: chunkManifest, Manifest: chunkManifest,
Master: vs.GetMasterNode(), Master: vs.GetMasterNode(),
} }
defer chunkedFileReader.Close() defer chunkedFileReader.Close()
if e := writeResponseContent(fileName, mType, chunkedFileReader, w, r); e != nil {
glog.V(2).Infoln("response write error:", e)
}
return
}
func writeResponseContent(filename, mimeType string, rs io.ReadSeeker, w http.ResponseWriter, r *http.Request) error {
totalSize, e := rs.Seek(0, 2)
if mimeType == "" {
if ext := path.Ext(filename); ext != "" {
mimeType = mime.TypeByExtension(ext)
}
}
if mimeType != "" {
w.Header().Set("Content-Type", mimeType)
}
if filename != "" {
w.Header().Set("Content-Disposition", `filename="`+fileNameEscaper.Replace(filename)+`"`)
}
w.Header().Set("Accept-Ranges", "bytes")
if r.Method == "HEAD" {
w.Header().Set("Content-Length", strconv.FormatInt(totalSize, 10))
return nil
}
rangeReq := r.Header.Get("Range") rangeReq := r.Header.Get("Range")
if rangeReq == "" { if rangeReq == "" {
w.Header().Set("Content-Length", strconv.FormatInt(chunkManifest.Size, 10))
if _, e = io.Copy(w, chunkedFileReader); e != nil {
glog.V(2).Infoln("response write error:", e)
w.Header().Set("Content-Length", strconv.FormatInt(totalSize, 10))
if _, e = rs.Seek(0, 0); e != nil {
return e
} }
return true
_, e = io.Copy(w, rs)
return e
} }
//the rest is dealing with partial content request //the rest is dealing with partial content request
//mostly copy from src/pkg/net/http/fs.go //mostly copy from src/pkg/net/http/fs.go
size := chunkManifest.Size
ranges, err := parseRange(rangeReq, size)
ranges, err := parseRange(rangeReq, totalSize)
if err != nil { if err != nil {
http.Error(w, err.Error(), http.StatusRequestedRangeNotSatisfiable) http.Error(w, err.Error(), http.StatusRequestedRangeNotSatisfiable)
return
return nil
} }
if sumRangesSize(ranges) > size {
if sumRangesSize(ranges) > totalSize {
// The total number of bytes in all the ranges // The total number of bytes in all the ranges
// is larger than the size of the file by // is larger than the size of the file by
// itself, so this is probably an attack, or a // itself, so this is probably an attack, or a
// dumb client. Ignore the range request. // dumb client. Ignore the range request.
ranges = nil
return
return nil
} }
if len(ranges) == 0 { if len(ranges) == 0 {
return
return nil
} }
if len(ranges) == 1 { if len(ranges) == 1 {
// RFC 2616, Section 14.16: // RFC 2616, Section 14.16:
@ -314,24 +229,23 @@ func (vs *VolumeServer) tryHandleChunkedFile(n *storage.Needle, fileName string,
// be sent using the multipart/byteranges media type." // be sent using the multipart/byteranges media type."
ra := ranges[0] ra := ranges[0]
w.Header().Set("Content-Length", strconv.FormatInt(ra.length, 10)) w.Header().Set("Content-Length", strconv.FormatInt(ra.length, 10))
w.Header().Set("Content-Range", ra.contentRange(size))
w.Header().Set("Content-Range", ra.contentRange(totalSize))
w.WriteHeader(http.StatusPartialContent) w.WriteHeader(http.StatusPartialContent)
if _, e = chunkedFileReader.Seek(ra.start, 0); e != nil {
glog.V(2).Infoln("chunkedFileReader Seek error:", e)
if _, e = rs.Seek(ra.start, 0); e != nil {
return e
} }
if _, e = io.CopyN(w, chunkedFileReader, ra.length); e != nil {
glog.V(2).Infoln("response write error:", e)
}
return
_, e = io.CopyN(w, rs, ra.length)
return e
} }
// process multiple ranges // process multiple ranges
for _, ra := range ranges { for _, ra := range ranges {
if ra.start > size {
if ra.start > totalSize {
http.Error(w, "Out of Range", http.StatusRequestedRangeNotSatisfiable) http.Error(w, "Out of Range", http.StatusRequestedRangeNotSatisfiable)
return
return nil
} }
} }
sendSize := rangesMIMESize(ranges, mtype, size)
sendSize := rangesMIMESize(ranges, mimeType, totalSize)
pr, pw := io.Pipe() pr, pw := io.Pipe()
mw := multipart.NewWriter(pw) mw := multipart.NewWriter(pw)
w.Header().Set("Content-Type", "multipart/byteranges; boundary="+mw.Boundary()) w.Header().Set("Content-Type", "multipart/byteranges; boundary="+mw.Boundary())
@ -339,16 +253,17 @@ func (vs *VolumeServer) tryHandleChunkedFile(n *storage.Needle, fileName string,
defer pr.Close() // cause writing goroutine to fail and exit if CopyN doesn't finish. defer pr.Close() // cause writing goroutine to fail and exit if CopyN doesn't finish.
go func() { go func() {
for _, ra := range ranges { for _, ra := range ranges {
part, err := mw.CreatePart(ra.mimeHeader(mtype, size))
if err != nil {
pw.CloseWithError(err)
part, e := mw.CreatePart(ra.mimeHeader(mimeType, totalSize))
if e != nil {
pw.CloseWithError(e)
return return
} }
if _, e = chunkedFileReader.Seek(ra.start, 0); e != nil {
glog.V(2).Infoln("response write error:", e)
if _, e = rs.Seek(ra.start, 0); e != nil {
pw.CloseWithError(e)
return
} }
if _, err = io.CopyN(part, chunkedFileReader, ra.length); err != nil {
pw.CloseWithError(err)
if _, e = io.CopyN(part, rs, ra.length); e != nil {
pw.CloseWithError(e)
return return
} }
} }
@ -359,6 +274,6 @@ func (vs *VolumeServer) tryHandleChunkedFile(n *storage.Needle, fileName string,
w.Header().Set("Content-Length", strconv.FormatInt(sendSize, 10)) w.Header().Set("Content-Length", strconv.FormatInt(sendSize, 10))
} }
w.WriteHeader(http.StatusPartialContent) w.WriteHeader(http.StatusPartialContent)
io.CopyN(w, sendContent, sendSize)
return
_, e = io.CopyN(w, sendContent, sendSize)
return e
} }
Loading…
Cancel
Save