You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

470 lines
13 KiB

  1. package main
  2. import (
  3. "bufio"
  4. "code.google.com/p/weed-fs/go/glog"
  5. "code.google.com/p/weed-fs/go/operation"
  6. "code.google.com/p/weed-fs/go/util"
  7. "fmt"
  8. "io"
  9. "math"
  10. "math/rand"
  11. "os"
  12. "runtime"
  13. "runtime/pprof"
  14. "sort"
  15. "strings"
  16. "sync"
  17. "time"
  18. )
  19. type BenchmarkOptions struct {
  20. server *string
  21. concurrency *int
  22. numberOfFiles *int
  23. fileSize *int
  24. idListFile *string
  25. write *bool
  26. read *bool
  27. sequentialRead *bool
  28. collection *string
  29. cpuprofile *string
  30. vid2server map[string]string //cache for vid locations
  31. }
  32. var (
  33. b BenchmarkOptions
  34. )
  35. func init() {
  36. cmdBenchmark.Run = runbenchmark // break init cycle
  37. cmdBenchmark.IsDebug = cmdBenchmark.Flag.Bool("debug", false, "verbose debug information")
  38. b.server = cmdBenchmark.Flag.String("server", "localhost:9333", "weedfs master location")
  39. b.concurrency = cmdBenchmark.Flag.Int("c", 16, "number of concurrent write or read processes")
  40. b.fileSize = cmdBenchmark.Flag.Int("size", 1024, "simulated file size in bytes")
  41. b.numberOfFiles = cmdBenchmark.Flag.Int("n", 1024*1024, "number of files to write for each thread")
  42. b.idListFile = cmdBenchmark.Flag.String("list", os.TempDir()+"/benchmark_list.txt", "list of uploaded file ids")
  43. b.write = cmdBenchmark.Flag.Bool("write", true, "enable write")
  44. b.read = cmdBenchmark.Flag.Bool("read", true, "enable read")
  45. b.sequentialRead = cmdBenchmark.Flag.Bool("readSequentially", false, "randomly read by ids from \"-list\" specified file")
  46. b.collection = cmdBenchmark.Flag.String("collection", "benchmark", "write data to this collection")
  47. b.cpuprofile = cmdBenchmark.Flag.String("cpuprofile", "", "write cpu profile to file")
  48. b.vid2server = make(map[string]string)
  49. }
  50. var cmdBenchmark = &Command{
  51. UsageLine: "benchmark -server=localhost:9333 -c=10 -n=100000",
  52. Short: "benchmark on writing millions of files and read out",
  53. Long: `benchmark on an empty weed file system.
  54. Two tests during benchmark:
  55. 1) write lots of small files to the system
  56. 2) read the files out
  57. The file content is mostly zero, but no compression is done.
  58. You can choose to only benchmark read or write.
  59. During write, the list of uploaded file ids is stored in "-list" specified file.
  60. You can also use your own list of file ids to run read test.
  61. Write speed and read speed will be collected.
  62. The numbers are used to get a sense of the system.
  63. Usually your network or the hard drive is the real bottleneck.
  64. Another thing to watch is whether the volumes are evenly distributed
  65. to each volume server. Because the 7 more benchmark volumes are randomly distributed
  66. to servers with free slots, it's highly possible some servers have uneven amount of
  67. benchmark volumes. To remedy this, you can use this to grow the benchmark volumes
  68. before starting the benchmark command:
  69. http://localhost:9333/vol/grow?collection=benchmark&count=5
  70. After benchmarking, you can clean up the written data by deleting the benchmark collection
  71. http://localhost:9333/col/delete?collection=benchmark
  72. `,
  73. }
  74. var (
  75. wait sync.WaitGroup
  76. writeStats *stats
  77. readStats *stats
  78. )
  79. func runbenchmark(cmd *Command, args []string) bool {
  80. fmt.Printf("This is Weed File System version %s %s %s\n", VERSION, runtime.GOOS, runtime.GOARCH)
  81. if *b.cpuprofile != "" {
  82. f, err := os.Create(*b.cpuprofile)
  83. if err != nil {
  84. glog.Fatal(err)
  85. }
  86. pprof.StartCPUProfile(f)
  87. defer pprof.StopCPUProfile()
  88. }
  89. if *b.write {
  90. bench_write()
  91. }
  92. if *b.read {
  93. bench_read()
  94. }
  95. return true
  96. }
  97. func bench_write() {
  98. fileIdLineChan := make(chan string)
  99. finishChan := make(chan bool)
  100. writeStats = newStats()
  101. idChan := make(chan int)
  102. wait.Add(*b.concurrency)
  103. go writeFileIds(*b.idListFile, fileIdLineChan, finishChan)
  104. for i := 0; i < *b.concurrency; i++ {
  105. go writeFiles(idChan, fileIdLineChan, writeStats)
  106. }
  107. writeStats.start = time.Now()
  108. go writeStats.checkProgress("Writing Benchmark", finishChan)
  109. for i := 0; i < *b.numberOfFiles; i++ {
  110. idChan <- i
  111. }
  112. close(idChan)
  113. wait.Wait()
  114. writeStats.end = time.Now()
  115. wait.Add(1)
  116. finishChan <- true
  117. finishChan <- true
  118. close(finishChan)
  119. wait.Wait()
  120. writeStats.printStats()
  121. }
  122. func bench_read() {
  123. fileIdLineChan := make(chan string)
  124. finishChan := make(chan bool)
  125. readStats = newStats()
  126. wait.Add(*b.concurrency)
  127. go readFileIds(*b.idListFile, fileIdLineChan)
  128. readStats.start = time.Now()
  129. go readStats.checkProgress("Randomly Reading Benchmark", finishChan)
  130. for i := 0; i < *b.concurrency; i++ {
  131. go readFiles(fileIdLineChan, readStats)
  132. }
  133. wait.Wait()
  134. finishChan <- true
  135. close(finishChan)
  136. readStats.end = time.Now()
  137. readStats.printStats()
  138. }
  139. func writeFiles(idChan chan int, fileIdLineChan chan string, s *stats) {
  140. serverLimitChan := make(map[string]chan bool)
  141. for {
  142. if id, ok := <-idChan; ok {
  143. start := time.Now()
  144. fp := &operation.FilePart{Reader: &FakeReader{id: uint64(id), size: int64(*b.fileSize)}, FileSize: int64(*b.fileSize)}
  145. if assignResult, err := operation.Assign(*b.server, 1, "", *b.collection); err == nil {
  146. fp.Server, fp.Fid, fp.Collection = assignResult.PublicUrl, assignResult.Fid, *b.collection
  147. if _, ok := serverLimitChan[fp.Server]; !ok {
  148. serverLimitChan[fp.Server] = make(chan bool, 7)
  149. }
  150. serverLimitChan[fp.Server] <- true
  151. if _, err := fp.Upload(0, *b.server); err == nil {
  152. fileIdLineChan <- fp.Fid
  153. s.completed++
  154. s.transferred += int64(*b.fileSize)
  155. } else {
  156. s.failed++
  157. }
  158. writeStats.addSample(time.Now().Sub(start))
  159. <-serverLimitChan[fp.Server]
  160. if *cmdBenchmark.IsDebug {
  161. fmt.Printf("writing %d file %s\n", id, fp.Fid)
  162. }
  163. } else {
  164. s.failed++
  165. println("writing file error:", err.Error())
  166. }
  167. } else {
  168. break
  169. }
  170. }
  171. wait.Done()
  172. }
  173. func readFiles(fileIdLineChan chan string, s *stats) {
  174. serverLimitChan := make(map[string]chan bool)
  175. masterLimitChan := make(chan bool, 1)
  176. for {
  177. if fid, ok := <-fileIdLineChan; ok {
  178. if len(fid) == 0 {
  179. continue
  180. }
  181. if fid[0] == '#' {
  182. continue
  183. }
  184. if *cmdBenchmark.IsDebug {
  185. fmt.Printf("reading file %s\n", fid)
  186. }
  187. parts := strings.SplitN(fid, ",", 2)
  188. vid := parts[0]
  189. start := time.Now()
  190. if server, ok := b.vid2server[vid]; !ok {
  191. masterLimitChan <- true
  192. if _, now_ok := b.vid2server[vid]; !now_ok {
  193. if ret, err := operation.Lookup(*b.server, vid); err == nil {
  194. if len(ret.Locations) > 0 {
  195. server = ret.Locations[0].PublicUrl
  196. b.vid2server[vid] = server
  197. }
  198. }
  199. }
  200. <-masterLimitChan
  201. }
  202. if server, ok := b.vid2server[vid]; ok {
  203. if _, ok := serverLimitChan[server]; !ok {
  204. serverLimitChan[server] = make(chan bool, 7)
  205. }
  206. serverLimitChan[server] <- true
  207. url := "http://" + server + "/" + fid
  208. if bytesRead, err := util.Get(url); err == nil {
  209. s.completed++
  210. s.transferred += int64(len(bytesRead))
  211. readStats.addSample(time.Now().Sub(start))
  212. } else {
  213. s.failed++
  214. println("!!!! Failed to read from ", url, " !!!!!")
  215. }
  216. <-serverLimitChan[server]
  217. } else {
  218. s.failed++
  219. println("!!!! volume id ", vid, " location not found!!!!!")
  220. }
  221. } else {
  222. break
  223. }
  224. }
  225. wait.Done()
  226. }
  227. func writeFileIds(fileName string, fileIdLineChan chan string, finishChan chan bool) {
  228. file, err := os.OpenFile(fileName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
  229. if err != nil {
  230. glog.Fatalf("File to create file %s: %s\n", fileName, err)
  231. }
  232. defer file.Close()
  233. for {
  234. select {
  235. case <-finishChan:
  236. wait.Done()
  237. return
  238. case line := <-fileIdLineChan:
  239. file.Write([]byte(line))
  240. file.Write([]byte("\n"))
  241. }
  242. }
  243. }
  244. func readFileIds(fileName string, fileIdLineChan chan string) {
  245. file, err := os.Open(fileName) // For read access.
  246. if err != nil {
  247. glog.Fatalf("File to read file %s: %s\n", fileName, err)
  248. }
  249. defer file.Close()
  250. r := bufio.NewReader(file)
  251. if *b.sequentialRead {
  252. for {
  253. if line, err := Readln(r); err == nil {
  254. fileIdLineChan <- string(line)
  255. } else {
  256. break
  257. }
  258. }
  259. } else {
  260. lines := make([]string, 0, *b.numberOfFiles)
  261. for {
  262. if line, err := Readln(r); err == nil {
  263. lines = append(lines, string(line))
  264. } else {
  265. break
  266. }
  267. }
  268. for i := 0; i < *b.numberOfFiles; i++ {
  269. fileIdLineChan <- lines[rand.Intn(len(lines))]
  270. }
  271. }
  272. close(fileIdLineChan)
  273. }
  274. const (
  275. benchResolution = 10000 //0.1 microsecond
  276. benchBucket = 1000000000 / benchResolution
  277. )
  278. // An efficient statics collecting and rendering
  279. type stats struct {
  280. data []int
  281. overflow []int
  282. completed int
  283. failed int
  284. transferred int64
  285. start time.Time
  286. end time.Time
  287. }
  288. var percentages = []int{50, 66, 75, 80, 90, 95, 98, 99, 100}
  289. func newStats() *stats {
  290. return &stats{data: make([]int, benchResolution), overflow: make([]int, 0)}
  291. }
  292. func (s *stats) addSample(d time.Duration) {
  293. index := int(d / benchBucket)
  294. if index < 0 {
  295. fmt.Printf("This request takes %3.1f seconds, skipping!\n", float64(index)/10000)
  296. } else if index < len(s.data) {
  297. s.data[int(d/benchBucket)]++
  298. } else {
  299. s.overflow = append(s.overflow, index)
  300. }
  301. }
  302. func (s *stats) checkProgress(testName string, finishChan chan bool) {
  303. fmt.Printf("\n------------ %s ----------\n", testName)
  304. ticker := time.Tick(time.Second)
  305. lastCompleted, lastTransferred, lastTime := 0, int64(0), time.Now()
  306. for {
  307. select {
  308. case <-finishChan:
  309. return
  310. case t := <-ticker:
  311. completed, transferred, taken := s.completed-lastCompleted, s.transferred-lastTransferred, t.Sub(lastTime)
  312. fmt.Printf("Completed %d of %d requests, %3.1f%% %3.1f/s %3.1fMB/s\n",
  313. s.completed, *b.numberOfFiles, float64(s.completed)*100/float64(*b.numberOfFiles),
  314. float64(completed)*float64(int64(time.Second))/float64(int64(taken)),
  315. float64(transferred)*float64(int64(time.Second))/float64(int64(taken))/float64(1024*1024),
  316. )
  317. lastCompleted, lastTransferred, lastTime = s.completed, s.transferred, t
  318. }
  319. }
  320. }
  321. func (s *stats) printStats() {
  322. timeTaken := float64(int64(s.end.Sub(s.start))) / 1000000000
  323. fmt.Printf("\nConcurrency Level: %d\n", *b.concurrency)
  324. fmt.Printf("Time taken for tests: %.3f seconds\n", timeTaken)
  325. fmt.Printf("Complete requests: %d\n", s.completed)
  326. fmt.Printf("Failed requests: %d\n", s.failed)
  327. fmt.Printf("Total transferred: %d bytes\n", s.transferred)
  328. fmt.Printf("Requests per second: %.2f [#/sec]\n", float64(s.completed)/timeTaken)
  329. fmt.Printf("Transfer rate: %.2f [Kbytes/sec]\n", float64(s.transferred)/1024/timeTaken)
  330. n, sum := 0, 0
  331. min, max := 10000000, 0
  332. for i := 0; i < len(s.data); i++ {
  333. n += s.data[i]
  334. sum += s.data[i] * i
  335. if s.data[i] > 0 {
  336. if min > i {
  337. min = i
  338. }
  339. if max < i {
  340. max = i
  341. }
  342. }
  343. }
  344. n += len(s.overflow)
  345. for i := 0; i < len(s.overflow); i++ {
  346. sum += s.overflow[i]
  347. if min > s.overflow[i] {
  348. min = s.overflow[i]
  349. }
  350. if max < s.overflow[i] {
  351. max = s.overflow[i]
  352. }
  353. }
  354. avg := float64(sum) / float64(n)
  355. varianceSum := 0.0
  356. for i := 0; i < len(s.data); i++ {
  357. if s.data[i] > 0 {
  358. d := float64(i) - avg
  359. varianceSum += d * d * float64(s.data[i])
  360. }
  361. }
  362. for i := 0; i < len(s.overflow); i++ {
  363. d := float64(s.overflow[i]) - avg
  364. varianceSum += d * d
  365. }
  366. std := math.Sqrt(varianceSum / float64(n))
  367. fmt.Printf("\nConnection Times (ms)\n")
  368. fmt.Printf(" min avg max std\n")
  369. fmt.Printf("Total: %2.1f %3.1f %3.1f %3.1f\n", float32(min)/10, float32(avg)/10, float32(max)/10, std/10)
  370. //printing percentiles
  371. fmt.Printf("\nPercentage of the requests served within a certain time (ms)\n")
  372. percentiles := make([]int, len(percentages))
  373. for i := 0; i < len(percentages); i++ {
  374. percentiles[i] = n * percentages[i] / 100
  375. }
  376. percentiles[len(percentiles)-1] = n
  377. percentileIndex := 0
  378. currentSum := 0
  379. for i := 0; i < len(s.data); i++ {
  380. currentSum += s.data[i]
  381. if s.data[i] > 0 && percentileIndex < len(percentiles) && currentSum >= percentiles[percentileIndex] {
  382. fmt.Printf(" %3d%% %5.1f ms\n", percentages[percentileIndex], float32(i)/10.0)
  383. percentileIndex++
  384. for percentileIndex < len(percentiles) && currentSum >= percentiles[percentileIndex] {
  385. percentileIndex++
  386. }
  387. }
  388. }
  389. sort.Ints(s.overflow)
  390. for i := 0; i < len(s.overflow); i++ {
  391. currentSum++
  392. if percentileIndex < len(percentiles) && currentSum >= percentiles[percentileIndex] {
  393. fmt.Printf(" %3d%% %5.1f ms\n", percentages[percentileIndex], float32(s.overflow[i])/10.0)
  394. percentileIndex++
  395. for percentileIndex < len(percentiles) && currentSum >= percentiles[percentileIndex] {
  396. percentileIndex++
  397. }
  398. }
  399. }
  400. }
  401. // a fake reader to generate content to upload
  402. type FakeReader struct {
  403. id uint64 // an id number
  404. size int64 // max bytes
  405. }
  406. func (l *FakeReader) Read(p []byte) (n int, err error) {
  407. if l.size <= 0 {
  408. return 0, io.EOF
  409. }
  410. if int64(len(p)) > l.size {
  411. n = int(l.size)
  412. } else {
  413. n = len(p)
  414. }
  415. for i := 0; i < n-8; i += 8 {
  416. for s := uint(0); s < 8; s++ {
  417. p[i] = byte(l.id >> (s * 8))
  418. }
  419. }
  420. l.size -= int64(n)
  421. return
  422. }
  423. func Readln(r *bufio.Reader) ([]byte, error) {
  424. var (
  425. isPrefix bool = true
  426. err error = nil
  427. line, ln []byte
  428. )
  429. for isPrefix && err == nil {
  430. line, isPrefix, err = r.ReadLine()
  431. ln = append(ln, line...)
  432. }
  433. return ln, err
  434. }