You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

542 lines
15 KiB

10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
  1. package command
  2. import (
  3. "bufio"
  4. "fmt"
  5. "io"
  6. "math"
  7. "math/rand"
  8. "os"
  9. "runtime"
  10. "runtime/pprof"
  11. "sort"
  12. "strings"
  13. "sync"
  14. "time"
  15. "github.com/chrislusf/seaweedfs/weed/glog"
  16. "github.com/chrislusf/seaweedfs/weed/operation"
  17. "github.com/chrislusf/seaweedfs/weed/security"
  18. "github.com/chrislusf/seaweedfs/weed/util"
  19. )
  20. type BenchmarkOptions struct {
  21. server *string
  22. concurrency *int
  23. numberOfFiles *int
  24. fileSize *int
  25. idListFile *string
  26. write *bool
  27. deletePercentage *int
  28. read *bool
  29. sequentialRead *bool
  30. collection *string
  31. cpuprofile *string
  32. maxCpu *int
  33. secretKey *string
  34. }
  35. var (
  36. b BenchmarkOptions
  37. sharedBytes []byte
  38. )
  39. func init() {
  40. cmdBenchmark.Run = runbenchmark // break init cycle
  41. cmdBenchmark.IsDebug = cmdBenchmark.Flag.Bool("debug", false, "verbose debug information")
  42. b.server = cmdBenchmark.Flag.String("server", "localhost:9333", "SeaweedFS master location")
  43. b.concurrency = cmdBenchmark.Flag.Int("c", 16, "number of concurrent write or read processes")
  44. b.fileSize = cmdBenchmark.Flag.Int("size", 1024, "simulated file size in bytes, with random(0~63) bytes padding")
  45. b.numberOfFiles = cmdBenchmark.Flag.Int("n", 1024*1024, "number of files to write for each thread")
  46. b.idListFile = cmdBenchmark.Flag.String("list", os.TempDir()+"/benchmark_list.txt", "list of uploaded file ids")
  47. b.write = cmdBenchmark.Flag.Bool("write", true, "enable write")
  48. b.deletePercentage = cmdBenchmark.Flag.Int("deletePercent", 0, "the percent of writes that are deletes")
  49. b.read = cmdBenchmark.Flag.Bool("read", true, "enable read")
  50. b.sequentialRead = cmdBenchmark.Flag.Bool("readSequentially", false, "randomly read by ids from \"-list\" specified file")
  51. b.collection = cmdBenchmark.Flag.String("collection", "benchmark", "write data to this collection")
  52. b.cpuprofile = cmdBenchmark.Flag.String("cpuprofile", "", "cpu profile output file")
  53. b.maxCpu = cmdBenchmark.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs")
  54. b.secretKey = cmdBenchmark.Flag.String("secure.secret", "", "secret to encrypt Json Web Token(JWT)")
  55. sharedBytes = make([]byte, 1024)
  56. }
  57. var cmdBenchmark = &Command{
  58. UsageLine: "benchmark -server=localhost:9333 -c=10 -n=100000",
  59. Short: "benchmark on writing millions of files and read out",
  60. Long: `benchmark on an empty SeaweedFS file system.
  61. Two tests during benchmark:
  62. 1) write lots of small files to the system
  63. 2) read the files out
  64. The file content is mostly zero, but no compression is done.
  65. You can choose to only benchmark read or write.
  66. During write, the list of uploaded file ids is stored in "-list" specified file.
  67. You can also use your own list of file ids to run read test.
  68. Write speed and read speed will be collected.
  69. The numbers are used to get a sense of the system.
  70. Usually your network or the hard drive is the real bottleneck.
  71. Another thing to watch is whether the volumes are evenly distributed
  72. to each volume server. Because the 7 more benchmark volumes are randomly distributed
  73. to servers with free slots, it's highly possible some servers have uneven amount of
  74. benchmark volumes. To remedy this, you can use this to grow the benchmark volumes
  75. before starting the benchmark command:
  76. http://localhost:9333/vol/grow?collection=benchmark&count=5
  77. After benchmarking, you can clean up the written data by deleting the benchmark collection
  78. http://localhost:9333/col/delete?collection=benchmark
  79. `,
  80. }
  81. var (
  82. wait sync.WaitGroup
  83. writeStats *stats
  84. readStats *stats
  85. )
  86. func runbenchmark(cmd *Command, args []string) bool {
  87. fmt.Printf("This is SeaweedFS version %s %s %s\n", util.VERSION, runtime.GOOS, runtime.GOARCH)
  88. if *b.maxCpu < 1 {
  89. *b.maxCpu = runtime.NumCPU()
  90. }
  91. runtime.GOMAXPROCS(*b.maxCpu)
  92. if *b.cpuprofile != "" {
  93. f, err := os.Create(*b.cpuprofile)
  94. if err != nil {
  95. glog.Fatal(err)
  96. }
  97. pprof.StartCPUProfile(f)
  98. defer pprof.StopCPUProfile()
  99. }
  100. if *b.write {
  101. bench_write()
  102. }
  103. if *b.read {
  104. bench_read()
  105. }
  106. return true
  107. }
  108. func bench_write() {
  109. fileIdLineChan := make(chan string)
  110. finishChan := make(chan bool)
  111. writeStats = newStats(*b.concurrency)
  112. idChan := make(chan int)
  113. go writeFileIds(*b.idListFile, fileIdLineChan, finishChan)
  114. for i := 0; i < *b.concurrency; i++ {
  115. wait.Add(1)
  116. go writeFiles(idChan, fileIdLineChan, &writeStats.localStats[i])
  117. }
  118. writeStats.start = time.Now()
  119. writeStats.total = *b.numberOfFiles
  120. go writeStats.checkProgress("Writing Benchmark", finishChan)
  121. for i := 0; i < *b.numberOfFiles; i++ {
  122. idChan <- i
  123. }
  124. close(idChan)
  125. wait.Wait()
  126. writeStats.end = time.Now()
  127. wait.Add(2)
  128. finishChan <- true
  129. finishChan <- true
  130. wait.Wait()
  131. close(finishChan)
  132. writeStats.printStats()
  133. }
  134. func bench_read() {
  135. fileIdLineChan := make(chan string)
  136. finishChan := make(chan bool)
  137. readStats = newStats(*b.concurrency)
  138. go readFileIds(*b.idListFile, fileIdLineChan)
  139. readStats.start = time.Now()
  140. readStats.total = *b.numberOfFiles
  141. go readStats.checkProgress("Randomly Reading Benchmark", finishChan)
  142. for i := 0; i < *b.concurrency; i++ {
  143. wait.Add(1)
  144. go readFiles(fileIdLineChan, &readStats.localStats[i])
  145. }
  146. wait.Wait()
  147. wait.Add(1)
  148. finishChan <- true
  149. wait.Wait()
  150. close(finishChan)
  151. readStats.end = time.Now()
  152. readStats.printStats()
  153. }
  154. type delayedFile struct {
  155. enterTime time.Time
  156. fp *operation.FilePart
  157. }
  158. func writeFiles(idChan chan int, fileIdLineChan chan string, s *stat) {
  159. defer wait.Done()
  160. delayedDeleteChan := make(chan *delayedFile, 100)
  161. var waitForDeletions sync.WaitGroup
  162. secret := security.Secret(*b.secretKey)
  163. for i := 0; i < 7; i++ {
  164. waitForDeletions.Add(1)
  165. go func() {
  166. defer waitForDeletions.Done()
  167. for df := range delayedDeleteChan {
  168. if df.enterTime.After(time.Now()) {
  169. time.Sleep(df.enterTime.Sub(time.Now()))
  170. }
  171. if e := util.Delete("http://"+df.fp.Server+"/"+df.fp.Fid,
  172. security.GenJwt(secret, df.fp.Fid)); e == nil {
  173. s.completed++
  174. } else {
  175. s.failed++
  176. }
  177. }
  178. }()
  179. }
  180. random := rand.New(rand.NewSource(time.Now().UnixNano()))
  181. for id := range idChan {
  182. start := time.Now()
  183. fileSize := int64(*b.fileSize + random.Intn(64))
  184. fp := &operation.FilePart{Reader: &FakeReader{id: uint64(id), size: fileSize}, FileSize: fileSize}
  185. ar := &operation.VolumeAssignRequest{
  186. Count: 1,
  187. Collection: *b.collection,
  188. }
  189. if assignResult, err := operation.Assign(*b.server, ar); err == nil {
  190. fp.Server, fp.Fid, fp.Collection = assignResult.Url, assignResult.Fid, *b.collection
  191. if _, err := fp.Upload(0, *b.server, secret); err == nil {
  192. if random.Intn(100) < *b.deletePercentage {
  193. s.total++
  194. delayedDeleteChan <- &delayedFile{time.Now().Add(time.Second), fp}
  195. } else {
  196. fileIdLineChan <- fp.Fid
  197. }
  198. s.completed++
  199. s.transferred += fileSize
  200. } else {
  201. s.failed++
  202. fmt.Printf("Failed to write with error:%v\n", err)
  203. }
  204. writeStats.addSample(time.Now().Sub(start))
  205. if *cmdBenchmark.IsDebug {
  206. fmt.Printf("writing %d file %s\n", id, fp.Fid)
  207. }
  208. } else {
  209. s.failed++
  210. println("writing file error:", err.Error())
  211. }
  212. }
  213. close(delayedDeleteChan)
  214. waitForDeletions.Wait()
  215. }
  216. func readFiles(fileIdLineChan chan string, s *stat) {
  217. defer wait.Done()
  218. random := rand.New(rand.NewSource(time.Now().UnixNano()))
  219. for fid := range fileIdLineChan {
  220. if len(fid) == 0 {
  221. continue
  222. }
  223. if fid[0] == '#' {
  224. continue
  225. }
  226. if *cmdBenchmark.IsDebug {
  227. fmt.Printf("reading file %s\n", fid)
  228. }
  229. parts := strings.SplitN(fid, ",", 2)
  230. vid := parts[0]
  231. start := time.Now()
  232. ret, err := operation.Lookup(*b.server, vid)
  233. if err != nil || len(ret.Locations) == 0 {
  234. s.failed++
  235. println("!!!! volume id ", vid, " location not found!!!!!")
  236. continue
  237. }
  238. server := ret.Locations[random.Intn(len(ret.Locations))].Url
  239. url := "http://" + server + "/" + fid
  240. if bytesRead, err := util.Get(url); err == nil {
  241. s.completed++
  242. s.transferred += int64(len(bytesRead))
  243. readStats.addSample(time.Now().Sub(start))
  244. } else {
  245. s.failed++
  246. fmt.Printf("Failed to read %s error:%v\n", url, err)
  247. }
  248. }
  249. }
  250. func writeFileIds(fileName string, fileIdLineChan chan string, finishChan chan bool) {
  251. file, err := os.OpenFile(fileName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
  252. if err != nil {
  253. glog.Fatalf("File to create file %s: %s\n", fileName, err)
  254. }
  255. defer file.Close()
  256. for {
  257. select {
  258. case <-finishChan:
  259. wait.Done()
  260. return
  261. case line := <-fileIdLineChan:
  262. file.Write([]byte(line))
  263. file.Write([]byte("\n"))
  264. }
  265. }
  266. }
  267. func readFileIds(fileName string, fileIdLineChan chan string) {
  268. file, err := os.Open(fileName) // For read access.
  269. if err != nil {
  270. glog.Fatalf("File to read file %s: %s\n", fileName, err)
  271. }
  272. defer file.Close()
  273. random := rand.New(rand.NewSource(time.Now().UnixNano()))
  274. r := bufio.NewReader(file)
  275. if *b.sequentialRead {
  276. for {
  277. if line, err := Readln(r); err == nil {
  278. fileIdLineChan <- string(line)
  279. } else {
  280. break
  281. }
  282. }
  283. } else {
  284. lines := make([]string, 0, readStats.total)
  285. for {
  286. if line, err := Readln(r); err == nil {
  287. lines = append(lines, string(line))
  288. } else {
  289. break
  290. }
  291. }
  292. if len(lines) > 0 {
  293. for i := 0; i < readStats.total; i++ {
  294. fileIdLineChan <- lines[random.Intn(len(lines))]
  295. }
  296. }
  297. }
  298. close(fileIdLineChan)
  299. }
  300. const (
  301. benchResolution = 10000 //0.1 microsecond
  302. benchBucket = 1000000000 / benchResolution
  303. )
  304. // An efficient statics collecting and rendering
  305. type stats struct {
  306. data []int
  307. overflow []int
  308. localStats []stat
  309. start time.Time
  310. end time.Time
  311. total int
  312. }
  313. type stat struct {
  314. completed int
  315. failed int
  316. total int
  317. transferred int64
  318. }
  319. var percentages = []int{50, 66, 75, 80, 90, 95, 98, 99, 100}
  320. func newStats(n int) *stats {
  321. return &stats{
  322. data: make([]int, benchResolution),
  323. overflow: make([]int, 0),
  324. localStats: make([]stat, n),
  325. }
  326. }
  327. func (s *stats) addSample(d time.Duration) {
  328. index := int(d / benchBucket)
  329. if index < 0 {
  330. fmt.Printf("This request takes %3.1f seconds, skipping!\n", float64(index)/10000)
  331. } else if index < len(s.data) {
  332. s.data[int(d/benchBucket)]++
  333. } else {
  334. s.overflow = append(s.overflow, index)
  335. }
  336. }
  337. func (s *stats) checkProgress(testName string, finishChan chan bool) {
  338. fmt.Printf("\n------------ %s ----------\n", testName)
  339. ticker := time.Tick(time.Second)
  340. lastCompleted, lastTransferred, lastTime := 0, int64(0), time.Now()
  341. for {
  342. select {
  343. case <-finishChan:
  344. wait.Done()
  345. return
  346. case t := <-ticker:
  347. completed, transferred, taken, total := 0, int64(0), t.Sub(lastTime), s.total
  348. for _, localStat := range s.localStats {
  349. completed += localStat.completed
  350. transferred += localStat.transferred
  351. total += localStat.total
  352. }
  353. fmt.Printf("Completed %d of %d requests, %3.1f%% %3.1f/s %3.1fMB/s\n",
  354. completed, total, float64(completed)*100/float64(total),
  355. float64(completed-lastCompleted)*float64(int64(time.Second))/float64(int64(taken)),
  356. float64(transferred-lastTransferred)*float64(int64(time.Second))/float64(int64(taken))/float64(1024*1024),
  357. )
  358. lastCompleted, lastTransferred, lastTime = completed, transferred, t
  359. }
  360. }
  361. }
  362. func (s *stats) printStats() {
  363. completed, failed, transferred, total := 0, 0, int64(0), s.total
  364. for _, localStat := range s.localStats {
  365. completed += localStat.completed
  366. failed += localStat.failed
  367. transferred += localStat.transferred
  368. total += localStat.total
  369. }
  370. timeTaken := float64(int64(s.end.Sub(s.start))) / 1000000000
  371. fmt.Printf("\nConcurrency Level: %d\n", *b.concurrency)
  372. fmt.Printf("Time taken for tests: %.3f seconds\n", timeTaken)
  373. fmt.Printf("Complete requests: %d\n", completed)
  374. fmt.Printf("Failed requests: %d\n", failed)
  375. fmt.Printf("Total transferred: %d bytes\n", transferred)
  376. fmt.Printf("Requests per second: %.2f [#/sec]\n", float64(completed)/timeTaken)
  377. fmt.Printf("Transfer rate: %.2f [Kbytes/sec]\n", float64(transferred)/1024/timeTaken)
  378. n, sum := 0, 0
  379. min, max := 10000000, 0
  380. for i := 0; i < len(s.data); i++ {
  381. n += s.data[i]
  382. sum += s.data[i] * i
  383. if s.data[i] > 0 {
  384. if min > i {
  385. min = i
  386. }
  387. if max < i {
  388. max = i
  389. }
  390. }
  391. }
  392. n += len(s.overflow)
  393. for i := 0; i < len(s.overflow); i++ {
  394. sum += s.overflow[i]
  395. if min > s.overflow[i] {
  396. min = s.overflow[i]
  397. }
  398. if max < s.overflow[i] {
  399. max = s.overflow[i]
  400. }
  401. }
  402. avg := float64(sum) / float64(n)
  403. varianceSum := 0.0
  404. for i := 0; i < len(s.data); i++ {
  405. if s.data[i] > 0 {
  406. d := float64(i) - avg
  407. varianceSum += d * d * float64(s.data[i])
  408. }
  409. }
  410. for i := 0; i < len(s.overflow); i++ {
  411. d := float64(s.overflow[i]) - avg
  412. varianceSum += d * d
  413. }
  414. std := math.Sqrt(varianceSum / float64(n))
  415. fmt.Printf("\nConnection Times (ms)\n")
  416. fmt.Printf(" min avg max std\n")
  417. fmt.Printf("Total: %2.1f %3.1f %3.1f %3.1f\n", float32(min)/10, float32(avg)/10, float32(max)/10, std/10)
  418. //printing percentiles
  419. fmt.Printf("\nPercentage of the requests served within a certain time (ms)\n")
  420. percentiles := make([]int, len(percentages))
  421. for i := 0; i < len(percentages); i++ {
  422. percentiles[i] = n * percentages[i] / 100
  423. }
  424. percentiles[len(percentiles)-1] = n
  425. percentileIndex := 0
  426. currentSum := 0
  427. for i := 0; i < len(s.data); i++ {
  428. currentSum += s.data[i]
  429. if s.data[i] > 0 && percentileIndex < len(percentiles) && currentSum >= percentiles[percentileIndex] {
  430. fmt.Printf(" %3d%% %5.1f ms\n", percentages[percentileIndex], float32(i)/10.0)
  431. percentileIndex++
  432. for percentileIndex < len(percentiles) && currentSum >= percentiles[percentileIndex] {
  433. percentileIndex++
  434. }
  435. }
  436. }
  437. sort.Ints(s.overflow)
  438. for i := 0; i < len(s.overflow); i++ {
  439. currentSum++
  440. if percentileIndex < len(percentiles) && currentSum >= percentiles[percentileIndex] {
  441. fmt.Printf(" %3d%% %5.1f ms\n", percentages[percentileIndex], float32(s.overflow[i])/10.0)
  442. percentileIndex++
  443. for percentileIndex < len(percentiles) && currentSum >= percentiles[percentileIndex] {
  444. percentileIndex++
  445. }
  446. }
  447. }
  448. }
  449. // a fake reader to generate content to upload
  450. type FakeReader struct {
  451. id uint64 // an id number
  452. size int64 // max bytes
  453. }
  454. func (l *FakeReader) Read(p []byte) (n int, err error) {
  455. if l.size <= 0 {
  456. return 0, io.EOF
  457. }
  458. if int64(len(p)) > l.size {
  459. n = int(l.size)
  460. } else {
  461. n = len(p)
  462. }
  463. if n >= 8 {
  464. for i := 0; i < 8; i++ {
  465. p[i] = byte(l.id >> uint(i*8))
  466. }
  467. }
  468. l.size -= int64(n)
  469. return
  470. }
  471. func (l *FakeReader) WriteTo(w io.Writer) (n int64, err error) {
  472. size := int(l.size)
  473. bufferSize := len(sharedBytes)
  474. for size > 0 {
  475. tempBuffer := sharedBytes
  476. if size < bufferSize {
  477. tempBuffer = sharedBytes[0:size]
  478. }
  479. count, e := w.Write(tempBuffer)
  480. if e != nil {
  481. return int64(size), e
  482. }
  483. size -= count
  484. }
  485. return l.size, nil
  486. }
  487. func Readln(r *bufio.Reader) ([]byte, error) {
  488. var (
  489. isPrefix = true
  490. err error
  491. line, ln []byte
  492. )
  493. for isPrefix && err == nil {
  494. line, isPrefix, err = r.ReadLine()
  495. ln = append(ln, line...)
  496. }
  497. return ln, err
  498. }