You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

566 lines
15 KiB

  1. package main
  2. import (
  3. "bufio"
  4. "fmt"
  5. "io"
  6. "math"
  7. "math/rand"
  8. "os"
  9. "runtime"
  10. "runtime/pprof"
  11. "sort"
  12. "strings"
  13. "sync"
  14. "time"
  15. "github.com/chrislusf/weed-fs/go/glog"
  16. "github.com/chrislusf/weed-fs/go/operation"
  17. "github.com/chrislusf/weed-fs/go/util"
  18. )
  19. type BenchmarkOptions struct {
  20. server *string
  21. concurrency *int
  22. numberOfFiles *int
  23. fileSize *int
  24. idListFile *string
  25. write *bool
  26. deletePercentage *int
  27. read *bool
  28. sequentialRead *bool
  29. collection *string
  30. cpuprofile *string
  31. maxCpu *int
  32. vid2server map[string]string //cache for vid locations
  33. }
  34. var (
  35. b BenchmarkOptions
  36. sharedBytes []byte
  37. )
  38. func init() {
  39. cmdBenchmark.Run = runbenchmark // break init cycle
  40. cmdBenchmark.IsDebug = cmdBenchmark.Flag.Bool("debug", false, "verbose debug information")
  41. b.server = cmdBenchmark.Flag.String("server", "localhost:9333", "weedfs master location")
  42. b.concurrency = cmdBenchmark.Flag.Int("c", 16, "number of concurrent write or read processes")
  43. b.fileSize = cmdBenchmark.Flag.Int("size", 1024, "simulated file size in bytes, with random(0~63) bytes padding")
  44. b.numberOfFiles = cmdBenchmark.Flag.Int("n", 1024*1024, "number of files to write for each thread")
  45. b.idListFile = cmdBenchmark.Flag.String("list", os.TempDir()+"/benchmark_list.txt", "list of uploaded file ids")
  46. b.write = cmdBenchmark.Flag.Bool("write", true, "enable write")
  47. b.deletePercentage = cmdBenchmark.Flag.Int("deletePercent", 0, "the percent of writes that are deletes")
  48. b.read = cmdBenchmark.Flag.Bool("read", true, "enable read")
  49. b.sequentialRead = cmdBenchmark.Flag.Bool("readSequentially", false, "randomly read by ids from \"-list\" specified file")
  50. b.collection = cmdBenchmark.Flag.String("collection", "benchmark", "write data to this collection")
  51. b.cpuprofile = cmdBenchmark.Flag.String("cpuprofile", "", "cpu profile output file")
  52. b.maxCpu = cmdBenchmark.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs")
  53. b.vid2server = make(map[string]string)
  54. sharedBytes = make([]byte, 1024)
  55. }
  56. var cmdBenchmark = &Command{
  57. UsageLine: "benchmark -server=localhost:9333 -c=10 -n=100000",
  58. Short: "benchmark on writing millions of files and read out",
  59. Long: `benchmark on an empty weed file system.
  60. Two tests during benchmark:
  61. 1) write lots of small files to the system
  62. 2) read the files out
  63. The file content is mostly zero, but no compression is done.
  64. You can choose to only benchmark read or write.
  65. During write, the list of uploaded file ids is stored in "-list" specified file.
  66. You can also use your own list of file ids to run read test.
  67. Write speed and read speed will be collected.
  68. The numbers are used to get a sense of the system.
  69. Usually your network or the hard drive is the real bottleneck.
  70. Another thing to watch is whether the volumes are evenly distributed
  71. to each volume server. Because the 7 more benchmark volumes are randomly distributed
  72. to servers with free slots, it's highly possible some servers have uneven amount of
  73. benchmark volumes. To remedy this, you can use this to grow the benchmark volumes
  74. before starting the benchmark command:
  75. http://localhost:9333/vol/grow?collection=benchmark&count=5
  76. After benchmarking, you can clean up the written data by deleting the benchmark collection
  77. http://localhost:9333/col/delete?collection=benchmark
  78. `,
  79. }
  80. var (
  81. wait sync.WaitGroup
  82. writeStats *stats
  83. readStats *stats
  84. serverLimitChan map[string]chan bool
  85. )
  86. func init() {
  87. serverLimitChan = make(map[string]chan bool)
  88. }
  89. func runbenchmark(cmd *Command, args []string) bool {
  90. fmt.Printf("This is Seaweed File System version %s %s %s\n", util.VERSION, runtime.GOOS, runtime.GOARCH)
  91. if *b.maxCpu < 1 {
  92. *b.maxCpu = runtime.NumCPU()
  93. }
  94. runtime.GOMAXPROCS(*b.maxCpu)
  95. if *b.cpuprofile != "" {
  96. f, err := os.Create(*b.cpuprofile)
  97. if err != nil {
  98. glog.Fatal(err)
  99. }
  100. pprof.StartCPUProfile(f)
  101. defer pprof.StopCPUProfile()
  102. }
  103. if *b.write {
  104. bench_write()
  105. }
  106. if *b.read {
  107. bench_read()
  108. }
  109. return true
  110. }
  111. func bench_write() {
  112. fileIdLineChan := make(chan string)
  113. finishChan := make(chan bool)
  114. writeStats = newStats(*b.concurrency)
  115. idChan := make(chan int)
  116. go writeFileIds(*b.idListFile, fileIdLineChan, finishChan)
  117. for i := 0; i < *b.concurrency; i++ {
  118. wait.Add(1)
  119. go writeFiles(idChan, fileIdLineChan, &writeStats.localStats[i])
  120. }
  121. writeStats.start = time.Now()
  122. writeStats.total = *b.numberOfFiles
  123. go writeStats.checkProgress("Writing Benchmark", finishChan)
  124. for i := 0; i < *b.numberOfFiles; i++ {
  125. idChan <- i
  126. }
  127. close(idChan)
  128. wait.Wait()
  129. writeStats.end = time.Now()
  130. wait.Add(1)
  131. finishChan <- true
  132. close(finishChan)
  133. wait.Wait()
  134. writeStats.printStats()
  135. }
  136. func bench_read() {
  137. fileIdLineChan := make(chan string)
  138. finishChan := make(chan bool)
  139. readStats = newStats(*b.concurrency)
  140. go readFileIds(*b.idListFile, fileIdLineChan)
  141. readStats.start = time.Now()
  142. readStats.total = *b.numberOfFiles
  143. go readStats.checkProgress("Randomly Reading Benchmark", finishChan)
  144. for i := 0; i < *b.concurrency; i++ {
  145. wait.Add(1)
  146. go readFiles(fileIdLineChan, &readStats.localStats[i])
  147. }
  148. wait.Wait()
  149. finishChan <- true
  150. close(finishChan)
  151. readStats.end = time.Now()
  152. readStats.printStats()
  153. }
  154. type delayedFile struct {
  155. enterTime time.Time
  156. fp *operation.FilePart
  157. }
  158. func writeFiles(idChan chan int, fileIdLineChan chan string, s *stat) {
  159. defer wait.Done()
  160. delayedDeleteChan := make(chan *delayedFile, 100)
  161. var waitForDeletions sync.WaitGroup
  162. for i := 0; i < 7; i++ {
  163. waitForDeletions.Add(1)
  164. go func() {
  165. for df := range delayedDeleteChan {
  166. if df == nil {
  167. break
  168. }
  169. if df.enterTime.After(time.Now()) {
  170. time.Sleep(df.enterTime.Sub(time.Now()))
  171. }
  172. fp := df.fp
  173. serverLimitChan[fp.Server] <- true
  174. if e := util.Delete("http://" + fp.Server + "/" + fp.Fid); e == nil {
  175. s.completed++
  176. } else {
  177. s.failed++
  178. }
  179. <-serverLimitChan[fp.Server]
  180. }
  181. waitForDeletions.Done()
  182. }()
  183. }
  184. for {
  185. if id, ok := <-idChan; ok {
  186. start := time.Now()
  187. fileSize := int64(*b.fileSize + rand.Intn(64))
  188. fp := &operation.FilePart{Reader: &FakeReader{id: uint64(id), size: fileSize}, FileSize: fileSize}
  189. if assignResult, err := operation.Assign(*b.server, 1, "", *b.collection, ""); err == nil {
  190. fp.Server, fp.Fid, fp.Collection = assignResult.PublicUrl, assignResult.Fid, *b.collection
  191. if _, ok := serverLimitChan[fp.Server]; !ok {
  192. serverLimitChan[fp.Server] = make(chan bool, 7)
  193. }
  194. serverLimitChan[fp.Server] <- true
  195. if _, err := fp.Upload(0, *b.server); err == nil {
  196. if rand.Intn(100) < *b.deletePercentage {
  197. s.total++
  198. delayedDeleteChan <- &delayedFile{time.Now().Add(time.Second), fp}
  199. } else {
  200. fileIdLineChan <- fp.Fid
  201. }
  202. s.completed++
  203. s.transferred += fileSize
  204. } else {
  205. s.failed++
  206. fmt.Printf("Failed to write with error:%v\n", err)
  207. }
  208. writeStats.addSample(time.Now().Sub(start))
  209. <-serverLimitChan[fp.Server]
  210. if *cmdBenchmark.IsDebug {
  211. fmt.Printf("writing %d file %s\n", id, fp.Fid)
  212. }
  213. } else {
  214. s.failed++
  215. println("writing file error:", err.Error())
  216. }
  217. } else {
  218. break
  219. }
  220. }
  221. close(delayedDeleteChan)
  222. waitForDeletions.Wait()
  223. }
  224. func readFiles(fileIdLineChan chan string, s *stat) {
  225. defer wait.Done()
  226. serverLimitChan := make(map[string]chan bool)
  227. masterLimitChan := make(chan bool, 1)
  228. for {
  229. if fid, ok := <-fileIdLineChan; ok {
  230. if len(fid) == 0 {
  231. continue
  232. }
  233. if fid[0] == '#' {
  234. continue
  235. }
  236. if *cmdBenchmark.IsDebug {
  237. fmt.Printf("reading file %s\n", fid)
  238. }
  239. parts := strings.SplitN(fid, ",", 2)
  240. vid := parts[0]
  241. start := time.Now()
  242. if server, ok := b.vid2server[vid]; !ok {
  243. masterLimitChan <- true
  244. if _, now_ok := b.vid2server[vid]; !now_ok {
  245. if ret, err := operation.Lookup(*b.server, vid); err == nil {
  246. if len(ret.Locations) > 0 {
  247. server = ret.Locations[0].PublicUrl
  248. b.vid2server[vid] = server
  249. }
  250. }
  251. }
  252. <-masterLimitChan
  253. }
  254. if server, ok := b.vid2server[vid]; ok {
  255. if _, ok := serverLimitChan[server]; !ok {
  256. serverLimitChan[server] = make(chan bool, 7)
  257. }
  258. serverLimitChan[server] <- true
  259. url := "http://" + server + "/" + fid
  260. if bytesRead, err := util.Get(url); err == nil {
  261. s.completed++
  262. s.transferred += int64(len(bytesRead))
  263. readStats.addSample(time.Now().Sub(start))
  264. } else {
  265. s.failed++
  266. fmt.Printf("Failed to read %s error:%v\n", url, err)
  267. }
  268. <-serverLimitChan[server]
  269. } else {
  270. s.failed++
  271. println("!!!! volume id ", vid, " location not found!!!!!")
  272. }
  273. } else {
  274. break
  275. }
  276. }
  277. }
  278. func writeFileIds(fileName string, fileIdLineChan chan string, finishChan chan bool) {
  279. file, err := os.OpenFile(fileName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
  280. if err != nil {
  281. glog.Fatalf("File to create file %s: %s\n", fileName, err)
  282. }
  283. defer file.Close()
  284. for {
  285. select {
  286. case <-finishChan:
  287. wait.Done()
  288. return
  289. case line := <-fileIdLineChan:
  290. file.Write([]byte(line))
  291. file.Write([]byte("\n"))
  292. }
  293. }
  294. }
  295. func readFileIds(fileName string, fileIdLineChan chan string) {
  296. file, err := os.Open(fileName) // For read access.
  297. if err != nil {
  298. glog.Fatalf("File to read file %s: %s\n", fileName, err)
  299. }
  300. defer file.Close()
  301. r := bufio.NewReader(file)
  302. if *b.sequentialRead {
  303. for {
  304. if line, err := Readln(r); err == nil {
  305. fileIdLineChan <- string(line)
  306. } else {
  307. break
  308. }
  309. }
  310. } else {
  311. lines := make([]string, 0, readStats.total)
  312. for {
  313. if line, err := Readln(r); err == nil {
  314. lines = append(lines, string(line))
  315. } else {
  316. break
  317. }
  318. }
  319. if len(lines) > 0 {
  320. for i := 0; i < readStats.total; i++ {
  321. fileIdLineChan <- lines[rand.Intn(len(lines))]
  322. }
  323. }
  324. }
  325. close(fileIdLineChan)
  326. }
  327. const (
  328. benchResolution = 10000 //0.1 microsecond
  329. benchBucket = 1000000000 / benchResolution
  330. )
  331. // An efficient statics collecting and rendering
  332. type stats struct {
  333. data []int
  334. overflow []int
  335. localStats []stat
  336. start time.Time
  337. end time.Time
  338. total int
  339. }
  340. type stat struct {
  341. completed int
  342. failed int
  343. total int
  344. transferred int64
  345. }
  346. var percentages = []int{50, 66, 75, 80, 90, 95, 98, 99, 100}
  347. func newStats(n int) *stats {
  348. return &stats{
  349. data: make([]int, benchResolution),
  350. overflow: make([]int, 0),
  351. localStats: make([]stat, n),
  352. }
  353. }
  354. func (s *stats) addSample(d time.Duration) {
  355. index := int(d / benchBucket)
  356. if index < 0 {
  357. fmt.Printf("This request takes %3.1f seconds, skipping!\n", float64(index)/10000)
  358. } else if index < len(s.data) {
  359. s.data[int(d/benchBucket)]++
  360. } else {
  361. s.overflow = append(s.overflow, index)
  362. }
  363. }
  364. func (s *stats) checkProgress(testName string, finishChan chan bool) {
  365. fmt.Printf("\n------------ %s ----------\n", testName)
  366. ticker := time.Tick(time.Second)
  367. lastCompleted, lastTransferred, lastTime := 0, int64(0), time.Now()
  368. for {
  369. select {
  370. case <-finishChan:
  371. return
  372. case t := <-ticker:
  373. completed, transferred, taken, total := 0, int64(0), t.Sub(lastTime), s.total
  374. for _, localStat := range s.localStats {
  375. completed += localStat.completed
  376. transferred += localStat.transferred
  377. total += localStat.total
  378. }
  379. fmt.Printf("Completed %d of %d requests, %3.1f%% %3.1f/s %3.1fMB/s\n",
  380. completed, total, float64(completed)*100/float64(total),
  381. float64(completed-lastCompleted)*float64(int64(time.Second))/float64(int64(taken)),
  382. float64(transferred-lastTransferred)*float64(int64(time.Second))/float64(int64(taken))/float64(1024*1024),
  383. )
  384. lastCompleted, lastTransferred, lastTime = completed, transferred, t
  385. }
  386. }
  387. }
  388. func (s *stats) printStats() {
  389. completed, failed, transferred, total := 0, 0, int64(0), s.total
  390. for _, localStat := range s.localStats {
  391. completed += localStat.completed
  392. failed += localStat.failed
  393. transferred += localStat.transferred
  394. total += localStat.total
  395. }
  396. timeTaken := float64(int64(s.end.Sub(s.start))) / 1000000000
  397. fmt.Printf("\nConcurrency Level: %d\n", *b.concurrency)
  398. fmt.Printf("Time taken for tests: %.3f seconds\n", timeTaken)
  399. fmt.Printf("Complete requests: %d\n", completed)
  400. fmt.Printf("Failed requests: %d\n", failed)
  401. fmt.Printf("Total transferred: %d bytes\n", transferred)
  402. fmt.Printf("Requests per second: %.2f [#/sec]\n", float64(completed)/timeTaken)
  403. fmt.Printf("Transfer rate: %.2f [Kbytes/sec]\n", float64(transferred)/1024/timeTaken)
  404. n, sum := 0, 0
  405. min, max := 10000000, 0
  406. for i := 0; i < len(s.data); i++ {
  407. n += s.data[i]
  408. sum += s.data[i] * i
  409. if s.data[i] > 0 {
  410. if min > i {
  411. min = i
  412. }
  413. if max < i {
  414. max = i
  415. }
  416. }
  417. }
  418. n += len(s.overflow)
  419. for i := 0; i < len(s.overflow); i++ {
  420. sum += s.overflow[i]
  421. if min > s.overflow[i] {
  422. min = s.overflow[i]
  423. }
  424. if max < s.overflow[i] {
  425. max = s.overflow[i]
  426. }
  427. }
  428. avg := float64(sum) / float64(n)
  429. varianceSum := 0.0
  430. for i := 0; i < len(s.data); i++ {
  431. if s.data[i] > 0 {
  432. d := float64(i) - avg
  433. varianceSum += d * d * float64(s.data[i])
  434. }
  435. }
  436. for i := 0; i < len(s.overflow); i++ {
  437. d := float64(s.overflow[i]) - avg
  438. varianceSum += d * d
  439. }
  440. std := math.Sqrt(varianceSum / float64(n))
  441. fmt.Printf("\nConnection Times (ms)\n")
  442. fmt.Printf(" min avg max std\n")
  443. fmt.Printf("Total: %2.1f %3.1f %3.1f %3.1f\n", float32(min)/10, float32(avg)/10, float32(max)/10, std/10)
  444. //printing percentiles
  445. fmt.Printf("\nPercentage of the requests served within a certain time (ms)\n")
  446. percentiles := make([]int, len(percentages))
  447. for i := 0; i < len(percentages); i++ {
  448. percentiles[i] = n * percentages[i] / 100
  449. }
  450. percentiles[len(percentiles)-1] = n
  451. percentileIndex := 0
  452. currentSum := 0
  453. for i := 0; i < len(s.data); i++ {
  454. currentSum += s.data[i]
  455. if s.data[i] > 0 && percentileIndex < len(percentiles) && currentSum >= percentiles[percentileIndex] {
  456. fmt.Printf(" %3d%% %5.1f ms\n", percentages[percentileIndex], float32(i)/10.0)
  457. percentileIndex++
  458. for percentileIndex < len(percentiles) && currentSum >= percentiles[percentileIndex] {
  459. percentileIndex++
  460. }
  461. }
  462. }
  463. sort.Ints(s.overflow)
  464. for i := 0; i < len(s.overflow); i++ {
  465. currentSum++
  466. if percentileIndex < len(percentiles) && currentSum >= percentiles[percentileIndex] {
  467. fmt.Printf(" %3d%% %5.1f ms\n", percentages[percentileIndex], float32(s.overflow[i])/10.0)
  468. percentileIndex++
  469. for percentileIndex < len(percentiles) && currentSum >= percentiles[percentileIndex] {
  470. percentileIndex++
  471. }
  472. }
  473. }
  474. }
  475. // a fake reader to generate content to upload
  476. type FakeReader struct {
  477. id uint64 // an id number
  478. size int64 // max bytes
  479. }
  480. func (l *FakeReader) Read(p []byte) (n int, err error) {
  481. if l.size <= 0 {
  482. return 0, io.EOF
  483. }
  484. if int64(len(p)) > l.size {
  485. n = int(l.size)
  486. } else {
  487. n = len(p)
  488. }
  489. if n >= 8 {
  490. for i := 0; i < 8; i++ {
  491. p[i] = byte(l.id >> uint(i*8))
  492. }
  493. }
  494. l.size -= int64(n)
  495. return
  496. }
  497. func (l *FakeReader) WriteTo(w io.Writer) (n int64, err error) {
  498. size := int(l.size)
  499. bufferSize := len(sharedBytes)
  500. for size > 0 {
  501. tempBuffer := sharedBytes
  502. if size < bufferSize {
  503. tempBuffer = sharedBytes[0:size]
  504. }
  505. count, e := w.Write(tempBuffer)
  506. if e != nil {
  507. return int64(size), e
  508. }
  509. size -= count
  510. }
  511. return l.size, nil
  512. }
  513. func Readln(r *bufio.Reader) ([]byte, error) {
  514. var (
  515. isPrefix bool = true
  516. err error = nil
  517. line, ln []byte
  518. )
  519. for isPrefix && err == nil {
  520. line, isPrefix, err = r.ReadLine()
  521. ln = append(ln, line...)
  522. }
  523. return ln, err
  524. }