diff --git a/weed/command/benchmark.go b/weed/command/benchmark.go index 41aadc6db..7f132892e 100644 --- a/weed/command/benchmark.go +++ b/weed/command/benchmark.go @@ -41,7 +41,6 @@ type BenchmarkOptions struct { grpcDialOption grpc.DialOption masterClient *wdclient.MasterClient fsync *bool - useTcp *bool } var ( @@ -68,7 +67,6 @@ func init() { b.cpuprofile = cmdBenchmark.Flag.String("cpuprofile", "", "cpu profile output file") b.maxCpu = cmdBenchmark.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs") b.fsync = cmdBenchmark.Flag.Bool("fsync", false, "flush data to disk after write") - b.useTcp = cmdBenchmark.Flag.Bool("useTcp", false, "send data via tcp") sharedBytes = make([]byte, 1024) } @@ -225,8 +223,6 @@ func writeFiles(idChan chan int, fileIdLineChan chan string, s *stat) { random := rand.New(rand.NewSource(time.Now().UnixNano())) - volumeTcpClient := wdclient.NewVolumeTcpClient() - for id := range idChan { start := time.Now() fileSize := int64(*b.fileSize + random.Intn(64)) @@ -247,15 +243,7 @@ func writeFiles(idChan chan int, fileIdLineChan chan string, s *stat) { if !isSecure && assignResult.Auth != "" { isSecure = true } - if *b.useTcp { - if uploadByTcp(volumeTcpClient, fp) { - fileIdLineChan <- fp.Fid - s.completed++ - s.transferred += fileSize - } else { - s.failed++ - } - } else if _, err := fp.Upload(0, b.masterClient.GetMaster, false, assignResult.Auth, b.grpcDialOption); err == nil { + if _, err := fp.Upload(0, b.masterClient.GetMaster, false, assignResult.Auth, b.grpcDialOption); err == nil { if random.Intn(100) < *b.deletePercentage { s.total++ delayedDeleteChan <- &delayedFile{time.Now().Add(time.Second), fp} @@ -341,17 +329,6 @@ func writeFileIds(fileName string, fileIdLineChan chan string, finishChan chan b } } -func uploadByTcp(volumeTcpClient *wdclient.VolumeTcpClient, fp *operation.FilePart) bool { - - err := volumeTcpClient.PutFileChunk(fp.Server, fp.Fid, uint32(fp.FileSize), fp.Reader) - if err != nil { - glog.Errorf("upload chunk err: %v", err) - return false - } - - return true -} - func readFileIds(fileName string, fileIdLineChan chan string) { file, err := os.Open(fileName) // For read access. if err != nil { diff --git a/weed/wdclient/volume_tcp_client.go b/weed/wdclient/volume_tcp_client.go deleted file mode 100644 index d7ea81d64..000000000 --- a/weed/wdclient/volume_tcp_client.go +++ /dev/null @@ -1,97 +0,0 @@ -package wdclient - -import ( - "bufio" - "bytes" - "fmt" - "github.com/seaweedfs/seaweedfs/weed/glog" - "github.com/seaweedfs/seaweedfs/weed/pb" - "github.com/seaweedfs/seaweedfs/weed/util" - "github.com/seaweedfs/seaweedfs/weed/wdclient/net2" - "io" - "net" - "time" -) - -// VolumeTcpClient put/get/delete file chunks directly on volume servers without replication -type VolumeTcpClient struct { - cp net2.ConnectionPool -} - -type VolumeTcpConn struct { - net.Conn - bufWriter *bufio.Writer - bufReader *bufio.Reader -} - -func NewVolumeTcpClient() *VolumeTcpClient { - MaxIdleTime := 10 * time.Second - return &VolumeTcpClient{ - cp: net2.NewMultiConnectionPool(net2.ConnectionOptions{ - MaxActiveConnections: 16, - MaxIdleConnections: 1, - MaxIdleTime: &MaxIdleTime, - DialMaxConcurrency: 0, - Dial: func(network string, address string) (net.Conn, error) { - conn, err := net.Dial(network, address) - return &VolumeTcpConn{ - conn, - bufio.NewWriter(conn), - bufio.NewReader(conn), - }, err - }, - NowFunc: nil, - ReadTimeout: 0, - WriteTimeout: 0, - }), - } -} -func (c *VolumeTcpClient) PutFileChunk(volumeServerAddress string, fileId string, fileSize uint32, fileReader io.Reader) (err error) { - - tcpAddress, parseErr := pb.ParseServerAddress(volumeServerAddress, 20000) - if parseErr != nil { - return parseErr - } - - c.cp.Register("tcp", tcpAddress) - tcpConn, getErr := c.cp.Get("tcp", tcpAddress) - if getErr != nil { - return fmt.Errorf("get connection to %s: %v", tcpAddress, getErr) - } - conn := tcpConn.RawConn().(*VolumeTcpConn) - defer func() { - if err != nil { - tcpConn.DiscardConnection() - } else { - tcpConn.ReleaseConnection() - } - }() - - buf := []byte("+" + fileId + "\n") - _, err = conn.bufWriter.Write([]byte(buf)) - if err != nil { - return - } - util.Uint32toBytes(buf[0:4], fileSize) - _, err = conn.bufWriter.Write(buf[0:4]) - if err != nil { - return - } - _, err = io.Copy(conn.bufWriter, fileReader) - if err != nil { - return - } - conn.bufWriter.Write([]byte("!\n")) - conn.bufWriter.Flush() - - ret, _, err := conn.bufReader.ReadLine() - if err != nil { - glog.V(0).Infof("upload by tcp: %v", err) - return - } - if !bytes.HasPrefix(ret, []byte("+OK")) { - glog.V(0).Infof("upload by tcp: %v", string(ret)) - } - - return nil -}