You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

498 lines
14 KiB

  1. package main
  2. import (
  3. "bufio"
  4. "code.google.com/p/weed-fs/go/glog"
  5. "code.google.com/p/weed-fs/go/operation"
  6. "code.google.com/p/weed-fs/go/util"
  7. "fmt"
  8. "io"
  9. "math"
  10. "math/rand"
  11. "os"
  12. "runtime"
  13. "runtime/pprof"
  14. "sort"
  15. "strings"
  16. "sync"
  17. "time"
  18. )
  19. type BenchmarkOptions struct {
  20. server *string
  21. concurrency *int
  22. numberOfFiles *int
  23. fileSize *int
  24. idListFile *string
  25. write *bool
  26. deletePercentage *int
  27. read *bool
  28. sequentialRead *bool
  29. collection *string
  30. cpuprofile *string
  31. vid2server map[string]string //cache for vid locations
  32. }
  33. var (
  34. b BenchmarkOptions
  35. )
  36. func init() {
  37. cmdBenchmark.Run = runbenchmark // break init cycle
  38. cmdBenchmark.IsDebug = cmdBenchmark.Flag.Bool("debug", false, "verbose debug information")
  39. b.server = cmdBenchmark.Flag.String("server", "localhost:9333", "weedfs master location")
  40. b.concurrency = cmdBenchmark.Flag.Int("c", 16, "number of concurrent write or read processes")
  41. b.fileSize = cmdBenchmark.Flag.Int("size", 1024, "simulated file size in bytes, with random(0~63) bytes padding")
  42. b.numberOfFiles = cmdBenchmark.Flag.Int("n", 1024*1024, "number of files to write for each thread")
  43. b.idListFile = cmdBenchmark.Flag.String("list", os.TempDir()+"/benchmark_list.txt", "list of uploaded file ids")
  44. b.write = cmdBenchmark.Flag.Bool("write", true, "enable write")
  45. b.deletePercentage = cmdBenchmark.Flag.Int("deletePercent", 0, "the percent of writes that are deletes")
  46. b.read = cmdBenchmark.Flag.Bool("read", true, "enable read")
  47. b.sequentialRead = cmdBenchmark.Flag.Bool("readSequentially", false, "randomly read by ids from \"-list\" specified file")
  48. b.collection = cmdBenchmark.Flag.String("collection", "benchmark", "write data to this collection")
  49. b.cpuprofile = cmdBenchmark.Flag.String("cpuprofile", "", "write cpu profile to file")
  50. b.vid2server = make(map[string]string)
  51. }
  52. var cmdBenchmark = &Command{
  53. UsageLine: "benchmark -server=localhost:9333 -c=10 -n=100000",
  54. Short: "benchmark on writing millions of files and read out",
  55. Long: `benchmark on an empty weed file system.
  56. Two tests during benchmark:
  57. 1) write lots of small files to the system
  58. 2) read the files out
  59. The file content is mostly zero, but no compression is done.
  60. You can choose to only benchmark read or write.
  61. During write, the list of uploaded file ids is stored in "-list" specified file.
  62. You can also use your own list of file ids to run read test.
  63. Write speed and read speed will be collected.
  64. The numbers are used to get a sense of the system.
  65. Usually your network or the hard drive is the real bottleneck.
  66. Another thing to watch is whether the volumes are evenly distributed
  67. to each volume server. Because the 7 more benchmark volumes are randomly distributed
  68. to servers with free slots, it's highly possible some servers have uneven amount of
  69. benchmark volumes. To remedy this, you can use this to grow the benchmark volumes
  70. before starting the benchmark command:
  71. http://localhost:9333/vol/grow?collection=benchmark&count=5
  72. After benchmarking, you can clean up the written data by deleting the benchmark collection
  73. http://localhost:9333/col/delete?collection=benchmark
  74. `,
  75. }
  76. var (
  77. wait sync.WaitGroup
  78. writeStats *stats
  79. readStats *stats
  80. serverLimitChan map[string]chan bool
  81. )
  82. func init() {
  83. serverLimitChan = make(map[string]chan bool)
  84. }
  85. func runbenchmark(cmd *Command, args []string) bool {
  86. fmt.Printf("This is Weed File System version %s %s %s\n", VERSION, runtime.GOOS, runtime.GOARCH)
  87. if *b.cpuprofile != "" {
  88. f, err := os.Create(*b.cpuprofile)
  89. if err != nil {
  90. glog.Fatal(err)
  91. }
  92. pprof.StartCPUProfile(f)
  93. defer pprof.StopCPUProfile()
  94. }
  95. if *b.write {
  96. bench_write()
  97. }
  98. if *b.read {
  99. bench_read()
  100. }
  101. return true
  102. }
  103. func bench_write() {
  104. fileIdLineChan := make(chan string)
  105. finishChan := make(chan bool)
  106. writeStats = newStats()
  107. idChan := make(chan int)
  108. wait.Add(*b.concurrency)
  109. go writeFileIds(*b.idListFile, fileIdLineChan, finishChan)
  110. for i := 0; i < *b.concurrency; i++ {
  111. go writeFiles(idChan, fileIdLineChan, writeStats)
  112. }
  113. writeStats.start = time.Now()
  114. writeStats.total = *b.numberOfFiles
  115. go writeStats.checkProgress("Writing Benchmark", finishChan)
  116. for i := 0; i < *b.numberOfFiles; i++ {
  117. idChan <- i
  118. }
  119. close(idChan)
  120. wait.Wait()
  121. writeStats.end = time.Now()
  122. wait.Add(1)
  123. finishChan <- true
  124. finishChan <- true
  125. close(finishChan)
  126. wait.Wait()
  127. writeStats.printStats()
  128. }
  129. func bench_read() {
  130. fileIdLineChan := make(chan string)
  131. finishChan := make(chan bool)
  132. readStats = newStats()
  133. wait.Add(*b.concurrency)
  134. go readFileIds(*b.idListFile, fileIdLineChan)
  135. readStats.start = time.Now()
  136. readStats.total = *b.numberOfFiles
  137. go readStats.checkProgress("Randomly Reading Benchmark", finishChan)
  138. for i := 0; i < *b.concurrency; i++ {
  139. go readFiles(fileIdLineChan, readStats)
  140. }
  141. wait.Wait()
  142. finishChan <- true
  143. close(finishChan)
  144. readStats.end = time.Now()
  145. readStats.printStats()
  146. }
  147. func writeFiles(idChan chan int, fileIdLineChan chan string, s *stats) {
  148. for {
  149. if id, ok := <-idChan; ok {
  150. start := time.Now()
  151. fileSize := int64(*b.fileSize + rand.Intn(64))
  152. fp := &operation.FilePart{Reader: &FakeReader{id: uint64(id), size: fileSize}, FileSize: fileSize}
  153. if assignResult, err := operation.Assign(*b.server, 1, "", *b.collection); err == nil {
  154. fp.Server, fp.Fid, fp.Collection = assignResult.PublicUrl, assignResult.Fid, *b.collection
  155. if _, ok := serverLimitChan[fp.Server]; !ok {
  156. serverLimitChan[fp.Server] = make(chan bool, 7)
  157. }
  158. serverLimitChan[fp.Server] <- true
  159. if _, err := fp.Upload(0, *b.server); err == nil {
  160. if rand.Intn(100) < *b.deletePercentage {
  161. s.total++
  162. go func() {
  163. time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
  164. serverLimitChan[fp.Server] <- true
  165. if e := operation.DeleteFile(*b.server, fp.Fid); e == nil {
  166. s.completed++
  167. } else {
  168. s.failed++
  169. }
  170. <-serverLimitChan[fp.Server]
  171. }()
  172. } else {
  173. fileIdLineChan <- fp.Fid
  174. }
  175. s.completed++
  176. s.transferred += fileSize
  177. } else {
  178. s.failed++
  179. }
  180. writeStats.addSample(time.Now().Sub(start))
  181. <-serverLimitChan[fp.Server]
  182. if *cmdBenchmark.IsDebug {
  183. fmt.Printf("writing %d file %s\n", id, fp.Fid)
  184. }
  185. } else {
  186. s.failed++
  187. println("writing file error:", err.Error())
  188. }
  189. } else {
  190. break
  191. }
  192. }
  193. //wait for the deleting goroutines
  194. time.Sleep(time.Duration(1500) * time.Millisecond)
  195. wait.Done()
  196. }
  197. func readFiles(fileIdLineChan chan string, s *stats) {
  198. serverLimitChan := make(map[string]chan bool)
  199. masterLimitChan := make(chan bool, 1)
  200. for {
  201. if fid, ok := <-fileIdLineChan; ok {
  202. if len(fid) == 0 {
  203. continue
  204. }
  205. if fid[0] == '#' {
  206. continue
  207. }
  208. if *cmdBenchmark.IsDebug {
  209. fmt.Printf("reading file %s\n", fid)
  210. }
  211. parts := strings.SplitN(fid, ",", 2)
  212. vid := parts[0]
  213. start := time.Now()
  214. if server, ok := b.vid2server[vid]; !ok {
  215. masterLimitChan <- true
  216. if _, now_ok := b.vid2server[vid]; !now_ok {
  217. if ret, err := operation.Lookup(*b.server, vid); err == nil {
  218. if len(ret.Locations) > 0 {
  219. server = ret.Locations[0].PublicUrl
  220. b.vid2server[vid] = server
  221. }
  222. }
  223. }
  224. <-masterLimitChan
  225. }
  226. if server, ok := b.vid2server[vid]; ok {
  227. if _, ok := serverLimitChan[server]; !ok {
  228. serverLimitChan[server] = make(chan bool, 7)
  229. }
  230. serverLimitChan[server] <- true
  231. url := "http://" + server + "/" + fid
  232. if bytesRead, err := util.Get(url); err == nil {
  233. s.completed++
  234. s.transferred += int64(len(bytesRead))
  235. readStats.addSample(time.Now().Sub(start))
  236. } else {
  237. s.failed++
  238. println("!!!! Failed to read from ", url, " !!!!!")
  239. }
  240. <-serverLimitChan[server]
  241. } else {
  242. s.failed++
  243. println("!!!! volume id ", vid, " location not found!!!!!")
  244. }
  245. } else {
  246. break
  247. }
  248. }
  249. wait.Done()
  250. }
  251. func writeFileIds(fileName string, fileIdLineChan chan string, finishChan chan bool) {
  252. file, err := os.OpenFile(fileName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
  253. if err != nil {
  254. glog.Fatalf("File to create file %s: %s\n", fileName, err)
  255. }
  256. defer file.Close()
  257. for {
  258. select {
  259. case <-finishChan:
  260. wait.Done()
  261. return
  262. case line := <-fileIdLineChan:
  263. file.Write([]byte(line))
  264. file.Write([]byte("\n"))
  265. }
  266. }
  267. }
  268. func readFileIds(fileName string, fileIdLineChan chan string) {
  269. file, err := os.Open(fileName) // For read access.
  270. if err != nil {
  271. glog.Fatalf("File to read file %s: %s\n", fileName, err)
  272. }
  273. defer file.Close()
  274. r := bufio.NewReader(file)
  275. if *b.sequentialRead {
  276. for {
  277. if line, err := Readln(r); err == nil {
  278. fileIdLineChan <- string(line)
  279. } else {
  280. break
  281. }
  282. }
  283. } else {
  284. lines := make([]string, 0, *b.numberOfFiles)
  285. for {
  286. if line, err := Readln(r); err == nil {
  287. lines = append(lines, string(line))
  288. } else {
  289. break
  290. }
  291. }
  292. if len(lines) > 0 {
  293. for i := 0; i < *b.numberOfFiles; i++ {
  294. fileIdLineChan <- lines[rand.Intn(len(lines))]
  295. }
  296. }
  297. }
  298. close(fileIdLineChan)
  299. }
  300. const (
  301. benchResolution = 10000 //0.1 microsecond
  302. benchBucket = 1000000000 / benchResolution
  303. )
  304. // An efficient statics collecting and rendering
  305. type stats struct {
  306. data []int
  307. overflow []int
  308. completed int
  309. failed int
  310. total int
  311. transferred int64
  312. start time.Time
  313. end time.Time
  314. }
  315. var percentages = []int{50, 66, 75, 80, 90, 95, 98, 99, 100}
  316. func newStats() *stats {
  317. return &stats{data: make([]int, benchResolution), overflow: make([]int, 0)}
  318. }
  319. func (s *stats) addSample(d time.Duration) {
  320. index := int(d / benchBucket)
  321. if index < 0 {
  322. fmt.Printf("This request takes %3.1f seconds, skipping!\n", float64(index)/10000)
  323. } else if index < len(s.data) {
  324. s.data[int(d/benchBucket)]++
  325. } else {
  326. s.overflow = append(s.overflow, index)
  327. }
  328. }
  329. func (s *stats) checkProgress(testName string, finishChan chan bool) {
  330. fmt.Printf("\n------------ %s ----------\n", testName)
  331. ticker := time.Tick(time.Second)
  332. lastCompleted, lastTransferred, lastTime := 0, int64(0), time.Now()
  333. for {
  334. select {
  335. case <-finishChan:
  336. return
  337. case t := <-ticker:
  338. completed, transferred, taken := s.completed-lastCompleted, s.transferred-lastTransferred, t.Sub(lastTime)
  339. fmt.Printf("Completed %d of %d requests, %3.1f%% %3.1f/s %3.1fMB/s\n",
  340. s.completed, s.total, float64(s.completed)*100/float64(s.total),
  341. float64(completed)*float64(int64(time.Second))/float64(int64(taken)),
  342. float64(transferred)*float64(int64(time.Second))/float64(int64(taken))/float64(1024*1024),
  343. )
  344. lastCompleted, lastTransferred, lastTime = s.completed, s.transferred, t
  345. }
  346. }
  347. }
  348. func (s *stats) printStats() {
  349. timeTaken := float64(int64(s.end.Sub(s.start))) / 1000000000
  350. fmt.Printf("\nConcurrency Level: %d\n", *b.concurrency)
  351. fmt.Printf("Time taken for tests: %.3f seconds\n", timeTaken)
  352. fmt.Printf("Complete requests: %d\n", s.completed)
  353. fmt.Printf("Failed requests: %d\n", s.failed)
  354. fmt.Printf("Total transferred: %d bytes\n", s.transferred)
  355. fmt.Printf("Requests per second: %.2f [#/sec]\n", float64(s.completed)/timeTaken)
  356. fmt.Printf("Transfer rate: %.2f [Kbytes/sec]\n", float64(s.transferred)/1024/timeTaken)
  357. n, sum := 0, 0
  358. min, max := 10000000, 0
  359. for i := 0; i < len(s.data); i++ {
  360. n += s.data[i]
  361. sum += s.data[i] * i
  362. if s.data[i] > 0 {
  363. if min > i {
  364. min = i
  365. }
  366. if max < i {
  367. max = i
  368. }
  369. }
  370. }
  371. n += len(s.overflow)
  372. for i := 0; i < len(s.overflow); i++ {
  373. sum += s.overflow[i]
  374. if min > s.overflow[i] {
  375. min = s.overflow[i]
  376. }
  377. if max < s.overflow[i] {
  378. max = s.overflow[i]
  379. }
  380. }
  381. avg := float64(sum) / float64(n)
  382. varianceSum := 0.0
  383. for i := 0; i < len(s.data); i++ {
  384. if s.data[i] > 0 {
  385. d := float64(i) - avg
  386. varianceSum += d * d * float64(s.data[i])
  387. }
  388. }
  389. for i := 0; i < len(s.overflow); i++ {
  390. d := float64(s.overflow[i]) - avg
  391. varianceSum += d * d
  392. }
  393. std := math.Sqrt(varianceSum / float64(n))
  394. fmt.Printf("\nConnection Times (ms)\n")
  395. fmt.Printf(" min avg max std\n")
  396. fmt.Printf("Total: %2.1f %3.1f %3.1f %3.1f\n", float32(min)/10, float32(avg)/10, float32(max)/10, std/10)
  397. //printing percentiles
  398. fmt.Printf("\nPercentage of the requests served within a certain time (ms)\n")
  399. percentiles := make([]int, len(percentages))
  400. for i := 0; i < len(percentages); i++ {
  401. percentiles[i] = n * percentages[i] / 100
  402. }
  403. percentiles[len(percentiles)-1] = n
  404. percentileIndex := 0
  405. currentSum := 0
  406. for i := 0; i < len(s.data); i++ {
  407. currentSum += s.data[i]
  408. if s.data[i] > 0 && percentileIndex < len(percentiles) && currentSum >= percentiles[percentileIndex] {
  409. fmt.Printf(" %3d%% %5.1f ms\n", percentages[percentileIndex], float32(i)/10.0)
  410. percentileIndex++
  411. for percentileIndex < len(percentiles) && currentSum >= percentiles[percentileIndex] {
  412. percentileIndex++
  413. }
  414. }
  415. }
  416. sort.Ints(s.overflow)
  417. for i := 0; i < len(s.overflow); i++ {
  418. currentSum++
  419. if percentileIndex < len(percentiles) && currentSum >= percentiles[percentileIndex] {
  420. fmt.Printf(" %3d%% %5.1f ms\n", percentages[percentileIndex], float32(s.overflow[i])/10.0)
  421. percentileIndex++
  422. for percentileIndex < len(percentiles) && currentSum >= percentiles[percentileIndex] {
  423. percentileIndex++
  424. }
  425. }
  426. }
  427. }
  428. // a fake reader to generate content to upload
  429. type FakeReader struct {
  430. id uint64 // an id number
  431. size int64 // max bytes
  432. }
  433. func (l *FakeReader) Read(p []byte) (n int, err error) {
  434. if l.size <= 0 {
  435. return 0, io.EOF
  436. }
  437. if int64(len(p)) > l.size {
  438. n = int(l.size)
  439. } else {
  440. n = len(p)
  441. }
  442. for i := 0; i < n-8; i += 8 {
  443. for s := uint(0); s < 8; s++ {
  444. p[i] = byte(l.id >> (s * 8))
  445. }
  446. }
  447. l.size -= int64(n)
  448. return
  449. }
  450. func Readln(r *bufio.Reader) ([]byte, error) {
  451. var (
  452. isPrefix bool = true
  453. err error = nil
  454. line, ln []byte
  455. )
  456. for isPrefix && err == nil {
  457. line, isPrefix, err = r.ReadLine()
  458. ln = append(ln, line...)
  459. }
  460. return ln, err
  461. }