Browse Source

s3 and filer transport using unix domain socket instead of tcp

pull/2733/head
chrislu 3 years ago
parent
commit
da3d330616
  1. 21
      weed/command/filer.go
  2. 2
      weed/command/s3.go
  3. 2
      weed/command/server.go
  4. 15
      weed/s3api/s3api_object_handlers.go
  5. 18
      weed/s3api/s3api_server.go

21
weed/command/filer.go

@ -2,6 +2,7 @@ package command
import ( import (
"fmt" "fmt"
"net"
"net/http" "net/http"
"os" "os"
"time" "time"
@ -51,6 +52,7 @@ type FilerOptions struct {
concurrentUploadLimitMB *int concurrentUploadLimitMB *int
debug *bool debug *bool
debugPort *int debugPort *int
localSocket *string
} }
func init() { func init() {
@ -76,6 +78,7 @@ func init() {
f.concurrentUploadLimitMB = cmdFiler.Flag.Int("concurrentUploadLimitMB", 128, "limit total concurrent upload size") f.concurrentUploadLimitMB = cmdFiler.Flag.Int("concurrentUploadLimitMB", 128, "limit total concurrent upload size")
f.debug = cmdFiler.Flag.Bool("debug", false, "serves runtime profiling data, e.g., http://localhost:<debug.port>/debug/pprof/goroutine?debug=2") f.debug = cmdFiler.Flag.Bool("debug", false, "serves runtime profiling data, e.g., http://localhost:<debug.port>/debug/pprof/goroutine?debug=2")
f.debugPort = cmdFiler.Flag.Int("debug.port", 6060, "http port for debugging") f.debugPort = cmdFiler.Flag.Int("debug.port", 6060, "http port for debugging")
f.localSocket = cmdFiler.Flag.String("localSocket", "", "default to /tmp/seaweedfs-filer-<port>.sock")
// start s3 on filer // start s3 on filer
filerStartS3 = cmdFiler.Flag.Bool("s3", false, "whether to start S3 gateway") filerStartS3 = cmdFiler.Flag.Bool("s3", false, "whether to start S3 gateway")
@ -139,11 +142,14 @@ func runFiler(cmd *Command, args []string) bool {
if *filerStartS3 { if *filerStartS3 {
filerS3Options.filer = &filerAddress filerS3Options.filer = &filerAddress
filerS3Options.bindIp = f.bindIp filerS3Options.bindIp = f.bindIp
filerS3Options.localFilerSocket = f.localSocket
go func() { go func() {
time.Sleep(startDelay * time.Second) time.Sleep(startDelay * time.Second)
filerS3Options.startS3Server() filerS3Options.startS3Server()
}() }()
startDelay++ startDelay++
} else {
f.localSocket = nil
} }
if *filerStartWebDav { if *filerStartWebDav {
@ -230,6 +236,18 @@ func (fo *FilerOptions) startFiler() {
glog.Fatalf("Filer listener error: %v", e) glog.Fatalf("Filer listener error: %v", e)
} }
// start on local unix socket
if *fo.localSocket == "" {
*fo.localSocket = fmt.Sprintf("/tmp/seaweefs-filer-%d.sock", *fo.port)
if err := os.Remove(*fo.localSocket); err != nil && !os.IsNotExist(err) {
glog.Fatalf("Failed to remove %s, error: %s", *fo.localSocket, err.Error())
}
}
filerSocketListener, err := net.Listen("unix", *fo.localSocket)
if err != nil {
glog.Fatalf("Failed to listen on %s: %v", *fo.localSocket, err)
}
// starting grpc server // starting grpc server
grpcPort := *fo.portGrpc grpcPort := *fo.portGrpc
grpcL, err := util.NewListener(util.JoinHostPort(*fo.bindIp, grpcPort), 0) grpcL, err := util.NewListener(util.JoinHostPort(*fo.bindIp, grpcPort), 0)
@ -242,6 +260,9 @@ func (fo *FilerOptions) startFiler() {
go grpcS.Serve(grpcL) go grpcS.Serve(grpcL)
httpS := &http.Server{Handler: defaultMux} httpS := &http.Server{Handler: defaultMux}
go func() {
httpS.Serve(filerSocketListener)
}()
if err := httpS.Serve(filerListener); err != nil { if err := httpS.Serve(filerListener); err != nil {
glog.Fatalf("Filer Fail to serve: %v", e) glog.Fatalf("Filer Fail to serve: %v", e)
} }

2
weed/command/s3.go

@ -34,6 +34,7 @@ type S3Options struct {
metricsHttpPort *int metricsHttpPort *int
allowEmptyFolder *bool allowEmptyFolder *bool
auditLogConfig *string auditLogConfig *string
localFilerSocket *string
} }
func init() { func init() {
@ -184,6 +185,7 @@ func (s3opt *S3Options) startS3Server() bool {
BucketsPath: filerBucketsPath, BucketsPath: filerBucketsPath,
GrpcDialOption: grpcDialOption, GrpcDialOption: grpcDialOption,
AllowEmptyFolder: *s3opt.allowEmptyFolder, AllowEmptyFolder: *s3opt.allowEmptyFolder,
LocalFilerSocket: s3opt.localFilerSocket,
}) })
if s3ApiServer_err != nil { if s3ApiServer_err != nil {
glog.Fatalf("S3 API Server startup error: %v", s3ApiServer_err) glog.Fatalf("S3 API Server startup error: %v", s3ApiServer_err)

2
weed/command/server.go

@ -112,6 +112,7 @@ func init() {
filerOptions.cipher = cmdServer.Flag.Bool("filer.encryptVolumeData", false, "encrypt data on volume servers") filerOptions.cipher = cmdServer.Flag.Bool("filer.encryptVolumeData", false, "encrypt data on volume servers")
filerOptions.saveToFilerLimit = cmdServer.Flag.Int("filer.saveToFilerLimit", 0, "Small files smaller than this limit can be cached in filer store.") filerOptions.saveToFilerLimit = cmdServer.Flag.Int("filer.saveToFilerLimit", 0, "Small files smaller than this limit can be cached in filer store.")
filerOptions.concurrentUploadLimitMB = cmdServer.Flag.Int("filer.concurrentUploadLimitMB", 64, "limit total concurrent upload size") filerOptions.concurrentUploadLimitMB = cmdServer.Flag.Int("filer.concurrentUploadLimitMB", 64, "limit total concurrent upload size")
filerOptions.localSocket = cmdServer.Flag.String("filer.localSocket", "", "default to /tmp/seaweedfs-filer-<port>.sock")
serverOptions.v.port = cmdServer.Flag.Int("volume.port", 8080, "volume server http listen port") serverOptions.v.port = cmdServer.Flag.Int("volume.port", 8080, "volume server http listen port")
serverOptions.v.portGrpc = cmdServer.Flag.Int("volume.port.grpc", 0, "volume server grpc listen port") serverOptions.v.portGrpc = cmdServer.Flag.Int("volume.port.grpc", 0, "volume server grpc listen port")
@ -245,6 +246,7 @@ func runServer(cmd *Command, args []string) bool {
if *isStartingS3 { if *isStartingS3 {
go func() { go func() {
time.Sleep(2 * time.Second) time.Sleep(2 * time.Second)
s3Options.localFilerSocket = filerOptions.localSocket
s3Options.startS3Server() s3Options.startS3Server()
}() }()
} }

15
weed/s3api/s3api_object_handlers.go

@ -27,17 +27,6 @@ import (
"github.com/chrislusf/seaweedfs/weed/util" "github.com/chrislusf/seaweedfs/weed/util"
) )
var (
client *http.Client
)
func init() {
client = &http.Client{Transport: &http.Transport{
MaxIdleConns: 1024,
MaxIdleConnsPerHost: 1024,
}}
}
func mimeDetect(r *http.Request, dataReader io.Reader) io.ReadCloser { func mimeDetect(r *http.Request, dataReader io.Reader) io.ReadCloser {
mimeBuffer := make([]byte, 512) mimeBuffer := make([]byte, 512)
size, _ := dataReader.Read(mimeBuffer) size, _ := dataReader.Read(mimeBuffer)
@ -335,7 +324,7 @@ func (s3a *S3ApiServer) proxyToFiler(w http.ResponseWriter, r *http.Request, des
// ensure that the Authorization header is overriding any previous // ensure that the Authorization header is overriding any previous
// Authorization header which might be already present in proxyReq // Authorization header which might be already present in proxyReq
s3a.maybeAddFilerJwtAuthorization(proxyReq, isWrite) s3a.maybeAddFilerJwtAuthorization(proxyReq, isWrite)
resp, postErr := client.Do(proxyReq)
resp, postErr := s3a.client.Do(proxyReq)
if postErr != nil { if postErr != nil {
glog.Errorf("post to filer: %v", postErr) glog.Errorf("post to filer: %v", postErr)
@ -401,7 +390,7 @@ func (s3a *S3ApiServer) putToFiler(r *http.Request, uploadUrl string, dataReader
// ensure that the Authorization header is overriding any previous // ensure that the Authorization header is overriding any previous
// Authorization header which might be already present in proxyReq // Authorization header which might be already present in proxyReq
s3a.maybeAddFilerJwtAuthorization(proxyReq, true) s3a.maybeAddFilerJwtAuthorization(proxyReq, true)
resp, postErr := client.Do(proxyReq)
resp, postErr := s3a.client.Do(proxyReq)
if postErr != nil { if postErr != nil {
glog.Errorf("post to filer: %v", postErr) glog.Errorf("post to filer: %v", postErr)

18
weed/s3api/s3api_server.go

@ -1,7 +1,9 @@
package s3api package s3api
import ( import (
"context"
"fmt" "fmt"
"net"
"net/http" "net/http"
"strings" "strings"
"time" "time"
@ -24,6 +26,7 @@ type S3ApiServerOption struct {
BucketsPath string BucketsPath string
GrpcDialOption grpc.DialOption GrpcDialOption grpc.DialOption
AllowEmptyFolder bool AllowEmptyFolder bool
LocalFilerSocket *string
} }
type S3ApiServer struct { type S3ApiServer struct {
@ -31,6 +34,7 @@ type S3ApiServer struct {
iam *IdentityAccessManagement iam *IdentityAccessManagement
randomClientId int32 randomClientId int32
filerGuard *security.Guard filerGuard *security.Guard
client *http.Client
} }
func NewS3ApiServer(router *mux.Router, option *S3ApiServerOption) (s3ApiServer *S3ApiServer, err error) { func NewS3ApiServer(router *mux.Router, option *S3ApiServerOption) (s3ApiServer *S3ApiServer, err error) {
@ -49,6 +53,20 @@ func NewS3ApiServer(router *mux.Router, option *S3ApiServerOption) (s3ApiServer
randomClientId: util.RandomInt32(), randomClientId: util.RandomInt32(),
filerGuard: security.NewGuard([]string{}, signingKey, expiresAfterSec, readSigningKey, readExpiresAfterSec), filerGuard: security.NewGuard([]string{}, signingKey, expiresAfterSec, readSigningKey, readExpiresAfterSec),
} }
if option.LocalFilerSocket == nil {
s3ApiServer.client = &http.Client{Transport: &http.Transport{
MaxIdleConns: 1024,
MaxIdleConnsPerHost: 1024,
}}
} else {
s3ApiServer.client = &http.Client{
Transport: &http.Transport{
DialContext: func(_ context.Context, _, _ string) (net.Conn, error) {
return net.Dial("unix", *option.LocalFilerSocket)
},
},
}
}
s3ApiServer.registerRouter(router) s3ApiServer.registerRouter(router)

Loading…
Cancel
Save