You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

562 lines
15 KiB

6 years ago
6 years ago
6 years ago
10 years ago
6 years ago
6 years ago
6 years ago
6 years ago
  1. package command
  2. import (
  3. "bufio"
  4. "context"
  5. "fmt"
  6. "io"
  7. "math"
  8. "math/rand"
  9. "os"
  10. "runtime"
  11. "runtime/pprof"
  12. "sort"
  13. "strings"
  14. "sync"
  15. "time"
  16. "google.golang.org/grpc"
  17. "github.com/chrislusf/seaweedfs/weed/glog"
  18. "github.com/chrislusf/seaweedfs/weed/operation"
  19. "github.com/chrislusf/seaweedfs/weed/security"
  20. "github.com/chrislusf/seaweedfs/weed/util"
  21. "github.com/chrislusf/seaweedfs/weed/wdclient"
  22. )
  23. type BenchmarkOptions struct {
  24. masters *string
  25. concurrency *int
  26. numberOfFiles *int
  27. fileSize *int
  28. idListFile *string
  29. write *bool
  30. deletePercentage *int
  31. read *bool
  32. sequentialRead *bool
  33. collection *string
  34. replication *string
  35. cpuprofile *string
  36. maxCpu *int
  37. grpcDialOption grpc.DialOption
  38. masterClient *wdclient.MasterClient
  39. }
  40. var (
  41. b BenchmarkOptions
  42. sharedBytes []byte
  43. isSecure bool
  44. )
  45. func init() {
  46. cmdBenchmark.Run = runBenchmark // break init cycle
  47. cmdBenchmark.IsDebug = cmdBenchmark.Flag.Bool("debug", false, "verbose debug information")
  48. b.masters = cmdBenchmark.Flag.String("master", "localhost:9333", "SeaweedFS master location")
  49. b.concurrency = cmdBenchmark.Flag.Int("c", 16, "number of concurrent write or read processes")
  50. b.fileSize = cmdBenchmark.Flag.Int("size", 1024, "simulated file size in bytes, with random(0~63) bytes padding")
  51. b.numberOfFiles = cmdBenchmark.Flag.Int("n", 1024*1024, "number of files to write for each thread")
  52. b.idListFile = cmdBenchmark.Flag.String("list", os.TempDir()+"/benchmark_list.txt", "list of uploaded file ids")
  53. b.write = cmdBenchmark.Flag.Bool("write", true, "enable write")
  54. b.deletePercentage = cmdBenchmark.Flag.Int("deletePercent", 0, "the percent of writes that are deletes")
  55. b.read = cmdBenchmark.Flag.Bool("read", true, "enable read")
  56. b.sequentialRead = cmdBenchmark.Flag.Bool("readSequentially", false, "randomly read by ids from \"-list\" specified file")
  57. b.collection = cmdBenchmark.Flag.String("collection", "benchmark", "write data to this collection")
  58. b.replication = cmdBenchmark.Flag.String("replication", "000", "replication type")
  59. b.cpuprofile = cmdBenchmark.Flag.String("cpuprofile", "", "cpu profile output file")
  60. b.maxCpu = cmdBenchmark.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs")
  61. sharedBytes = make([]byte, 1024)
  62. }
  63. var cmdBenchmark = &Command{
  64. UsageLine: "benchmark -master=localhost:9333 -c=10 -n=100000",
  65. Short: "benchmark on writing millions of files and read out",
  66. Long: `benchmark on an empty SeaweedFS file system.
  67. Two tests during benchmark:
  68. 1) write lots of small files to the system
  69. 2) read the files out
  70. The file content is mostly zero, but no compression is done.
  71. You can choose to only benchmark read or write.
  72. During write, the list of uploaded file ids is stored in "-list" specified file.
  73. You can also use your own list of file ids to run read test.
  74. Write speed and read speed will be collected.
  75. The numbers are used to get a sense of the system.
  76. Usually your network or the hard drive is the real bottleneck.
  77. Another thing to watch is whether the volumes are evenly distributed
  78. to each volume server. Because the 7 more benchmark volumes are randomly distributed
  79. to servers with free slots, it's highly possible some servers have uneven amount of
  80. benchmark volumes. To remedy this, you can use this to grow the benchmark volumes
  81. before starting the benchmark command:
  82. http://localhost:9333/vol/grow?collection=benchmark&count=5
  83. After benchmarking, you can clean up the written data by deleting the benchmark collection
  84. http://localhost:9333/col/delete?collection=benchmark
  85. `,
  86. }
  87. var (
  88. wait sync.WaitGroup
  89. writeStats *stats
  90. readStats *stats
  91. )
  92. func runBenchmark(cmd *Command, args []string) bool {
  93. util.LoadConfiguration("security", false)
  94. b.grpcDialOption = security.LoadClientTLS(util.GetViper(), "grpc.client")
  95. fmt.Printf("This is SeaweedFS version %s %s %s\n", util.VERSION, runtime.GOOS, runtime.GOARCH)
  96. if *b.maxCpu < 1 {
  97. *b.maxCpu = runtime.NumCPU()
  98. }
  99. runtime.GOMAXPROCS(*b.maxCpu)
  100. if *b.cpuprofile != "" {
  101. f, err := os.Create(*b.cpuprofile)
  102. if err != nil {
  103. glog.Fatal(err)
  104. }
  105. pprof.StartCPUProfile(f)
  106. defer pprof.StopCPUProfile()
  107. }
  108. b.masterClient = wdclient.NewMasterClient(context.Background(), b.grpcDialOption, "client", strings.Split(*b.masters, ","))
  109. go b.masterClient.KeepConnectedToMaster()
  110. b.masterClient.WaitUntilConnected()
  111. if *b.write {
  112. benchWrite()
  113. }
  114. if *b.read {
  115. benchRead()
  116. }
  117. return true
  118. }
  119. func benchWrite() {
  120. fileIdLineChan := make(chan string)
  121. finishChan := make(chan bool)
  122. writeStats = newStats(*b.concurrency)
  123. idChan := make(chan int)
  124. go writeFileIds(*b.idListFile, fileIdLineChan, finishChan)
  125. for i := 0; i < *b.concurrency; i++ {
  126. wait.Add(1)
  127. go writeFiles(idChan, fileIdLineChan, &writeStats.localStats[i])
  128. }
  129. writeStats.start = time.Now()
  130. writeStats.total = *b.numberOfFiles
  131. go writeStats.checkProgress("Writing Benchmark", finishChan)
  132. for i := 0; i < *b.numberOfFiles; i++ {
  133. idChan <- i
  134. }
  135. close(idChan)
  136. wait.Wait()
  137. writeStats.end = time.Now()
  138. wait.Add(2)
  139. finishChan <- true
  140. finishChan <- true
  141. wait.Wait()
  142. close(finishChan)
  143. writeStats.printStats()
  144. }
  145. func benchRead() {
  146. fileIdLineChan := make(chan string)
  147. finishChan := make(chan bool)
  148. readStats = newStats(*b.concurrency)
  149. go readFileIds(*b.idListFile, fileIdLineChan)
  150. readStats.start = time.Now()
  151. readStats.total = *b.numberOfFiles
  152. go readStats.checkProgress("Randomly Reading Benchmark", finishChan)
  153. for i := 0; i < *b.concurrency; i++ {
  154. wait.Add(1)
  155. go readFiles(fileIdLineChan, &readStats.localStats[i])
  156. }
  157. wait.Wait()
  158. wait.Add(1)
  159. finishChan <- true
  160. wait.Wait()
  161. close(finishChan)
  162. readStats.end = time.Now()
  163. readStats.printStats()
  164. }
  165. type delayedFile struct {
  166. enterTime time.Time
  167. fp *operation.FilePart
  168. }
  169. func writeFiles(idChan chan int, fileIdLineChan chan string, s *stat) {
  170. defer wait.Done()
  171. delayedDeleteChan := make(chan *delayedFile, 100)
  172. var waitForDeletions sync.WaitGroup
  173. for i := 0; i < 7; i++ {
  174. waitForDeletions.Add(1)
  175. go func() {
  176. defer waitForDeletions.Done()
  177. for df := range delayedDeleteChan {
  178. if df.enterTime.After(time.Now()) {
  179. time.Sleep(df.enterTime.Sub(time.Now()))
  180. }
  181. var jwtAuthorization security.EncodedJwt
  182. if isSecure {
  183. jwtAuthorization = operation.LookupJwt(b.masterClient.GetMaster(), df.fp.Fid)
  184. }
  185. if e := util.Delete(fmt.Sprintf("http://%s/%s", df.fp.Server, df.fp.Fid), string(jwtAuthorization)); e == nil {
  186. s.completed++
  187. } else {
  188. s.failed++
  189. }
  190. }
  191. }()
  192. }
  193. random := rand.New(rand.NewSource(time.Now().UnixNano()))
  194. for id := range idChan {
  195. start := time.Now()
  196. fileSize := int64(*b.fileSize + random.Intn(64))
  197. fp := &operation.FilePart{
  198. Reader: &FakeReader{id: uint64(id), size: fileSize},
  199. FileSize: fileSize,
  200. MimeType: "image/bench", // prevent gzip benchmark content
  201. }
  202. ar := &operation.VolumeAssignRequest{
  203. Count: 1,
  204. Collection: *b.collection,
  205. Replication: *b.replication,
  206. }
  207. if assignResult, err := operation.Assign(b.masterClient.GetMaster(), b.grpcDialOption, ar); err == nil {
  208. fp.Server, fp.Fid, fp.Collection = assignResult.Url, assignResult.Fid, *b.collection
  209. if !isSecure && assignResult.Auth != "" {
  210. isSecure = true
  211. }
  212. if _, err := fp.Upload(0, b.masterClient.GetMaster(), assignResult.Auth, b.grpcDialOption); err == nil {
  213. if random.Intn(100) < *b.deletePercentage {
  214. s.total++
  215. delayedDeleteChan <- &delayedFile{time.Now().Add(time.Second), fp}
  216. } else {
  217. fileIdLineChan <- fp.Fid
  218. }
  219. s.completed++
  220. s.transferred += fileSize
  221. } else {
  222. s.failed++
  223. fmt.Printf("Failed to write with error:%v\n", err)
  224. }
  225. writeStats.addSample(time.Now().Sub(start))
  226. if *cmdBenchmark.IsDebug {
  227. fmt.Printf("writing %d file %s\n", id, fp.Fid)
  228. }
  229. } else {
  230. s.failed++
  231. println("writing file error:", err.Error())
  232. }
  233. }
  234. close(delayedDeleteChan)
  235. waitForDeletions.Wait()
  236. }
  237. func readFiles(fileIdLineChan chan string, s *stat) {
  238. defer wait.Done()
  239. for fid := range fileIdLineChan {
  240. if len(fid) == 0 {
  241. continue
  242. }
  243. if fid[0] == '#' {
  244. continue
  245. }
  246. if *cmdBenchmark.IsDebug {
  247. fmt.Printf("reading file %s\n", fid)
  248. }
  249. start := time.Now()
  250. url, err := b.masterClient.LookupFileId(fid)
  251. if err != nil {
  252. s.failed++
  253. println("!!!! ", fid, " location not found!!!!!")
  254. continue
  255. }
  256. if bytesRead, err := util.Get(url); err == nil {
  257. s.completed++
  258. s.transferred += int64(len(bytesRead))
  259. readStats.addSample(time.Now().Sub(start))
  260. } else {
  261. s.failed++
  262. fmt.Printf("Failed to read %s error:%v\n", url, err)
  263. }
  264. }
  265. }
  266. func writeFileIds(fileName string, fileIdLineChan chan string, finishChan chan bool) {
  267. file, err := os.OpenFile(fileName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
  268. if err != nil {
  269. glog.Fatalf("File to create file %s: %s\n", fileName, err)
  270. }
  271. defer file.Close()
  272. for {
  273. select {
  274. case <-finishChan:
  275. wait.Done()
  276. return
  277. case line := <-fileIdLineChan:
  278. file.Write([]byte(line))
  279. file.Write([]byte("\n"))
  280. }
  281. }
  282. }
  283. func readFileIds(fileName string, fileIdLineChan chan string) {
  284. file, err := os.Open(fileName) // For read access.
  285. if err != nil {
  286. glog.Fatalf("File to read file %s: %s\n", fileName, err)
  287. }
  288. defer file.Close()
  289. random := rand.New(rand.NewSource(time.Now().UnixNano()))
  290. r := bufio.NewReader(file)
  291. if *b.sequentialRead {
  292. for {
  293. if line, err := Readln(r); err == nil {
  294. fileIdLineChan <- string(line)
  295. } else {
  296. break
  297. }
  298. }
  299. } else {
  300. lines := make([]string, 0, readStats.total)
  301. for {
  302. if line, err := Readln(r); err == nil {
  303. lines = append(lines, string(line))
  304. } else {
  305. break
  306. }
  307. }
  308. if len(lines) > 0 {
  309. for i := 0; i < readStats.total; i++ {
  310. fileIdLineChan <- lines[random.Intn(len(lines))]
  311. }
  312. }
  313. }
  314. close(fileIdLineChan)
  315. }
  316. const (
  317. benchResolution = 10000 //0.1 microsecond
  318. benchBucket = 1000000000 / benchResolution
  319. )
  320. // An efficient statics collecting and rendering
  321. type stats struct {
  322. data []int
  323. overflow []int
  324. localStats []stat
  325. start time.Time
  326. end time.Time
  327. total int
  328. }
  329. type stat struct {
  330. completed int
  331. failed int
  332. total int
  333. transferred int64
  334. }
  335. var percentages = []int{50, 66, 75, 80, 90, 95, 98, 99, 100}
  336. func newStats(n int) *stats {
  337. return &stats{
  338. data: make([]int, benchResolution),
  339. overflow: make([]int, 0),
  340. localStats: make([]stat, n),
  341. }
  342. }
  343. func (s *stats) addSample(d time.Duration) {
  344. index := int(d / benchBucket)
  345. if index < 0 {
  346. fmt.Printf("This request takes %3.1f seconds, skipping!\n", float64(index)/10000)
  347. } else if index < len(s.data) {
  348. s.data[int(d/benchBucket)]++
  349. } else {
  350. s.overflow = append(s.overflow, index)
  351. }
  352. }
  353. func (s *stats) checkProgress(testName string, finishChan chan bool) {
  354. fmt.Printf("\n------------ %s ----------\n", testName)
  355. ticker := time.Tick(time.Second)
  356. lastCompleted, lastTransferred, lastTime := 0, int64(0), time.Now()
  357. for {
  358. select {
  359. case <-finishChan:
  360. wait.Done()
  361. return
  362. case t := <-ticker:
  363. completed, transferred, taken, total := 0, int64(0), t.Sub(lastTime), s.total
  364. for _, localStat := range s.localStats {
  365. completed += localStat.completed
  366. transferred += localStat.transferred
  367. total += localStat.total
  368. }
  369. fmt.Printf("Completed %d of %d requests, %3.1f%% %3.1f/s %3.1fMB/s\n",
  370. completed, total, float64(completed)*100/float64(total),
  371. float64(completed-lastCompleted)*float64(int64(time.Second))/float64(int64(taken)),
  372. float64(transferred-lastTransferred)*float64(int64(time.Second))/float64(int64(taken))/float64(1024*1024),
  373. )
  374. lastCompleted, lastTransferred, lastTime = completed, transferred, t
  375. }
  376. }
  377. }
  378. func (s *stats) printStats() {
  379. completed, failed, transferred, total := 0, 0, int64(0), s.total
  380. for _, localStat := range s.localStats {
  381. completed += localStat.completed
  382. failed += localStat.failed
  383. transferred += localStat.transferred
  384. total += localStat.total
  385. }
  386. timeTaken := float64(int64(s.end.Sub(s.start))) / 1000000000
  387. fmt.Printf("\nConcurrency Level: %d\n", *b.concurrency)
  388. fmt.Printf("Time taken for tests: %.3f seconds\n", timeTaken)
  389. fmt.Printf("Complete requests: %d\n", completed)
  390. fmt.Printf("Failed requests: %d\n", failed)
  391. fmt.Printf("Total transferred: %d bytes\n", transferred)
  392. fmt.Printf("Requests per second: %.2f [#/sec]\n", float64(completed)/timeTaken)
  393. fmt.Printf("Transfer rate: %.2f [Kbytes/sec]\n", float64(transferred)/1024/timeTaken)
  394. n, sum := 0, 0
  395. min, max := 10000000, 0
  396. for i := 0; i < len(s.data); i++ {
  397. n += s.data[i]
  398. sum += s.data[i] * i
  399. if s.data[i] > 0 {
  400. if min > i {
  401. min = i
  402. }
  403. if max < i {
  404. max = i
  405. }
  406. }
  407. }
  408. n += len(s.overflow)
  409. for i := 0; i < len(s.overflow); i++ {
  410. sum += s.overflow[i]
  411. if min > s.overflow[i] {
  412. min = s.overflow[i]
  413. }
  414. if max < s.overflow[i] {
  415. max = s.overflow[i]
  416. }
  417. }
  418. avg := float64(sum) / float64(n)
  419. varianceSum := 0.0
  420. for i := 0; i < len(s.data); i++ {
  421. if s.data[i] > 0 {
  422. d := float64(i) - avg
  423. varianceSum += d * d * float64(s.data[i])
  424. }
  425. }
  426. for i := 0; i < len(s.overflow); i++ {
  427. d := float64(s.overflow[i]) - avg
  428. varianceSum += d * d
  429. }
  430. std := math.Sqrt(varianceSum / float64(n))
  431. fmt.Printf("\nConnection Times (ms)\n")
  432. fmt.Printf(" min avg max std\n")
  433. fmt.Printf("Total: %2.1f %3.1f %3.1f %3.1f\n", float32(min)/10, float32(avg)/10, float32(max)/10, std/10)
  434. //printing percentiles
  435. fmt.Printf("\nPercentage of the requests served within a certain time (ms)\n")
  436. percentiles := make([]int, len(percentages))
  437. for i := 0; i < len(percentages); i++ {
  438. percentiles[i] = n * percentages[i] / 100
  439. }
  440. percentiles[len(percentiles)-1] = n
  441. percentileIndex := 0
  442. currentSum := 0
  443. for i := 0; i < len(s.data); i++ {
  444. currentSum += s.data[i]
  445. if s.data[i] > 0 && percentileIndex < len(percentiles) && currentSum >= percentiles[percentileIndex] {
  446. fmt.Printf(" %3d%% %5.1f ms\n", percentages[percentileIndex], float32(i)/10.0)
  447. percentileIndex++
  448. for percentileIndex < len(percentiles) && currentSum >= percentiles[percentileIndex] {
  449. percentileIndex++
  450. }
  451. }
  452. }
  453. sort.Ints(s.overflow)
  454. for i := 0; i < len(s.overflow); i++ {
  455. currentSum++
  456. if percentileIndex < len(percentiles) && currentSum >= percentiles[percentileIndex] {
  457. fmt.Printf(" %3d%% %5.1f ms\n", percentages[percentileIndex], float32(s.overflow[i])/10.0)
  458. percentileIndex++
  459. for percentileIndex < len(percentiles) && currentSum >= percentiles[percentileIndex] {
  460. percentileIndex++
  461. }
  462. }
  463. }
  464. }
  465. // a fake reader to generate content to upload
  466. type FakeReader struct {
  467. id uint64 // an id number
  468. size int64 // max bytes
  469. }
  470. func (l *FakeReader) Read(p []byte) (n int, err error) {
  471. if l.size <= 0 {
  472. return 0, io.EOF
  473. }
  474. if int64(len(p)) > l.size {
  475. n = int(l.size)
  476. } else {
  477. n = len(p)
  478. }
  479. if n >= 8 {
  480. for i := 0; i < 8; i++ {
  481. p[i] = byte(l.id >> uint(i*8))
  482. }
  483. }
  484. l.size -= int64(n)
  485. return
  486. }
  487. func (l *FakeReader) WriteTo(w io.Writer) (n int64, err error) {
  488. size := int(l.size)
  489. bufferSize := len(sharedBytes)
  490. for size > 0 {
  491. tempBuffer := sharedBytes
  492. if size < bufferSize {
  493. tempBuffer = sharedBytes[0:size]
  494. }
  495. count, e := w.Write(tempBuffer)
  496. if e != nil {
  497. return int64(size), e
  498. }
  499. size -= count
  500. }
  501. return l.size, nil
  502. }
  503. func Readln(r *bufio.Reader) ([]byte, error) {
  504. var (
  505. isPrefix = true
  506. err error
  507. line, ln []byte
  508. )
  509. for isPrefix && err == nil {
  510. line, isPrefix, err = r.ReadLine()
  511. ln = append(ln, line...)
  512. }
  513. return ln, err
  514. }