You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

576 lines
16 KiB

6 years ago
6 years ago
6 years ago
10 years ago
6 years ago
5 years ago
6 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
  1. package command
  2. import (
  3. "bufio"
  4. "fmt"
  5. "io"
  6. "math"
  7. "math/rand"
  8. "os"
  9. "runtime"
  10. "runtime/pprof"
  11. "sort"
  12. "strings"
  13. "sync"
  14. "time"
  15. "google.golang.org/grpc"
  16. "github.com/chrislusf/seaweedfs/weed/util/log"
  17. "github.com/chrislusf/seaweedfs/weed/operation"
  18. "github.com/chrislusf/seaweedfs/weed/security"
  19. "github.com/chrislusf/seaweedfs/weed/util"
  20. "github.com/chrislusf/seaweedfs/weed/wdclient"
  21. )
  22. type BenchmarkOptions struct {
  23. masters *string
  24. concurrency *int
  25. numberOfFiles *int
  26. fileSize *int
  27. idListFile *string
  28. write *bool
  29. deletePercentage *int
  30. read *bool
  31. sequentialRead *bool
  32. collection *string
  33. replication *string
  34. cpuprofile *string
  35. maxCpu *int
  36. grpcDialOption grpc.DialOption
  37. masterClient *wdclient.MasterClient
  38. fsync *bool
  39. }
  40. var (
  41. b BenchmarkOptions
  42. sharedBytes []byte
  43. isSecure bool
  44. )
  45. func init() {
  46. cmdBenchmark.Run = runBenchmark // break init cycle
  47. cmdBenchmark.IsDebug = cmdBenchmark.Flag.Bool("debug", false, "verbose debug information")
  48. b.masters = cmdBenchmark.Flag.String("master", "localhost:9333", "SeaweedFS master location")
  49. b.concurrency = cmdBenchmark.Flag.Int("c", 16, "number of concurrent write or read processes")
  50. b.fileSize = cmdBenchmark.Flag.Int("size", 1024, "simulated file size in bytes, with random(0~63) bytes padding")
  51. b.numberOfFiles = cmdBenchmark.Flag.Int("n", 1024*1024, "number of files to write for each thread")
  52. b.idListFile = cmdBenchmark.Flag.String("list", os.TempDir()+"/benchmark_list.txt", "list of uploaded file ids")
  53. b.write = cmdBenchmark.Flag.Bool("write", true, "enable write")
  54. b.deletePercentage = cmdBenchmark.Flag.Int("deletePercent", 0, "the percent of writes that are deletes")
  55. b.read = cmdBenchmark.Flag.Bool("read", true, "enable read")
  56. b.sequentialRead = cmdBenchmark.Flag.Bool("readSequentially", false, "randomly read by ids from \"-list\" specified file")
  57. b.collection = cmdBenchmark.Flag.String("collection", "benchmark", "write data to this collection")
  58. b.replication = cmdBenchmark.Flag.String("replication", "000", "replication type")
  59. b.cpuprofile = cmdBenchmark.Flag.String("cpuprofile", "", "cpu profile output file")
  60. b.maxCpu = cmdBenchmark.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs")
  61. b.fsync = cmdBenchmark.Flag.Bool("fsync", false, "flush data to disk after write")
  62. sharedBytes = make([]byte, 1024)
  63. }
  64. var cmdBenchmark = &Command{
  65. UsageLine: "benchmark -master=localhost:9333 -c=10 -n=100000",
  66. Short: "benchmark on writing millions of files and read out",
  67. Long: `benchmark on an empty SeaweedFS file system.
  68. Two tests during benchmark:
  69. 1) write lots of small files to the system
  70. 2) read the files out
  71. The file content is mostly zero, but no compression is done.
  72. You can choose to only benchmark read or write.
  73. During write, the list of uploaded file ids is stored in "-list" specified file.
  74. You can also use your own list of file ids to run read test.
  75. Write speed and read speed will be collected.
  76. The numbers are used to get a sense of the system.
  77. Usually your network or the hard drive is the real bottleneck.
  78. Another thing to watch is whether the volumes are evenly distributed
  79. to each volume server. Because the 7 more benchmark volumes are randomly distributed
  80. to servers with free slots, it's highly possible some servers have uneven amount of
  81. benchmark volumes. To remedy this, you can use this to grow the benchmark volumes
  82. before starting the benchmark command:
  83. http://localhost:9333/vol/grow?collection=benchmark&count=5
  84. After benchmarking, you can clean up the written data by deleting the benchmark collection
  85. http://localhost:9333/col/delete?collection=benchmark
  86. `,
  87. }
  88. var (
  89. wait sync.WaitGroup
  90. writeStats *stats
  91. readStats *stats
  92. )
  93. func runBenchmark(cmd *Command, args []string) bool {
  94. util.LoadConfiguration("security", false)
  95. b.grpcDialOption = security.LoadClientTLS(util.GetViper(), "grpc.client")
  96. fmt.Printf("This is SeaweedFS version %s %s %s\n", util.Version(), runtime.GOOS, runtime.GOARCH)
  97. if *b.maxCpu < 1 {
  98. *b.maxCpu = runtime.NumCPU()
  99. }
  100. runtime.GOMAXPROCS(*b.maxCpu)
  101. if *b.cpuprofile != "" {
  102. f, err := os.Create(*b.cpuprofile)
  103. if err != nil {
  104. log.Fatal(err)
  105. }
  106. pprof.StartCPUProfile(f)
  107. defer pprof.StopCPUProfile()
  108. }
  109. b.masterClient = wdclient.NewMasterClient(b.grpcDialOption, "client", "", 0, "", strings.Split(*b.masters, ","))
  110. go b.masterClient.KeepConnectedToMaster()
  111. b.masterClient.WaitUntilConnected()
  112. if *b.write {
  113. benchWrite()
  114. }
  115. if *b.read {
  116. benchRead()
  117. }
  118. return true
  119. }
  120. func benchWrite() {
  121. fileIdLineChan := make(chan string)
  122. finishChan := make(chan bool)
  123. writeStats = newStats(*b.concurrency)
  124. idChan := make(chan int)
  125. go writeFileIds(*b.idListFile, fileIdLineChan, finishChan)
  126. for i := 0; i < *b.concurrency; i++ {
  127. wait.Add(1)
  128. go writeFiles(idChan, fileIdLineChan, &writeStats.localStats[i])
  129. }
  130. writeStats.start = time.Now()
  131. writeStats.total = *b.numberOfFiles
  132. go writeStats.checkProgress("Writing Benchmark", finishChan)
  133. for i := 0; i < *b.numberOfFiles; i++ {
  134. idChan <- i
  135. }
  136. close(idChan)
  137. wait.Wait()
  138. writeStats.end = time.Now()
  139. wait.Add(2)
  140. finishChan <- true
  141. finishChan <- true
  142. wait.Wait()
  143. close(finishChan)
  144. writeStats.printStats()
  145. }
  146. func benchRead() {
  147. fileIdLineChan := make(chan string)
  148. finishChan := make(chan bool)
  149. readStats = newStats(*b.concurrency)
  150. go readFileIds(*b.idListFile, fileIdLineChan)
  151. readStats.start = time.Now()
  152. readStats.total = *b.numberOfFiles
  153. go readStats.checkProgress("Randomly Reading Benchmark", finishChan)
  154. for i := 0; i < *b.concurrency; i++ {
  155. wait.Add(1)
  156. go readFiles(fileIdLineChan, &readStats.localStats[i])
  157. }
  158. wait.Wait()
  159. wait.Add(1)
  160. finishChan <- true
  161. wait.Wait()
  162. close(finishChan)
  163. readStats.end = time.Now()
  164. readStats.printStats()
  165. }
  166. type delayedFile struct {
  167. enterTime time.Time
  168. fp *operation.FilePart
  169. }
  170. func writeFiles(idChan chan int, fileIdLineChan chan string, s *stat) {
  171. defer wait.Done()
  172. delayedDeleteChan := make(chan *delayedFile, 100)
  173. var waitForDeletions sync.WaitGroup
  174. for i := 0; i < 7; i++ {
  175. waitForDeletions.Add(1)
  176. go func() {
  177. defer waitForDeletions.Done()
  178. for df := range delayedDeleteChan {
  179. if df.enterTime.After(time.Now()) {
  180. time.Sleep(df.enterTime.Sub(time.Now()))
  181. }
  182. var jwtAuthorization security.EncodedJwt
  183. if isSecure {
  184. jwtAuthorization = operation.LookupJwt(b.masterClient.GetMaster(), df.fp.Fid)
  185. }
  186. if e := util.Delete(fmt.Sprintf("http://%s/%s", df.fp.Server, df.fp.Fid), string(jwtAuthorization)); e == nil {
  187. s.completed++
  188. } else {
  189. s.failed++
  190. }
  191. }
  192. }()
  193. }
  194. random := rand.New(rand.NewSource(time.Now().UnixNano()))
  195. for id := range idChan {
  196. start := time.Now()
  197. fileSize := int64(*b.fileSize + random.Intn(64))
  198. fp := &operation.FilePart{
  199. Reader: &FakeReader{id: uint64(id), size: fileSize, random: random},
  200. FileSize: fileSize,
  201. MimeType: "image/bench", // prevent gzip benchmark content
  202. Fsync: *b.fsync,
  203. }
  204. ar := &operation.VolumeAssignRequest{
  205. Count: 1,
  206. Collection: *b.collection,
  207. Replication: *b.replication,
  208. }
  209. if assignResult, err := operation.Assign(b.masterClient.GetMaster(), b.grpcDialOption, ar); err == nil {
  210. fp.Server, fp.Fid, fp.Collection = assignResult.Url, assignResult.Fid, *b.collection
  211. if !isSecure && assignResult.Auth != "" {
  212. isSecure = true
  213. }
  214. if _, err := fp.Upload(0, b.masterClient.GetMaster(), false, assignResult.Auth, b.grpcDialOption); err == nil {
  215. if random.Intn(100) < *b.deletePercentage {
  216. s.total++
  217. delayedDeleteChan <- &delayedFile{time.Now().Add(time.Second), fp}
  218. } else {
  219. fileIdLineChan <- fp.Fid
  220. }
  221. s.completed++
  222. s.transferred += fileSize
  223. } else {
  224. s.failed++
  225. fmt.Printf("Failed to write with error:%v\n", err)
  226. }
  227. writeStats.addSample(time.Now().Sub(start))
  228. if *cmdBenchmark.IsDebug {
  229. fmt.Printf("writing %d file %s\n", id, fp.Fid)
  230. }
  231. } else {
  232. s.failed++
  233. println("writing file error:", err.Error())
  234. }
  235. }
  236. close(delayedDeleteChan)
  237. waitForDeletions.Wait()
  238. }
  239. func readFiles(fileIdLineChan chan string, s *stat) {
  240. defer wait.Done()
  241. for fid := range fileIdLineChan {
  242. if len(fid) == 0 {
  243. continue
  244. }
  245. if fid[0] == '#' {
  246. continue
  247. }
  248. if *cmdBenchmark.IsDebug {
  249. fmt.Printf("reading file %s\n", fid)
  250. }
  251. start := time.Now()
  252. var bytesRead int
  253. var err error
  254. urls, err := b.masterClient.LookupFileId(fid)
  255. if err != nil {
  256. s.failed++
  257. println("!!!! ", fid, " location not found!!!!!")
  258. continue
  259. }
  260. var bytes []byte
  261. for _, url := range urls {
  262. bytes, _, err = util.Get(url)
  263. if err == nil {
  264. break
  265. }
  266. }
  267. bytesRead = len(bytes)
  268. if err == nil {
  269. s.completed++
  270. s.transferred += int64(bytesRead)
  271. readStats.addSample(time.Now().Sub(start))
  272. } else {
  273. s.failed++
  274. fmt.Printf("Failed to read %s error:%v\n", fid, err)
  275. }
  276. }
  277. }
  278. func writeFileIds(fileName string, fileIdLineChan chan string, finishChan chan bool) {
  279. file, err := os.OpenFile(fileName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
  280. if err != nil {
  281. log.Fatalf("File to create file %s: %s\n", fileName, err)
  282. }
  283. defer file.Close()
  284. for {
  285. select {
  286. case <-finishChan:
  287. wait.Done()
  288. return
  289. case line := <-fileIdLineChan:
  290. file.Write([]byte(line))
  291. file.Write([]byte("\n"))
  292. }
  293. }
  294. }
  295. func readFileIds(fileName string, fileIdLineChan chan string) {
  296. file, err := os.Open(fileName) // For read access.
  297. if err != nil {
  298. log.Fatalf("File to read file %s: %s\n", fileName, err)
  299. }
  300. defer file.Close()
  301. random := rand.New(rand.NewSource(time.Now().UnixNano()))
  302. r := bufio.NewReader(file)
  303. if *b.sequentialRead {
  304. for {
  305. if line, err := Readln(r); err == nil {
  306. fileIdLineChan <- string(line)
  307. } else {
  308. break
  309. }
  310. }
  311. } else {
  312. lines := make([]string, 0, readStats.total)
  313. for {
  314. if line, err := Readln(r); err == nil {
  315. lines = append(lines, string(line))
  316. } else {
  317. break
  318. }
  319. }
  320. if len(lines) > 0 {
  321. for i := 0; i < readStats.total; i++ {
  322. fileIdLineChan <- lines[random.Intn(len(lines))]
  323. }
  324. }
  325. }
  326. close(fileIdLineChan)
  327. }
  328. const (
  329. benchResolution = 10000 // 0.1 microsecond
  330. benchBucket = 1000000000 / benchResolution
  331. )
  332. // An efficient statics collecting and rendering
  333. type stats struct {
  334. data []int
  335. overflow []int
  336. localStats []stat
  337. start time.Time
  338. end time.Time
  339. total int
  340. }
  341. type stat struct {
  342. completed int
  343. failed int
  344. total int
  345. transferred int64
  346. }
  347. var percentages = []int{50, 66, 75, 80, 90, 95, 98, 99, 100}
  348. func newStats(n int) *stats {
  349. return &stats{
  350. data: make([]int, benchResolution),
  351. overflow: make([]int, 0),
  352. localStats: make([]stat, n),
  353. }
  354. }
  355. func (s *stats) addSample(d time.Duration) {
  356. index := int(d / benchBucket)
  357. if index < 0 {
  358. fmt.Printf("This request takes %3.1f seconds, skipping!\n", float64(index)/10000)
  359. } else if index < len(s.data) {
  360. s.data[int(d/benchBucket)]++
  361. } else {
  362. s.overflow = append(s.overflow, index)
  363. }
  364. }
  365. func (s *stats) checkProgress(testName string, finishChan chan bool) {
  366. fmt.Printf("\n------------ %s ----------\n", testName)
  367. ticker := time.Tick(time.Second)
  368. lastCompleted, lastTransferred, lastTime := 0, int64(0), time.Now()
  369. for {
  370. select {
  371. case <-finishChan:
  372. wait.Done()
  373. return
  374. case t := <-ticker:
  375. completed, transferred, taken, total := 0, int64(0), t.Sub(lastTime), s.total
  376. for _, localStat := range s.localStats {
  377. completed += localStat.completed
  378. transferred += localStat.transferred
  379. total += localStat.total
  380. }
  381. fmt.Printf("Completed %d of %d requests, %3.1f%% %3.1f/s %3.1fMB/s\n",
  382. completed, total, float64(completed)*100/float64(total),
  383. float64(completed-lastCompleted)*float64(int64(time.Second))/float64(int64(taken)),
  384. float64(transferred-lastTransferred)*float64(int64(time.Second))/float64(int64(taken))/float64(1024*1024),
  385. )
  386. lastCompleted, lastTransferred, lastTime = completed, transferred, t
  387. }
  388. }
  389. }
  390. func (s *stats) printStats() {
  391. completed, failed, transferred, total := 0, 0, int64(0), s.total
  392. for _, localStat := range s.localStats {
  393. completed += localStat.completed
  394. failed += localStat.failed
  395. transferred += localStat.transferred
  396. total += localStat.total
  397. }
  398. timeTaken := float64(int64(s.end.Sub(s.start))) / 1000000000
  399. fmt.Printf("\nConcurrency Level: %d\n", *b.concurrency)
  400. fmt.Printf("Time taken for tests: %.3f seconds\n", timeTaken)
  401. fmt.Printf("Complete requests: %d\n", completed)
  402. fmt.Printf("Failed requests: %d\n", failed)
  403. fmt.Printf("Total transferred: %d bytes\n", transferred)
  404. fmt.Printf("Requests per second: %.2f [#/sec]\n", float64(completed)/timeTaken)
  405. fmt.Printf("Transfer rate: %.2f [Kbytes/sec]\n", float64(transferred)/1024/timeTaken)
  406. n, sum := 0, 0
  407. min, max := 10000000, 0
  408. for i := 0; i < len(s.data); i++ {
  409. n += s.data[i]
  410. sum += s.data[i] * i
  411. if s.data[i] > 0 {
  412. if min > i {
  413. min = i
  414. }
  415. if max < i {
  416. max = i
  417. }
  418. }
  419. }
  420. n += len(s.overflow)
  421. for i := 0; i < len(s.overflow); i++ {
  422. sum += s.overflow[i]
  423. if min > s.overflow[i] {
  424. min = s.overflow[i]
  425. }
  426. if max < s.overflow[i] {
  427. max = s.overflow[i]
  428. }
  429. }
  430. avg := float64(sum) / float64(n)
  431. varianceSum := 0.0
  432. for i := 0; i < len(s.data); i++ {
  433. if s.data[i] > 0 {
  434. d := float64(i) - avg
  435. varianceSum += d * d * float64(s.data[i])
  436. }
  437. }
  438. for i := 0; i < len(s.overflow); i++ {
  439. d := float64(s.overflow[i]) - avg
  440. varianceSum += d * d
  441. }
  442. std := math.Sqrt(varianceSum / float64(n))
  443. fmt.Printf("\nConnection Times (ms)\n")
  444. fmt.Printf(" min avg max std\n")
  445. fmt.Printf("Total: %2.1f %3.1f %3.1f %3.1f\n", float32(min)/10, float32(avg)/10, float32(max)/10, std/10)
  446. // printing percentiles
  447. fmt.Printf("\nPercentage of the requests served within a certain time (ms)\n")
  448. percentiles := make([]int, len(percentages))
  449. for i := 0; i < len(percentages); i++ {
  450. percentiles[i] = n * percentages[i] / 100
  451. }
  452. percentiles[len(percentiles)-1] = n
  453. percentileIndex := 0
  454. currentSum := 0
  455. for i := 0; i < len(s.data); i++ {
  456. currentSum += s.data[i]
  457. if s.data[i] > 0 && percentileIndex < len(percentiles) && currentSum >= percentiles[percentileIndex] {
  458. fmt.Printf(" %3d%% %5.1f ms\n", percentages[percentileIndex], float32(i)/10.0)
  459. percentileIndex++
  460. for percentileIndex < len(percentiles) && currentSum >= percentiles[percentileIndex] {
  461. percentileIndex++
  462. }
  463. }
  464. }
  465. sort.Ints(s.overflow)
  466. for i := 0; i < len(s.overflow); i++ {
  467. currentSum++
  468. if percentileIndex < len(percentiles) && currentSum >= percentiles[percentileIndex] {
  469. fmt.Printf(" %3d%% %5.1f ms\n", percentages[percentileIndex], float32(s.overflow[i])/10.0)
  470. percentileIndex++
  471. for percentileIndex < len(percentiles) && currentSum >= percentiles[percentileIndex] {
  472. percentileIndex++
  473. }
  474. }
  475. }
  476. }
  477. // a fake reader to generate content to upload
  478. type FakeReader struct {
  479. id uint64 // an id number
  480. size int64 // max bytes
  481. random *rand.Rand
  482. }
  483. func (l *FakeReader) Read(p []byte) (n int, err error) {
  484. if l.size <= 0 {
  485. return 0, io.EOF
  486. }
  487. if int64(len(p)) > l.size {
  488. n = int(l.size)
  489. } else {
  490. n = len(p)
  491. }
  492. if n >= 8 {
  493. for i := 0; i < 8; i++ {
  494. p[i] = byte(l.id >> uint(i*8))
  495. }
  496. l.random.Read(p[8:])
  497. }
  498. l.size -= int64(n)
  499. return
  500. }
  501. func (l *FakeReader) WriteTo(w io.Writer) (n int64, err error) {
  502. size := int(l.size)
  503. bufferSize := len(sharedBytes)
  504. for size > 0 {
  505. tempBuffer := sharedBytes
  506. if size < bufferSize {
  507. tempBuffer = sharedBytes[0:size]
  508. }
  509. count, e := w.Write(tempBuffer)
  510. if e != nil {
  511. return int64(size), e
  512. }
  513. size -= count
  514. }
  515. return l.size, nil
  516. }
  517. func Readln(r *bufio.Reader) ([]byte, error) {
  518. var (
  519. isPrefix = true
  520. err error
  521. line, ln []byte
  522. )
  523. for isPrefix && err == nil {
  524. line, isPrefix, err = r.ReadLine()
  525. ln = append(ln, line...)
  526. }
  527. return ln, err
  528. }