You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

581 lines
16 KiB

4 years ago
6 years ago
6 years ago
10 years ago
5 years ago
4 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
  1. package command
  2. import (
  3. "bufio"
  4. "context"
  5. "fmt"
  6. "github.com/seaweedfs/seaweedfs/weed/pb"
  7. "io"
  8. "math"
  9. "math/rand"
  10. "os"
  11. "runtime"
  12. "runtime/pprof"
  13. "sort"
  14. "sync"
  15. "time"
  16. "google.golang.org/grpc"
  17. "github.com/seaweedfs/seaweedfs/weed/glog"
  18. "github.com/seaweedfs/seaweedfs/weed/operation"
  19. "github.com/seaweedfs/seaweedfs/weed/security"
  20. "github.com/seaweedfs/seaweedfs/weed/util"
  21. "github.com/seaweedfs/seaweedfs/weed/wdclient"
  22. )
  23. type BenchmarkOptions struct {
  24. masters *string
  25. concurrency *int
  26. numberOfFiles *int
  27. fileSize *int
  28. idListFile *string
  29. write *bool
  30. deletePercentage *int
  31. read *bool
  32. sequentialRead *bool
  33. collection *string
  34. replication *string
  35. diskType *string
  36. cpuprofile *string
  37. maxCpu *int
  38. grpcDialOption grpc.DialOption
  39. masterClient *wdclient.MasterClient
  40. fsync *bool
  41. }
  42. var (
  43. b BenchmarkOptions
  44. sharedBytes []byte
  45. isSecure bool
  46. )
  47. func init() {
  48. cmdBenchmark.Run = runBenchmark // break init cycle
  49. cmdBenchmark.IsDebug = cmdBenchmark.Flag.Bool("debug", false, "verbose debug information")
  50. b.masters = cmdBenchmark.Flag.String("master", "localhost:9333", "SeaweedFS master location")
  51. b.concurrency = cmdBenchmark.Flag.Int("c", 16, "number of concurrent write or read processes")
  52. b.fileSize = cmdBenchmark.Flag.Int("size", 1024, "simulated file size in bytes, with random(0~63) bytes padding")
  53. b.numberOfFiles = cmdBenchmark.Flag.Int("n", 1024*1024, "number of files to write for each thread")
  54. b.idListFile = cmdBenchmark.Flag.String("list", os.TempDir()+"/benchmark_list.txt", "list of uploaded file ids")
  55. b.write = cmdBenchmark.Flag.Bool("write", true, "enable write")
  56. b.deletePercentage = cmdBenchmark.Flag.Int("deletePercent", 0, "the percent of writes that are deletes")
  57. b.read = cmdBenchmark.Flag.Bool("read", true, "enable read")
  58. b.sequentialRead = cmdBenchmark.Flag.Bool("readSequentially", false, "randomly read by ids from \"-list\" specified file")
  59. b.collection = cmdBenchmark.Flag.String("collection", "benchmark", "write data to this collection")
  60. b.replication = cmdBenchmark.Flag.String("replication", "000", "replication type")
  61. b.diskType = cmdBenchmark.Flag.String("disk", "", "[hdd|ssd|<tag>] hard drive or solid state drive or any tag")
  62. b.cpuprofile = cmdBenchmark.Flag.String("cpuprofile", "", "cpu profile output file")
  63. b.maxCpu = cmdBenchmark.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs")
  64. b.fsync = cmdBenchmark.Flag.Bool("fsync", false, "flush data to disk after write")
  65. sharedBytes = make([]byte, 1024)
  66. }
  67. var cmdBenchmark = &Command{
  68. UsageLine: "benchmark -master=localhost:9333 -c=10 -n=100000",
  69. Short: "benchmark by writing millions of files and reading them out",
  70. Long: `benchmark on an empty SeaweedFS file system.
  71. Two tests during benchmark:
  72. 1) write lots of small files to the system
  73. 2) read the files out
  74. The file content is mostly zeros, but no compression is done.
  75. You can choose to only benchmark read or write.
  76. During write, the list of uploaded file ids is stored in "-list" specified file.
  77. You can also use your own list of file ids to run read test.
  78. Write speed and read speed will be collected.
  79. The numbers are used to get a sense of the system.
  80. Usually your network or the hard drive is the real bottleneck.
  81. Another thing to watch is whether the volumes are evenly distributed
  82. to each volume server. Because the 7 more benchmark volumes are randomly distributed
  83. to servers with free slots, it's highly possible some servers have uneven amount of
  84. benchmark volumes. To remedy this, you can use this to grow the benchmark volumes
  85. before starting the benchmark command:
  86. http://localhost:9333/vol/grow?collection=benchmark&count=5
  87. After benchmarking, you can clean up the written data by deleting the benchmark collection
  88. http://localhost:9333/col/delete?collection=benchmark
  89. `,
  90. }
  91. var (
  92. wait sync.WaitGroup
  93. writeStats *stats
  94. readStats *stats
  95. )
  96. func runBenchmark(cmd *Command, args []string) bool {
  97. util.LoadConfiguration("security", false)
  98. b.grpcDialOption = security.LoadClientTLS(util.GetViper(), "grpc.client")
  99. fmt.Printf("This is SeaweedFS version %s %s %s\n", util.Version(), runtime.GOOS, runtime.GOARCH)
  100. if *b.maxCpu < 1 {
  101. *b.maxCpu = runtime.NumCPU()
  102. }
  103. runtime.GOMAXPROCS(*b.maxCpu)
  104. if *b.cpuprofile != "" {
  105. f, err := os.Create(*b.cpuprofile)
  106. if err != nil {
  107. glog.Fatal(err)
  108. }
  109. pprof.StartCPUProfile(f)
  110. defer pprof.StopCPUProfile()
  111. }
  112. b.masterClient = wdclient.NewMasterClient(b.grpcDialOption, "", "client", "", "", "", *pb.ServerAddresses(*b.masters).ToServiceDiscovery())
  113. ctx := context.Background()
  114. go b.masterClient.KeepConnectedToMaster(ctx)
  115. b.masterClient.WaitUntilConnected(ctx)
  116. if *b.write {
  117. benchWrite()
  118. }
  119. if *b.read {
  120. benchRead()
  121. }
  122. return true
  123. }
  124. func benchWrite() {
  125. fileIdLineChan := make(chan string)
  126. finishChan := make(chan bool)
  127. writeStats = newStats(*b.concurrency)
  128. idChan := make(chan int)
  129. go writeFileIds(*b.idListFile, fileIdLineChan, finishChan)
  130. for i := 0; i < *b.concurrency; i++ {
  131. wait.Add(1)
  132. go writeFiles(idChan, fileIdLineChan, &writeStats.localStats[i])
  133. }
  134. writeStats.start = time.Now()
  135. writeStats.total = *b.numberOfFiles
  136. go writeStats.checkProgress("Writing Benchmark", finishChan)
  137. for i := 0; i < *b.numberOfFiles; i++ {
  138. idChan <- i
  139. }
  140. close(idChan)
  141. wait.Wait()
  142. writeStats.end = time.Now()
  143. wait.Add(2)
  144. finishChan <- true
  145. finishChan <- true
  146. wait.Wait()
  147. close(finishChan)
  148. writeStats.printStats()
  149. }
  150. func benchRead() {
  151. fileIdLineChan := make(chan string)
  152. finishChan := make(chan bool)
  153. readStats = newStats(*b.concurrency)
  154. go readFileIds(*b.idListFile, fileIdLineChan)
  155. readStats.start = time.Now()
  156. readStats.total = *b.numberOfFiles
  157. go readStats.checkProgress("Randomly Reading Benchmark", finishChan)
  158. for i := 0; i < *b.concurrency; i++ {
  159. wait.Add(1)
  160. go readFiles(fileIdLineChan, &readStats.localStats[i])
  161. }
  162. wait.Wait()
  163. wait.Add(1)
  164. finishChan <- true
  165. wait.Wait()
  166. close(finishChan)
  167. readStats.end = time.Now()
  168. readStats.printStats()
  169. }
  170. type delayedFile struct {
  171. enterTime time.Time
  172. fp *operation.FilePart
  173. }
  174. func writeFiles(idChan chan int, fileIdLineChan chan string, s *stat) {
  175. defer wait.Done()
  176. delayedDeleteChan := make(chan *delayedFile, 100)
  177. var waitForDeletions sync.WaitGroup
  178. for i := 0; i < 7; i++ {
  179. waitForDeletions.Add(1)
  180. go func() {
  181. defer waitForDeletions.Done()
  182. for df := range delayedDeleteChan {
  183. if df.enterTime.After(time.Now()) {
  184. time.Sleep(df.enterTime.Sub(time.Now()))
  185. }
  186. var jwtAuthorization security.EncodedJwt
  187. if isSecure {
  188. jwtAuthorization = operation.LookupJwt(b.masterClient.GetMaster(context.Background()), b.grpcDialOption, df.fp.Fid)
  189. }
  190. if e := util.Delete(fmt.Sprintf("http://%s/%s", df.fp.Server, df.fp.Fid), string(jwtAuthorization)); e == nil {
  191. s.completed++
  192. } else {
  193. s.failed++
  194. }
  195. }
  196. }()
  197. }
  198. random := rand.New(rand.NewSource(time.Now().UnixNano()))
  199. for id := range idChan {
  200. start := time.Now()
  201. fileSize := int64(*b.fileSize + random.Intn(64))
  202. fp := &operation.FilePart{
  203. Reader: &FakeReader{id: uint64(id), size: fileSize, random: random},
  204. FileSize: fileSize,
  205. MimeType: "image/bench", // prevent gzip benchmark content
  206. Fsync: *b.fsync,
  207. }
  208. ar := &operation.VolumeAssignRequest{
  209. Count: 1,
  210. Collection: *b.collection,
  211. Replication: *b.replication,
  212. DiskType: *b.diskType,
  213. }
  214. if assignResult, err := operation.Assign(b.masterClient.GetMaster, b.grpcDialOption, ar); err == nil {
  215. fp.Server, fp.Fid, fp.Collection = assignResult.Url, assignResult.Fid, *b.collection
  216. if !isSecure && assignResult.Auth != "" {
  217. isSecure = true
  218. }
  219. if _, err := fp.Upload(0, b.masterClient.GetMaster, false, assignResult.Auth, b.grpcDialOption); err == nil {
  220. if random.Intn(100) < *b.deletePercentage {
  221. s.total++
  222. delayedDeleteChan <- &delayedFile{time.Now().Add(time.Second), fp}
  223. } else {
  224. fileIdLineChan <- fp.Fid
  225. }
  226. s.completed++
  227. s.transferred += fileSize
  228. } else {
  229. s.failed++
  230. fmt.Printf("Failed to write with error:%v\n", err)
  231. }
  232. writeStats.addSample(time.Now().Sub(start))
  233. if *cmdBenchmark.IsDebug {
  234. fmt.Printf("writing %d file %s\n", id, fp.Fid)
  235. }
  236. } else {
  237. s.failed++
  238. println("writing file error:", err.Error())
  239. }
  240. }
  241. close(delayedDeleteChan)
  242. waitForDeletions.Wait()
  243. }
  244. func readFiles(fileIdLineChan chan string, s *stat) {
  245. defer wait.Done()
  246. for fid := range fileIdLineChan {
  247. if len(fid) == 0 {
  248. continue
  249. }
  250. if fid[0] == '#' {
  251. continue
  252. }
  253. if *cmdBenchmark.IsDebug {
  254. fmt.Printf("reading file %s\n", fid)
  255. }
  256. start := time.Now()
  257. var bytesRead int
  258. var err error
  259. urls, err := b.masterClient.LookupFileId(fid)
  260. if err != nil {
  261. s.failed++
  262. println("!!!! ", fid, " location not found!!!!!")
  263. continue
  264. }
  265. var bytes []byte
  266. for _, url := range urls {
  267. bytes, _, err = util.Get(url)
  268. if err == nil {
  269. break
  270. }
  271. }
  272. bytesRead = len(bytes)
  273. if err == nil {
  274. s.completed++
  275. s.transferred += int64(bytesRead)
  276. readStats.addSample(time.Now().Sub(start))
  277. } else {
  278. s.failed++
  279. fmt.Printf("Failed to read %s error:%v\n", fid, err)
  280. }
  281. }
  282. }
  283. func writeFileIds(fileName string, fileIdLineChan chan string, finishChan chan bool) {
  284. file, err := os.OpenFile(fileName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
  285. if err != nil {
  286. glog.Fatalf("File to create file %s: %s\n", fileName, err)
  287. }
  288. defer file.Close()
  289. for {
  290. select {
  291. case <-finishChan:
  292. wait.Done()
  293. return
  294. case line := <-fileIdLineChan:
  295. file.Write([]byte(line))
  296. file.Write([]byte("\n"))
  297. }
  298. }
  299. }
  300. func readFileIds(fileName string, fileIdLineChan chan string) {
  301. file, err := os.Open(fileName) // For read access.
  302. if err != nil {
  303. glog.Fatalf("File to read file %s: %s\n", fileName, err)
  304. }
  305. defer file.Close()
  306. random := rand.New(rand.NewSource(time.Now().UnixNano()))
  307. r := bufio.NewReader(file)
  308. if *b.sequentialRead {
  309. for {
  310. if line, err := Readln(r); err == nil {
  311. fileIdLineChan <- string(line)
  312. } else {
  313. break
  314. }
  315. }
  316. } else {
  317. lines := make([]string, 0, readStats.total)
  318. for {
  319. if line, err := Readln(r); err == nil {
  320. lines = append(lines, string(line))
  321. } else {
  322. break
  323. }
  324. }
  325. if len(lines) > 0 {
  326. for i := 0; i < readStats.total; i++ {
  327. fileIdLineChan <- lines[random.Intn(len(lines))]
  328. }
  329. }
  330. }
  331. close(fileIdLineChan)
  332. }
  333. const (
  334. benchResolution = 10000 // 0.1 microsecond
  335. benchBucket = 1000000000 / benchResolution
  336. )
  337. // An efficient statics collecting and rendering
  338. type stats struct {
  339. data []int
  340. overflow []int
  341. localStats []stat
  342. start time.Time
  343. end time.Time
  344. total int
  345. }
  346. type stat struct {
  347. completed int
  348. failed int
  349. total int
  350. transferred int64
  351. }
  352. var percentages = []int{50, 66, 75, 80, 90, 95, 98, 99, 100}
  353. func newStats(n int) *stats {
  354. return &stats{
  355. data: make([]int, benchResolution),
  356. overflow: make([]int, 0),
  357. localStats: make([]stat, n),
  358. }
  359. }
  360. func (s *stats) addSample(d time.Duration) {
  361. index := int(d / benchBucket)
  362. if index < 0 {
  363. fmt.Printf("This request takes %3.1f seconds, skipping!\n", float64(index)/10000)
  364. } else if index < len(s.data) {
  365. s.data[int(d/benchBucket)]++
  366. } else {
  367. s.overflow = append(s.overflow, index)
  368. }
  369. }
  370. func (s *stats) checkProgress(testName string, finishChan chan bool) {
  371. fmt.Printf("\n------------ %s ----------\n", testName)
  372. ticker := time.Tick(time.Second)
  373. lastCompleted, lastTransferred, lastTime := 0, int64(0), time.Now()
  374. for {
  375. select {
  376. case <-finishChan:
  377. wait.Done()
  378. return
  379. case t := <-ticker:
  380. completed, transferred, taken, total := 0, int64(0), t.Sub(lastTime), s.total
  381. for _, localStat := range s.localStats {
  382. completed += localStat.completed
  383. transferred += localStat.transferred
  384. total += localStat.total
  385. }
  386. fmt.Printf("Completed %d of %d requests, %3.1f%% %3.1f/s %3.1fMB/s\n",
  387. completed, total, float64(completed)*100/float64(total),
  388. float64(completed-lastCompleted)*float64(int64(time.Second))/float64(int64(taken)),
  389. float64(transferred-lastTransferred)*float64(int64(time.Second))/float64(int64(taken))/float64(1024*1024),
  390. )
  391. lastCompleted, lastTransferred, lastTime = completed, transferred, t
  392. }
  393. }
  394. }
  395. func (s *stats) printStats() {
  396. completed, failed, transferred, total := 0, 0, int64(0), s.total
  397. for _, localStat := range s.localStats {
  398. completed += localStat.completed
  399. failed += localStat.failed
  400. transferred += localStat.transferred
  401. total += localStat.total
  402. }
  403. timeTaken := float64(int64(s.end.Sub(s.start))) / 1000000000
  404. fmt.Printf("\nConcurrency Level: %d\n", *b.concurrency)
  405. fmt.Printf("Time taken for tests: %.3f seconds\n", timeTaken)
  406. fmt.Printf("Completed requests: %d\n", completed)
  407. fmt.Printf("Failed requests: %d\n", failed)
  408. fmt.Printf("Total transferred: %d bytes\n", transferred)
  409. fmt.Printf("Requests per second: %.2f [#/sec]\n", float64(completed)/timeTaken)
  410. fmt.Printf("Transfer rate: %.2f [Kbytes/sec]\n", float64(transferred)/1024/timeTaken)
  411. n, sum := 0, 0
  412. min, max := 10000000, 0
  413. for i := 0; i < len(s.data); i++ {
  414. n += s.data[i]
  415. sum += s.data[i] * i
  416. if s.data[i] > 0 {
  417. if min > i {
  418. min = i
  419. }
  420. if max < i {
  421. max = i
  422. }
  423. }
  424. }
  425. n += len(s.overflow)
  426. for i := 0; i < len(s.overflow); i++ {
  427. sum += s.overflow[i]
  428. if min > s.overflow[i] {
  429. min = s.overflow[i]
  430. }
  431. if max < s.overflow[i] {
  432. max = s.overflow[i]
  433. }
  434. }
  435. avg := float64(sum) / float64(n)
  436. varianceSum := 0.0
  437. for i := 0; i < len(s.data); i++ {
  438. if s.data[i] > 0 {
  439. d := float64(i) - avg
  440. varianceSum += d * d * float64(s.data[i])
  441. }
  442. }
  443. for i := 0; i < len(s.overflow); i++ {
  444. d := float64(s.overflow[i]) - avg
  445. varianceSum += d * d
  446. }
  447. std := math.Sqrt(varianceSum / float64(n))
  448. fmt.Printf("\nConnection Times (ms)\n")
  449. fmt.Printf(" min avg max std\n")
  450. fmt.Printf("Total: %2.1f %3.1f %3.1f %3.1f\n", float32(min)/10, float32(avg)/10, float32(max)/10, std/10)
  451. // printing percentiles
  452. fmt.Printf("\nPercentage of the requests served within a certain time (ms)\n")
  453. percentiles := make([]int, len(percentages))
  454. for i := 0; i < len(percentages); i++ {
  455. percentiles[i] = n * percentages[i] / 100
  456. }
  457. percentiles[len(percentiles)-1] = n
  458. percentileIndex := 0
  459. currentSum := 0
  460. for i := 0; i < len(s.data); i++ {
  461. currentSum += s.data[i]
  462. if s.data[i] > 0 && percentileIndex < len(percentiles) && currentSum >= percentiles[percentileIndex] {
  463. fmt.Printf(" %3d%% %5.1f ms\n", percentages[percentileIndex], float32(i)/10.0)
  464. percentileIndex++
  465. for percentileIndex < len(percentiles) && currentSum >= percentiles[percentileIndex] {
  466. percentileIndex++
  467. }
  468. }
  469. }
  470. sort.Ints(s.overflow)
  471. for i := 0; i < len(s.overflow); i++ {
  472. currentSum++
  473. if percentileIndex < len(percentiles) && currentSum >= percentiles[percentileIndex] {
  474. fmt.Printf(" %3d%% %5.1f ms\n", percentages[percentileIndex], float32(s.overflow[i])/10.0)
  475. percentileIndex++
  476. for percentileIndex < len(percentiles) && currentSum >= percentiles[percentileIndex] {
  477. percentileIndex++
  478. }
  479. }
  480. }
  481. }
  482. // a fake reader to generate content to upload
  483. type FakeReader struct {
  484. id uint64 // an id number
  485. size int64 // max bytes
  486. random *rand.Rand
  487. }
  488. func (l *FakeReader) Read(p []byte) (n int, err error) {
  489. if l.size <= 0 {
  490. return 0, io.EOF
  491. }
  492. if int64(len(p)) > l.size {
  493. n = int(l.size)
  494. } else {
  495. n = len(p)
  496. }
  497. if n >= 8 {
  498. for i := 0; i < 8; i++ {
  499. p[i] = byte(l.id >> uint(i*8))
  500. }
  501. l.random.Read(p[8:])
  502. }
  503. l.size -= int64(n)
  504. return
  505. }
  506. func (l *FakeReader) WriteTo(w io.Writer) (n int64, err error) {
  507. size := int(l.size)
  508. bufferSize := len(sharedBytes)
  509. for size > 0 {
  510. tempBuffer := sharedBytes
  511. if size < bufferSize {
  512. tempBuffer = sharedBytes[0:size]
  513. }
  514. count, e := w.Write(tempBuffer)
  515. if e != nil {
  516. return int64(size), e
  517. }
  518. size -= count
  519. }
  520. return l.size, nil
  521. }
  522. func Readln(r *bufio.Reader) ([]byte, error) {
  523. var (
  524. isPrefix = true
  525. err error
  526. line, ln []byte
  527. )
  528. for isPrefix && err == nil {
  529. line, isPrefix, err = r.ReadLine()
  530. ln = append(ln, line...)
  531. }
  532. return ln, err
  533. }