Browse Source
Merge pull request #224 from tnextday/feature/chunked-file-support
Merge pull request #224 from tnextday/feature/chunked-file-support
Feature/chunked file supportpull/228/head
Chris Lu
9 years ago
14 changed files with 616 additions and 117 deletions
-
74.gitignore
-
25Makefile
-
213go/operation/chunked_file.go
-
2go/operation/compress.go
-
16go/operation/delete_content.go
-
48go/operation/submit.go
-
16go/storage/needle.go
-
9go/storage/needle_read_write.go
-
1go/topology/volume_location_list.go
-
23go/util/http_util.go
-
94go/weed/download.go
-
2go/weed/weed_server/common.go
-
146go/weed/weed_server/volume_server_handlers_read.go
-
64go/weed/weed_server/volume_server_handlers_write.go
@ -1,3 +1,77 @@ |
|||
go/weed/.goxc* |
|||
tags |
|||
*.swp |
|||
### OSX template |
|||
.DS_Store |
|||
.AppleDouble |
|||
.LSOverride |
|||
|
|||
# Icon must end with two \r |
|||
Icon |
|||
|
|||
# Thumbnails |
|||
._* |
|||
|
|||
# Files that might appear in the root of a volume |
|||
.DocumentRevisions-V100 |
|||
.fseventsd |
|||
.Spotlight-V100 |
|||
.TemporaryItems |
|||
.Trashes |
|||
.VolumeIcon.icns |
|||
|
|||
# Directories potentially created on remote AFP share |
|||
.AppleDB |
|||
.AppleDesktop |
|||
Network Trash Folder |
|||
Temporary Items |
|||
.apdisk |
|||
### JetBrains template |
|||
# Covers JetBrains IDEs: IntelliJ, RubyMine, PhpStorm, AppCode, PyCharm, CLion, Android Studio |
|||
|
|||
*.iml |
|||
|
|||
## Directory-based project format: |
|||
.idea/ |
|||
# if you remove the above rule, at least ignore the following: |
|||
|
|||
# User-specific stuff: |
|||
# .idea/workspace.xml |
|||
# .idea/tasks.xml |
|||
# .idea/dictionaries |
|||
|
|||
# Sensitive or high-churn files: |
|||
# .idea/dataSources.ids |
|||
# .idea/dataSources.xml |
|||
# .idea/sqlDataSources.xml |
|||
# .idea/dynamic.xml |
|||
# .idea/uiDesigner.xml |
|||
|
|||
# Gradle: |
|||
# .idea/gradle.xml |
|||
# .idea/libraries |
|||
|
|||
# Mongo Explorer plugin: |
|||
# .idea/mongoSettings.xml |
|||
|
|||
## File-based project format: |
|||
*.ipr |
|||
*.iws |
|||
|
|||
## Plugin-specific files: |
|||
|
|||
# IntelliJ |
|||
/out/ |
|||
|
|||
# mpeltonen/sbt-idea plugin |
|||
.idea_modules/ |
|||
|
|||
# JIRA plugin |
|||
atlassian-ide-plugin.xml |
|||
|
|||
# Crashlytics plugin (for Android Studio and IntelliJ) |
|||
com_crashlytics_export_strings.xml |
|||
crashlytics.properties |
|||
crashlytics-build.properties |
|||
|
|||
test_data |
@ -1,11 +1,22 @@ |
|||
BINARY = weed |
|||
|
|||
.clean: |
|||
go clean -i -v ./go/weed/ |
|||
GO_FLAGS = #-v |
|||
SOURCE_DIR = ./go/weed/ |
|||
|
|||
.deps: |
|||
go get -d ./go/weed/ |
|||
all: build |
|||
|
|||
.build: .deps |
|||
go build -v ./go/weed/ |
|||
.PHONY : clean deps build linux |
|||
|
|||
all: .build |
|||
clean: |
|||
go clean -i $(GO_FLAGS) $(SOURCE_DIR) |
|||
rm -f $(BINARY) |
|||
|
|||
deps: |
|||
go get $(GO_FLAGS) -d $(SOURCE_DIR) |
|||
|
|||
build: deps |
|||
go build $(GO_FLAGS) -o $(BINARY) $(SOURCE_DIR) |
|||
|
|||
linux: deps |
|||
mkdir -p linux |
|||
GOOS=linux GOARCH=amd64 go build $(GO_FLAGS) -o linux/$(BINARY) $(SOURCE_DIR) |
@ -0,0 +1,213 @@ |
|||
package operation |
|||
|
|||
import ( |
|||
"encoding/json" |
|||
"errors" |
|||
"fmt" |
|||
"io" |
|||
"net/http" |
|||
"sort" |
|||
|
|||
"sync" |
|||
|
|||
"github.com/chrislusf/seaweedfs/go/glog" |
|||
"github.com/chrislusf/seaweedfs/go/util" |
|||
) |
|||
|
|||
var ( |
|||
// when the remote server does not allow range requests (Accept-Ranges was not set)
|
|||
ErrRangeRequestsNotSupported = errors.New("Range requests are not supported by the remote server") |
|||
// ErrInvalidRange is returned by Read when trying to read past the end of the file
|
|||
ErrInvalidRange = errors.New("Invalid range") |
|||
) |
|||
|
|||
type ChunkInfo struct { |
|||
Fid string `json:"fid"` |
|||
Offset int64 `json:"offset"` |
|||
Size int64 `json:"size"` |
|||
} |
|||
|
|||
type ChunkList []*ChunkInfo |
|||
|
|||
type ChunkManifest struct { |
|||
Name string `json:"name,omitempty"` |
|||
Mime string `json:"mime,omitempty"` |
|||
Size int64 `json:"size,omitempty"` |
|||
Chunks ChunkList `json:"chunks,omitempty"` |
|||
} |
|||
|
|||
// seekable chunked file reader
|
|||
type ChunkedFileReader struct { |
|||
Manifest *ChunkManifest |
|||
Master string |
|||
pos int64 |
|||
pr *io.PipeReader |
|||
pw *io.PipeWriter |
|||
mutex sync.Mutex |
|||
} |
|||
|
|||
func (s ChunkList) Len() int { return len(s) } |
|||
func (s ChunkList) Less(i, j int) bool { return s[i].Offset < s[j].Offset } |
|||
func (s ChunkList) Swap(i, j int) { s[i], s[j] = s[j], s[i] } |
|||
|
|||
func LoadChunkManifest(buffer []byte, isGzipped bool) (*ChunkManifest, error) { |
|||
if isGzipped { |
|||
var err error |
|||
if buffer, err = UnGzipData(buffer); err != nil { |
|||
return nil, err |
|||
} |
|||
} |
|||
cm := ChunkManifest{} |
|||
if e := json.Unmarshal(buffer, &cm); e != nil { |
|||
return nil, e |
|||
} |
|||
sort.Sort(cm.Chunks) |
|||
return &cm, nil |
|||
} |
|||
|
|||
func (cm *ChunkManifest) Marshal() ([]byte, error) { |
|||
return json.Marshal(cm) |
|||
} |
|||
|
|||
func (cm *ChunkManifest) DeleteChunks(master string) error { |
|||
deleteError := 0 |
|||
for _, ci := range cm.Chunks { |
|||
if e := DeleteFile(master, ci.Fid, ""); e != nil { |
|||
deleteError++ |
|||
glog.V(0).Infof("Delete %s error: %s, master: %s", ci.Fid, e.Error(), master) |
|||
} |
|||
} |
|||
if deleteError > 0 { |
|||
return errors.New("Not all chunks deleted.") |
|||
} |
|||
return nil |
|||
} |
|||
|
|||
func readChunkNeedle(fileUrl string, w io.Writer, offset int64) (written int64, e error) { |
|||
req, err := http.NewRequest("GET", fileUrl, nil) |
|||
if err != nil { |
|||
return written, err |
|||
} |
|||
if offset > 0 { |
|||
req.Header.Set("Range", fmt.Sprintf("bytes=%d-", offset)) |
|||
} |
|||
|
|||
resp, err := util.Do(req) |
|||
if err != nil { |
|||
return written, err |
|||
} |
|||
defer resp.Body.Close() |
|||
|
|||
switch resp.StatusCode { |
|||
case http.StatusRequestedRangeNotSatisfiable: |
|||
return written, ErrInvalidRange |
|||
case http.StatusOK: |
|||
if offset > 0 { |
|||
return written, ErrRangeRequestsNotSupported |
|||
} |
|||
case http.StatusPartialContent: |
|||
break |
|||
default: |
|||
return written, fmt.Errorf("Read chunk needle error: [%d] %s", resp.StatusCode, fileUrl) |
|||
|
|||
} |
|||
return io.Copy(w, resp.Body) |
|||
} |
|||
|
|||
func (cf *ChunkedFileReader) Seek(offset int64, whence int) (int64, error) { |
|||
var err error |
|||
switch whence { |
|||
case 0: |
|||
case 1: |
|||
offset += cf.pos |
|||
case 2: |
|||
offset = cf.Manifest.Size - offset |
|||
} |
|||
if offset > cf.Manifest.Size { |
|||
err = ErrInvalidRange |
|||
} |
|||
if cf.pos != offset { |
|||
cf.Close() |
|||
} |
|||
cf.pos = offset |
|||
return cf.pos, err |
|||
} |
|||
|
|||
func (cf *ChunkedFileReader) WriteTo(w io.Writer) (n int64, err error) { |
|||
cm := cf.Manifest |
|||
chunkIndex := -1 |
|||
chunkStartOffset := int64(0) |
|||
for i, ci := range cm.Chunks { |
|||
if cf.pos >= ci.Offset && cf.pos < ci.Offset+ci.Size { |
|||
chunkIndex = i |
|||
chunkStartOffset = cf.pos - ci.Offset |
|||
break |
|||
} |
|||
} |
|||
if chunkIndex < 0 { |
|||
return n, ErrInvalidRange |
|||
} |
|||
for ; chunkIndex < cm.Chunks.Len(); chunkIndex++ { |
|||
ci := cm.Chunks[chunkIndex] |
|||
// if we need read date from local volume server first?
|
|||
fileUrl, lookupError := LookupFileId(cf.Master, ci.Fid) |
|||
if lookupError != nil { |
|||
return n, lookupError |
|||
} |
|||
if wn, e := readChunkNeedle(fileUrl, w, chunkStartOffset); e != nil { |
|||
return n, e |
|||
} else { |
|||
n += wn |
|||
cf.pos += wn |
|||
} |
|||
|
|||
chunkStartOffset = 0 |
|||
} |
|||
return n, nil |
|||
} |
|||
|
|||
func (cf *ChunkedFileReader) ReadAt(p []byte, off int64) (n int, err error) { |
|||
cf.Seek(off, 0) |
|||
return cf.Read(p) |
|||
} |
|||
|
|||
func (cf *ChunkedFileReader) Read(p []byte) (int, error) { |
|||
return cf.getPipeReader().Read(p) |
|||
} |
|||
|
|||
func (cf *ChunkedFileReader) Close() (e error) { |
|||
cf.mutex.Lock() |
|||
defer cf.mutex.Unlock() |
|||
return cf.closePipe() |
|||
} |
|||
|
|||
func (cf *ChunkedFileReader) closePipe() (e error) { |
|||
if cf.pr != nil { |
|||
if err := cf.pr.Close(); err != nil { |
|||
e = err |
|||
} |
|||
} |
|||
cf.pr = nil |
|||
if cf.pw != nil { |
|||
if err := cf.pw.Close(); err != nil { |
|||
e = err |
|||
} |
|||
} |
|||
cf.pw = nil |
|||
return e |
|||
} |
|||
|
|||
func (cf *ChunkedFileReader) getPipeReader() io.Reader { |
|||
cf.mutex.Lock() |
|||
defer cf.mutex.Unlock() |
|||
if cf.pr != nil && cf.pw != nil { |
|||
return cf.pr |
|||
} |
|||
cf.closePipe() |
|||
cf.pr, cf.pw = io.Pipe() |
|||
go func(pw *io.PipeWriter) { |
|||
_, e := cf.WriteTo(pw) |
|||
pw.CloseWithError(e) |
|||
}(cf.pw) |
|||
return cf.pr |
|||
} |
@ -1,4 +1,4 @@ |
|||
package storage |
|||
package operation |
|||
|
|||
import ( |
|||
"bytes" |
Write
Preview
Loading…
Cancel
Save
Reference in new issue