Browse Source

add tcp read

Performance not so good. Could need some optimization.

Concurrency Level:      16
Time taken for tests:   33.575 seconds
Complete requests:      1048576
Failed requests:        0
Total transferred:      1106753375 bytes
Requests per second:    31230.86 [#/sec]
Transfer rate:          32191.03 [Kbytes/sec]

vs  normal http

Concurrency Level:      16
Time taken for tests:   24.829 seconds
Complete requests:      1048576
Failed requests:        0
Total transferred:      1106761259 bytes
Requests per second:    42231.10 [#/sec]
Transfer rate:          43529.78 [Kbytes/sec]
tcp_read
Chris Lu 5 years ago
parent
commit
be415f4e3c
  1. 47
      weed/command/benchmark.go
  2. 1
      weed/command/server.go
  3. 32
      weed/command/volume.go
  4. 73
      weed/operation/grpc_client.go
  5. 4
      weed/pb/volume_server.proto
  6. 567
      weed/pb/volume_server_pb/volume_server.pb.go
  7. 150
      weed/server/volume_tcp_file.go
  8. 56
      weed/util/http_util.go

47
weed/command/benchmark.go

@ -7,6 +7,7 @@ import (
"io"
"math"
"math/rand"
"net"
"os"
"runtime"
"runtime/pprof"
@ -41,7 +42,8 @@ type BenchmarkOptions struct {
maxCpu *int
grpcDialOption grpc.DialOption
masterClient *wdclient.MasterClient
grpcRead *bool
readByGrpc *bool
readByTcp *bool
}
var (
@ -66,7 +68,8 @@ func init() {
b.replication = cmdBenchmark.Flag.String("replication", "000", "replication type")
b.cpuprofile = cmdBenchmark.Flag.String("cpuprofile", "", "cpu profile output file")
b.maxCpu = cmdBenchmark.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs")
b.grpcRead = cmdBenchmark.Flag.Bool("grpcRead", false, "use grpc API to read")
b.readByGrpc = cmdBenchmark.Flag.Bool("read.grpc", false, "use grpc API to read")
b.readByTcp = cmdBenchmark.Flag.Bool("read.tcp", false, "use tcp API to read")
sharedBytes = make([]byte, 1024)
}
@ -283,7 +286,7 @@ func readFiles(fileIdLineChan chan string, s *stat) {
start := time.Now()
var bytesRead int
var err error
if *b.grpcRead {
if *b.readByGrpc {
volumeServer, err := b.masterClient.LookupVolumeServer(fid)
if err != nil {
s.failed++
@ -291,6 +294,15 @@ func readFiles(fileIdLineChan chan string, s *stat) {
continue
}
bytesRead, err = grpcFileGet(volumeServer, fid, b.grpcDialOption)
} else if *b.readByTcp {
volumeServer, err := b.masterClient.LookupVolumeServer(fid)
if err != nil {
s.failed++
println("!!!! ", fid, " location not found!!!!!")
continue
}
bytesRead, err = tcpFileGet(volumeServer, fid)
} else {
url, err := b.masterClient.LookupFileId(fid)
if err != nil {
@ -336,6 +348,35 @@ func grpcFileGet(volumeServer, fid string, grpcDialOption grpc.DialOption) (byte
return
}
func tcpFileGet(volumeServer, fid string) (bytesRead int, err error) {
err = operation.WithVolumeServerTcpConnection(volumeServer, func(conn net.Conn) error {
// println("requesting", fid, "...")
if err := util.WriteMessage(conn, &volume_server_pb.TcpRequestHeader{
Get: &volume_server_pb.FileGetRequest{FileId: fid},
}); err != nil {
return err
}
for {
resp := &volume_server_pb.FileGetResponse{}
// println("reading...")
respErr := util.ReadMessage(conn, resp)
if respErr != nil {
if respErr == io.EOF {
return nil
}
// println("err:", respErr.Error())
return respErr
}
// println("resp size", len(resp.Data))
bytesRead += len(resp.Data)
}
})
return
}
func writeFileIds(fileName string, fileIdLineChan chan string, finishChan chan bool) {
file, err := os.OpenFile(fileName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
if err != nil {

1
weed/command/server.go

@ -91,6 +91,7 @@ func init() {
serverOptions.v.compactionMBPerSecond = cmdServer.Flag.Int("volume.compactionMBps", 0, "limit compaction speed in mega bytes per second")
serverOptions.v.fileSizeLimitMB = cmdServer.Flag.Int("volume.fileSizeLimitMB", 256, "limit file size to avoid out of memory")
serverOptions.v.publicUrl = cmdServer.Flag.String("volume.publicUrl", "", "publicly accessible address")
serverOptions.v.enableTcp = cmdServer.Flag.Bool("volume.enableTcp", false, "[experimental] toggle tcp port, running on 20000 + port")
s3Options.filerBucketsPath = cmdServer.Flag.String("s3.filer.dir.buckets", "/buckets", "folder on filer to store all buckets")
s3Options.port = cmdServer.Flag.Int("s3.port", 8333, "s3 server http listen port")

32
weed/command/volume.go

@ -50,6 +50,7 @@ type VolumeServerOptions struct {
memProfile *string
compactionMBPerSecond *int
fileSizeLimitMB *int
enableTcp *bool // temporary toggle
}
func init() {
@ -71,6 +72,7 @@ func init() {
v.memProfile = cmdVolume.Flag.String("memprofile", "", "memory profile output file")
v.compactionMBPerSecond = cmdVolume.Flag.Int("compactionMBps", 0, "limit background compaction or copying speed in mega bytes per second")
v.fileSizeLimitMB = cmdVolume.Flag.Int("fileSizeLimitMB", 256, "limit file size to avoid out of memory")
v.enableTcp = cmdVolume.Flag.Bool("enableTcp", false, "[experimental] toggle tcp port, running on 20000 + port")
}
var cmdVolume = &Command{
@ -168,6 +170,10 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v
// starting grpc server
grpcS := v.startGrpcService(volumeServer)
if v.enableTcp != nil && *v.enableTcp {
go v.startTcpServer(volumeServer)
}
// starting public http server
var publicHttpDown httpdown.Server
if v.isSeparatedPublicPort() {
@ -245,6 +251,32 @@ func (v VolumeServerOptions) startGrpcService(vs volume_server_pb.VolumeServerSe
return grpcS
}
func (v VolumeServerOptions) startTcpServer(vs *weed_server.VolumeServer) {
tcpPort := *v.port + 20000
tcpL, err := util.NewListener(*v.bindIp+":"+strconv.Itoa(tcpPort), 0)
if err != nil {
glog.Fatalf("failed to listen on tcp port %d: %v", tcpPort, err)
}
defer tcpL.Close()
for {
c, err := tcpL.Accept()
if err!= nil {
glog.V(0).Infof("accept tcp connection: %v", err)
continue
}
go func() {
for {
if err := vs.HandleTcpConnection(c); err != nil {
glog.V(0).Infof("handle tcp remote %s: %v", c.RemoteAddr(), err)
return
}
}
}()
}
}
func (v VolumeServerOptions) startPublicHttpService(handler http.Handler) httpdown.Server {
publicListeningAddress := *v.bindIp + ":" + strconv.Itoa(*v.publicPort)
glog.V(0).Infoln("Start Seaweed volume server", util.VERSION, "public at", publicListeningAddress)

73
weed/operation/grpc_client.go

@ -3,13 +3,22 @@ package operation
import (
"context"
"fmt"
"net"
"strconv"
"strings"
"sync"
"google.golang.org/grpc"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
"github.com/chrislusf/seaweedfs/weed/util"
"google.golang.org/grpc"
"strconv"
"strings"
)
var (
connectionPool = make(map[string]*sync.Pool)
connectionPoolLock sync.Mutex
)
func WithVolumeServerClient(volumeServer string, grpcDialOption grpc.DialOption, fn func(context.Context, volume_server_pb.VolumeServerClient) error) error {
@ -38,6 +47,64 @@ func toVolumeServerGrpcAddress(volumeServer string) (grpcAddress string, err err
return fmt.Sprintf("%s:%d", volumeServer[0:sepIndex], port+10000), nil
}
func WithVolumeServerTcpConnection(volumeServer string, fn func(conn net.Conn) error) error {
tcpAddress, err := toVolumeServerTcpAddress(volumeServer)
if err != nil {
return err
}
conn := getConnection(tcpAddress)
defer releaseConnection(conn, tcpAddress)
err = fn(conn)
return err
}
func getConnection(tcpAddress string) net.Conn {
connectionPoolLock.Lock()
defer connectionPoolLock.Unlock()
pool, found := connectionPool[tcpAddress]
if !found {
pool = &sync.Pool{New: func() interface{} {
conn, err := net.Dial("tcp", tcpAddress)
if err != nil {
glog.Errorf("failed to connect to %s: %v", tcpAddress, err)
return conn
}
// println("connected", tcpAddress, "=>", conn.LocalAddr().String())
return conn
}}
connectionPool[tcpAddress] = pool
}
conn := pool.Get().(net.Conn)
// println("get connection", tcpAddress, "=>", conn.LocalAddr().String())
return conn
}
func releaseConnection(conn net.Conn, tcpAddress string) {
connectionPoolLock.Lock()
defer connectionPoolLock.Unlock()
pool, found := connectionPool[tcpAddress]
if !found {
// println("can not return connection", tcpAddress, "=>", conn.LocalAddr().String())
return
}
pool.Put(conn)
// println("returned connection", tcpAddress, "=>", conn.LocalAddr().String())
}
func toVolumeServerTcpAddress(volumeServer string) (grpcAddress string, err error) {
sepIndex := strings.LastIndex(volumeServer, ":")
port, err := strconv.Atoi(volumeServer[sepIndex+1:])
if err != nil {
glog.Errorf("failed to parse volume server address: %v", volumeServer)
return "", err
}
return fmt.Sprintf("%s:%d", volumeServer[0:sepIndex], port+20000), nil
}
func WithMasterServerClient(masterServer string, grpcDialOption grpc.DialOption, fn func(ctx2 context.Context, masterClient master_pb.SeaweedClient) error) error {
ctx := context.Background()

4
weed/pb/volume_server.proto

@ -104,6 +104,10 @@ message DeleteResult {
uint32 version = 5;
}
message TcpRequestHeader {
FileGetRequest get = 1;
}
message FileGetRequest {
string file_id = 1;
bool accept_gzip = 2;

567
weed/pb/volume_server_pb/volume_server.pb.go
File diff suppressed because it is too large
View File

150
weed/server/volume_tcp_file.go

@ -0,0 +1,150 @@
package weed_server
import (
"context"
"encoding/json"
"fmt"
"net"
"net/http"
"strings"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/chrislusf/seaweedfs/weed/util"
)
func (vs *VolumeServer) HandleTcpConnection(conn net.Conn) error {
// println("handle tcp conn", conn.RemoteAddr())
tcpMessage := &volume_server_pb.TcpRequestHeader{}
if err := util.ReadMessage(conn, tcpMessage); err != nil {
return fmt.Errorf("read message: %v", err)
}
if tcpMessage.Get != nil {
vs.handleFileGet(conn, tcpMessage.Get)
}
err := util.WriteMessageEOF(conn)
// println("processed", tcpMessage.Get.FileId)
return err
}
func (vs *VolumeServer) handleFileGet(conn net.Conn, req *volume_server_pb.FileGetRequest) error {
headResponse := &volume_server_pb.FileGetResponse{}
n := new(needle.Needle)
commaIndex := strings.LastIndex(req.FileId, ",")
vid := req.FileId[:commaIndex]
fid := req.FileId[commaIndex+1:]
volumeId, err := needle.NewVolumeId(vid)
if err != nil {
headResponse.ErrorCode = http.StatusBadRequest
return util.WriteMessage(conn, headResponse)
}
err = n.ParsePath(fid)
if err != nil {
headResponse.ErrorCode = http.StatusBadRequest
return util.WriteMessage(conn, headResponse)
}
hasVolume := vs.store.HasVolume(volumeId)
_, hasEcVolume := vs.store.FindEcVolume(volumeId)
if !hasVolume && !hasEcVolume {
headResponse.ErrorCode = http.StatusMovedPermanently
return util.WriteMessage(conn, headResponse)
}
cookie := n.Cookie
var count int
if hasVolume {
count, err = vs.store.ReadVolumeNeedle(volumeId, n)
} else if hasEcVolume {
count, err = vs.store.ReadEcShardNeedle(context.Background(), volumeId, n)
}
if err != nil || count < 0 {
headResponse.ErrorCode = http.StatusNotFound
return util.WriteMessage(conn, headResponse)
}
if n.Cookie != cookie {
headResponse.ErrorCode = http.StatusNotFound
return util.WriteMessage(conn, headResponse)
}
if n.LastModified != 0 {
headResponse.LastModified = n.LastModified
}
headResponse.Etag = n.Etag()
if n.HasPairs() {
pairMap := make(map[string]string)
err = json.Unmarshal(n.Pairs, &pairMap)
if err != nil {
glog.V(0).Infoln("Unmarshal pairs error:", err)
}
headResponse.Headers = pairMap
}
/*
// skip this, no redirection
if vs.tryHandleChunkedFile(n, filename, w, r) {
return
}
*/
if n.NameSize > 0 {
headResponse.Filename = string(n.Name)
}
mtype := ""
if n.MimeSize > 0 {
mt := string(n.Mime)
if !strings.HasPrefix(mt, "application/octet-stream") {
mtype = mt
}
}
headResponse.ContentType = mtype
headResponse.IsGzipped = n.IsGzipped()
if n.IsGzipped() && req.AcceptGzip {
if n.Data, err = util.UnGzipData(n.Data); err != nil {
glog.V(0).Infof("ungzip %s error: %v", req.FileId, err)
}
}
headResponse.ContentLength = uint32(len(n.Data))
bytesToRead := len(n.Data)
bytesRead := 0
t := headResponse
for bytesRead < bytesToRead {
stopIndex := bytesRead + BufferSizeLimit
if stopIndex > bytesToRead {
stopIndex = bytesToRead
}
if t == nil {
t = &volume_server_pb.FileGetResponse{}
}
t.Data = n.Data[bytesRead:stopIndex]
err = util.WriteMessage(conn, t)
t = nil
if err != nil {
return err
}
bytesRead = stopIndex
}
return nil
}

56
weed/util/http_util.go

@ -8,10 +8,14 @@ import (
"fmt"
"io"
"io/ioutil"
"math"
"net"
"net/http"
"net/url"
"strings"
"github.com/golang/protobuf/proto"
"github.com/chrislusf/seaweedfs/weed/glog"
)
@ -312,3 +316,55 @@ func CloseResponse(resp *http.Response) {
io.Copy(ioutil.Discard, resp.Body)
resp.Body.Close()
}
func WriteMessage(conn net.Conn, message proto.Message) error {
data, err := proto.Marshal(message)
if err != nil {
glog.Fatalf("marshal: %v", err)
}
messageSizeBytes := make([]byte, 4)
Uint32toBytes(messageSizeBytes, uint32(len(data)))
_, err = conn.Write(messageSizeBytes)
if err != nil {
return err
}
_, err = conn.Write(data)
return err
}
func WriteMessageEOF(conn net.Conn) error {
messageSizeBytes := make([]byte, 4)
Uint32toBytes(messageSizeBytes, math.MaxUint32)
_, err := conn.Write(messageSizeBytes)
return err
}
func ReadMessage(conn net.Conn, message proto.Message) error {
messageSizeBuffer := make([]byte, 4)
n, err := conn.Read(messageSizeBuffer)
if err != nil {
if err == io.EOF {
// println("unexpected eof")
return err
}
return fmt.Errorf("read message size byte length: %d %v", n, err)
}
if n != 4 {
return fmt.Errorf("unexpected message size byte length: %d", n)
}
messageSize := BytesToUint32(messageSizeBuffer)
if messageSize == math.MaxUint32 {
// println("marked eof")
return io.EOF
}
messageBytes := make([]byte, messageSize)
readMessageLength, err := conn.Read(messageBytes)
if readMessageLength != int(messageSize) {
return fmt.Errorf("message size:%d, expected:%d", readMessageLength, messageSize)
}
if err := proto.Unmarshal(messageBytes, message); err != nil {
return err
}
return nil
}
Loading…
Cancel
Save