You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

582 lines
16 KiB

4 years ago
6 years ago
6 years ago
10 years ago
5 years ago
4 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
  1. package command
  2. import (
  3. "bufio"
  4. "context"
  5. "fmt"
  6. "github.com/seaweedfs/seaweedfs/weed/pb"
  7. "io"
  8. "math"
  9. "math/rand"
  10. "os"
  11. "runtime"
  12. "runtime/pprof"
  13. "sort"
  14. "sync"
  15. "time"
  16. "google.golang.org/grpc"
  17. "github.com/seaweedfs/seaweedfs/weed/glog"
  18. "github.com/seaweedfs/seaweedfs/weed/operation"
  19. "github.com/seaweedfs/seaweedfs/weed/security"
  20. "github.com/seaweedfs/seaweedfs/weed/util"
  21. util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
  22. "github.com/seaweedfs/seaweedfs/weed/wdclient"
  23. )
  24. type BenchmarkOptions struct {
  25. masters *string
  26. concurrency *int
  27. numberOfFiles *int
  28. fileSize *int
  29. idListFile *string
  30. write *bool
  31. deletePercentage *int
  32. read *bool
  33. sequentialRead *bool
  34. collection *string
  35. replication *string
  36. diskType *string
  37. cpuprofile *string
  38. maxCpu *int
  39. grpcDialOption grpc.DialOption
  40. masterClient *wdclient.MasterClient
  41. fsync *bool
  42. }
  43. var (
  44. b BenchmarkOptions
  45. sharedBytes []byte
  46. isSecure bool
  47. )
  48. func init() {
  49. cmdBenchmark.Run = runBenchmark // break init cycle
  50. cmdBenchmark.IsDebug = cmdBenchmark.Flag.Bool("debug", false, "verbose debug information")
  51. b.masters = cmdBenchmark.Flag.String("master", "localhost:9333", "SeaweedFS master location")
  52. b.concurrency = cmdBenchmark.Flag.Int("c", 16, "number of concurrent write or read processes")
  53. b.fileSize = cmdBenchmark.Flag.Int("size", 1024, "simulated file size in bytes, with random(0~63) bytes padding")
  54. b.numberOfFiles = cmdBenchmark.Flag.Int("n", 1024*1024, "number of files to write for each thread")
  55. b.idListFile = cmdBenchmark.Flag.String("list", os.TempDir()+"/benchmark_list.txt", "list of uploaded file ids")
  56. b.write = cmdBenchmark.Flag.Bool("write", true, "enable write")
  57. b.deletePercentage = cmdBenchmark.Flag.Int("deletePercent", 0, "the percent of writes that are deletes")
  58. b.read = cmdBenchmark.Flag.Bool("read", true, "enable read")
  59. b.sequentialRead = cmdBenchmark.Flag.Bool("readSequentially", false, "randomly read by ids from \"-list\" specified file")
  60. b.collection = cmdBenchmark.Flag.String("collection", "benchmark", "write data to this collection")
  61. b.replication = cmdBenchmark.Flag.String("replication", "000", "replication type")
  62. b.diskType = cmdBenchmark.Flag.String("disk", "", "[hdd|ssd|<tag>] hard drive or solid state drive or any tag")
  63. b.cpuprofile = cmdBenchmark.Flag.String("cpuprofile", "", "cpu profile output file")
  64. b.maxCpu = cmdBenchmark.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs")
  65. b.fsync = cmdBenchmark.Flag.Bool("fsync", false, "flush data to disk after write")
  66. sharedBytes = make([]byte, 1024)
  67. }
  68. var cmdBenchmark = &Command{
  69. UsageLine: "benchmark -master=localhost:9333 -c=10 -n=100000",
  70. Short: "benchmark by writing millions of files and reading them out",
  71. Long: `benchmark on an empty SeaweedFS file system.
  72. Two tests during benchmark:
  73. 1) write lots of small files to the system
  74. 2) read the files out
  75. The file content is mostly zeros, but no compression is done.
  76. You can choose to only benchmark read or write.
  77. During write, the list of uploaded file ids is stored in "-list" specified file.
  78. You can also use your own list of file ids to run read test.
  79. Write speed and read speed will be collected.
  80. The numbers are used to get a sense of the system.
  81. Usually your network or the hard drive is the real bottleneck.
  82. Another thing to watch is whether the volumes are evenly distributed
  83. to each volume server. Because the 7 more benchmark volumes are randomly distributed
  84. to servers with free slots, it's highly possible some servers have uneven amount of
  85. benchmark volumes. To remedy this, you can use this to grow the benchmark volumes
  86. before starting the benchmark command:
  87. http://localhost:9333/vol/grow?collection=benchmark&count=5
  88. After benchmarking, you can clean up the written data by deleting the benchmark collection
  89. http://localhost:9333/col/delete?collection=benchmark
  90. `,
  91. }
  92. var (
  93. wait sync.WaitGroup
  94. writeStats *stats
  95. readStats *stats
  96. )
  97. func runBenchmark(cmd *Command, args []string) bool {
  98. util.LoadSecurityConfiguration()
  99. b.grpcDialOption = security.LoadClientTLS(util.GetViper(), "grpc.client")
  100. fmt.Printf("This is SeaweedFS version %s %s %s\n", util.Version(), runtime.GOOS, runtime.GOARCH)
  101. if *b.maxCpu < 1 {
  102. *b.maxCpu = runtime.NumCPU()
  103. }
  104. runtime.GOMAXPROCS(*b.maxCpu)
  105. if *b.cpuprofile != "" {
  106. f, err := os.Create(*b.cpuprofile)
  107. if err != nil {
  108. glog.Fatal(err)
  109. }
  110. pprof.StartCPUProfile(f)
  111. defer pprof.StopCPUProfile()
  112. }
  113. b.masterClient = wdclient.NewMasterClient(b.grpcDialOption, "", "client", "", "", "", *pb.ServerAddresses(*b.masters).ToServiceDiscovery())
  114. ctx := context.Background()
  115. go b.masterClient.KeepConnectedToMaster(ctx)
  116. b.masterClient.WaitUntilConnected(ctx)
  117. if *b.write {
  118. benchWrite()
  119. }
  120. if *b.read {
  121. benchRead()
  122. }
  123. return true
  124. }
  125. func benchWrite() {
  126. fileIdLineChan := make(chan string)
  127. finishChan := make(chan bool)
  128. writeStats = newStats(*b.concurrency)
  129. idChan := make(chan int)
  130. go writeFileIds(*b.idListFile, fileIdLineChan, finishChan)
  131. for i := 0; i < *b.concurrency; i++ {
  132. wait.Add(1)
  133. go writeFiles(idChan, fileIdLineChan, &writeStats.localStats[i])
  134. }
  135. writeStats.start = time.Now()
  136. writeStats.total = *b.numberOfFiles
  137. go writeStats.checkProgress("Writing Benchmark", finishChan)
  138. for i := 0; i < *b.numberOfFiles; i++ {
  139. idChan <- i
  140. }
  141. close(idChan)
  142. wait.Wait()
  143. writeStats.end = time.Now()
  144. wait.Add(2)
  145. finishChan <- true
  146. finishChan <- true
  147. wait.Wait()
  148. close(finishChan)
  149. writeStats.printStats()
  150. }
  151. func benchRead() {
  152. fileIdLineChan := make(chan string)
  153. finishChan := make(chan bool)
  154. readStats = newStats(*b.concurrency)
  155. go readFileIds(*b.idListFile, fileIdLineChan)
  156. readStats.start = time.Now()
  157. readStats.total = *b.numberOfFiles
  158. go readStats.checkProgress("Randomly Reading Benchmark", finishChan)
  159. for i := 0; i < *b.concurrency; i++ {
  160. wait.Add(1)
  161. go readFiles(fileIdLineChan, &readStats.localStats[i])
  162. }
  163. wait.Wait()
  164. wait.Add(1)
  165. finishChan <- true
  166. wait.Wait()
  167. close(finishChan)
  168. readStats.end = time.Now()
  169. readStats.printStats()
  170. }
  171. type delayedFile struct {
  172. enterTime time.Time
  173. fp *operation.FilePart
  174. }
  175. func writeFiles(idChan chan int, fileIdLineChan chan string, s *stat) {
  176. defer wait.Done()
  177. delayedDeleteChan := make(chan *delayedFile, 100)
  178. var waitForDeletions sync.WaitGroup
  179. for i := 0; i < 7; i++ {
  180. waitForDeletions.Add(1)
  181. go func() {
  182. defer waitForDeletions.Done()
  183. for df := range delayedDeleteChan {
  184. if df.enterTime.After(time.Now()) {
  185. time.Sleep(df.enterTime.Sub(time.Now()))
  186. }
  187. var jwtAuthorization security.EncodedJwt
  188. if isSecure {
  189. jwtAuthorization = operation.LookupJwt(b.masterClient.GetMaster(context.Background()), b.grpcDialOption, df.fp.Fid)
  190. }
  191. if e := util_http.Delete(fmt.Sprintf("http://%s/%s", df.fp.Server, df.fp.Fid), string(jwtAuthorization)); e == nil {
  192. s.completed++
  193. } else {
  194. s.failed++
  195. }
  196. }
  197. }()
  198. }
  199. random := rand.New(rand.NewSource(time.Now().UnixNano()))
  200. for id := range idChan {
  201. start := time.Now()
  202. fileSize := int64(*b.fileSize + random.Intn(64))
  203. fp := &operation.FilePart{
  204. Reader: &FakeReader{id: uint64(id), size: fileSize, random: random},
  205. FileSize: fileSize,
  206. MimeType: "image/bench", // prevent gzip benchmark content
  207. Fsync: *b.fsync,
  208. }
  209. ar := &operation.VolumeAssignRequest{
  210. Count: 1,
  211. Collection: *b.collection,
  212. Replication: *b.replication,
  213. DiskType: *b.diskType,
  214. }
  215. if assignResult, err := operation.Assign(b.masterClient.GetMaster, b.grpcDialOption, ar); err == nil {
  216. fp.Server, fp.Fid, fp.Pref.Collection = assignResult.Url, assignResult.Fid, *b.collection
  217. if !isSecure && assignResult.Auth != "" {
  218. isSecure = true
  219. }
  220. if _, err := fp.Upload(0, b.masterClient.GetMaster, false, assignResult.Auth, b.grpcDialOption); err == nil {
  221. if random.Intn(100) < *b.deletePercentage {
  222. s.total++
  223. delayedDeleteChan <- &delayedFile{time.Now().Add(time.Second), fp}
  224. } else {
  225. fileIdLineChan <- fp.Fid
  226. }
  227. s.completed++
  228. s.transferred += fileSize
  229. } else {
  230. s.failed++
  231. fmt.Printf("Failed to write with error:%v\n", err)
  232. }
  233. writeStats.addSample(time.Now().Sub(start))
  234. if *cmdBenchmark.IsDebug {
  235. fmt.Printf("writing %d file %s\n", id, fp.Fid)
  236. }
  237. } else {
  238. s.failed++
  239. println("writing file error:", err.Error())
  240. }
  241. }
  242. close(delayedDeleteChan)
  243. waitForDeletions.Wait()
  244. }
  245. func readFiles(fileIdLineChan chan string, s *stat) {
  246. defer wait.Done()
  247. for fid := range fileIdLineChan {
  248. if len(fid) == 0 {
  249. continue
  250. }
  251. if fid[0] == '#' {
  252. continue
  253. }
  254. if *cmdBenchmark.IsDebug {
  255. fmt.Printf("reading file %s\n", fid)
  256. }
  257. start := time.Now()
  258. var bytesRead int
  259. var err error
  260. urls, err := b.masterClient.LookupFileId(fid)
  261. if err != nil {
  262. s.failed++
  263. println("!!!! ", fid, " location not found!!!!!")
  264. continue
  265. }
  266. var bytes []byte
  267. for _, url := range urls {
  268. bytes, _, err = util_http.Get(url)
  269. if err == nil {
  270. break
  271. }
  272. }
  273. bytesRead = len(bytes)
  274. if err == nil {
  275. s.completed++
  276. s.transferred += int64(bytesRead)
  277. readStats.addSample(time.Now().Sub(start))
  278. } else {
  279. s.failed++
  280. fmt.Printf("Failed to read %s error:%v\n", fid, err)
  281. }
  282. }
  283. }
  284. func writeFileIds(fileName string, fileIdLineChan chan string, finishChan chan bool) {
  285. file, err := os.OpenFile(fileName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
  286. if err != nil {
  287. glog.Fatalf("File to create file %s: %s\n", fileName, err)
  288. }
  289. defer file.Close()
  290. for {
  291. select {
  292. case <-finishChan:
  293. wait.Done()
  294. return
  295. case line := <-fileIdLineChan:
  296. file.Write([]byte(line))
  297. file.Write([]byte("\n"))
  298. }
  299. }
  300. }
  301. func readFileIds(fileName string, fileIdLineChan chan string) {
  302. file, err := os.Open(fileName) // For read access.
  303. if err != nil {
  304. glog.Fatalf("File to read file %s: %s\n", fileName, err)
  305. }
  306. defer file.Close()
  307. random := rand.New(rand.NewSource(time.Now().UnixNano()))
  308. r := bufio.NewReader(file)
  309. if *b.sequentialRead {
  310. for {
  311. if line, err := Readln(r); err == nil {
  312. fileIdLineChan <- string(line)
  313. } else {
  314. break
  315. }
  316. }
  317. } else {
  318. lines := make([]string, 0, readStats.total)
  319. for {
  320. if line, err := Readln(r); err == nil {
  321. lines = append(lines, string(line))
  322. } else {
  323. break
  324. }
  325. }
  326. if len(lines) > 0 {
  327. for i := 0; i < readStats.total; i++ {
  328. fileIdLineChan <- lines[random.Intn(len(lines))]
  329. }
  330. }
  331. }
  332. close(fileIdLineChan)
  333. }
  334. const (
  335. benchResolution = 10000 // 0.1 microsecond
  336. benchBucket = 1000000000 / benchResolution
  337. )
  338. // An efficient statics collecting and rendering
  339. type stats struct {
  340. data []int
  341. overflow []int
  342. localStats []stat
  343. start time.Time
  344. end time.Time
  345. total int
  346. }
  347. type stat struct {
  348. completed int
  349. failed int
  350. total int
  351. transferred int64
  352. }
  353. var percentages = []int{50, 66, 75, 80, 90, 95, 98, 99, 100}
  354. func newStats(n int) *stats {
  355. return &stats{
  356. data: make([]int, benchResolution),
  357. overflow: make([]int, 0),
  358. localStats: make([]stat, n),
  359. }
  360. }
  361. func (s *stats) addSample(d time.Duration) {
  362. index := int(d / benchBucket)
  363. if index < 0 {
  364. fmt.Printf("This request takes %3.1f seconds, skipping!\n", float64(index)/10000)
  365. } else if index < len(s.data) {
  366. s.data[int(d/benchBucket)]++
  367. } else {
  368. s.overflow = append(s.overflow, index)
  369. }
  370. }
  371. func (s *stats) checkProgress(testName string, finishChan chan bool) {
  372. fmt.Printf("\n------------ %s ----------\n", testName)
  373. ticker := time.Tick(time.Second)
  374. lastCompleted, lastTransferred, lastTime := 0, int64(0), time.Now()
  375. for {
  376. select {
  377. case <-finishChan:
  378. wait.Done()
  379. return
  380. case t := <-ticker:
  381. completed, transferred, taken, total := 0, int64(0), t.Sub(lastTime), s.total
  382. for _, localStat := range s.localStats {
  383. completed += localStat.completed
  384. transferred += localStat.transferred
  385. total += localStat.total
  386. }
  387. fmt.Printf("Completed %d of %d requests, %3.1f%% %3.1f/s %3.1fMB/s\n",
  388. completed, total, float64(completed)*100/float64(total),
  389. float64(completed-lastCompleted)*float64(int64(time.Second))/float64(int64(taken)),
  390. float64(transferred-lastTransferred)*float64(int64(time.Second))/float64(int64(taken))/float64(1024*1024),
  391. )
  392. lastCompleted, lastTransferred, lastTime = completed, transferred, t
  393. }
  394. }
  395. }
  396. func (s *stats) printStats() {
  397. completed, failed, transferred, total := 0, 0, int64(0), s.total
  398. for _, localStat := range s.localStats {
  399. completed += localStat.completed
  400. failed += localStat.failed
  401. transferred += localStat.transferred
  402. total += localStat.total
  403. }
  404. timeTaken := float64(int64(s.end.Sub(s.start))) / 1000000000
  405. fmt.Printf("\nConcurrency Level: %d\n", *b.concurrency)
  406. fmt.Printf("Time taken for tests: %.3f seconds\n", timeTaken)
  407. fmt.Printf("Completed requests: %d\n", completed)
  408. fmt.Printf("Failed requests: %d\n", failed)
  409. fmt.Printf("Total transferred: %d bytes\n", transferred)
  410. fmt.Printf("Requests per second: %.2f [#/sec]\n", float64(completed)/timeTaken)
  411. fmt.Printf("Transfer rate: %.2f [Kbytes/sec]\n", float64(transferred)/1024/timeTaken)
  412. n, sum := 0, 0
  413. min, max := 10000000, 0
  414. for i := 0; i < len(s.data); i++ {
  415. n += s.data[i]
  416. sum += s.data[i] * i
  417. if s.data[i] > 0 {
  418. if min > i {
  419. min = i
  420. }
  421. if max < i {
  422. max = i
  423. }
  424. }
  425. }
  426. n += len(s.overflow)
  427. for i := 0; i < len(s.overflow); i++ {
  428. sum += s.overflow[i]
  429. if min > s.overflow[i] {
  430. min = s.overflow[i]
  431. }
  432. if max < s.overflow[i] {
  433. max = s.overflow[i]
  434. }
  435. }
  436. avg := float64(sum) / float64(n)
  437. varianceSum := 0.0
  438. for i := 0; i < len(s.data); i++ {
  439. if s.data[i] > 0 {
  440. d := float64(i) - avg
  441. varianceSum += d * d * float64(s.data[i])
  442. }
  443. }
  444. for i := 0; i < len(s.overflow); i++ {
  445. d := float64(s.overflow[i]) - avg
  446. varianceSum += d * d
  447. }
  448. std := math.Sqrt(varianceSum / float64(n))
  449. fmt.Printf("\nConnection Times (ms)\n")
  450. fmt.Printf(" min avg max std\n")
  451. fmt.Printf("Total: %2.1f %3.1f %3.1f %3.1f\n", float32(min)/10, float32(avg)/10, float32(max)/10, std/10)
  452. // printing percentiles
  453. fmt.Printf("\nPercentage of the requests served within a certain time (ms)\n")
  454. percentiles := make([]int, len(percentages))
  455. for i := 0; i < len(percentages); i++ {
  456. percentiles[i] = n * percentages[i] / 100
  457. }
  458. percentiles[len(percentiles)-1] = n
  459. percentileIndex := 0
  460. currentSum := 0
  461. for i := 0; i < len(s.data); i++ {
  462. currentSum += s.data[i]
  463. if s.data[i] > 0 && percentileIndex < len(percentiles) && currentSum >= percentiles[percentileIndex] {
  464. fmt.Printf(" %3d%% %5.1f ms\n", percentages[percentileIndex], float32(i)/10.0)
  465. percentileIndex++
  466. for percentileIndex < len(percentiles) && currentSum >= percentiles[percentileIndex] {
  467. percentileIndex++
  468. }
  469. }
  470. }
  471. sort.Ints(s.overflow)
  472. for i := 0; i < len(s.overflow); i++ {
  473. currentSum++
  474. if percentileIndex < len(percentiles) && currentSum >= percentiles[percentileIndex] {
  475. fmt.Printf(" %3d%% %5.1f ms\n", percentages[percentileIndex], float32(s.overflow[i])/10.0)
  476. percentileIndex++
  477. for percentileIndex < len(percentiles) && currentSum >= percentiles[percentileIndex] {
  478. percentileIndex++
  479. }
  480. }
  481. }
  482. }
  483. // a fake reader to generate content to upload
  484. type FakeReader struct {
  485. id uint64 // an id number
  486. size int64 // max bytes
  487. random *rand.Rand
  488. }
  489. func (l *FakeReader) Read(p []byte) (n int, err error) {
  490. if l.size <= 0 {
  491. return 0, io.EOF
  492. }
  493. if int64(len(p)) > l.size {
  494. n = int(l.size)
  495. } else {
  496. n = len(p)
  497. }
  498. if n >= 8 {
  499. for i := 0; i < 8; i++ {
  500. p[i] = byte(l.id >> uint(i*8))
  501. }
  502. l.random.Read(p[8:])
  503. }
  504. l.size -= int64(n)
  505. return
  506. }
  507. func (l *FakeReader) WriteTo(w io.Writer) (n int64, err error) {
  508. size := int(l.size)
  509. bufferSize := len(sharedBytes)
  510. for size > 0 {
  511. tempBuffer := sharedBytes
  512. if size < bufferSize {
  513. tempBuffer = sharedBytes[0:size]
  514. }
  515. count, e := w.Write(tempBuffer)
  516. if e != nil {
  517. return int64(size), e
  518. }
  519. size -= count
  520. }
  521. return l.size, nil
  522. }
  523. func Readln(r *bufio.Reader) ([]byte, error) {
  524. var (
  525. isPrefix = true
  526. err error
  527. line, ln []byte
  528. )
  529. for isPrefix && err == nil {
  530. line, isPrefix, err = r.ReadLine()
  531. ln = append(ln, line...)
  532. }
  533. return ln, err
  534. }