Browse Source

in progress, trying to make benchmark working better to reuse http

connections.
pull/2/head
Chris Lu 11 years ago
parent
commit
054374c765
  1. 20
      go/util/http_util.go
  2. 60
      go/weed/benchmark.go
  3. 6
      go/weed/weed_server/master_server.go

20
go/util/http_util.go

@ -2,13 +2,26 @@ package util
import ( import (
"code.google.com/p/weed-fs/go/glog" "code.google.com/p/weed-fs/go/glog"
"fmt"
"io/ioutil" "io/ioutil"
"net/http" "net/http"
"net/url" "net/url"
) )
var (
client *http.Client
Transport *http.Transport
)
func init() {
Transport = &http.Transport{
MaxIdleConnsPerHost: 1024,
}
client = &http.Client{Transport: Transport}
}
func Post(url string, values url.Values) ([]byte, error) { func Post(url string, values url.Values) ([]byte, error) {
r, err := http.PostForm(url, values)
r, err := client.PostForm(url, values)
if err != nil { if err != nil {
glog.V(0).Infoln("post to", url, err) glog.V(0).Infoln("post to", url, err)
return nil, err return nil, err
@ -23,13 +36,16 @@ func Post(url string, values url.Values) ([]byte, error) {
} }
func Get(url string) ([]byte, error) { func Get(url string) ([]byte, error) {
r, err := http.Get(url)
r, err := client.Get(url)
if err != nil { if err != nil {
glog.V(0).Infoln("getting ", url, err) glog.V(0).Infoln("getting ", url, err)
return nil, err return nil, err
} }
defer r.Body.Close() defer r.Body.Close()
b, err := ioutil.ReadAll(r.Body) b, err := ioutil.ReadAll(r.Body)
if r.StatusCode != 200 {
return nil, fmt.Errorf("%s: %s", url, r.Status)
}
if err != nil { if err != nil {
glog.V(0).Infoln("read get result from", url, err) glog.V(0).Infoln("read get result from", url, err)
return nil, err return nil, err

60
go/weed/benchmark.go

@ -11,6 +11,7 @@ import (
"math/rand" "math/rand"
"os" "os"
"runtime" "runtime"
"runtime/pprof"
"strings" "strings"
"sync" "sync"
"time" "time"
@ -26,6 +27,7 @@ type BenchmarkOptions struct {
read *bool read *bool
sequentialRead *bool sequentialRead *bool
collection *string collection *string
cpuprofile *string
vid2server map[string]string //cache for vid locations vid2server map[string]string //cache for vid locations
} }
@ -45,6 +47,8 @@ func init() {
b.read = cmdBenchmark.Flag.Bool("read", true, "enable read") b.read = cmdBenchmark.Flag.Bool("read", true, "enable read")
b.sequentialRead = cmdBenchmark.Flag.Bool("readSequentially", false, "randomly read by ids from \"-list\" specified file") b.sequentialRead = cmdBenchmark.Flag.Bool("readSequentially", false, "randomly read by ids from \"-list\" specified file")
b.collection = cmdBenchmark.Flag.String("collection", "benchmark", "write data to this collection") b.collection = cmdBenchmark.Flag.String("collection", "benchmark", "write data to this collection")
b.cpuprofile = cmdBenchmark.Flag.String("cpuprofile", "", "write cpu profile to file")
b.vid2server = make(map[string]string)
} }
var cmdBenchmark = &Command{ var cmdBenchmark = &Command{
@ -80,13 +84,30 @@ var (
) )
func runbenchmark(cmd *Command, args []string) bool { func runbenchmark(cmd *Command, args []string) bool {
finishChan := make(chan bool)
fileIdLineChan := make(chan string)
b.vid2server = make(map[string]string)
fmt.Printf("This is Weed File System version %s %s %s\n", VERSION, runtime.GOOS, runtime.GOARCH) fmt.Printf("This is Weed File System version %s %s %s\n", VERSION, runtime.GOOS, runtime.GOARCH)
if *b.cpuprofile != "" {
f, err := os.Create(*b.cpuprofile)
if err != nil {
glog.Fatal(err)
}
pprof.StartCPUProfile(f)
defer pprof.StopCPUProfile()
}
if *b.write { if *b.write {
bench_write()
}
if *b.read {
bench_read()
}
return true
}
func bench_write() {
fileIdLineChan := make(chan string)
finishChan := make(chan bool)
writeStats = newStats() writeStats = newStats()
idChan := make(chan int) idChan := make(chan int)
wait.Add(*b.concurrency) wait.Add(*b.concurrency)
@ -105,11 +126,14 @@ func runbenchmark(cmd *Command, args []string) bool {
wait.Add(1) wait.Add(1)
finishChan <- true finishChan <- true
finishChan <- true finishChan <- true
close(finishChan)
wait.Wait() wait.Wait()
writeStats.printStats() writeStats.printStats()
}
}
if *b.read {
func bench_read() {
fileIdLineChan := make(chan string)
finishChan := make(chan bool)
readStats = newStats() readStats = newStats()
wait.Add(*b.concurrency) wait.Add(*b.concurrency)
go readFileIds(*b.idListFile, fileIdLineChan) go readFileIds(*b.idListFile, fileIdLineChan)
@ -120,22 +144,26 @@ func runbenchmark(cmd *Command, args []string) bool {
} }
wait.Wait() wait.Wait()
finishChan <- true finishChan <- true
close(finishChan)
readStats.end = time.Now() readStats.end = time.Now()
readStats.printStats() readStats.printStats()
}
return true
} }
func writeFiles(idChan chan int, fileIdLineChan chan string, s *stats) { func writeFiles(idChan chan int, fileIdLineChan chan string, s *stats) {
serverLimitChan := make(map[string]chan bool)
for { for {
if id, ok := <-idChan; ok { if id, ok := <-idChan; ok {
start := time.Now() start := time.Now()
fp := &operation.FilePart{Reader: &FakeReader{id: uint64(id), size: int64(*b.fileSize)}, FileSize: int64(*b.fileSize)} fp := &operation.FilePart{Reader: &FakeReader{id: uint64(id), size: int64(*b.fileSize)}, FileSize: int64(*b.fileSize)}
if assignResult, err := operation.Assign(*b.server, 1, "", *b.collection); err == nil { if assignResult, err := operation.Assign(*b.server, 1, "", *b.collection); err == nil {
fp.Server, fp.Fid, fp.Collection = assignResult.PublicUrl, assignResult.Fid, *b.collection fp.Server, fp.Fid, fp.Collection = assignResult.PublicUrl, assignResult.Fid, *b.collection
if _, ok := serverLimitChan[fp.Server]; !ok {
serverLimitChan[fp.Server] = make(chan bool, 7)
}
serverLimitChan[fp.Server] <- true
fp.Upload(0, *b.server) fp.Upload(0, *b.server)
writeStats.addSample(time.Now().Sub(start)) writeStats.addSample(time.Now().Sub(start))
<-serverLimitChan[fp.Server]
fileIdLineChan <- fp.Fid fileIdLineChan <- fp.Fid
s.transferred += int64(*b.fileSize) s.transferred += int64(*b.fileSize)
s.completed++ s.completed++
@ -154,6 +182,8 @@ func writeFiles(idChan chan int, fileIdLineChan chan string, s *stats) {
} }
func readFiles(fileIdLineChan chan string, s *stats) { func readFiles(fileIdLineChan chan string, s *stats) {
serverLimitChan := make(map[string]chan bool)
masterLimitChan := make(chan bool, 7)
for { for {
if fid, ok := <-fileIdLineChan; ok { if fid, ok := <-fileIdLineChan; ok {
if len(fid) == 0 { if len(fid) == 0 {
@ -169,14 +199,20 @@ func readFiles(fileIdLineChan chan string, s *stats) {
vid := parts[0] vid := parts[0]
start := time.Now() start := time.Now()
if server, ok := b.vid2server[vid]; !ok { if server, ok := b.vid2server[vid]; !ok {
masterLimitChan <- true
if ret, err := operation.Lookup(*b.server, vid); err == nil { if ret, err := operation.Lookup(*b.server, vid); err == nil {
if len(ret.Locations) > 0 { if len(ret.Locations) > 0 {
server = ret.Locations[0].PublicUrl server = ret.Locations[0].PublicUrl
b.vid2server[vid] = server b.vid2server[vid] = server
} }
} }
<-masterLimitChan
} }
if server, ok := b.vid2server[vid]; ok { if server, ok := b.vid2server[vid]; ok {
if _, ok := serverLimitChan[server]; !ok {
serverLimitChan[server] = make(chan bool, 7)
}
serverLimitChan[server] <- true
url := "http://" + server + "/" + fid url := "http://" + server + "/" + fid
if bytesRead, err := util.Get(url); err == nil { if bytesRead, err := util.Get(url); err == nil {
s.completed++ s.completed++
@ -186,6 +222,7 @@ func readFiles(fileIdLineChan chan string, s *stats) {
s.failed++ s.failed++
println("!!!! Failed to read from ", url, " !!!!!") println("!!!! Failed to read from ", url, " !!!!!")
} }
<-serverLimitChan[server]
} else { } else {
s.failed++ s.failed++
println("!!!! volume id ", vid, " location not found!!!!!") println("!!!! volume id ", vid, " location not found!!!!!")
@ -270,7 +307,12 @@ func newStats() *stats {
} }
func (s *stats) addSample(d time.Duration) { func (s *stats) addSample(d time.Duration) {
index := int(d / benchBucket)
if 0 <= index && index < len(s.data) {
s.data[int(d/benchBucket)]++ s.data[int(d/benchBucket)]++
} else {
fmt.Printf("This request takes %3.1f seconds, skipping!\n", float64(index)/10000)
}
} }
func (s *stats) checkProgress(testName string, finishChan chan bool) { func (s *stats) checkProgress(testName string, finishChan chan bool) {

6
go/weed/weed_server/master_server.go

@ -5,6 +5,7 @@ import (
"code.google.com/p/weed-fs/go/replication" "code.google.com/p/weed-fs/go/replication"
"code.google.com/p/weed-fs/go/sequence" "code.google.com/p/weed-fs/go/sequence"
"code.google.com/p/weed-fs/go/topology" "code.google.com/p/weed-fs/go/topology"
"code.google.com/p/weed-fs/go/util"
"github.com/goraft/raft" "github.com/goraft/raft"
"github.com/gorilla/mux" "github.com/gorilla/mux"
"net/http" "net/http"
@ -29,6 +30,7 @@ type MasterServer struct {
vgLock sync.Mutex vgLock sync.Mutex
raftServer *RaftServer raftServer *RaftServer
bounedLeaderChan chan int
} }
func NewMasterServer(r *mux.Router, version string, port int, metaFolder string, func NewMasterServer(r *mux.Router, version string, port int, metaFolder string,
@ -47,6 +49,7 @@ func NewMasterServer(r *mux.Router, version string, port int, metaFolder string,
garbageThreshold: garbageThreshold, garbageThreshold: garbageThreshold,
whiteList: whiteList, whiteList: whiteList,
} }
ms.bounedLeaderChan = make(chan int, 16)
seq := sequence.NewFileSequencer(path.Join(metaFolder, "weed.seq")) seq := sequence.NewFileSequencer(path.Join(metaFolder, "weed.seq"))
var e error var e error
if ms.topo, e = topology.NewTopology("topo", confFile, seq, if ms.topo, e = topology.NewTopology("topo", confFile, seq,
@ -95,6 +98,8 @@ func (ms *MasterServer) proxyToLeader(f func(w http.ResponseWriter, r *http.Requ
if ms.IsLeader() { if ms.IsLeader() {
f(w, r) f(w, r)
} else { } else {
ms.bounedLeaderChan <- 1
defer func() { <-ms.bounedLeaderChan }()
targetUrl, err := url.Parse("http://" + ms.raftServer.Leader()) targetUrl, err := url.Parse("http://" + ms.raftServer.Leader())
if err != nil { if err != nil {
writeJsonQuiet(w, r, map[string]interface{}{"error": "Leader URL Parse Error " + err.Error()}) writeJsonQuiet(w, r, map[string]interface{}{"error": "Leader URL Parse Error " + err.Error()})
@ -102,6 +107,7 @@ func (ms *MasterServer) proxyToLeader(f func(w http.ResponseWriter, r *http.Requ
} }
glog.V(4).Infoln("proxying to leader", ms.raftServer.Leader()) glog.V(4).Infoln("proxying to leader", ms.raftServer.Leader())
proxy := httputil.NewSingleHostReverseProxy(targetUrl) proxy := httputil.NewSingleHostReverseProxy(targetUrl)
proxy.Transport = util.Transport
proxy.ServeHTTP(w, r) proxy.ServeHTTP(w, r)
} }
} }

Loading…
Cancel
Save