Browse Source

delete replications, untested yet

pull/2/head
Chris Lu 12 years ago
parent
commit
4b3676a54b
  1. 67
      weed-fs/src/cmd/weed/volume.go
  2. 14
      weed-fs/src/pkg/operation/delete_content.go
  3. 64
      weed-fs/src/pkg/operation/upload_content.go

67
weed-fs/src/cmd/weed/volume.go

@ -135,27 +135,11 @@ func PostHandler(w http.ResponseWriter, r *http.Request) {
ret := store.Write(volumeId, needle) ret := store.Write(volumeId, needle)
if ret > 0 || !store.HasVolume(volumeId) { //send to other replica locations if ret > 0 || !store.HasVolume(volumeId) { //send to other replica locations
if r.FormValue("type") != "standard" { if r.FormValue("type") != "standard" {
if lookupResult, lookupErr := operation.Lookup(*masterNode, volumeId); lookupErr == nil {
sendFunc := func(background bool) {
postContentFunc := func(location operation.Location) bool {
operation.Upload("http://"+location.Url+r.URL.Path+"?type=standard", filename, bytes.NewReader(needle.Data))
return true
}
for _, location := range lookupResult.Locations {
if location.Url != (*ip+":"+strconv.Itoa(*vport)) {
if background {
go postContentFunc(location)
} else {
postContentFunc(location)
}
}
}
}
waitTime, err := strconv.Atoi(r.FormValue("wait"))
sendFunc(err == nil && waitTime > 0)
} else {
log.Println("Failed to lookup for", volumeId, lookupErr.Error())
}
waitTime, err := strconv.Atoi(r.FormValue("wait"))
distributedOperation(volumeId, func(location operation.Location) bool {
operation.Upload("http://"+location.Url+r.URL.Path+"?type=standard", filename, bytes.NewReader(needle.Data))
return true
}, err == nil && waitTime > 0)
} }
w.WriteHeader(http.StatusCreated) w.WriteHeader(http.StatusCreated)
} }
@ -191,7 +175,19 @@ func DeleteHandler(w http.ResponseWriter, r *http.Request) {
} }
n.Size = 0 n.Size = 0
store.Delete(volumeId, n)
ret := store.Delete(volumeId, n)
if ret > 0 || !store.HasVolume(volumeId) { //send to other replica locations
if r.FormValue("type") != "standard" {
waitTime, err := strconv.Atoi(r.FormValue("wait"))
distributedOperation(volumeId, func(location operation.Location) bool {
operation.Delete("http://"+location.Url+r.URL.Path+"?type=standard")
return true
}, err == nil && waitTime > 0)
}
w.WriteHeader(http.StatusCreated)
}
m := make(map[string]uint32) m := make(map[string]uint32)
m["size"] = uint32(count) m["size"] = uint32(count)
writeJson(w, r, m) writeJson(w, r, m)
@ -217,6 +213,27 @@ func parseURLPath(path string) (vid, fid, ext string) {
return return
} }
type distributedFunction func(location operation.Location) bool
func distributedOperation(volumeId storage.VolumeId, op distributedFunction, wait bool) {
if lookupResult, lookupErr := operation.Lookup(*masterNode, volumeId); lookupErr == nil {
sendFunc := func(background bool) {
for _, location := range lookupResult.Locations {
if location.Url != (*ip + ":" + strconv.Itoa(*vport)) {
if background {
go op(location)
} else {
op(location)
}
}
}
}
sendFunc(wait)
} else {
log.Println("Failed to lookup for", volumeId, lookupErr.Error())
}
}
func runVolume(cmd *Command, args []string) bool { func runVolume(cmd *Command, args []string) bool {
fileInfo, err := os.Stat(*volumeFolder) fileInfo, err := os.Stat(*volumeFolder)
//TODO: now default to 1G, this value should come from server? //TODO: now default to 1G, this value should come from server?
@ -228,9 +245,9 @@ func runVolume(cmd *Command, args []string) bool {
} }
perm := fileInfo.Mode().Perm() perm := fileInfo.Mode().Perm()
log.Println("Volume Folder permission:", perm) log.Println("Volume Folder permission:", perm)
if *publicUrl == "" { if *publicUrl == "" {
*publicUrl = *ip + ":" + strconv.Itoa(*vport)
*publicUrl = *ip + ":" + strconv.Itoa(*vport)
} }
store = storage.NewStore(*vport, *ip, *publicUrl, *volumeFolder, *maxVolumeCount) store = storage.NewStore(*vport, *ip, *publicUrl, *volumeFolder, *maxVolumeCount)
@ -247,7 +264,7 @@ func runVolume(cmd *Command, args []string) bool {
}() }()
log.Println("store joined at", *masterNode) log.Println("store joined at", *masterNode)
log.Println("Start storage service at http://"+*ip+":"+strconv.Itoa(*vport))
log.Println("Start storage service at http://" + *ip + ":" + strconv.Itoa(*vport))
e := http.ListenAndServe(":"+strconv.Itoa(*vport), nil) e := http.ListenAndServe(":"+strconv.Itoa(*vport), nil)
if e != nil { if e != nil {
log.Fatalf("Fail to start:%s", e.Error()) log.Fatalf("Fail to start:%s", e.Error())

14
weed-fs/src/pkg/operation/delete_content.go

@ -0,0 +1,14 @@
package operation
import (
"net/http"
)
func Delete(url string) error {
req, err := http.NewRequest("DELETE", url, nil)
if err != nil {
return err
}
_, err = http.DefaultClient.Do(req)
return err
}

64
weed-fs/src/pkg/operation/upload_content.go

@ -1,42 +1,42 @@
package operation package operation
import ( import (
"bytes"
"encoding/json"
"mime/multipart"
"net/http"
_ "fmt"
"io"
"io/ioutil"
"bytes"
"encoding/json"
_ "fmt"
"io"
"io/ioutil"
"mime/multipart"
"net/http"
) )
type UploadResult struct { type UploadResult struct {
Size int
Size int
} }
func Upload(uploadUrl string, filename string, reader io.Reader) (*UploadResult, error) { func Upload(uploadUrl string, filename string, reader io.Reader) (*UploadResult, error) {
println("uploading to", uploadUrl)
body_buf := bytes.NewBufferString("")
body_writer := multipart.NewWriter(body_buf)
file_writer, err := body_writer.CreateFormFile("file", filename)
io.Copy(file_writer, reader)
content_type := body_writer.FormDataContentType()
body_writer.Close()
resp, err := http.Post(uploadUrl, content_type, body_buf)
if err != nil {
return nil, err
}
defer resp.Body.Close()
resp_body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, err
}
var ret UploadResult
println("upload response to", uploadUrl, resp_body)
err = json.Unmarshal(resp_body, &ret)
if err != nil {
panic(err.Error())
}
//fmt.Println("Uploaded " + strconv.Itoa(ret.Size) + " Bytes to " + uploadUrl)
return &ret, nil
body_buf := bytes.NewBufferString("")
body_writer := multipart.NewWriter(body_buf)
file_writer, err := body_writer.CreateFormFile("file", filename)
io.Copy(file_writer, reader)
content_type := body_writer.FormDataContentType()
body_writer.Close()
resp, err := http.Post(uploadUrl, content_type, body_buf)
if err != nil {
println("uploading to", uploadUrl)
return nil, err
}
defer resp.Body.Close()
resp_body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, err
}
var ret UploadResult
err = json.Unmarshal(resp_body, &ret)
if err != nil {
println("upload response to", uploadUrl, resp_body)
panic(err.Error())
}
//fmt.Println("Uploaded " + strconv.Itoa(ret.Size) + " Bytes to " + uploadUrl)
return &ret, nil
} }
Loading…
Cancel
Save