You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

603 lines
16 KiB

6 years ago
6 years ago
6 years ago
10 years ago
6 years ago
6 years ago
6 years ago
  1. package command
  2. import (
  3. "bufio"
  4. "context"
  5. "fmt"
  6. "io"
  7. "math"
  8. "math/rand"
  9. "os"
  10. "runtime"
  11. "runtime/pprof"
  12. "sort"
  13. "strings"
  14. "sync"
  15. "time"
  16. "google.golang.org/grpc"
  17. "github.com/chrislusf/seaweedfs/weed/glog"
  18. "github.com/chrislusf/seaweedfs/weed/operation"
  19. "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
  20. "github.com/chrislusf/seaweedfs/weed/security"
  21. "github.com/chrislusf/seaweedfs/weed/util"
  22. "github.com/chrislusf/seaweedfs/weed/wdclient"
  23. )
  24. type BenchmarkOptions struct {
  25. masters *string
  26. concurrency *int
  27. numberOfFiles *int
  28. fileSize *int
  29. idListFile *string
  30. write *bool
  31. deletePercentage *int
  32. read *bool
  33. sequentialRead *bool
  34. collection *string
  35. replication *string
  36. cpuprofile *string
  37. maxCpu *int
  38. grpcDialOption grpc.DialOption
  39. masterClient *wdclient.MasterClient
  40. grpcRead *bool
  41. }
  42. var (
  43. b BenchmarkOptions
  44. sharedBytes []byte
  45. isSecure bool
  46. )
  47. func init() {
  48. cmdBenchmark.Run = runBenchmark // break init cycle
  49. cmdBenchmark.IsDebug = cmdBenchmark.Flag.Bool("debug", false, "verbose debug information")
  50. b.masters = cmdBenchmark.Flag.String("master", "localhost:9333", "SeaweedFS master location")
  51. b.concurrency = cmdBenchmark.Flag.Int("c", 16, "number of concurrent write or read processes")
  52. b.fileSize = cmdBenchmark.Flag.Int("size", 1024, "simulated file size in bytes, with random(0~63) bytes padding")
  53. b.numberOfFiles = cmdBenchmark.Flag.Int("n", 1024*1024, "number of files to write for each thread")
  54. b.idListFile = cmdBenchmark.Flag.String("list", os.TempDir()+"/benchmark_list.txt", "list of uploaded file ids")
  55. b.write = cmdBenchmark.Flag.Bool("write", true, "enable write")
  56. b.deletePercentage = cmdBenchmark.Flag.Int("deletePercent", 0, "the percent of writes that are deletes")
  57. b.read = cmdBenchmark.Flag.Bool("read", true, "enable read")
  58. b.sequentialRead = cmdBenchmark.Flag.Bool("readSequentially", false, "randomly read by ids from \"-list\" specified file")
  59. b.collection = cmdBenchmark.Flag.String("collection", "benchmark", "write data to this collection")
  60. b.replication = cmdBenchmark.Flag.String("replication", "000", "replication type")
  61. b.cpuprofile = cmdBenchmark.Flag.String("cpuprofile", "", "cpu profile output file")
  62. b.maxCpu = cmdBenchmark.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs")
  63. b.grpcRead = cmdBenchmark.Flag.Bool("grpcRead", false, "use grpc API to read")
  64. sharedBytes = make([]byte, 1024)
  65. }
  66. var cmdBenchmark = &Command{
  67. UsageLine: "benchmark -master=localhost:9333 -c=10 -n=100000",
  68. Short: "benchmark on writing millions of files and read out",
  69. Long: `benchmark on an empty SeaweedFS file system.
  70. Two tests during benchmark:
  71. 1) write lots of small files to the system
  72. 2) read the files out
  73. The file content is mostly zero, but no compression is done.
  74. You can choose to only benchmark read or write.
  75. During write, the list of uploaded file ids is stored in "-list" specified file.
  76. You can also use your own list of file ids to run read test.
  77. Write speed and read speed will be collected.
  78. The numbers are used to get a sense of the system.
  79. Usually your network or the hard drive is the real bottleneck.
  80. Another thing to watch is whether the volumes are evenly distributed
  81. to each volume server. Because the 7 more benchmark volumes are randomly distributed
  82. to servers with free slots, it's highly possible some servers have uneven amount of
  83. benchmark volumes. To remedy this, you can use this to grow the benchmark volumes
  84. before starting the benchmark command:
  85. http://localhost:9333/vol/grow?collection=benchmark&count=5
  86. After benchmarking, you can clean up the written data by deleting the benchmark collection
  87. http://localhost:9333/col/delete?collection=benchmark
  88. `,
  89. }
  90. var (
  91. wait sync.WaitGroup
  92. writeStats *stats
  93. readStats *stats
  94. )
  95. func runBenchmark(cmd *Command, args []string) bool {
  96. util.LoadConfiguration("security", false)
  97. b.grpcDialOption = security.LoadClientTLS(util.GetViper(), "grpc.client")
  98. fmt.Printf("This is SeaweedFS version %s %s %s\n", util.VERSION, runtime.GOOS, runtime.GOARCH)
  99. if *b.maxCpu < 1 {
  100. *b.maxCpu = runtime.NumCPU()
  101. }
  102. runtime.GOMAXPROCS(*b.maxCpu)
  103. if *b.cpuprofile != "" {
  104. f, err := os.Create(*b.cpuprofile)
  105. if err != nil {
  106. glog.Fatal(err)
  107. }
  108. pprof.StartCPUProfile(f)
  109. defer pprof.StopCPUProfile()
  110. }
  111. b.masterClient = wdclient.NewMasterClient(context.Background(), b.grpcDialOption, "client", strings.Split(*b.masters, ","))
  112. go b.masterClient.KeepConnectedToMaster()
  113. b.masterClient.WaitUntilConnected()
  114. if *b.write {
  115. benchWrite()
  116. }
  117. if *b.read {
  118. benchRead()
  119. }
  120. return true
  121. }
  122. func benchWrite() {
  123. fileIdLineChan := make(chan string)
  124. finishChan := make(chan bool)
  125. writeStats = newStats(*b.concurrency)
  126. idChan := make(chan int)
  127. go writeFileIds(*b.idListFile, fileIdLineChan, finishChan)
  128. for i := 0; i < *b.concurrency; i++ {
  129. wait.Add(1)
  130. go writeFiles(idChan, fileIdLineChan, &writeStats.localStats[i])
  131. }
  132. writeStats.start = time.Now()
  133. writeStats.total = *b.numberOfFiles
  134. go writeStats.checkProgress("Writing Benchmark", finishChan)
  135. for i := 0; i < *b.numberOfFiles; i++ {
  136. idChan <- i
  137. }
  138. close(idChan)
  139. wait.Wait()
  140. writeStats.end = time.Now()
  141. wait.Add(2)
  142. finishChan <- true
  143. finishChan <- true
  144. wait.Wait()
  145. close(finishChan)
  146. writeStats.printStats()
  147. }
  148. func benchRead() {
  149. fileIdLineChan := make(chan string)
  150. finishChan := make(chan bool)
  151. readStats = newStats(*b.concurrency)
  152. go readFileIds(*b.idListFile, fileIdLineChan)
  153. readStats.start = time.Now()
  154. readStats.total = *b.numberOfFiles
  155. go readStats.checkProgress("Randomly Reading Benchmark", finishChan)
  156. for i := 0; i < *b.concurrency; i++ {
  157. wait.Add(1)
  158. go readFiles(fileIdLineChan, &readStats.localStats[i])
  159. }
  160. wait.Wait()
  161. wait.Add(1)
  162. finishChan <- true
  163. wait.Wait()
  164. close(finishChan)
  165. readStats.end = time.Now()
  166. readStats.printStats()
  167. }
  168. type delayedFile struct {
  169. enterTime time.Time
  170. fp *operation.FilePart
  171. }
  172. func writeFiles(idChan chan int, fileIdLineChan chan string, s *stat) {
  173. defer wait.Done()
  174. delayedDeleteChan := make(chan *delayedFile, 100)
  175. var waitForDeletions sync.WaitGroup
  176. for i := 0; i < 7; i++ {
  177. waitForDeletions.Add(1)
  178. go func() {
  179. defer waitForDeletions.Done()
  180. for df := range delayedDeleteChan {
  181. if df.enterTime.After(time.Now()) {
  182. time.Sleep(df.enterTime.Sub(time.Now()))
  183. }
  184. var jwtAuthorization security.EncodedJwt
  185. if isSecure {
  186. jwtAuthorization = operation.LookupJwt(b.masterClient.GetMaster(), df.fp.Fid)
  187. }
  188. if e := util.Delete(fmt.Sprintf("http://%s/%s", df.fp.Server, df.fp.Fid), string(jwtAuthorization)); e == nil {
  189. s.completed++
  190. } else {
  191. s.failed++
  192. }
  193. }
  194. }()
  195. }
  196. random := rand.New(rand.NewSource(time.Now().UnixNano()))
  197. for id := range idChan {
  198. start := time.Now()
  199. fileSize := int64(*b.fileSize + random.Intn(64))
  200. fp := &operation.FilePart{
  201. Reader: &FakeReader{id: uint64(id), size: fileSize},
  202. FileSize: fileSize,
  203. MimeType: "image/bench", // prevent gzip benchmark content
  204. }
  205. ar := &operation.VolumeAssignRequest{
  206. Count: 1,
  207. Collection: *b.collection,
  208. Replication: *b.replication,
  209. }
  210. if assignResult, err := operation.Assign(b.masterClient.GetMaster(), b.grpcDialOption, ar); err == nil {
  211. fp.Server, fp.Fid, fp.Collection = assignResult.Url, assignResult.Fid, *b.collection
  212. if !isSecure && assignResult.Auth != "" {
  213. isSecure = true
  214. }
  215. if _, err := fp.Upload(0, b.masterClient.GetMaster(), assignResult.Auth, b.grpcDialOption); err == nil {
  216. if random.Intn(100) < *b.deletePercentage {
  217. s.total++
  218. delayedDeleteChan <- &delayedFile{time.Now().Add(time.Second), fp}
  219. } else {
  220. fileIdLineChan <- fp.Fid
  221. }
  222. s.completed++
  223. s.transferred += fileSize
  224. } else {
  225. s.failed++
  226. fmt.Printf("Failed to write with error:%v\n", err)
  227. }
  228. writeStats.addSample(time.Now().Sub(start))
  229. if *cmdBenchmark.IsDebug {
  230. fmt.Printf("writing %d file %s\n", id, fp.Fid)
  231. }
  232. } else {
  233. s.failed++
  234. println("writing file error:", err.Error())
  235. }
  236. }
  237. close(delayedDeleteChan)
  238. waitForDeletions.Wait()
  239. }
  240. func readFiles(fileIdLineChan chan string, s *stat) {
  241. defer wait.Done()
  242. for fid := range fileIdLineChan {
  243. if len(fid) == 0 {
  244. continue
  245. }
  246. if fid[0] == '#' {
  247. continue
  248. }
  249. if *cmdBenchmark.IsDebug {
  250. fmt.Printf("reading file %s\n", fid)
  251. }
  252. start := time.Now()
  253. var bytesRead int
  254. var err error
  255. if *b.grpcRead {
  256. volumeServer, err := b.masterClient.LookupVolumeServer(fid)
  257. if err != nil {
  258. s.failed++
  259. println("!!!! ", fid, " location not found!!!!!")
  260. continue
  261. }
  262. bytesRead, err = grpcFileGet(volumeServer, fid, b.grpcDialOption)
  263. } else {
  264. url, err := b.masterClient.LookupFileId(fid)
  265. if err != nil {
  266. s.failed++
  267. println("!!!! ", fid, " location not found!!!!!")
  268. continue
  269. }
  270. var bytes []byte
  271. bytes, err = util.Get(url)
  272. bytesRead = len(bytes)
  273. }
  274. if err == nil {
  275. s.completed++
  276. s.transferred += int64(bytesRead)
  277. readStats.addSample(time.Now().Sub(start))
  278. } else {
  279. s.failed++
  280. fmt.Printf("Failed to read %s error:%v\n", fid, err)
  281. }
  282. }
  283. }
  284. func grpcFileGet(volumeServer, fid string, grpcDialOption grpc.DialOption) (bytesRead int, err error) {
  285. err = operation.WithVolumeServerClient(volumeServer, grpcDialOption, func(ctx context.Context, client volume_server_pb.VolumeServerClient) error {
  286. fileGetClient, err := client.FileGet(ctx, &volume_server_pb.FileGetRequest{FileId: fid})
  287. if err != nil {
  288. return err
  289. }
  290. for {
  291. resp, respErr := fileGetClient.Recv()
  292. if resp != nil {
  293. bytesRead += len(resp.Data)
  294. }
  295. if respErr != nil {
  296. if respErr == io.EOF {
  297. return nil
  298. }
  299. return respErr
  300. }
  301. }
  302. })
  303. return
  304. }
  305. func writeFileIds(fileName string, fileIdLineChan chan string, finishChan chan bool) {
  306. file, err := os.OpenFile(fileName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
  307. if err != nil {
  308. glog.Fatalf("File to create file %s: %s\n", fileName, err)
  309. }
  310. defer file.Close()
  311. for {
  312. select {
  313. case <-finishChan:
  314. wait.Done()
  315. return
  316. case line := <-fileIdLineChan:
  317. file.Write([]byte(line))
  318. file.Write([]byte("\n"))
  319. }
  320. }
  321. }
  322. func readFileIds(fileName string, fileIdLineChan chan string) {
  323. file, err := os.Open(fileName) // For read access.
  324. if err != nil {
  325. glog.Fatalf("File to read file %s: %s\n", fileName, err)
  326. }
  327. defer file.Close()
  328. random := rand.New(rand.NewSource(time.Now().UnixNano()))
  329. r := bufio.NewReader(file)
  330. if *b.sequentialRead {
  331. for {
  332. if line, err := Readln(r); err == nil {
  333. fileIdLineChan <- string(line)
  334. } else {
  335. break
  336. }
  337. }
  338. } else {
  339. lines := make([]string, 0, readStats.total)
  340. for {
  341. if line, err := Readln(r); err == nil {
  342. lines = append(lines, string(line))
  343. } else {
  344. break
  345. }
  346. }
  347. if len(lines) > 0 {
  348. for i := 0; i < readStats.total; i++ {
  349. fileIdLineChan <- lines[random.Intn(len(lines))]
  350. }
  351. }
  352. }
  353. close(fileIdLineChan)
  354. }
  355. const (
  356. benchResolution = 10000 //0.1 microsecond
  357. benchBucket = 1000000000 / benchResolution
  358. )
  359. // An efficient statics collecting and rendering
  360. type stats struct {
  361. data []int
  362. overflow []int
  363. localStats []stat
  364. start time.Time
  365. end time.Time
  366. total int
  367. }
  368. type stat struct {
  369. completed int
  370. failed int
  371. total int
  372. transferred int64
  373. }
  374. var percentages = []int{50, 66, 75, 80, 90, 95, 98, 99, 100}
  375. func newStats(n int) *stats {
  376. return &stats{
  377. data: make([]int, benchResolution),
  378. overflow: make([]int, 0),
  379. localStats: make([]stat, n),
  380. }
  381. }
  382. func (s *stats) addSample(d time.Duration) {
  383. index := int(d / benchBucket)
  384. if index < 0 {
  385. fmt.Printf("This request takes %3.1f seconds, skipping!\n", float64(index)/10000)
  386. } else if index < len(s.data) {
  387. s.data[int(d/benchBucket)]++
  388. } else {
  389. s.overflow = append(s.overflow, index)
  390. }
  391. }
  392. func (s *stats) checkProgress(testName string, finishChan chan bool) {
  393. fmt.Printf("\n------------ %s ----------\n", testName)
  394. ticker := time.Tick(time.Second)
  395. lastCompleted, lastTransferred, lastTime := 0, int64(0), time.Now()
  396. for {
  397. select {
  398. case <-finishChan:
  399. wait.Done()
  400. return
  401. case t := <-ticker:
  402. completed, transferred, taken, total := 0, int64(0), t.Sub(lastTime), s.total
  403. for _, localStat := range s.localStats {
  404. completed += localStat.completed
  405. transferred += localStat.transferred
  406. total += localStat.total
  407. }
  408. fmt.Printf("Completed %d of %d requests, %3.1f%% %3.1f/s %3.1fMB/s\n",
  409. completed, total, float64(completed)*100/float64(total),
  410. float64(completed-lastCompleted)*float64(int64(time.Second))/float64(int64(taken)),
  411. float64(transferred-lastTransferred)*float64(int64(time.Second))/float64(int64(taken))/float64(1024*1024),
  412. )
  413. lastCompleted, lastTransferred, lastTime = completed, transferred, t
  414. }
  415. }
  416. }
  417. func (s *stats) printStats() {
  418. completed, failed, transferred, total := 0, 0, int64(0), s.total
  419. for _, localStat := range s.localStats {
  420. completed += localStat.completed
  421. failed += localStat.failed
  422. transferred += localStat.transferred
  423. total += localStat.total
  424. }
  425. timeTaken := float64(int64(s.end.Sub(s.start))) / 1000000000
  426. fmt.Printf("\nConcurrency Level: %d\n", *b.concurrency)
  427. fmt.Printf("Time taken for tests: %.3f seconds\n", timeTaken)
  428. fmt.Printf("Complete requests: %d\n", completed)
  429. fmt.Printf("Failed requests: %d\n", failed)
  430. fmt.Printf("Total transferred: %d bytes\n", transferred)
  431. fmt.Printf("Requests per second: %.2f [#/sec]\n", float64(completed)/timeTaken)
  432. fmt.Printf("Transfer rate: %.2f [Kbytes/sec]\n", float64(transferred)/1024/timeTaken)
  433. n, sum := 0, 0
  434. min, max := 10000000, 0
  435. for i := 0; i < len(s.data); i++ {
  436. n += s.data[i]
  437. sum += s.data[i] * i
  438. if s.data[i] > 0 {
  439. if min > i {
  440. min = i
  441. }
  442. if max < i {
  443. max = i
  444. }
  445. }
  446. }
  447. n += len(s.overflow)
  448. for i := 0; i < len(s.overflow); i++ {
  449. sum += s.overflow[i]
  450. if min > s.overflow[i] {
  451. min = s.overflow[i]
  452. }
  453. if max < s.overflow[i] {
  454. max = s.overflow[i]
  455. }
  456. }
  457. avg := float64(sum) / float64(n)
  458. varianceSum := 0.0
  459. for i := 0; i < len(s.data); i++ {
  460. if s.data[i] > 0 {
  461. d := float64(i) - avg
  462. varianceSum += d * d * float64(s.data[i])
  463. }
  464. }
  465. for i := 0; i < len(s.overflow); i++ {
  466. d := float64(s.overflow[i]) - avg
  467. varianceSum += d * d
  468. }
  469. std := math.Sqrt(varianceSum / float64(n))
  470. fmt.Printf("\nConnection Times (ms)\n")
  471. fmt.Printf(" min avg max std\n")
  472. fmt.Printf("Total: %2.1f %3.1f %3.1f %3.1f\n", float32(min)/10, float32(avg)/10, float32(max)/10, std/10)
  473. //printing percentiles
  474. fmt.Printf("\nPercentage of the requests served within a certain time (ms)\n")
  475. percentiles := make([]int, len(percentages))
  476. for i := 0; i < len(percentages); i++ {
  477. percentiles[i] = n * percentages[i] / 100
  478. }
  479. percentiles[len(percentiles)-1] = n
  480. percentileIndex := 0
  481. currentSum := 0
  482. for i := 0; i < len(s.data); i++ {
  483. currentSum += s.data[i]
  484. if s.data[i] > 0 && percentileIndex < len(percentiles) && currentSum >= percentiles[percentileIndex] {
  485. fmt.Printf(" %3d%% %5.1f ms\n", percentages[percentileIndex], float32(i)/10.0)
  486. percentileIndex++
  487. for percentileIndex < len(percentiles) && currentSum >= percentiles[percentileIndex] {
  488. percentileIndex++
  489. }
  490. }
  491. }
  492. sort.Ints(s.overflow)
  493. for i := 0; i < len(s.overflow); i++ {
  494. currentSum++
  495. if percentileIndex < len(percentiles) && currentSum >= percentiles[percentileIndex] {
  496. fmt.Printf(" %3d%% %5.1f ms\n", percentages[percentileIndex], float32(s.overflow[i])/10.0)
  497. percentileIndex++
  498. for percentileIndex < len(percentiles) && currentSum >= percentiles[percentileIndex] {
  499. percentileIndex++
  500. }
  501. }
  502. }
  503. }
  504. // a fake reader to generate content to upload
  505. type FakeReader struct {
  506. id uint64 // an id number
  507. size int64 // max bytes
  508. }
  509. func (l *FakeReader) Read(p []byte) (n int, err error) {
  510. if l.size <= 0 {
  511. return 0, io.EOF
  512. }
  513. if int64(len(p)) > l.size {
  514. n = int(l.size)
  515. } else {
  516. n = len(p)
  517. }
  518. if n >= 8 {
  519. for i := 0; i < 8; i++ {
  520. p[i] = byte(l.id >> uint(i*8))
  521. }
  522. }
  523. l.size -= int64(n)
  524. return
  525. }
  526. func (l *FakeReader) WriteTo(w io.Writer) (n int64, err error) {
  527. size := int(l.size)
  528. bufferSize := len(sharedBytes)
  529. for size > 0 {
  530. tempBuffer := sharedBytes
  531. if size < bufferSize {
  532. tempBuffer = sharedBytes[0:size]
  533. }
  534. count, e := w.Write(tempBuffer)
  535. if e != nil {
  536. return int64(size), e
  537. }
  538. size -= count
  539. }
  540. return l.size, nil
  541. }
  542. func Readln(r *bufio.Reader) ([]byte, error) {
  543. var (
  544. isPrefix = true
  545. err error
  546. line, ln []byte
  547. )
  548. for isPrefix && err == nil {
  549. line, isPrefix, err = r.ReadLine()
  550. ln = append(ln, line...)
  551. }
  552. return ln, err
  553. }