You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

552 lines
15 KiB

10 years ago
  1. package command
  2. import (
  3. "bufio"
  4. "context"
  5. "fmt"
  6. "io"
  7. "math"
  8. "math/rand"
  9. "os"
  10. "runtime"
  11. "runtime/pprof"
  12. "sort"
  13. "strings"
  14. "sync"
  15. "time"
  16. "github.com/chrislusf/seaweedfs/weed/glog"
  17. "github.com/chrislusf/seaweedfs/weed/operation"
  18. "github.com/chrislusf/seaweedfs/weed/security"
  19. "github.com/chrislusf/seaweedfs/weed/util"
  20. "github.com/chrislusf/seaweedfs/weed/wdclient"
  21. )
  22. type BenchmarkOptions struct {
  23. masters *string
  24. concurrency *int
  25. numberOfFiles *int
  26. fileSize *int
  27. idListFile *string
  28. write *bool
  29. deletePercentage *int
  30. read *bool
  31. sequentialRead *bool
  32. collection *string
  33. cpuprofile *string
  34. maxCpu *int
  35. }
  36. var (
  37. b BenchmarkOptions
  38. sharedBytes []byte
  39. masterClient *wdclient.MasterClient
  40. isSecure bool
  41. )
  42. func init() {
  43. cmdBenchmark.Run = runBenchmark // break init cycle
  44. cmdBenchmark.IsDebug = cmdBenchmark.Flag.Bool("debug", false, "verbose debug information")
  45. b.masters = cmdBenchmark.Flag.String("master", "localhost:9333", "SeaweedFS master location")
  46. b.concurrency = cmdBenchmark.Flag.Int("c", 16, "number of concurrent write or read processes")
  47. b.fileSize = cmdBenchmark.Flag.Int("size", 1024, "simulated file size in bytes, with random(0~63) bytes padding")
  48. b.numberOfFiles = cmdBenchmark.Flag.Int("n", 1024*1024, "number of files to write for each thread")
  49. b.idListFile = cmdBenchmark.Flag.String("list", os.TempDir()+"/benchmark_list.txt", "list of uploaded file ids")
  50. b.write = cmdBenchmark.Flag.Bool("write", true, "enable write")
  51. b.deletePercentage = cmdBenchmark.Flag.Int("deletePercent", 0, "the percent of writes that are deletes")
  52. b.read = cmdBenchmark.Flag.Bool("read", true, "enable read")
  53. b.sequentialRead = cmdBenchmark.Flag.Bool("readSequentially", false, "randomly read by ids from \"-list\" specified file")
  54. b.collection = cmdBenchmark.Flag.String("collection", "benchmark", "write data to this collection")
  55. b.cpuprofile = cmdBenchmark.Flag.String("cpuprofile", "", "cpu profile output file")
  56. b.maxCpu = cmdBenchmark.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs")
  57. sharedBytes = make([]byte, 1024)
  58. }
  59. var cmdBenchmark = &Command{
  60. UsageLine: "benchmark -server=localhost:9333 -c=10 -n=100000",
  61. Short: "benchmark on writing millions of files and read out",
  62. Long: `benchmark on an empty SeaweedFS file system.
  63. Two tests during benchmark:
  64. 1) write lots of small files to the system
  65. 2) read the files out
  66. The file content is mostly zero, but no compression is done.
  67. You can choose to only benchmark read or write.
  68. During write, the list of uploaded file ids is stored in "-list" specified file.
  69. You can also use your own list of file ids to run read test.
  70. Write speed and read speed will be collected.
  71. The numbers are used to get a sense of the system.
  72. Usually your network or the hard drive is the real bottleneck.
  73. Another thing to watch is whether the volumes are evenly distributed
  74. to each volume server. Because the 7 more benchmark volumes are randomly distributed
  75. to servers with free slots, it's highly possible some servers have uneven amount of
  76. benchmark volumes. To remedy this, you can use this to grow the benchmark volumes
  77. before starting the benchmark command:
  78. http://localhost:9333/vol/grow?collection=benchmark&count=5
  79. After benchmarking, you can clean up the written data by deleting the benchmark collection
  80. http://localhost:9333/col/delete?collection=benchmark
  81. `,
  82. }
  83. var (
  84. wait sync.WaitGroup
  85. writeStats *stats
  86. readStats *stats
  87. )
  88. func runBenchmark(cmd *Command, args []string) bool {
  89. fmt.Printf("This is SeaweedFS version %s %s %s\n", util.VERSION, runtime.GOOS, runtime.GOARCH)
  90. if *b.maxCpu < 1 {
  91. *b.maxCpu = runtime.NumCPU()
  92. }
  93. runtime.GOMAXPROCS(*b.maxCpu)
  94. if *b.cpuprofile != "" {
  95. f, err := os.Create(*b.cpuprofile)
  96. if err != nil {
  97. glog.Fatal(err)
  98. }
  99. pprof.StartCPUProfile(f)
  100. defer pprof.StopCPUProfile()
  101. }
  102. masterClient = wdclient.NewMasterClient(context.Background(), "benchmark", strings.Split(*b.masters, ","))
  103. go masterClient.KeepConnectedToMaster()
  104. masterClient.WaitUntilConnected()
  105. if *b.write {
  106. benchWrite()
  107. }
  108. if *b.read {
  109. benchRead()
  110. }
  111. return true
  112. }
  113. func benchWrite() {
  114. fileIdLineChan := make(chan string)
  115. finishChan := make(chan bool)
  116. writeStats = newStats(*b.concurrency)
  117. idChan := make(chan int)
  118. go writeFileIds(*b.idListFile, fileIdLineChan, finishChan)
  119. for i := 0; i < *b.concurrency; i++ {
  120. wait.Add(1)
  121. go writeFiles(idChan, fileIdLineChan, &writeStats.localStats[i])
  122. }
  123. writeStats.start = time.Now()
  124. writeStats.total = *b.numberOfFiles
  125. go writeStats.checkProgress("Writing Benchmark", finishChan)
  126. for i := 0; i < *b.numberOfFiles; i++ {
  127. idChan <- i
  128. }
  129. close(idChan)
  130. wait.Wait()
  131. writeStats.end = time.Now()
  132. wait.Add(2)
  133. finishChan <- true
  134. finishChan <- true
  135. wait.Wait()
  136. close(finishChan)
  137. writeStats.printStats()
  138. }
  139. func benchRead() {
  140. fileIdLineChan := make(chan string)
  141. finishChan := make(chan bool)
  142. readStats = newStats(*b.concurrency)
  143. go readFileIds(*b.idListFile, fileIdLineChan)
  144. readStats.start = time.Now()
  145. readStats.total = *b.numberOfFiles
  146. go readStats.checkProgress("Randomly Reading Benchmark", finishChan)
  147. for i := 0; i < *b.concurrency; i++ {
  148. wait.Add(1)
  149. go readFiles(fileIdLineChan, &readStats.localStats[i])
  150. }
  151. wait.Wait()
  152. wait.Add(1)
  153. finishChan <- true
  154. wait.Wait()
  155. close(finishChan)
  156. readStats.end = time.Now()
  157. readStats.printStats()
  158. }
  159. type delayedFile struct {
  160. enterTime time.Time
  161. fp *operation.FilePart
  162. }
  163. func writeFiles(idChan chan int, fileIdLineChan chan string, s *stat) {
  164. defer wait.Done()
  165. delayedDeleteChan := make(chan *delayedFile, 100)
  166. var waitForDeletions sync.WaitGroup
  167. for i := 0; i < 7; i++ {
  168. waitForDeletions.Add(1)
  169. go func() {
  170. defer waitForDeletions.Done()
  171. for df := range delayedDeleteChan {
  172. if df.enterTime.After(time.Now()) {
  173. time.Sleep(df.enterTime.Sub(time.Now()))
  174. }
  175. var jwtAuthorization security.EncodedJwt
  176. if isSecure {
  177. jwtAuthorization = operation.LookupJwt(masterClient.GetMaster(), df.fp.Fid)
  178. }
  179. if e := util.Delete(fmt.Sprintf("http://%s/%s", df.fp.Server, df.fp.Fid), jwtAuthorization); e == nil {
  180. s.completed++
  181. } else {
  182. s.failed++
  183. }
  184. }
  185. }()
  186. }
  187. random := rand.New(rand.NewSource(time.Now().UnixNano()))
  188. for id := range idChan {
  189. start := time.Now()
  190. fileSize := int64(*b.fileSize + random.Intn(64))
  191. fp := &operation.FilePart{
  192. Reader: &FakeReader{id: uint64(id), size: fileSize},
  193. FileSize: fileSize,
  194. MimeType: "image/bench", // prevent gzip benchmark content
  195. }
  196. ar := &operation.VolumeAssignRequest{
  197. Count: 1,
  198. Collection: *b.collection,
  199. }
  200. if assignResult, err := operation.Assign(masterClient.GetMaster(), ar); err == nil {
  201. fp.Server, fp.Fid, fp.Collection = assignResult.Url, assignResult.Fid, *b.collection
  202. if !isSecure && assignResult.Auth != "" {
  203. isSecure = true
  204. }
  205. if _, err := fp.Upload(0, masterClient.GetMaster(), assignResult.Auth); err == nil {
  206. if random.Intn(100) < *b.deletePercentage {
  207. s.total++
  208. delayedDeleteChan <- &delayedFile{time.Now().Add(time.Second), fp}
  209. } else {
  210. fileIdLineChan <- fp.Fid
  211. }
  212. s.completed++
  213. s.transferred += fileSize
  214. } else {
  215. s.failed++
  216. fmt.Printf("Failed to write with error:%v\n", err)
  217. }
  218. writeStats.addSample(time.Now().Sub(start))
  219. if *cmdBenchmark.IsDebug {
  220. fmt.Printf("writing %d file %s\n", id, fp.Fid)
  221. }
  222. } else {
  223. s.failed++
  224. println("writing file error:", err.Error())
  225. }
  226. }
  227. close(delayedDeleteChan)
  228. waitForDeletions.Wait()
  229. }
  230. func readFiles(fileIdLineChan chan string, s *stat) {
  231. defer wait.Done()
  232. for fid := range fileIdLineChan {
  233. if len(fid) == 0 {
  234. continue
  235. }
  236. if fid[0] == '#' {
  237. continue
  238. }
  239. if *cmdBenchmark.IsDebug {
  240. fmt.Printf("reading file %s\n", fid)
  241. }
  242. start := time.Now()
  243. url, err := masterClient.LookupFileId(fid)
  244. if err != nil {
  245. s.failed++
  246. println("!!!! ", fid, " location not found!!!!!")
  247. continue
  248. }
  249. if bytesRead, err := util.Get(url); err == nil {
  250. s.completed++
  251. s.transferred += int64(len(bytesRead))
  252. readStats.addSample(time.Now().Sub(start))
  253. } else {
  254. s.failed++
  255. fmt.Printf("Failed to read %s error:%v\n", url, err)
  256. }
  257. }
  258. }
  259. func writeFileIds(fileName string, fileIdLineChan chan string, finishChan chan bool) {
  260. file, err := os.OpenFile(fileName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
  261. if err != nil {
  262. glog.Fatalf("File to create file %s: %s\n", fileName, err)
  263. }
  264. defer file.Close()
  265. for {
  266. select {
  267. case <-finishChan:
  268. wait.Done()
  269. return
  270. case line := <-fileIdLineChan:
  271. file.Write([]byte(line))
  272. file.Write([]byte("\n"))
  273. }
  274. }
  275. }
  276. func readFileIds(fileName string, fileIdLineChan chan string) {
  277. file, err := os.Open(fileName) // For read access.
  278. if err != nil {
  279. glog.Fatalf("File to read file %s: %s\n", fileName, err)
  280. }
  281. defer file.Close()
  282. random := rand.New(rand.NewSource(time.Now().UnixNano()))
  283. r := bufio.NewReader(file)
  284. if *b.sequentialRead {
  285. for {
  286. if line, err := Readln(r); err == nil {
  287. fileIdLineChan <- string(line)
  288. } else {
  289. break
  290. }
  291. }
  292. } else {
  293. lines := make([]string, 0, readStats.total)
  294. for {
  295. if line, err := Readln(r); err == nil {
  296. lines = append(lines, string(line))
  297. } else {
  298. break
  299. }
  300. }
  301. if len(lines) > 0 {
  302. for i := 0; i < readStats.total; i++ {
  303. fileIdLineChan <- lines[random.Intn(len(lines))]
  304. }
  305. }
  306. }
  307. close(fileIdLineChan)
  308. }
  309. const (
  310. benchResolution = 10000 //0.1 microsecond
  311. benchBucket = 1000000000 / benchResolution
  312. )
  313. // An efficient statics collecting and rendering
  314. type stats struct {
  315. data []int
  316. overflow []int
  317. localStats []stat
  318. start time.Time
  319. end time.Time
  320. total int
  321. }
  322. type stat struct {
  323. completed int
  324. failed int
  325. total int
  326. transferred int64
  327. }
  328. var percentages = []int{50, 66, 75, 80, 90, 95, 98, 99, 100}
  329. func newStats(n int) *stats {
  330. return &stats{
  331. data: make([]int, benchResolution),
  332. overflow: make([]int, 0),
  333. localStats: make([]stat, n),
  334. }
  335. }
  336. func (s *stats) addSample(d time.Duration) {
  337. index := int(d / benchBucket)
  338. if index < 0 {
  339. fmt.Printf("This request takes %3.1f seconds, skipping!\n", float64(index)/10000)
  340. } else if index < len(s.data) {
  341. s.data[int(d/benchBucket)]++
  342. } else {
  343. s.overflow = append(s.overflow, index)
  344. }
  345. }
  346. func (s *stats) checkProgress(testName string, finishChan chan bool) {
  347. fmt.Printf("\n------------ %s ----------\n", testName)
  348. ticker := time.Tick(time.Second)
  349. lastCompleted, lastTransferred, lastTime := 0, int64(0), time.Now()
  350. for {
  351. select {
  352. case <-finishChan:
  353. wait.Done()
  354. return
  355. case t := <-ticker:
  356. completed, transferred, taken, total := 0, int64(0), t.Sub(lastTime), s.total
  357. for _, localStat := range s.localStats {
  358. completed += localStat.completed
  359. transferred += localStat.transferred
  360. total += localStat.total
  361. }
  362. fmt.Printf("Completed %d of %d requests, %3.1f%% %3.1f/s %3.1fMB/s\n",
  363. completed, total, float64(completed)*100/float64(total),
  364. float64(completed-lastCompleted)*float64(int64(time.Second))/float64(int64(taken)),
  365. float64(transferred-lastTransferred)*float64(int64(time.Second))/float64(int64(taken))/float64(1024*1024),
  366. )
  367. lastCompleted, lastTransferred, lastTime = completed, transferred, t
  368. }
  369. }
  370. }
  371. func (s *stats) printStats() {
  372. completed, failed, transferred, total := 0, 0, int64(0), s.total
  373. for _, localStat := range s.localStats {
  374. completed += localStat.completed
  375. failed += localStat.failed
  376. transferred += localStat.transferred
  377. total += localStat.total
  378. }
  379. timeTaken := float64(int64(s.end.Sub(s.start))) / 1000000000
  380. fmt.Printf("\nConcurrency Level: %d\n", *b.concurrency)
  381. fmt.Printf("Time taken for tests: %.3f seconds\n", timeTaken)
  382. fmt.Printf("Complete requests: %d\n", completed)
  383. fmt.Printf("Failed requests: %d\n", failed)
  384. fmt.Printf("Total transferred: %d bytes\n", transferred)
  385. fmt.Printf("Requests per second: %.2f [#/sec]\n", float64(completed)/timeTaken)
  386. fmt.Printf("Transfer rate: %.2f [Kbytes/sec]\n", float64(transferred)/1024/timeTaken)
  387. n, sum := 0, 0
  388. min, max := 10000000, 0
  389. for i := 0; i < len(s.data); i++ {
  390. n += s.data[i]
  391. sum += s.data[i] * i
  392. if s.data[i] > 0 {
  393. if min > i {
  394. min = i
  395. }
  396. if max < i {
  397. max = i
  398. }
  399. }
  400. }
  401. n += len(s.overflow)
  402. for i := 0; i < len(s.overflow); i++ {
  403. sum += s.overflow[i]
  404. if min > s.overflow[i] {
  405. min = s.overflow[i]
  406. }
  407. if max < s.overflow[i] {
  408. max = s.overflow[i]
  409. }
  410. }
  411. avg := float64(sum) / float64(n)
  412. varianceSum := 0.0
  413. for i := 0; i < len(s.data); i++ {
  414. if s.data[i] > 0 {
  415. d := float64(i) - avg
  416. varianceSum += d * d * float64(s.data[i])
  417. }
  418. }
  419. for i := 0; i < len(s.overflow); i++ {
  420. d := float64(s.overflow[i]) - avg
  421. varianceSum += d * d
  422. }
  423. std := math.Sqrt(varianceSum / float64(n))
  424. fmt.Printf("\nConnection Times (ms)\n")
  425. fmt.Printf(" min avg max std\n")
  426. fmt.Printf("Total: %2.1f %3.1f %3.1f %3.1f\n", float32(min)/10, float32(avg)/10, float32(max)/10, std/10)
  427. //printing percentiles
  428. fmt.Printf("\nPercentage of the requests served within a certain time (ms)\n")
  429. percentiles := make([]int, len(percentages))
  430. for i := 0; i < len(percentages); i++ {
  431. percentiles[i] = n * percentages[i] / 100
  432. }
  433. percentiles[len(percentiles)-1] = n
  434. percentileIndex := 0
  435. currentSum := 0
  436. for i := 0; i < len(s.data); i++ {
  437. currentSum += s.data[i]
  438. if s.data[i] > 0 && percentileIndex < len(percentiles) && currentSum >= percentiles[percentileIndex] {
  439. fmt.Printf(" %3d%% %5.1f ms\n", percentages[percentileIndex], float32(i)/10.0)
  440. percentileIndex++
  441. for percentileIndex < len(percentiles) && currentSum >= percentiles[percentileIndex] {
  442. percentileIndex++
  443. }
  444. }
  445. }
  446. sort.Ints(s.overflow)
  447. for i := 0; i < len(s.overflow); i++ {
  448. currentSum++
  449. if percentileIndex < len(percentiles) && currentSum >= percentiles[percentileIndex] {
  450. fmt.Printf(" %3d%% %5.1f ms\n", percentages[percentileIndex], float32(s.overflow[i])/10.0)
  451. percentileIndex++
  452. for percentileIndex < len(percentiles) && currentSum >= percentiles[percentileIndex] {
  453. percentileIndex++
  454. }
  455. }
  456. }
  457. }
  458. // a fake reader to generate content to upload
  459. type FakeReader struct {
  460. id uint64 // an id number
  461. size int64 // max bytes
  462. }
  463. func (l *FakeReader) Read(p []byte) (n int, err error) {
  464. if l.size <= 0 {
  465. return 0, io.EOF
  466. }
  467. if int64(len(p)) > l.size {
  468. n = int(l.size)
  469. } else {
  470. n = len(p)
  471. }
  472. if n >= 8 {
  473. for i := 0; i < 8; i++ {
  474. p[i] = byte(l.id >> uint(i*8))
  475. }
  476. }
  477. l.size -= int64(n)
  478. return
  479. }
  480. func (l *FakeReader) WriteTo(w io.Writer) (n int64, err error) {
  481. size := int(l.size)
  482. bufferSize := len(sharedBytes)
  483. for size > 0 {
  484. tempBuffer := sharedBytes
  485. if size < bufferSize {
  486. tempBuffer = sharedBytes[0:size]
  487. }
  488. count, e := w.Write(tempBuffer)
  489. if e != nil {
  490. return int64(size), e
  491. }
  492. size -= count
  493. }
  494. return l.size, nil
  495. }
  496. func Readln(r *bufio.Reader) ([]byte, error) {
  497. var (
  498. isPrefix = true
  499. err error
  500. line, ln []byte
  501. )
  502. for isPrefix && err == nil {
  503. line, isPrefix, err = r.ReadLine()
  504. ln = append(ln, line...)
  505. }
  506. return ln, err
  507. }