You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

380 lines
10 KiB

  1. package main
  2. import (
  3. "bufio"
  4. "code.google.com/p/weed-fs/go/glog"
  5. "code.google.com/p/weed-fs/go/operation"
  6. "code.google.com/p/weed-fs/go/util"
  7. "fmt"
  8. "io"
  9. "math"
  10. "math/rand"
  11. "os"
  12. "runtime"
  13. "strings"
  14. "sync"
  15. "time"
  16. )
  17. type BenchmarkOptions struct {
  18. server *string
  19. concurrency *int
  20. numberOfFiles *int
  21. fileSize *int
  22. idListFile *string
  23. write *bool
  24. read *bool
  25. sequentialRead *bool
  26. collection *string
  27. vid2server map[string]string //cache for vid locations
  28. }
  29. var (
  30. b BenchmarkOptions
  31. )
  32. func init() {
  33. cmdBenchmark.Run = runbenchmark // break init cycle
  34. cmdBenchmark.IsDebug = cmdBenchmark.Flag.Bool("debug", false, "verbose debug information")
  35. b.server = cmdBenchmark.Flag.String("server", "localhost:9333", "weedfs master location")
  36. b.concurrency = cmdBenchmark.Flag.Int("c", 7, "number of concurrent write or read processes")
  37. b.fileSize = cmdBenchmark.Flag.Int("size", 1024, "simulated file size in bytes")
  38. b.numberOfFiles = cmdBenchmark.Flag.Int("n", 1024*1024, "number of files to write for each thread")
  39. b.idListFile = cmdBenchmark.Flag.String("list", os.TempDir()+"/benchmark_list.txt", "list of uploaded file ids")
  40. b.write = cmdBenchmark.Flag.Bool("write", true, "enable write")
  41. b.read = cmdBenchmark.Flag.Bool("read", true, "enable read")
  42. b.sequentialRead = cmdBenchmark.Flag.Bool("readSequentially", false, "randomly read by ids from \"-list\" specified file")
  43. b.collection = cmdBenchmark.Flag.String("collection", "benchmark", "write data to this collection")
  44. }
  45. var cmdBenchmark = &Command{
  46. UsageLine: "benchmark -server=localhost:9333 -c=10 -n=100000",
  47. Short: "benchmark on writing millions of files and read out",
  48. Long: `benchmark on an empty weed file system.
  49. Two tests during benchmark:
  50. 1) write lots of small files to the system
  51. 2) read the files out
  52. The file content is mostly zero, but no compression is done.
  53. By default, write 1 million files of 1KB each with 7 concurrent threads,
  54. and randomly read them out with 7 concurrent threads.
  55. You can choose to only benchmark read or write.
  56. During write, the list of uploaded file ids is stored in "-list" specified file.
  57. You can also use your own list of file ids to run read test.
  58. Write speed and read speed will be collected.
  59. The numbers are used to get a sense of the system.
  60. But usually your network or the hard drive is
  61. the real bottleneck.
  62. `,
  63. }
  64. var (
  65. wait sync.WaitGroup
  66. writeStats *stats
  67. readStats *stats
  68. )
  69. func runbenchmark(cmd *Command, args []string) bool {
  70. finishChan := make(chan bool)
  71. fileIdLineChan := make(chan string)
  72. b.vid2server = make(map[string]string)
  73. fmt.Printf("This is Weed File System version %s %s %s\n", VERSION, runtime.GOOS, runtime.GOARCH)
  74. if *b.write {
  75. writeStats = newStats()
  76. idChan := make(chan int)
  77. wait.Add(*b.concurrency)
  78. go writeFileIds(*b.idListFile, fileIdLineChan, finishChan)
  79. for i := 0; i < *b.concurrency; i++ {
  80. go writeFiles(idChan, fileIdLineChan, writeStats)
  81. }
  82. writeStats.start = time.Now()
  83. go writeStats.checkProgress("Writing Benchmark", finishChan)
  84. for i := 0; i < *b.numberOfFiles; i++ {
  85. idChan <- i
  86. }
  87. close(idChan)
  88. wait.Wait()
  89. writeStats.end = time.Now()
  90. wait.Add(1)
  91. finishChan <- true
  92. finishChan <- true
  93. wait.Wait()
  94. writeStats.printStats()
  95. }
  96. if *b.read {
  97. readStats = newStats()
  98. wait.Add(*b.concurrency)
  99. go readFileIds(*b.idListFile, fileIdLineChan)
  100. readStats.start = time.Now()
  101. go readStats.checkProgress("Randomly Reading Benchmark", finishChan)
  102. for i := 0; i < *b.concurrency; i++ {
  103. go readFiles(fileIdLineChan, readStats)
  104. }
  105. wait.Wait()
  106. finishChan <- true
  107. readStats.end = time.Now()
  108. readStats.printStats()
  109. }
  110. return true
  111. }
  112. func writeFiles(idChan chan int, fileIdLineChan chan string, s *stats) {
  113. for {
  114. if id, ok := <-idChan; ok {
  115. start := time.Now()
  116. fp := &operation.FilePart{Reader: &FakeReader{id: uint64(id), size: int64(*b.fileSize)}, FileSize: int64(*b.fileSize)}
  117. if assignResult, err := operation.Assign(*b.server, 1, "", *b.collection); err == nil {
  118. fp.Server, fp.Fid, fp.Collection = assignResult.PublicUrl, assignResult.Fid, *b.collection
  119. fp.Upload(0, *b.server)
  120. writeStats.addSample(time.Now().Sub(start))
  121. fileIdLineChan <- fp.Fid
  122. s.transferred += int64(*b.fileSize)
  123. s.completed++
  124. if *cmdBenchmark.IsDebug {
  125. fmt.Printf("writing %d file %s\n", id, fp.Fid)
  126. }
  127. } else {
  128. s.failed++
  129. println("writing file error:", err.Error())
  130. }
  131. } else {
  132. break
  133. }
  134. }
  135. wait.Done()
  136. }
  137. func readFiles(fileIdLineChan chan string, s *stats) {
  138. for {
  139. if fid, ok := <-fileIdLineChan; ok {
  140. if len(fid) == 0 {
  141. continue
  142. }
  143. if fid[0] == '#' {
  144. continue
  145. }
  146. if *cmdBenchmark.IsDebug {
  147. fmt.Printf("reading file %s\n", fid)
  148. }
  149. parts := strings.SplitN(fid, ",", 2)
  150. vid := parts[0]
  151. start := time.Now()
  152. if server, ok := b.vid2server[vid]; !ok {
  153. if ret, err := operation.Lookup(*b.server, vid); err == nil {
  154. if len(ret.Locations) > 0 {
  155. server = ret.Locations[0].PublicUrl
  156. b.vid2server[vid] = server
  157. }
  158. }
  159. }
  160. if server, ok := b.vid2server[vid]; ok {
  161. url := "http://" + server + "/" + fid
  162. if bytesRead, err := util.Get(url); err == nil {
  163. s.completed++
  164. s.transferred += int64(len(bytesRead))
  165. readStats.addSample(time.Now().Sub(start))
  166. } else {
  167. s.failed++
  168. println("!!!! Failed to read from ", url, " !!!!!")
  169. }
  170. } else {
  171. s.failed++
  172. println("!!!! volume id ", vid, " location not found!!!!!")
  173. }
  174. } else {
  175. break
  176. }
  177. }
  178. wait.Done()
  179. }
  180. func writeFileIds(fileName string, fileIdLineChan chan string, finishChan chan bool) {
  181. file, err := os.OpenFile(fileName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
  182. if err != nil {
  183. glog.Fatalf("File to create file %s: %s\n", fileName, err)
  184. }
  185. defer file.Close()
  186. for {
  187. select {
  188. case <-finishChan:
  189. wait.Done()
  190. return
  191. case line := <-fileIdLineChan:
  192. file.Write([]byte(line))
  193. file.Write([]byte("\n"))
  194. }
  195. }
  196. }
  197. func readFileIds(fileName string, fileIdLineChan chan string) {
  198. file, err := os.Open(fileName) // For read access.
  199. if err != nil {
  200. glog.Fatalf("File to read file %s: %s\n", fileName, err)
  201. }
  202. defer file.Close()
  203. r := bufio.NewReader(file)
  204. if *b.sequentialRead {
  205. for {
  206. if line, err := Readln(r); err == nil {
  207. fileIdLineChan <- string(line)
  208. } else {
  209. break
  210. }
  211. }
  212. } else {
  213. lines := make([]string, 0, *b.numberOfFiles)
  214. for {
  215. if line, err := Readln(r); err == nil {
  216. lines = append(lines, string(line))
  217. } else {
  218. break
  219. }
  220. }
  221. for i := 0; i < *b.numberOfFiles; i++ {
  222. fileIdLineChan <- lines[rand.Intn(len(lines))]
  223. }
  224. }
  225. close(fileIdLineChan)
  226. }
  227. const (
  228. benchResolution = 10000 //0.1 microsecond
  229. benchBucket = 1000000000 / benchResolution
  230. )
  231. type stats struct {
  232. data []int
  233. completed int
  234. failed int
  235. transferred int64
  236. start time.Time
  237. end time.Time
  238. }
  239. var percentages = []int{50, 66, 75, 80, 90, 95, 98, 99, 100}
  240. func newStats() *stats {
  241. return &stats{data: make([]int, benchResolution)}
  242. }
  243. func (s *stats) addSample(d time.Duration) {
  244. s.data[int(d/benchBucket)]++
  245. }
  246. func (s *stats) checkProgress(testName string, finishChan chan bool) {
  247. fmt.Printf("\n------------ %s ----------\n", testName)
  248. ticker := time.Tick(time.Second)
  249. for {
  250. select {
  251. case <-finishChan:
  252. break
  253. case <-ticker:
  254. fmt.Printf("Completed %d of %d requests, %3.1f%%\n", s.completed, *b.numberOfFiles, float64(s.completed)*100/float64(*b.numberOfFiles))
  255. }
  256. }
  257. }
  258. func (s *stats) printStats() {
  259. timeTaken := float64(int64(s.end.Sub(s.start))) / 1000000000
  260. fmt.Printf("\nConcurrency Level: %d\n", *b.concurrency)
  261. fmt.Printf("Time taken for tests: %.3f seconds\n", timeTaken)
  262. fmt.Printf("Complete requests: %d\n", s.completed)
  263. fmt.Printf("Failed requests: %d\n", s.failed)
  264. fmt.Printf("Total transferred: %d bytes\n", s.transferred)
  265. fmt.Printf("Requests per second: %.2f [#/sec]\n", float64(s.completed)/timeTaken)
  266. fmt.Printf("Transfer rate: %.2f [Kbytes/sec]\n", float64(s.transferred)/1024/timeTaken)
  267. n, sum := 0, 0
  268. min, max := 10000000, 0
  269. for i := 0; i < len(s.data); i++ {
  270. n += s.data[i]
  271. sum += s.data[i] * i
  272. if s.data[i] > 0 {
  273. if min > i {
  274. min = i
  275. }
  276. if max < i {
  277. max = i
  278. }
  279. }
  280. }
  281. avg := float64(sum) / float64(n)
  282. varianceSum := 0.0
  283. for i := 0; i < len(s.data); i++ {
  284. if s.data[i] > 0 {
  285. d := float64(i) - avg
  286. varianceSum += d * d * float64(s.data[i])
  287. }
  288. }
  289. std := math.Sqrt(varianceSum / float64(n))
  290. fmt.Printf("\nConnection Times (ms)\n")
  291. fmt.Printf(" min avg max std\n")
  292. fmt.Printf("Total: %2.1f %3.1f %3.1f %3.1f\n", float32(min)/10, float32(avg)/10, float32(max)/10, std/10)
  293. //printing percentiles
  294. fmt.Printf("\nPercentage of the requests served within a certain time (ms)\n")
  295. percentiles := make([]int, len(percentages))
  296. for i := 0; i < len(percentages); i++ {
  297. percentiles[i] = n * percentages[i] / 100
  298. }
  299. percentiles[len(percentiles)-1] = n
  300. percentileIndex := 0
  301. currentSum := 0
  302. for i := 0; i < len(s.data); i++ {
  303. currentSum += s.data[i]
  304. if s.data[i] > 0 && percentileIndex < len(percentiles) && currentSum >= percentiles[percentileIndex] {
  305. fmt.Printf(" %3d%% %5.1f ms\n", percentages[percentileIndex], float32(i)/10.0)
  306. percentileIndex++
  307. for percentileIndex < len(percentiles) && currentSum >= percentiles[percentileIndex] {
  308. percentileIndex++
  309. }
  310. }
  311. }
  312. }
  313. // a fake reader to generate content to upload
  314. type FakeReader struct {
  315. id uint64 // an id number
  316. size int64 // max bytes
  317. }
  318. func (l *FakeReader) Read(p []byte) (n int, err error) {
  319. if l.size <= 0 {
  320. return 0, io.EOF
  321. }
  322. if int64(len(p)) > l.size {
  323. n = int(l.size)
  324. } else {
  325. n = len(p)
  326. }
  327. for i := 0; i < n-8; i += 8 {
  328. for s := uint(0); s < 8; s++ {
  329. p[i] = byte(l.id >> (s * 8))
  330. }
  331. }
  332. l.size -= int64(n)
  333. return
  334. }
  335. func Readln(r *bufio.Reader) ([]byte, error) {
  336. var (
  337. isPrefix bool = true
  338. err error = nil
  339. line, ln []byte
  340. )
  341. for isPrefix && err == nil {
  342. line, isPrefix, err = r.ReadLine()
  343. ln = append(ln, line...)
  344. }
  345. return ln, err
  346. }