You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

546 lines
15 KiB

10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
  1. package main
  2. import (
  3. "bufio"
  4. "fmt"
  5. "io"
  6. "math"
  7. "math/rand"
  8. "os"
  9. "runtime"
  10. "runtime/pprof"
  11. "sort"
  12. "strings"
  13. "sync"
  14. "time"
  15. "github.com/chrislusf/weed-fs/go/glog"
  16. "github.com/chrislusf/weed-fs/go/operation"
  17. "github.com/chrislusf/weed-fs/go/security"
  18. "github.com/chrislusf/weed-fs/go/util"
  19. )
  20. type BenchmarkOptions struct {
  21. server *string
  22. concurrency *int
  23. numberOfFiles *int
  24. fileSize *int
  25. idListFile *string
  26. write *bool
  27. deletePercentage *int
  28. read *bool
  29. sequentialRead *bool
  30. collection *string
  31. cpuprofile *string
  32. maxCpu *int
  33. secretKey *string
  34. vid2server map[string]string //cache for vid locations
  35. }
  36. var (
  37. b BenchmarkOptions
  38. sharedBytes []byte
  39. )
  40. func init() {
  41. cmdBenchmark.Run = runbenchmark // break init cycle
  42. cmdBenchmark.IsDebug = cmdBenchmark.Flag.Bool("debug", false, "verbose debug information")
  43. b.server = cmdBenchmark.Flag.String("server", "localhost:9333", "weedfs master location")
  44. b.concurrency = cmdBenchmark.Flag.Int("c", 16, "number of concurrent write or read processes")
  45. b.fileSize = cmdBenchmark.Flag.Int("size", 1024, "simulated file size in bytes, with random(0~63) bytes padding")
  46. b.numberOfFiles = cmdBenchmark.Flag.Int("n", 1024*1024, "number of files to write for each thread")
  47. b.idListFile = cmdBenchmark.Flag.String("list", os.TempDir()+"/benchmark_list.txt", "list of uploaded file ids")
  48. b.write = cmdBenchmark.Flag.Bool("write", true, "enable write")
  49. b.deletePercentage = cmdBenchmark.Flag.Int("deletePercent", 0, "the percent of writes that are deletes")
  50. b.read = cmdBenchmark.Flag.Bool("read", true, "enable read")
  51. b.sequentialRead = cmdBenchmark.Flag.Bool("readSequentially", false, "randomly read by ids from \"-list\" specified file")
  52. b.collection = cmdBenchmark.Flag.String("collection", "benchmark", "write data to this collection")
  53. b.cpuprofile = cmdBenchmark.Flag.String("cpuprofile", "", "cpu profile output file")
  54. b.maxCpu = cmdBenchmark.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs")
  55. b.secretKey = cmdBenchmark.Flag.String("secure.secret", "", "secret to encrypt Json Web Token(JWT)")
  56. b.vid2server = make(map[string]string)
  57. sharedBytes = make([]byte, 1024)
  58. }
  59. var cmdBenchmark = &Command{
  60. UsageLine: "benchmark -server=localhost:9333 -c=10 -n=100000",
  61. Short: "benchmark on writing millions of files and read out",
  62. Long: `benchmark on an empty weed file system.
  63. Two tests during benchmark:
  64. 1) write lots of small files to the system
  65. 2) read the files out
  66. The file content is mostly zero, but no compression is done.
  67. You can choose to only benchmark read or write.
  68. During write, the list of uploaded file ids is stored in "-list" specified file.
  69. You can also use your own list of file ids to run read test.
  70. Write speed and read speed will be collected.
  71. The numbers are used to get a sense of the system.
  72. Usually your network or the hard drive is the real bottleneck.
  73. Another thing to watch is whether the volumes are evenly distributed
  74. to each volume server. Because the 7 more benchmark volumes are randomly distributed
  75. to servers with free slots, it's highly possible some servers have uneven amount of
  76. benchmark volumes. To remedy this, you can use this to grow the benchmark volumes
  77. before starting the benchmark command:
  78. http://localhost:9333/vol/grow?collection=benchmark&count=5
  79. After benchmarking, you can clean up the written data by deleting the benchmark collection
  80. http://localhost:9333/col/delete?collection=benchmark
  81. `,
  82. }
  83. var (
  84. wait sync.WaitGroup
  85. writeStats *stats
  86. readStats *stats
  87. )
  88. func runbenchmark(cmd *Command, args []string) bool {
  89. fmt.Printf("This is Seaweed File System version %s %s %s\n", util.VERSION, runtime.GOOS, runtime.GOARCH)
  90. if *b.maxCpu < 1 {
  91. *b.maxCpu = runtime.NumCPU()
  92. }
  93. runtime.GOMAXPROCS(*b.maxCpu)
  94. if *b.cpuprofile != "" {
  95. f, err := os.Create(*b.cpuprofile)
  96. if err != nil {
  97. glog.Fatal(err)
  98. }
  99. pprof.StartCPUProfile(f)
  100. defer pprof.StopCPUProfile()
  101. }
  102. if *b.write {
  103. bench_write()
  104. }
  105. if *b.read {
  106. bench_read()
  107. }
  108. return true
  109. }
  110. func bench_write() {
  111. fileIdLineChan := make(chan string)
  112. finishChan := make(chan bool)
  113. writeStats = newStats(*b.concurrency)
  114. idChan := make(chan int)
  115. go writeFileIds(*b.idListFile, fileIdLineChan, finishChan)
  116. for i := 0; i < *b.concurrency; i++ {
  117. wait.Add(1)
  118. go writeFiles(idChan, fileIdLineChan, &writeStats.localStats[i])
  119. }
  120. writeStats.start = time.Now()
  121. writeStats.total = *b.numberOfFiles
  122. go writeStats.checkProgress("Writing Benchmark", finishChan)
  123. for i := 0; i < *b.numberOfFiles; i++ {
  124. idChan <- i
  125. }
  126. close(idChan)
  127. wait.Wait()
  128. writeStats.end = time.Now()
  129. wait.Add(2)
  130. finishChan <- true
  131. finishChan <- true
  132. wait.Wait()
  133. close(finishChan)
  134. writeStats.printStats()
  135. }
  136. func bench_read() {
  137. fileIdLineChan := make(chan string)
  138. finishChan := make(chan bool)
  139. readStats = newStats(*b.concurrency)
  140. go readFileIds(*b.idListFile, fileIdLineChan)
  141. readStats.start = time.Now()
  142. readStats.total = *b.numberOfFiles
  143. go readStats.checkProgress("Randomly Reading Benchmark", finishChan)
  144. for i := 0; i < *b.concurrency; i++ {
  145. wait.Add(1)
  146. go readFiles(fileIdLineChan, &readStats.localStats[i])
  147. }
  148. wait.Wait()
  149. wait.Add(1)
  150. finishChan <- true
  151. wait.Wait()
  152. close(finishChan)
  153. readStats.end = time.Now()
  154. readStats.printStats()
  155. }
  156. type delayedFile struct {
  157. enterTime time.Time
  158. fp *operation.FilePart
  159. }
  160. func writeFiles(idChan chan int, fileIdLineChan chan string, s *stat) {
  161. defer wait.Done()
  162. delayedDeleteChan := make(chan *delayedFile, 100)
  163. var waitForDeletions sync.WaitGroup
  164. secret := security.Secret(*b.secretKey)
  165. for i := 0; i < 7; i++ {
  166. waitForDeletions.Add(1)
  167. go func() {
  168. defer waitForDeletions.Done()
  169. for df := range delayedDeleteChan {
  170. if df.enterTime.After(time.Now()) {
  171. time.Sleep(df.enterTime.Sub(time.Now()))
  172. }
  173. if e := util.Delete("http://"+df.fp.Server+"/"+df.fp.Fid,
  174. security.GenJwt(secret, df.fp.Fid)); e == nil {
  175. s.completed++
  176. } else {
  177. s.failed++
  178. }
  179. }
  180. }()
  181. }
  182. for id := range idChan {
  183. start := time.Now()
  184. fileSize := int64(*b.fileSize + rand.Intn(64))
  185. fp := &operation.FilePart{Reader: &FakeReader{id: uint64(id), size: fileSize}, FileSize: fileSize}
  186. if assignResult, err := operation.Assign(*b.server, 1, "", *b.collection, ""); err == nil {
  187. fp.Server, fp.Fid, fp.Collection = assignResult.Url, assignResult.Fid, *b.collection
  188. if _, err := fp.Upload(0, *b.server, secret); err == nil {
  189. if rand.Intn(100) < *b.deletePercentage {
  190. s.total++
  191. delayedDeleteChan <- &delayedFile{time.Now().Add(time.Second), fp}
  192. } else {
  193. fileIdLineChan <- fp.Fid
  194. }
  195. s.completed++
  196. s.transferred += fileSize
  197. } else {
  198. s.failed++
  199. fmt.Printf("Failed to write with error:%v\n", err)
  200. }
  201. writeStats.addSample(time.Now().Sub(start))
  202. if *cmdBenchmark.IsDebug {
  203. fmt.Printf("writing %d file %s\n", id, fp.Fid)
  204. }
  205. } else {
  206. s.failed++
  207. println("writing file error:", err.Error())
  208. }
  209. }
  210. close(delayedDeleteChan)
  211. waitForDeletions.Wait()
  212. }
  213. func readFiles(fileIdLineChan chan string, s *stat) {
  214. defer wait.Done()
  215. masterLimitChan := make(chan bool, 1)
  216. for fid := range fileIdLineChan {
  217. if len(fid) == 0 {
  218. continue
  219. }
  220. if fid[0] == '#' {
  221. continue
  222. }
  223. if *cmdBenchmark.IsDebug {
  224. fmt.Printf("reading file %s\n", fid)
  225. }
  226. parts := strings.SplitN(fid, ",", 2)
  227. vid := parts[0]
  228. start := time.Now()
  229. if server, ok := b.vid2server[vid]; !ok {
  230. masterLimitChan <- true
  231. if _, now_ok := b.vid2server[vid]; !now_ok {
  232. if ret, err := operation.Lookup(*b.server, vid); err == nil {
  233. if len(ret.Locations) > 0 {
  234. server = ret.Locations[0].Url
  235. b.vid2server[vid] = server
  236. }
  237. }
  238. }
  239. <-masterLimitChan
  240. }
  241. if server, ok := b.vid2server[vid]; ok {
  242. url := "http://" + server + "/" + fid
  243. if bytesRead, err := util.Get(url); err == nil {
  244. s.completed++
  245. s.transferred += int64(len(bytesRead))
  246. readStats.addSample(time.Now().Sub(start))
  247. } else {
  248. s.failed++
  249. fmt.Printf("Failed to read %s error:%v\n", url, err)
  250. }
  251. } else {
  252. s.failed++
  253. println("!!!! volume id ", vid, " location not found!!!!!")
  254. }
  255. }
  256. }
  257. func writeFileIds(fileName string, fileIdLineChan chan string, finishChan chan bool) {
  258. file, err := os.OpenFile(fileName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
  259. if err != nil {
  260. glog.Fatalf("File to create file %s: %s\n", fileName, err)
  261. }
  262. defer file.Close()
  263. for {
  264. select {
  265. case <-finishChan:
  266. wait.Done()
  267. return
  268. case line := <-fileIdLineChan:
  269. file.Write([]byte(line))
  270. file.Write([]byte("\n"))
  271. }
  272. }
  273. }
  274. func readFileIds(fileName string, fileIdLineChan chan string) {
  275. file, err := os.Open(fileName) // For read access.
  276. if err != nil {
  277. glog.Fatalf("File to read file %s: %s\n", fileName, err)
  278. }
  279. defer file.Close()
  280. r := bufio.NewReader(file)
  281. if *b.sequentialRead {
  282. for {
  283. if line, err := Readln(r); err == nil {
  284. fileIdLineChan <- string(line)
  285. } else {
  286. break
  287. }
  288. }
  289. } else {
  290. lines := make([]string, 0, readStats.total)
  291. for {
  292. if line, err := Readln(r); err == nil {
  293. lines = append(lines, string(line))
  294. } else {
  295. break
  296. }
  297. }
  298. if len(lines) > 0 {
  299. for i := 0; i < readStats.total; i++ {
  300. fileIdLineChan <- lines[rand.Intn(len(lines))]
  301. }
  302. }
  303. }
  304. close(fileIdLineChan)
  305. }
  306. const (
  307. benchResolution = 10000 //0.1 microsecond
  308. benchBucket = 1000000000 / benchResolution
  309. )
  310. // An efficient statics collecting and rendering
  311. type stats struct {
  312. data []int
  313. overflow []int
  314. localStats []stat
  315. start time.Time
  316. end time.Time
  317. total int
  318. }
  319. type stat struct {
  320. completed int
  321. failed int
  322. total int
  323. transferred int64
  324. }
  325. var percentages = []int{50, 66, 75, 80, 90, 95, 98, 99, 100}
  326. func newStats(n int) *stats {
  327. return &stats{
  328. data: make([]int, benchResolution),
  329. overflow: make([]int, 0),
  330. localStats: make([]stat, n),
  331. }
  332. }
  333. func (s *stats) addSample(d time.Duration) {
  334. index := int(d / benchBucket)
  335. if index < 0 {
  336. fmt.Printf("This request takes %3.1f seconds, skipping!\n", float64(index)/10000)
  337. } else if index < len(s.data) {
  338. s.data[int(d/benchBucket)]++
  339. } else {
  340. s.overflow = append(s.overflow, index)
  341. }
  342. }
  343. func (s *stats) checkProgress(testName string, finishChan chan bool) {
  344. fmt.Printf("\n------------ %s ----------\n", testName)
  345. ticker := time.Tick(time.Second)
  346. lastCompleted, lastTransferred, lastTime := 0, int64(0), time.Now()
  347. for {
  348. select {
  349. case <-finishChan:
  350. wait.Done()
  351. return
  352. case t := <-ticker:
  353. completed, transferred, taken, total := 0, int64(0), t.Sub(lastTime), s.total
  354. for _, localStat := range s.localStats {
  355. completed += localStat.completed
  356. transferred += localStat.transferred
  357. total += localStat.total
  358. }
  359. fmt.Printf("Completed %d of %d requests, %3.1f%% %3.1f/s %3.1fMB/s\n",
  360. completed, total, float64(completed)*100/float64(total),
  361. float64(completed-lastCompleted)*float64(int64(time.Second))/float64(int64(taken)),
  362. float64(transferred-lastTransferred)*float64(int64(time.Second))/float64(int64(taken))/float64(1024*1024),
  363. )
  364. lastCompleted, lastTransferred, lastTime = completed, transferred, t
  365. }
  366. }
  367. }
  368. func (s *stats) printStats() {
  369. completed, failed, transferred, total := 0, 0, int64(0), s.total
  370. for _, localStat := range s.localStats {
  371. completed += localStat.completed
  372. failed += localStat.failed
  373. transferred += localStat.transferred
  374. total += localStat.total
  375. }
  376. timeTaken := float64(int64(s.end.Sub(s.start))) / 1000000000
  377. fmt.Printf("\nConcurrency Level: %d\n", *b.concurrency)
  378. fmt.Printf("Time taken for tests: %.3f seconds\n", timeTaken)
  379. fmt.Printf("Complete requests: %d\n", completed)
  380. fmt.Printf("Failed requests: %d\n", failed)
  381. fmt.Printf("Total transferred: %d bytes\n", transferred)
  382. fmt.Printf("Requests per second: %.2f [#/sec]\n", float64(completed)/timeTaken)
  383. fmt.Printf("Transfer rate: %.2f [Kbytes/sec]\n", float64(transferred)/1024/timeTaken)
  384. n, sum := 0, 0
  385. min, max := 10000000, 0
  386. for i := 0; i < len(s.data); i++ {
  387. n += s.data[i]
  388. sum += s.data[i] * i
  389. if s.data[i] > 0 {
  390. if min > i {
  391. min = i
  392. }
  393. if max < i {
  394. max = i
  395. }
  396. }
  397. }
  398. n += len(s.overflow)
  399. for i := 0; i < len(s.overflow); i++ {
  400. sum += s.overflow[i]
  401. if min > s.overflow[i] {
  402. min = s.overflow[i]
  403. }
  404. if max < s.overflow[i] {
  405. max = s.overflow[i]
  406. }
  407. }
  408. avg := float64(sum) / float64(n)
  409. varianceSum := 0.0
  410. for i := 0; i < len(s.data); i++ {
  411. if s.data[i] > 0 {
  412. d := float64(i) - avg
  413. varianceSum += d * d * float64(s.data[i])
  414. }
  415. }
  416. for i := 0; i < len(s.overflow); i++ {
  417. d := float64(s.overflow[i]) - avg
  418. varianceSum += d * d
  419. }
  420. std := math.Sqrt(varianceSum / float64(n))
  421. fmt.Printf("\nConnection Times (ms)\n")
  422. fmt.Printf(" min avg max std\n")
  423. fmt.Printf("Total: %2.1f %3.1f %3.1f %3.1f\n", float32(min)/10, float32(avg)/10, float32(max)/10, std/10)
  424. //printing percentiles
  425. fmt.Printf("\nPercentage of the requests served within a certain time (ms)\n")
  426. percentiles := make([]int, len(percentages))
  427. for i := 0; i < len(percentages); i++ {
  428. percentiles[i] = n * percentages[i] / 100
  429. }
  430. percentiles[len(percentiles)-1] = n
  431. percentileIndex := 0
  432. currentSum := 0
  433. for i := 0; i < len(s.data); i++ {
  434. currentSum += s.data[i]
  435. if s.data[i] > 0 && percentileIndex < len(percentiles) && currentSum >= percentiles[percentileIndex] {
  436. fmt.Printf(" %3d%% %5.1f ms\n", percentages[percentileIndex], float32(i)/10.0)
  437. percentileIndex++
  438. for percentileIndex < len(percentiles) && currentSum >= percentiles[percentileIndex] {
  439. percentileIndex++
  440. }
  441. }
  442. }
  443. sort.Ints(s.overflow)
  444. for i := 0; i < len(s.overflow); i++ {
  445. currentSum++
  446. if percentileIndex < len(percentiles) && currentSum >= percentiles[percentileIndex] {
  447. fmt.Printf(" %3d%% %5.1f ms\n", percentages[percentileIndex], float32(s.overflow[i])/10.0)
  448. percentileIndex++
  449. for percentileIndex < len(percentiles) && currentSum >= percentiles[percentileIndex] {
  450. percentileIndex++
  451. }
  452. }
  453. }
  454. }
  455. // a fake reader to generate content to upload
  456. type FakeReader struct {
  457. id uint64 // an id number
  458. size int64 // max bytes
  459. }
  460. func (l *FakeReader) Read(p []byte) (n int, err error) {
  461. if l.size <= 0 {
  462. return 0, io.EOF
  463. }
  464. if int64(len(p)) > l.size {
  465. n = int(l.size)
  466. } else {
  467. n = len(p)
  468. }
  469. if n >= 8 {
  470. for i := 0; i < 8; i++ {
  471. p[i] = byte(l.id >> uint(i*8))
  472. }
  473. }
  474. l.size -= int64(n)
  475. return
  476. }
  477. func (l *FakeReader) WriteTo(w io.Writer) (n int64, err error) {
  478. size := int(l.size)
  479. bufferSize := len(sharedBytes)
  480. for size > 0 {
  481. tempBuffer := sharedBytes
  482. if size < bufferSize {
  483. tempBuffer = sharedBytes[0:size]
  484. }
  485. count, e := w.Write(tempBuffer)
  486. if e != nil {
  487. return int64(size), e
  488. }
  489. size -= count
  490. }
  491. return l.size, nil
  492. }
  493. func Readln(r *bufio.Reader) ([]byte, error) {
  494. var (
  495. isPrefix bool = true
  496. err error = nil
  497. line, ln []byte
  498. )
  499. for isPrefix && err == nil {
  500. line, isPrefix, err = r.ReadLine()
  501. ln = append(ln, line...)
  502. }
  503. return ln, err
  504. }