diff --git a/go/operation/assign_file_id.go b/go/operation/assign_file_id.go index a6680cbb8..4941d3c86 100644 --- a/go/operation/assign_file_id.go +++ b/go/operation/assign_file_id.go @@ -17,12 +17,15 @@ type AssignResult struct { Error string `json:"error"` } -func Assign(server string, count int, replication string) (*AssignResult, error) { +func Assign(server string, count int, replication string, collection string) (*AssignResult, error) { values := make(url.Values) values.Add("count", strconv.Itoa(count)) if replication != "" { values.Add("replication", replication) } + if collection != "" { + values.Add("collection", collection) + } jsonBlob, err := util.Post("http://"+server+"/dir/assign", values) glog.V(2).Info("assign result :", string(jsonBlob)) if err != nil { diff --git a/go/operation/submit.go b/go/operation/submit.go index 814c64957..693b90db1 100644 --- a/go/operation/submit.go +++ b/go/operation/submit.go @@ -12,14 +12,16 @@ import ( ) type FilePart struct { - Reader io.Reader - FileName string - FileSize int64 - IsGzipped bool - MimeType string - ModTime int64 //in seconds - Server string //this comes from assign result - Fid string //this comes from assign result, but customizable + Reader io.Reader + FileName string + FileSize int64 + IsGzipped bool + MimeType string + ModTime int64 //in seconds + Replication string + Collection string + Server string //this comes from assign result + Fid string //this comes from assign result, but customizable } type SubmitResult struct { @@ -30,12 +32,12 @@ type SubmitResult struct { Error string `json:"error"` } -func SubmitFiles(master string, files []FilePart, replication string, maxMB int) ([]SubmitResult, error) { +func SubmitFiles(master string, files []FilePart, replication string, collection string, maxMB int) ([]SubmitResult, error) { results := make([]SubmitResult, len(files)) for index, file := range files { results[index].FileName = file.FileName } - ret, err := Assign(master, len(files), replication) + ret, err := Assign(master, len(files), replication, collection) if err != nil { for index, _ := range files { results[index].Error = err.Error() @@ -48,7 +50,9 @@ func SubmitFiles(master string, files []FilePart, replication string, maxMB int) file.Fid = file.Fid + "_" + strconv.Itoa(index) } file.Server = ret.PublicUrl - results[index].Size, err = file.Upload(maxMB, master, replication) + file.Replication = replication + file.Collection = collection + results[index].Size, err = file.Upload(maxMB, master) if err != nil { results[index].Error = err.Error() } @@ -95,7 +99,7 @@ func newFilePart(fullPathFilename string) (ret FilePart, err error) { return ret, nil } -func (fi FilePart) Upload(maxMB int, master, replication string) (retSize int, err error) { +func (fi FilePart) Upload(maxMB int, master string) (retSize int, err error) { fileUrl := "http://" + fi.Server + "/" + fi.Fid if fi.ModTime != 0 { fileUrl += "?ts=" + strconv.Itoa(int(fi.ModTime)) @@ -108,7 +112,7 @@ func (fi FilePart) Upload(maxMB int, master, replication string) (retSize int, e chunks := fi.FileSize/chunkSize + 1 fids := make([]string, 0) for i := int64(0); i < chunks; i++ { - id, count, e := upload_one_chunk(fi.FileName+"-"+strconv.FormatInt(i+1, 10), io.LimitReader(fi.Reader, chunkSize), master, replication) + id, count, e := upload_one_chunk(fi.FileName+"-"+strconv.FormatInt(i+1, 10), io.LimitReader(fi.Reader, chunkSize), master, fi.Replication, fi.Collection) if e != nil { return 0, e } @@ -126,8 +130,8 @@ func (fi FilePart) Upload(maxMB int, master, replication string) (retSize int, e return } -func upload_one_chunk(filename string, reader io.Reader, master, replication string) (fid string, size int, e error) { - ret, err := Assign(master, 1, replication) +func upload_one_chunk(filename string, reader io.Reader, master, replication string, collection string) (fid string, size int, e error) { + ret, err := Assign(master, 1, replication, collection) if err != nil { return "", 0, err } diff --git a/go/weed/benchmark.go b/go/weed/benchmark.go index f9a775c32..2fc83e578 100644 --- a/go/weed/benchmark.go +++ b/go/weed/benchmark.go @@ -8,6 +8,7 @@ import ( "fmt" "io" "math" + "math/rand" "os" "strings" "sync" @@ -15,14 +16,16 @@ import ( ) type BenchmarkOptions struct { - server *string - concurrency *int - numberOfFiles *int - fileSize *int - idListFile *string - write *bool - read *bool - vid2server map[string]string //cache for vid locations + server *string + concurrency *int + numberOfFiles *int + fileSize *int + idListFile *string + write *bool + read *bool + sequentialRead *bool + collection *string + vid2server map[string]string //cache for vid locations } var ( @@ -39,6 +42,8 @@ func init() { b.idListFile = cmdBenchmark.Flag.String("list", os.TempDir()+"/benchmark_list.txt", "list of uploaded file ids") b.write = cmdBenchmark.Flag.Bool("write", true, "enable write") b.read = cmdBenchmark.Flag.Bool("read", true, "enable read") + b.sequentialRead = cmdBenchmark.Flag.Bool("readSequentially", false, "randomly read by ids from \"-list\" specified file") + b.collection = cmdBenchmark.Flag.String("collection", "benchmark", "write data to this collection") } var cmdBenchmark = &Command{ @@ -120,9 +125,9 @@ func writeFiles(idChan chan int, fileIdLineChan chan string, s *stats) { if id, ok := <-idChan; ok { start := time.Now() fp := &operation.FilePart{Reader: &FakeReader{id: uint64(id), size: int64(*b.fileSize)}, FileSize: int64(*b.fileSize)} - if assignResult, err := operation.Assign(*b.server, 1, ""); err == nil { - fp.Server, fp.Fid = assignResult.PublicUrl, assignResult.Fid - fp.Upload(0, *b.server, "") + if assignResult, err := operation.Assign(*b.server, 1, "", *b.collection); err == nil { + fp.Server, fp.Fid, fp.Collection = assignResult.PublicUrl, assignResult.Fid, *b.collection + fp.Upload(0, *b.server) writeStats.addSample(time.Now().Sub(start)) fileIdLineChan <- fp.Fid s.transferred += int64(*b.fileSize) @@ -212,13 +217,28 @@ func readFileIds(fileName string, fileIdLineChan chan string) { defer file.Close() r := bufio.NewReader(file) - for { - if line, err := Readln(r); err == nil { - fileIdLineChan <- string(line) - } else { - break + if *b.sequentialRead { + for { + if line, err := Readln(r); err == nil { + fileIdLineChan <- string(line) + } else { + break + } + } + } else { + lines := make([]string, 0, *b.numberOfFiles) + for { + if line, err := Readln(r); err == nil { + lines = append(lines, string(line)) + } else { + break + } + } + for i := 0; i < *b.numberOfFiles; i++ { + fileIdLineChan <- lines[rand.Intn(len(lines))] } } + close(fileIdLineChan) } diff --git a/go/weed/upload.go b/go/weed/upload.go index 42e753981..b59313a2a 100644 --- a/go/weed/upload.go +++ b/go/weed/upload.go @@ -10,6 +10,7 @@ import ( var ( uploadReplication *string + uploadCollection *string uploadDir *string include *string maxMB *int @@ -21,7 +22,8 @@ func init() { server = cmdUpload.Flag.String("server", "localhost:9333", "weedfs master location") uploadDir = cmdUpload.Flag.String("dir", "", "Upload the whole folder recursively if specified.") include = cmdUpload.Flag.String("include", "", "pattens of files to upload, e.g., *.pdf, *.html, ab?d.txt, works together with -dir") - uploadReplication = cmdUpload.Flag.String("replication", "", "replication type(000,001,010,100,110,200)") + uploadReplication = cmdUpload.Flag.String("replication", "", "replication type") + uploadCollection = cmdUpload.Flag.String("collection", "", "optional collection name") maxMB = cmdUpload.Flag.Int("maxMB", 0, "split files larger than the limit") } @@ -65,7 +67,7 @@ func runUpload(cmd *Command, args []string) bool { if e != nil { return e } - results, e := operation.SubmitFiles(*server, parts, *uploadReplication, *maxMB) + results, e := operation.SubmitFiles(*server, parts, *uploadReplication, *uploadCollection, *maxMB) bytes, _ := json.Marshal(results) fmt.Println(string(bytes)) if e != nil { @@ -82,7 +84,7 @@ func runUpload(cmd *Command, args []string) bool { if e != nil { fmt.Println(e.Error()) } - results, _ := operation.SubmitFiles(*server, parts, *uploadReplication, *maxMB) + results, _ := operation.SubmitFiles(*server, parts, *uploadReplication, *uploadCollection, *maxMB) bytes, _ := json.Marshal(results) fmt.Println(string(bytes)) } diff --git a/go/weed/weed_server/common.go b/go/weed/weed_server/common.go index d0d52f488..d4492e719 100644 --- a/go/weed/weed_server/common.go +++ b/go/weed/weed_server/common.go @@ -94,7 +94,7 @@ func submitForClientHandler(w http.ResponseWriter, r *http.Request, masterUrl st } debug("assigning file id for", fname) - assignResult, ae := operation.Assign(masterUrl, 1, r.FormValue("replication")) + assignResult, ae := operation.Assign(masterUrl, 1, r.FormValue("replication"), r.FormValue("collection")) if ae != nil { writeJsonError(w, r, ae) return