You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

563 lines
15 KiB

10 years ago
  1. package command
  2. import (
  3. "bufio"
  4. "context"
  5. "fmt"
  6. "github.com/chrislusf/seaweedfs/weed/server"
  7. "github.com/spf13/viper"
  8. "google.golang.org/grpc"
  9. "io"
  10. "math"
  11. "math/rand"
  12. "os"
  13. "runtime"
  14. "runtime/pprof"
  15. "sort"
  16. "strings"
  17. "sync"
  18. "time"
  19. "github.com/chrislusf/seaweedfs/weed/glog"
  20. "github.com/chrislusf/seaweedfs/weed/operation"
  21. "github.com/chrislusf/seaweedfs/weed/security"
  22. "github.com/chrislusf/seaweedfs/weed/util"
  23. "github.com/chrislusf/seaweedfs/weed/wdclient"
  24. )
  25. type BenchmarkOptions struct {
  26. masters *string
  27. concurrency *int
  28. numberOfFiles *int
  29. fileSize *int
  30. idListFile *string
  31. write *bool
  32. deletePercentage *int
  33. read *bool
  34. sequentialRead *bool
  35. collection *string
  36. replication *string
  37. cpuprofile *string
  38. maxCpu *int
  39. grpcDialOption grpc.DialOption
  40. }
  41. var (
  42. b BenchmarkOptions
  43. sharedBytes []byte
  44. masterClient *wdclient.MasterClient
  45. isSecure bool
  46. )
  47. func init() {
  48. cmdBenchmark.Run = runBenchmark // break init cycle
  49. cmdBenchmark.IsDebug = cmdBenchmark.Flag.Bool("debug", false, "verbose debug information")
  50. b.masters = cmdBenchmark.Flag.String("master", "localhost:9333", "SeaweedFS master location")
  51. b.concurrency = cmdBenchmark.Flag.Int("c", 16, "number of concurrent write or read processes")
  52. b.fileSize = cmdBenchmark.Flag.Int("size", 1024, "simulated file size in bytes, with random(0~63) bytes padding")
  53. b.numberOfFiles = cmdBenchmark.Flag.Int("n", 1024*1024, "number of files to write for each thread")
  54. b.idListFile = cmdBenchmark.Flag.String("list", os.TempDir()+"/benchmark_list.txt", "list of uploaded file ids")
  55. b.write = cmdBenchmark.Flag.Bool("write", true, "enable write")
  56. b.deletePercentage = cmdBenchmark.Flag.Int("deletePercent", 0, "the percent of writes that are deletes")
  57. b.read = cmdBenchmark.Flag.Bool("read", true, "enable read")
  58. b.sequentialRead = cmdBenchmark.Flag.Bool("readSequentially", false, "randomly read by ids from \"-list\" specified file")
  59. b.collection = cmdBenchmark.Flag.String("collection", "benchmark", "write data to this collection")
  60. b.replication = cmdBenchmark.Flag.String("replication", "000", "replication type")
  61. b.cpuprofile = cmdBenchmark.Flag.String("cpuprofile", "", "cpu profile output file")
  62. b.maxCpu = cmdBenchmark.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs")
  63. sharedBytes = make([]byte, 1024)
  64. }
  65. var cmdBenchmark = &Command{
  66. UsageLine: "benchmark -server=localhost:9333 -c=10 -n=100000",
  67. Short: "benchmark on writing millions of files and read out",
  68. Long: `benchmark on an empty SeaweedFS file system.
  69. Two tests during benchmark:
  70. 1) write lots of small files to the system
  71. 2) read the files out
  72. The file content is mostly zero, but no compression is done.
  73. You can choose to only benchmark read or write.
  74. During write, the list of uploaded file ids is stored in "-list" specified file.
  75. You can also use your own list of file ids to run read test.
  76. Write speed and read speed will be collected.
  77. The numbers are used to get a sense of the system.
  78. Usually your network or the hard drive is the real bottleneck.
  79. Another thing to watch is whether the volumes are evenly distributed
  80. to each volume server. Because the 7 more benchmark volumes are randomly distributed
  81. to servers with free slots, it's highly possible some servers have uneven amount of
  82. benchmark volumes. To remedy this, you can use this to grow the benchmark volumes
  83. before starting the benchmark command:
  84. http://localhost:9333/vol/grow?collection=benchmark&count=5
  85. After benchmarking, you can clean up the written data by deleting the benchmark collection
  86. http://localhost:9333/col/delete?collection=benchmark
  87. `,
  88. }
  89. var (
  90. wait sync.WaitGroup
  91. writeStats *stats
  92. readStats *stats
  93. )
  94. func runBenchmark(cmd *Command, args []string) bool {
  95. weed_server.LoadConfiguration("security", false)
  96. b.grpcDialOption = security.LoadClientTLS(viper.Sub("grpc"), "client")
  97. fmt.Printf("This is SeaweedFS version %s %s %s\n", util.VERSION, runtime.GOOS, runtime.GOARCH)
  98. if *b.maxCpu < 1 {
  99. *b.maxCpu = runtime.NumCPU()
  100. }
  101. runtime.GOMAXPROCS(*b.maxCpu)
  102. if *b.cpuprofile != "" {
  103. f, err := os.Create(*b.cpuprofile)
  104. if err != nil {
  105. glog.Fatal(err)
  106. }
  107. pprof.StartCPUProfile(f)
  108. defer pprof.StopCPUProfile()
  109. }
  110. masterClient = wdclient.NewMasterClient(context.Background(), b.grpcDialOption, "client", strings.Split(*b.masters, ","))
  111. go masterClient.KeepConnectedToMaster()
  112. masterClient.WaitUntilConnected()
  113. if *b.write {
  114. benchWrite()
  115. }
  116. if *b.read {
  117. benchRead()
  118. }
  119. return true
  120. }
  121. func benchWrite() {
  122. fileIdLineChan := make(chan string)
  123. finishChan := make(chan bool)
  124. writeStats = newStats(*b.concurrency)
  125. idChan := make(chan int)
  126. go writeFileIds(*b.idListFile, fileIdLineChan, finishChan)
  127. for i := 0; i < *b.concurrency; i++ {
  128. wait.Add(1)
  129. go writeFiles(idChan, fileIdLineChan, &writeStats.localStats[i])
  130. }
  131. writeStats.start = time.Now()
  132. writeStats.total = *b.numberOfFiles
  133. go writeStats.checkProgress("Writing Benchmark", finishChan)
  134. for i := 0; i < *b.numberOfFiles; i++ {
  135. idChan <- i
  136. }
  137. close(idChan)
  138. wait.Wait()
  139. writeStats.end = time.Now()
  140. wait.Add(2)
  141. finishChan <- true
  142. finishChan <- true
  143. wait.Wait()
  144. close(finishChan)
  145. writeStats.printStats()
  146. }
  147. func benchRead() {
  148. fileIdLineChan := make(chan string)
  149. finishChan := make(chan bool)
  150. readStats = newStats(*b.concurrency)
  151. go readFileIds(*b.idListFile, fileIdLineChan)
  152. readStats.start = time.Now()
  153. readStats.total = *b.numberOfFiles
  154. go readStats.checkProgress("Randomly Reading Benchmark", finishChan)
  155. for i := 0; i < *b.concurrency; i++ {
  156. wait.Add(1)
  157. go readFiles(fileIdLineChan, &readStats.localStats[i])
  158. }
  159. wait.Wait()
  160. wait.Add(1)
  161. finishChan <- true
  162. wait.Wait()
  163. close(finishChan)
  164. readStats.end = time.Now()
  165. readStats.printStats()
  166. }
  167. type delayedFile struct {
  168. enterTime time.Time
  169. fp *operation.FilePart
  170. }
  171. func writeFiles(idChan chan int, fileIdLineChan chan string, s *stat) {
  172. defer wait.Done()
  173. delayedDeleteChan := make(chan *delayedFile, 100)
  174. var waitForDeletions sync.WaitGroup
  175. for i := 0; i < 7; i++ {
  176. waitForDeletions.Add(1)
  177. go func() {
  178. defer waitForDeletions.Done()
  179. for df := range delayedDeleteChan {
  180. if df.enterTime.After(time.Now()) {
  181. time.Sleep(df.enterTime.Sub(time.Now()))
  182. }
  183. var jwtAuthorization security.EncodedJwt
  184. if isSecure {
  185. jwtAuthorization = operation.LookupJwt(masterClient.GetMaster(), df.fp.Fid)
  186. }
  187. if e := util.Delete(fmt.Sprintf("http://%s/%s", df.fp.Server, df.fp.Fid), string(jwtAuthorization)); e == nil {
  188. s.completed++
  189. } else {
  190. s.failed++
  191. }
  192. }
  193. }()
  194. }
  195. random := rand.New(rand.NewSource(time.Now().UnixNano()))
  196. for id := range idChan {
  197. start := time.Now()
  198. fileSize := int64(*b.fileSize + random.Intn(64))
  199. fp := &operation.FilePart{
  200. Reader: &FakeReader{id: uint64(id), size: fileSize},
  201. FileSize: fileSize,
  202. MimeType: "image/bench", // prevent gzip benchmark content
  203. }
  204. ar := &operation.VolumeAssignRequest{
  205. Count: 1,
  206. Collection: *b.collection,
  207. Replication: *b.replication,
  208. }
  209. if assignResult, err := operation.Assign(masterClient.GetMaster(), b.grpcDialOption, ar); err == nil {
  210. fp.Server, fp.Fid, fp.Collection = assignResult.Url, assignResult.Fid, *b.collection
  211. if !isSecure && assignResult.Auth != "" {
  212. isSecure = true
  213. }
  214. if _, err := fp.Upload(0, masterClient.GetMaster(), assignResult.Auth, b.grpcDialOption); err == nil {
  215. if random.Intn(100) < *b.deletePercentage {
  216. s.total++
  217. delayedDeleteChan <- &delayedFile{time.Now().Add(time.Second), fp}
  218. } else {
  219. fileIdLineChan <- fp.Fid
  220. }
  221. s.completed++
  222. s.transferred += fileSize
  223. } else {
  224. s.failed++
  225. fmt.Printf("Failed to write with error:%v\n", err)
  226. }
  227. writeStats.addSample(time.Now().Sub(start))
  228. if *cmdBenchmark.IsDebug {
  229. fmt.Printf("writing %d file %s\n", id, fp.Fid)
  230. }
  231. } else {
  232. s.failed++
  233. println("writing file error:", err.Error())
  234. }
  235. }
  236. close(delayedDeleteChan)
  237. waitForDeletions.Wait()
  238. }
  239. func readFiles(fileIdLineChan chan string, s *stat) {
  240. defer wait.Done()
  241. for fid := range fileIdLineChan {
  242. if len(fid) == 0 {
  243. continue
  244. }
  245. if fid[0] == '#' {
  246. continue
  247. }
  248. if *cmdBenchmark.IsDebug {
  249. fmt.Printf("reading file %s\n", fid)
  250. }
  251. start := time.Now()
  252. url, err := masterClient.LookupFileId(fid)
  253. if err != nil {
  254. s.failed++
  255. println("!!!! ", fid, " location not found!!!!!")
  256. continue
  257. }
  258. if bytesRead, err := util.Get(url); err == nil {
  259. s.completed++
  260. s.transferred += int64(len(bytesRead))
  261. readStats.addSample(time.Now().Sub(start))
  262. } else {
  263. s.failed++
  264. fmt.Printf("Failed to read %s error:%v\n", url, err)
  265. }
  266. }
  267. }
  268. func writeFileIds(fileName string, fileIdLineChan chan string, finishChan chan bool) {
  269. file, err := os.OpenFile(fileName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
  270. if err != nil {
  271. glog.Fatalf("File to create file %s: %s\n", fileName, err)
  272. }
  273. defer file.Close()
  274. for {
  275. select {
  276. case <-finishChan:
  277. wait.Done()
  278. return
  279. case line := <-fileIdLineChan:
  280. file.Write([]byte(line))
  281. file.Write([]byte("\n"))
  282. }
  283. }
  284. }
  285. func readFileIds(fileName string, fileIdLineChan chan string) {
  286. file, err := os.Open(fileName) // For read access.
  287. if err != nil {
  288. glog.Fatalf("File to read file %s: %s\n", fileName, err)
  289. }
  290. defer file.Close()
  291. random := rand.New(rand.NewSource(time.Now().UnixNano()))
  292. r := bufio.NewReader(file)
  293. if *b.sequentialRead {
  294. for {
  295. if line, err := Readln(r); err == nil {
  296. fileIdLineChan <- string(line)
  297. } else {
  298. break
  299. }
  300. }
  301. } else {
  302. lines := make([]string, 0, readStats.total)
  303. for {
  304. if line, err := Readln(r); err == nil {
  305. lines = append(lines, string(line))
  306. } else {
  307. break
  308. }
  309. }
  310. if len(lines) > 0 {
  311. for i := 0; i < readStats.total; i++ {
  312. fileIdLineChan <- lines[random.Intn(len(lines))]
  313. }
  314. }
  315. }
  316. close(fileIdLineChan)
  317. }
  318. const (
  319. benchResolution = 10000 //0.1 microsecond
  320. benchBucket = 1000000000 / benchResolution
  321. )
  322. // An efficient statics collecting and rendering
  323. type stats struct {
  324. data []int
  325. overflow []int
  326. localStats []stat
  327. start time.Time
  328. end time.Time
  329. total int
  330. }
  331. type stat struct {
  332. completed int
  333. failed int
  334. total int
  335. transferred int64
  336. }
  337. var percentages = []int{50, 66, 75, 80, 90, 95, 98, 99, 100}
  338. func newStats(n int) *stats {
  339. return &stats{
  340. data: make([]int, benchResolution),
  341. overflow: make([]int, 0),
  342. localStats: make([]stat, n),
  343. }
  344. }
  345. func (s *stats) addSample(d time.Duration) {
  346. index := int(d / benchBucket)
  347. if index < 0 {
  348. fmt.Printf("This request takes %3.1f seconds, skipping!\n", float64(index)/10000)
  349. } else if index < len(s.data) {
  350. s.data[int(d/benchBucket)]++
  351. } else {
  352. s.overflow = append(s.overflow, index)
  353. }
  354. }
  355. func (s *stats) checkProgress(testName string, finishChan chan bool) {
  356. fmt.Printf("\n------------ %s ----------\n", testName)
  357. ticker := time.Tick(time.Second)
  358. lastCompleted, lastTransferred, lastTime := 0, int64(0), time.Now()
  359. for {
  360. select {
  361. case <-finishChan:
  362. wait.Done()
  363. return
  364. case t := <-ticker:
  365. completed, transferred, taken, total := 0, int64(0), t.Sub(lastTime), s.total
  366. for _, localStat := range s.localStats {
  367. completed += localStat.completed
  368. transferred += localStat.transferred
  369. total += localStat.total
  370. }
  371. fmt.Printf("Completed %d of %d requests, %3.1f%% %3.1f/s %3.1fMB/s\n",
  372. completed, total, float64(completed)*100/float64(total),
  373. float64(completed-lastCompleted)*float64(int64(time.Second))/float64(int64(taken)),
  374. float64(transferred-lastTransferred)*float64(int64(time.Second))/float64(int64(taken))/float64(1024*1024),
  375. )
  376. lastCompleted, lastTransferred, lastTime = completed, transferred, t
  377. }
  378. }
  379. }
  380. func (s *stats) printStats() {
  381. completed, failed, transferred, total := 0, 0, int64(0), s.total
  382. for _, localStat := range s.localStats {
  383. completed += localStat.completed
  384. failed += localStat.failed
  385. transferred += localStat.transferred
  386. total += localStat.total
  387. }
  388. timeTaken := float64(int64(s.end.Sub(s.start))) / 1000000000
  389. fmt.Printf("\nConcurrency Level: %d\n", *b.concurrency)
  390. fmt.Printf("Time taken for tests: %.3f seconds\n", timeTaken)
  391. fmt.Printf("Complete requests: %d\n", completed)
  392. fmt.Printf("Failed requests: %d\n", failed)
  393. fmt.Printf("Total transferred: %d bytes\n", transferred)
  394. fmt.Printf("Requests per second: %.2f [#/sec]\n", float64(completed)/timeTaken)
  395. fmt.Printf("Transfer rate: %.2f [Kbytes/sec]\n", float64(transferred)/1024/timeTaken)
  396. n, sum := 0, 0
  397. min, max := 10000000, 0
  398. for i := 0; i < len(s.data); i++ {
  399. n += s.data[i]
  400. sum += s.data[i] * i
  401. if s.data[i] > 0 {
  402. if min > i {
  403. min = i
  404. }
  405. if max < i {
  406. max = i
  407. }
  408. }
  409. }
  410. n += len(s.overflow)
  411. for i := 0; i < len(s.overflow); i++ {
  412. sum += s.overflow[i]
  413. if min > s.overflow[i] {
  414. min = s.overflow[i]
  415. }
  416. if max < s.overflow[i] {
  417. max = s.overflow[i]
  418. }
  419. }
  420. avg := float64(sum) / float64(n)
  421. varianceSum := 0.0
  422. for i := 0; i < len(s.data); i++ {
  423. if s.data[i] > 0 {
  424. d := float64(i) - avg
  425. varianceSum += d * d * float64(s.data[i])
  426. }
  427. }
  428. for i := 0; i < len(s.overflow); i++ {
  429. d := float64(s.overflow[i]) - avg
  430. varianceSum += d * d
  431. }
  432. std := math.Sqrt(varianceSum / float64(n))
  433. fmt.Printf("\nConnection Times (ms)\n")
  434. fmt.Printf(" min avg max std\n")
  435. fmt.Printf("Total: %2.1f %3.1f %3.1f %3.1f\n", float32(min)/10, float32(avg)/10, float32(max)/10, std/10)
  436. //printing percentiles
  437. fmt.Printf("\nPercentage of the requests served within a certain time (ms)\n")
  438. percentiles := make([]int, len(percentages))
  439. for i := 0; i < len(percentages); i++ {
  440. percentiles[i] = n * percentages[i] / 100
  441. }
  442. percentiles[len(percentiles)-1] = n
  443. percentileIndex := 0
  444. currentSum := 0
  445. for i := 0; i < len(s.data); i++ {
  446. currentSum += s.data[i]
  447. if s.data[i] > 0 && percentileIndex < len(percentiles) && currentSum >= percentiles[percentileIndex] {
  448. fmt.Printf(" %3d%% %5.1f ms\n", percentages[percentileIndex], float32(i)/10.0)
  449. percentileIndex++
  450. for percentileIndex < len(percentiles) && currentSum >= percentiles[percentileIndex] {
  451. percentileIndex++
  452. }
  453. }
  454. }
  455. sort.Ints(s.overflow)
  456. for i := 0; i < len(s.overflow); i++ {
  457. currentSum++
  458. if percentileIndex < len(percentiles) && currentSum >= percentiles[percentileIndex] {
  459. fmt.Printf(" %3d%% %5.1f ms\n", percentages[percentileIndex], float32(s.overflow[i])/10.0)
  460. percentileIndex++
  461. for percentileIndex < len(percentiles) && currentSum >= percentiles[percentileIndex] {
  462. percentileIndex++
  463. }
  464. }
  465. }
  466. }
  467. // a fake reader to generate content to upload
  468. type FakeReader struct {
  469. id uint64 // an id number
  470. size int64 // max bytes
  471. }
  472. func (l *FakeReader) Read(p []byte) (n int, err error) {
  473. if l.size <= 0 {
  474. return 0, io.EOF
  475. }
  476. if int64(len(p)) > l.size {
  477. n = int(l.size)
  478. } else {
  479. n = len(p)
  480. }
  481. if n >= 8 {
  482. for i := 0; i < 8; i++ {
  483. p[i] = byte(l.id >> uint(i*8))
  484. }
  485. }
  486. l.size -= int64(n)
  487. return
  488. }
  489. func (l *FakeReader) WriteTo(w io.Writer) (n int64, err error) {
  490. size := int(l.size)
  491. bufferSize := len(sharedBytes)
  492. for size > 0 {
  493. tempBuffer := sharedBytes
  494. if size < bufferSize {
  495. tempBuffer = sharedBytes[0:size]
  496. }
  497. count, e := w.Write(tempBuffer)
  498. if e != nil {
  499. return int64(size), e
  500. }
  501. size -= count
  502. }
  503. return l.size, nil
  504. }
  505. func Readln(r *bufio.Reader) ([]byte, error) {
  506. var (
  507. isPrefix = true
  508. err error
  509. line, ln []byte
  510. )
  511. for isPrefix && err == nil {
  512. line, isPrefix, err = r.ReadLine()
  513. ln = append(ln, line...)
  514. }
  515. return ln, err
  516. }