|
@ -23,6 +23,7 @@ type FilePart struct { |
|
|
ModTime int64 //in seconds
|
|
|
ModTime int64 //in seconds
|
|
|
Replication string |
|
|
Replication string |
|
|
Collection string |
|
|
Collection string |
|
|
|
|
|
DataCenter string |
|
|
Ttl string |
|
|
Ttl string |
|
|
Server string //this comes from assign result
|
|
|
Server string //this comes from assign result
|
|
|
Fid string //this comes from assign result, but customizable
|
|
|
Fid string //this comes from assign result, but customizable
|
|
@ -37,7 +38,7 @@ type SubmitResult struct { |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
func SubmitFiles(master string, files []FilePart, |
|
|
func SubmitFiles(master string, files []FilePart, |
|
|
replication string, collection string, ttl string, maxMB int, |
|
|
|
|
|
|
|
|
replication string, collection string, dataCenter string, ttl string, maxMB int, |
|
|
secret security.Secret, |
|
|
secret security.Secret, |
|
|
) ([]SubmitResult, error) { |
|
|
) ([]SubmitResult, error) { |
|
|
results := make([]SubmitResult, len(files)) |
|
|
results := make([]SubmitResult, len(files)) |
|
@ -48,6 +49,7 @@ func SubmitFiles(master string, files []FilePart, |
|
|
Count: uint64(len(files)), |
|
|
Count: uint64(len(files)), |
|
|
Replication: replication, |
|
|
Replication: replication, |
|
|
Collection: collection, |
|
|
Collection: collection, |
|
|
|
|
|
DataCenter: dataCenter, |
|
|
Ttl: ttl, |
|
|
Ttl: ttl, |
|
|
} |
|
|
} |
|
|
ret, err := Assign(master, ar) |
|
|
ret, err := Assign(master, ar) |
|
@ -65,6 +67,7 @@ func SubmitFiles(master string, files []FilePart, |
|
|
file.Server = ret.Url |
|
|
file.Server = ret.Url |
|
|
file.Replication = replication |
|
|
file.Replication = replication |
|
|
file.Collection = collection |
|
|
file.Collection = collection |
|
|
|
|
|
file.DataCenter = dataCenter |
|
|
results[index].Size, err = file.Upload(maxMB, master, secret) |
|
|
results[index].Size, err = file.Upload(maxMB, master, secret) |
|
|
if err != nil { |
|
|
if err != nil { |
|
|
results[index].Error = err.Error() |
|
|
results[index].Error = err.Error() |
|
@ -129,11 +132,46 @@ func (fi FilePart) Upload(maxMB int, master string, secret security.Secret) (ret |
|
|
Chunks: make([]*ChunkInfo, 0, chunks), |
|
|
Chunks: make([]*ChunkInfo, 0, chunks), |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
var ret *AssignResult |
|
|
|
|
|
var id string |
|
|
|
|
|
if fi.DataCenter != "" { |
|
|
|
|
|
ar := &VolumeAssignRequest{ |
|
|
|
|
|
Count: uint64(chunks), |
|
|
|
|
|
Replication: fi.Replication, |
|
|
|
|
|
Collection: fi.Collection, |
|
|
|
|
|
Ttl: fi.Ttl, |
|
|
|
|
|
} |
|
|
|
|
|
ret, err = Assign(master, ar) |
|
|
|
|
|
if err != nil { |
|
|
|
|
|
return |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
for i := int64(0); i < chunks; i++ { |
|
|
for i := int64(0); i < chunks; i++ { |
|
|
id, count, e := upload_one_chunk( |
|
|
|
|
|
|
|
|
if fi.DataCenter == "" { |
|
|
|
|
|
ar := &VolumeAssignRequest{ |
|
|
|
|
|
Count: 1, |
|
|
|
|
|
Replication: fi.Replication, |
|
|
|
|
|
Collection: fi.Collection, |
|
|
|
|
|
Ttl: fi.Ttl, |
|
|
|
|
|
} |
|
|
|
|
|
ret, err = Assign(master, ar) |
|
|
|
|
|
if err != nil { |
|
|
|
|
|
// delete all uploaded chunks
|
|
|
|
|
|
cm.DeleteChunks(master) |
|
|
|
|
|
return |
|
|
|
|
|
} |
|
|
|
|
|
id = ret.Fid |
|
|
|
|
|
} else { |
|
|
|
|
|
id = ret.Fid |
|
|
|
|
|
if i > 0 { |
|
|
|
|
|
id += "_" + strconv.FormatInt(i, 10) |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
fileUrl := "http://" + ret.Url + "/" + id |
|
|
|
|
|
count, e := upload_one_chunk( |
|
|
baseName+"-"+strconv.FormatInt(i+1, 10), |
|
|
baseName+"-"+strconv.FormatInt(i+1, 10), |
|
|
io.LimitReader(fi.Reader, chunkSize), |
|
|
io.LimitReader(fi.Reader, chunkSize), |
|
|
master, fi.Replication, fi.Collection, fi.Ttl, |
|
|
|
|
|
|
|
|
master, fileUrl, |
|
|
jwt) |
|
|
jwt) |
|
|
if e != nil { |
|
|
if e != nil { |
|
|
// delete all uploaded chunks
|
|
|
// delete all uploaded chunks
|
|
@ -165,26 +203,15 @@ func (fi FilePart) Upload(maxMB int, master string, secret security.Secret) (ret |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
func upload_one_chunk(filename string, reader io.Reader, master, |
|
|
func upload_one_chunk(filename string, reader io.Reader, master, |
|
|
replication string, collection string, ttl string, jwt security.EncodedJwt, |
|
|
|
|
|
) (fid string, size uint32, e error) { |
|
|
|
|
|
ar := &VolumeAssignRequest{ |
|
|
|
|
|
Count: 1, |
|
|
|
|
|
Replication: replication, |
|
|
|
|
|
Collection: collection, |
|
|
|
|
|
Ttl: ttl, |
|
|
|
|
|
} |
|
|
|
|
|
ret, err := Assign(master, ar) |
|
|
|
|
|
if err != nil { |
|
|
|
|
|
return "", 0, err |
|
|
|
|
|
} |
|
|
|
|
|
fileUrl, fid := "http://"+ret.Url+"/"+ret.Fid, ret.Fid |
|
|
|
|
|
|
|
|
fileUrl string, jwt security.EncodedJwt, |
|
|
|
|
|
) (size uint32, e error) { |
|
|
glog.V(4).Info("Uploading part ", filename, " to ", fileUrl, "...") |
|
|
glog.V(4).Info("Uploading part ", filename, " to ", fileUrl, "...") |
|
|
uploadResult, uploadError := Upload(fileUrl, filename, reader, false, |
|
|
uploadResult, uploadError := Upload(fileUrl, filename, reader, false, |
|
|
"application/octet-stream", nil, jwt) |
|
|
"application/octet-stream", nil, jwt) |
|
|
if uploadError != nil { |
|
|
if uploadError != nil { |
|
|
return fid, 0, uploadError |
|
|
|
|
|
|
|
|
return 0, uploadError |
|
|
} |
|
|
} |
|
|
return fid, uploadResult.Size, nil |
|
|
|
|
|
|
|
|
return uploadResult.Size, nil |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
func upload_chunked_file_manifest(fileUrl string, manifest *ChunkManifest, jwt security.EncodedJwt) error { |
|
|
func upload_chunked_file_manifest(fileUrl string, manifest *ChunkManifest, jwt security.EncodedJwt) error { |
|
|