You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

624 lines
17 KiB

4 years ago
6 years ago
4 years ago
6 years ago
4 years ago
6 years ago
10 years ago
6 years ago
4 years ago
5 years ago
4 years ago
4 years ago
5 years ago
5 years ago
5 years ago
4 years ago
5 years ago
5 years ago
  1. package command
  2. import (
  3. "bufio"
  4. "fmt"
  5. "io"
  6. "math"
  7. "math/rand"
  8. "os"
  9. "runtime"
  10. "runtime/pprof"
  11. "sort"
  12. "strings"
  13. "sync"
  14. "time"
  15. "google.golang.org/grpc"
  16. "github.com/chrislusf/seaweedfs/weed/glog"
  17. "github.com/chrislusf/seaweedfs/weed/operation"
  18. "github.com/chrislusf/seaweedfs/weed/security"
  19. "github.com/chrislusf/seaweedfs/weed/util"
  20. "github.com/chrislusf/seaweedfs/weed/wdclient"
  21. )
  22. type BenchmarkOptions struct {
  23. masters *string
  24. concurrency *int
  25. numberOfFiles *int
  26. fileSize *int
  27. idListFile *string
  28. write *bool
  29. deletePercentage *int
  30. read *bool
  31. sequentialRead *bool
  32. collection *string
  33. replication *string
  34. diskType *string
  35. cpuprofile *string
  36. maxCpu *int
  37. grpcDialOption grpc.DialOption
  38. masterClient *wdclient.MasterClient
  39. fsync *bool
  40. useTcp *bool
  41. useUdp *bool
  42. }
  43. var (
  44. b BenchmarkOptions
  45. sharedBytes []byte
  46. isSecure bool
  47. )
  48. func init() {
  49. cmdBenchmark.Run = runBenchmark // break init cycle
  50. cmdBenchmark.IsDebug = cmdBenchmark.Flag.Bool("debug", false, "verbose debug information")
  51. b.masters = cmdBenchmark.Flag.String("master", "localhost:9333", "SeaweedFS master location")
  52. b.concurrency = cmdBenchmark.Flag.Int("c", 16, "number of concurrent write or read processes")
  53. b.fileSize = cmdBenchmark.Flag.Int("size", 1024, "simulated file size in bytes, with random(0~63) bytes padding")
  54. b.numberOfFiles = cmdBenchmark.Flag.Int("n", 1024*1024, "number of files to write for each thread")
  55. b.idListFile = cmdBenchmark.Flag.String("list", os.TempDir()+"/benchmark_list.txt", "list of uploaded file ids")
  56. b.write = cmdBenchmark.Flag.Bool("write", true, "enable write")
  57. b.deletePercentage = cmdBenchmark.Flag.Int("deletePercent", 0, "the percent of writes that are deletes")
  58. b.read = cmdBenchmark.Flag.Bool("read", true, "enable read")
  59. b.sequentialRead = cmdBenchmark.Flag.Bool("readSequentially", false, "randomly read by ids from \"-list\" specified file")
  60. b.collection = cmdBenchmark.Flag.String("collection", "benchmark", "write data to this collection")
  61. b.replication = cmdBenchmark.Flag.String("replication", "000", "replication type")
  62. b.diskType = cmdBenchmark.Flag.String("disk", "", "[hdd|ssd|<tag>] hard drive or solid state drive or any tag")
  63. b.cpuprofile = cmdBenchmark.Flag.String("cpuprofile", "", "cpu profile output file")
  64. b.maxCpu = cmdBenchmark.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs")
  65. b.fsync = cmdBenchmark.Flag.Bool("fsync", false, "flush data to disk after write")
  66. b.useTcp = cmdBenchmark.Flag.Bool("useTcp", false, "write data via tcp")
  67. b.useUdp = cmdBenchmark.Flag.Bool("useUdp", false, "write data via udp")
  68. sharedBytes = make([]byte, 1024)
  69. }
  70. var cmdBenchmark = &Command{
  71. UsageLine: "benchmark -master=localhost:9333 -c=10 -n=100000",
  72. Short: "benchmark on writing millions of files and read out",
  73. Long: `benchmark on an empty SeaweedFS file system.
  74. Two tests during benchmark:
  75. 1) write lots of small files to the system
  76. 2) read the files out
  77. The file content is mostly zero, but no compression is done.
  78. You can choose to only benchmark read or write.
  79. During write, the list of uploaded file ids is stored in "-list" specified file.
  80. You can also use your own list of file ids to run read test.
  81. Write speed and read speed will be collected.
  82. The numbers are used to get a sense of the system.
  83. Usually your network or the hard drive is the real bottleneck.
  84. Another thing to watch is whether the volumes are evenly distributed
  85. to each volume server. Because the 7 more benchmark volumes are randomly distributed
  86. to servers with free slots, it's highly possible some servers have uneven amount of
  87. benchmark volumes. To remedy this, you can use this to grow the benchmark volumes
  88. before starting the benchmark command:
  89. http://localhost:9333/vol/grow?collection=benchmark&count=5
  90. After benchmarking, you can clean up the written data by deleting the benchmark collection
  91. http://localhost:9333/col/delete?collection=benchmark
  92. `,
  93. }
  94. var (
  95. wait sync.WaitGroup
  96. writeStats *stats
  97. readStats *stats
  98. )
  99. func runBenchmark(cmd *Command, args []string) bool {
  100. util.LoadConfiguration("security", false)
  101. b.grpcDialOption = security.LoadClientTLS(util.GetViper(), "grpc.client")
  102. fmt.Printf("This is SeaweedFS version %s %s %s\n", util.Version(), runtime.GOOS, runtime.GOARCH)
  103. if *b.maxCpu < 1 {
  104. *b.maxCpu = runtime.NumCPU()
  105. }
  106. runtime.GOMAXPROCS(*b.maxCpu)
  107. if *b.cpuprofile != "" {
  108. f, err := os.Create(*b.cpuprofile)
  109. if err != nil {
  110. glog.Fatal(err)
  111. }
  112. pprof.StartCPUProfile(f)
  113. defer pprof.StopCPUProfile()
  114. }
  115. b.masterClient = wdclient.NewMasterClient(b.grpcDialOption, "client", "", 0, "", strings.Split(*b.masters, ","))
  116. go b.masterClient.KeepConnectedToMaster()
  117. b.masterClient.WaitUntilConnected()
  118. if *b.write {
  119. benchWrite()
  120. }
  121. if *b.read {
  122. benchRead()
  123. }
  124. return true
  125. }
  126. func benchWrite() {
  127. fileIdLineChan := make(chan string)
  128. finishChan := make(chan bool)
  129. writeStats = newStats(*b.concurrency)
  130. idChan := make(chan int)
  131. go writeFileIds(*b.idListFile, fileIdLineChan, finishChan)
  132. for i := 0; i < *b.concurrency; i++ {
  133. wait.Add(1)
  134. go writeFiles(idChan, fileIdLineChan, &writeStats.localStats[i])
  135. }
  136. writeStats.start = time.Now()
  137. writeStats.total = *b.numberOfFiles
  138. go writeStats.checkProgress("Writing Benchmark", finishChan)
  139. for i := 0; i < *b.numberOfFiles; i++ {
  140. idChan <- i
  141. }
  142. close(idChan)
  143. wait.Wait()
  144. writeStats.end = time.Now()
  145. wait.Add(2)
  146. finishChan <- true
  147. finishChan <- true
  148. wait.Wait()
  149. close(finishChan)
  150. writeStats.printStats()
  151. }
  152. func benchRead() {
  153. fileIdLineChan := make(chan string)
  154. finishChan := make(chan bool)
  155. readStats = newStats(*b.concurrency)
  156. go readFileIds(*b.idListFile, fileIdLineChan)
  157. readStats.start = time.Now()
  158. readStats.total = *b.numberOfFiles
  159. go readStats.checkProgress("Randomly Reading Benchmark", finishChan)
  160. for i := 0; i < *b.concurrency; i++ {
  161. wait.Add(1)
  162. go readFiles(fileIdLineChan, &readStats.localStats[i])
  163. }
  164. wait.Wait()
  165. wait.Add(1)
  166. finishChan <- true
  167. wait.Wait()
  168. close(finishChan)
  169. readStats.end = time.Now()
  170. readStats.printStats()
  171. }
  172. type delayedFile struct {
  173. enterTime time.Time
  174. fp *operation.FilePart
  175. }
  176. func writeFiles(idChan chan int, fileIdLineChan chan string, s *stat) {
  177. defer wait.Done()
  178. delayedDeleteChan := make(chan *delayedFile, 100)
  179. var waitForDeletions sync.WaitGroup
  180. for i := 0; i < 7; i++ {
  181. waitForDeletions.Add(1)
  182. go func() {
  183. defer waitForDeletions.Done()
  184. for df := range delayedDeleteChan {
  185. if df.enterTime.After(time.Now()) {
  186. time.Sleep(df.enterTime.Sub(time.Now()))
  187. }
  188. var jwtAuthorization security.EncodedJwt
  189. if isSecure {
  190. jwtAuthorization = operation.LookupJwt(b.masterClient.GetMaster(), df.fp.Fid)
  191. }
  192. if e := util.Delete(fmt.Sprintf("http://%s/%s", df.fp.Server, df.fp.Fid), string(jwtAuthorization)); e == nil {
  193. s.completed++
  194. } else {
  195. s.failed++
  196. }
  197. }
  198. }()
  199. }
  200. random := rand.New(rand.NewSource(time.Now().UnixNano()))
  201. volumeTcpClient := wdclient.NewVolumeTcpClient()
  202. volumeUdpClient := wdclient.NewVolumeUdpClient()
  203. for id := range idChan {
  204. start := time.Now()
  205. fileSize := int64(*b.fileSize + random.Intn(64))
  206. fp := &operation.FilePart{
  207. Reader: &FakeReader{id: uint64(id), size: fileSize, random: random},
  208. FileSize: fileSize,
  209. MimeType: "image/bench", // prevent gzip benchmark content
  210. Fsync: *b.fsync,
  211. }
  212. ar := &operation.VolumeAssignRequest{
  213. Count: 1,
  214. Collection: *b.collection,
  215. Replication: *b.replication,
  216. DiskType: *b.diskType,
  217. }
  218. if assignResult, err := operation.Assign(b.masterClient.GetMaster, b.grpcDialOption, ar); err == nil {
  219. fp.Server, fp.Fid, fp.Collection = assignResult.Url, assignResult.Fid, *b.collection
  220. if !isSecure && assignResult.Auth != "" {
  221. isSecure = true
  222. }
  223. if *b.useTcp {
  224. if uploadByTcp(volumeTcpClient, fp) {
  225. fileIdLineChan <- fp.Fid
  226. s.completed++
  227. s.transferred += fileSize
  228. } else {
  229. s.failed++
  230. }
  231. } else if *b.useUdp {
  232. if uploadByUdp(volumeUdpClient, fp) {
  233. fileIdLineChan <- fp.Fid
  234. s.completed++
  235. s.transferred += fileSize
  236. } else {
  237. s.failed++
  238. }
  239. } else if _, err := fp.Upload(0, b.masterClient.GetMaster, false, assignResult.Auth, b.grpcDialOption); err == nil {
  240. if random.Intn(100) < *b.deletePercentage {
  241. s.total++
  242. delayedDeleteChan <- &delayedFile{time.Now().Add(time.Second), fp}
  243. } else {
  244. fileIdLineChan <- fp.Fid
  245. }
  246. s.completed++
  247. s.transferred += fileSize
  248. } else {
  249. s.failed++
  250. fmt.Printf("Failed to write with error:%v\n", err)
  251. }
  252. writeStats.addSample(time.Now().Sub(start))
  253. if *cmdBenchmark.IsDebug {
  254. fmt.Printf("writing %d file %s\n", id, fp.Fid)
  255. }
  256. } else {
  257. s.failed++
  258. println("writing file error:", err.Error())
  259. }
  260. }
  261. close(delayedDeleteChan)
  262. waitForDeletions.Wait()
  263. }
  264. func readFiles(fileIdLineChan chan string, s *stat) {
  265. defer wait.Done()
  266. for fid := range fileIdLineChan {
  267. if len(fid) == 0 {
  268. continue
  269. }
  270. if fid[0] == '#' {
  271. continue
  272. }
  273. if *cmdBenchmark.IsDebug {
  274. fmt.Printf("reading file %s\n", fid)
  275. }
  276. start := time.Now()
  277. var bytesRead int
  278. var err error
  279. urls, err := b.masterClient.LookupFileId(fid)
  280. if err != nil {
  281. s.failed++
  282. println("!!!! ", fid, " location not found!!!!!")
  283. continue
  284. }
  285. var bytes []byte
  286. for _, url := range urls {
  287. bytes, _, err = util.FastGet(url)
  288. if err == nil {
  289. break
  290. }
  291. }
  292. bytesRead = len(bytes)
  293. if err == nil {
  294. s.completed++
  295. s.transferred += int64(bytesRead)
  296. readStats.addSample(time.Now().Sub(start))
  297. } else {
  298. s.failed++
  299. fmt.Printf("Failed to read %s error:%v\n", fid, err)
  300. }
  301. }
  302. }
  303. func writeFileIds(fileName string, fileIdLineChan chan string, finishChan chan bool) {
  304. file, err := os.OpenFile(fileName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
  305. if err != nil {
  306. glog.Fatalf("File to create file %s: %s\n", fileName, err)
  307. }
  308. defer file.Close()
  309. for {
  310. select {
  311. case <-finishChan:
  312. wait.Done()
  313. return
  314. case line := <-fileIdLineChan:
  315. file.Write([]byte(line))
  316. file.Write([]byte("\n"))
  317. }
  318. }
  319. }
  320. func uploadByTcp(volumeTcpClient *wdclient.VolumeTcpClient, fp *operation.FilePart) bool {
  321. err := volumeTcpClient.PutFileChunk(fp.Server, fp.Fid, uint32(fp.FileSize), fp.Reader)
  322. if err != nil {
  323. glog.Errorf("upload chunk err: %v", err)
  324. return false
  325. }
  326. return true
  327. }
  328. func uploadByUdp(volumeUdpClient *wdclient.VolumeUdpClient, fp *operation.FilePart) bool {
  329. err := volumeUdpClient.PutFileChunk(fp.Server, fp.Fid, uint32(fp.FileSize), fp.Reader)
  330. if err != nil {
  331. glog.Errorf("upload chunk err: %v", err)
  332. return false
  333. }
  334. return true
  335. }
  336. func readFileIds(fileName string, fileIdLineChan chan string) {
  337. file, err := os.Open(fileName) // For read access.
  338. if err != nil {
  339. glog.Fatalf("File to read file %s: %s\n", fileName, err)
  340. }
  341. defer file.Close()
  342. random := rand.New(rand.NewSource(time.Now().UnixNano()))
  343. r := bufio.NewReader(file)
  344. if *b.sequentialRead {
  345. for {
  346. if line, err := Readln(r); err == nil {
  347. fileIdLineChan <- string(line)
  348. } else {
  349. break
  350. }
  351. }
  352. } else {
  353. lines := make([]string, 0, readStats.total)
  354. for {
  355. if line, err := Readln(r); err == nil {
  356. lines = append(lines, string(line))
  357. } else {
  358. break
  359. }
  360. }
  361. if len(lines) > 0 {
  362. for i := 0; i < readStats.total; i++ {
  363. fileIdLineChan <- lines[random.Intn(len(lines))]
  364. }
  365. }
  366. }
  367. close(fileIdLineChan)
  368. }
  369. const (
  370. benchResolution = 10000 // 0.1 microsecond
  371. benchBucket = 1000000000 / benchResolution
  372. )
  373. // An efficient statics collecting and rendering
  374. type stats struct {
  375. data []int
  376. overflow []int
  377. localStats []stat
  378. start time.Time
  379. end time.Time
  380. total int
  381. }
  382. type stat struct {
  383. completed int
  384. failed int
  385. total int
  386. transferred int64
  387. }
  388. var percentages = []int{50, 66, 75, 80, 90, 95, 98, 99, 100}
  389. func newStats(n int) *stats {
  390. return &stats{
  391. data: make([]int, benchResolution),
  392. overflow: make([]int, 0),
  393. localStats: make([]stat, n),
  394. }
  395. }
  396. func (s *stats) addSample(d time.Duration) {
  397. index := int(d / benchBucket)
  398. if index < 0 {
  399. fmt.Printf("This request takes %3.1f seconds, skipping!\n", float64(index)/10000)
  400. } else if index < len(s.data) {
  401. s.data[int(d/benchBucket)]++
  402. } else {
  403. s.overflow = append(s.overflow, index)
  404. }
  405. }
  406. func (s *stats) checkProgress(testName string, finishChan chan bool) {
  407. fmt.Printf("\n------------ %s ----------\n", testName)
  408. ticker := time.Tick(time.Second)
  409. lastCompleted, lastTransferred, lastTime := 0, int64(0), time.Now()
  410. for {
  411. select {
  412. case <-finishChan:
  413. wait.Done()
  414. return
  415. case t := <-ticker:
  416. completed, transferred, taken, total := 0, int64(0), t.Sub(lastTime), s.total
  417. for _, localStat := range s.localStats {
  418. completed += localStat.completed
  419. transferred += localStat.transferred
  420. total += localStat.total
  421. }
  422. fmt.Printf("Completed %d of %d requests, %3.1f%% %3.1f/s %3.1fMB/s\n",
  423. completed, total, float64(completed)*100/float64(total),
  424. float64(completed-lastCompleted)*float64(int64(time.Second))/float64(int64(taken)),
  425. float64(transferred-lastTransferred)*float64(int64(time.Second))/float64(int64(taken))/float64(1024*1024),
  426. )
  427. lastCompleted, lastTransferred, lastTime = completed, transferred, t
  428. }
  429. }
  430. }
  431. func (s *stats) printStats() {
  432. completed, failed, transferred, total := 0, 0, int64(0), s.total
  433. for _, localStat := range s.localStats {
  434. completed += localStat.completed
  435. failed += localStat.failed
  436. transferred += localStat.transferred
  437. total += localStat.total
  438. }
  439. timeTaken := float64(int64(s.end.Sub(s.start))) / 1000000000
  440. fmt.Printf("\nConcurrency Level: %d\n", *b.concurrency)
  441. fmt.Printf("Time taken for tests: %.3f seconds\n", timeTaken)
  442. fmt.Printf("Complete requests: %d\n", completed)
  443. fmt.Printf("Failed requests: %d\n", failed)
  444. fmt.Printf("Total transferred: %d bytes\n", transferred)
  445. fmt.Printf("Requests per second: %.2f [#/sec]\n", float64(completed)/timeTaken)
  446. fmt.Printf("Transfer rate: %.2f [Kbytes/sec]\n", float64(transferred)/1024/timeTaken)
  447. n, sum := 0, 0
  448. min, max := 10000000, 0
  449. for i := 0; i < len(s.data); i++ {
  450. n += s.data[i]
  451. sum += s.data[i] * i
  452. if s.data[i] > 0 {
  453. if min > i {
  454. min = i
  455. }
  456. if max < i {
  457. max = i
  458. }
  459. }
  460. }
  461. n += len(s.overflow)
  462. for i := 0; i < len(s.overflow); i++ {
  463. sum += s.overflow[i]
  464. if min > s.overflow[i] {
  465. min = s.overflow[i]
  466. }
  467. if max < s.overflow[i] {
  468. max = s.overflow[i]
  469. }
  470. }
  471. avg := float64(sum) / float64(n)
  472. varianceSum := 0.0
  473. for i := 0; i < len(s.data); i++ {
  474. if s.data[i] > 0 {
  475. d := float64(i) - avg
  476. varianceSum += d * d * float64(s.data[i])
  477. }
  478. }
  479. for i := 0; i < len(s.overflow); i++ {
  480. d := float64(s.overflow[i]) - avg
  481. varianceSum += d * d
  482. }
  483. std := math.Sqrt(varianceSum / float64(n))
  484. fmt.Printf("\nConnection Times (ms)\n")
  485. fmt.Printf(" min avg max std\n")
  486. fmt.Printf("Total: %2.1f %3.1f %3.1f %3.1f\n", float32(min)/10, float32(avg)/10, float32(max)/10, std/10)
  487. // printing percentiles
  488. fmt.Printf("\nPercentage of the requests served within a certain time (ms)\n")
  489. percentiles := make([]int, len(percentages))
  490. for i := 0; i < len(percentages); i++ {
  491. percentiles[i] = n * percentages[i] / 100
  492. }
  493. percentiles[len(percentiles)-1] = n
  494. percentileIndex := 0
  495. currentSum := 0
  496. for i := 0; i < len(s.data); i++ {
  497. currentSum += s.data[i]
  498. if s.data[i] > 0 && percentileIndex < len(percentiles) && currentSum >= percentiles[percentileIndex] {
  499. fmt.Printf(" %3d%% %5.1f ms\n", percentages[percentileIndex], float32(i)/10.0)
  500. percentileIndex++
  501. for percentileIndex < len(percentiles) && currentSum >= percentiles[percentileIndex] {
  502. percentileIndex++
  503. }
  504. }
  505. }
  506. sort.Ints(s.overflow)
  507. for i := 0; i < len(s.overflow); i++ {
  508. currentSum++
  509. if percentileIndex < len(percentiles) && currentSum >= percentiles[percentileIndex] {
  510. fmt.Printf(" %3d%% %5.1f ms\n", percentages[percentileIndex], float32(s.overflow[i])/10.0)
  511. percentileIndex++
  512. for percentileIndex < len(percentiles) && currentSum >= percentiles[percentileIndex] {
  513. percentileIndex++
  514. }
  515. }
  516. }
  517. }
  518. // a fake reader to generate content to upload
  519. type FakeReader struct {
  520. id uint64 // an id number
  521. size int64 // max bytes
  522. random *rand.Rand
  523. }
  524. func (l *FakeReader) Read(p []byte) (n int, err error) {
  525. if l.size <= 0 {
  526. return 0, io.EOF
  527. }
  528. if int64(len(p)) > l.size {
  529. n = int(l.size)
  530. } else {
  531. n = len(p)
  532. }
  533. if n >= 8 {
  534. for i := 0; i < 8; i++ {
  535. p[i] = byte(l.id >> uint(i*8))
  536. }
  537. l.random.Read(p[8:])
  538. }
  539. l.size -= int64(n)
  540. return
  541. }
  542. func (l *FakeReader) WriteTo(w io.Writer) (n int64, err error) {
  543. size := int(l.size)
  544. bufferSize := len(sharedBytes)
  545. for size > 0 {
  546. tempBuffer := sharedBytes
  547. if size < bufferSize {
  548. tempBuffer = sharedBytes[0:size]
  549. }
  550. count, e := w.Write(tempBuffer)
  551. if e != nil {
  552. return int64(size), e
  553. }
  554. size -= count
  555. }
  556. return l.size, nil
  557. }
  558. func Readln(r *bufio.Reader) ([]byte, error) {
  559. var (
  560. isPrefix = true
  561. err error
  562. line, ln []byte
  563. )
  564. for isPrefix && err == nil {
  565. line, isPrefix, err = r.ReadLine()
  566. ln = append(ln, line...)
  567. }
  568. return ln, err
  569. }