Browse Source

switch to idle timeout instead of read timeout

pull/2/head
Chris Lu 11 years ago
parent
commit
7c5c94785c
  1. 65
      go/util/net_timeout.go
  2. 17
      go/weed/master.go
  3. 32
      go/weed/server.go
  4. 18
      go/weed/volume.go
  5. 4
      go/weed/weed_server/volume_server_handlers.go

65
go/util/net_timeout.go

@ -0,0 +1,65 @@
package util
import (
"net"
"time"
)
// Listener wraps a net.Listener, and gives a place to store the timeout
// parameters. On Accept, it will wrap the net.Conn with our own Conn for us.
type Listener struct {
net.Listener
ReadTimeout time.Duration
WriteTimeout time.Duration
}
func (l *Listener) Accept() (net.Conn, error) {
c, err := l.Listener.Accept()
if err != nil {
return nil, err
}
tc := &Conn{
Conn: c,
ReadTimeout: l.ReadTimeout,
WriteTimeout: l.WriteTimeout,
}
return tc, nil
}
// Conn wraps a net.Conn, and sets a deadline for every read
// and write operation.
type Conn struct {
net.Conn
ReadTimeout time.Duration
WriteTimeout time.Duration
}
func (c *Conn) Read(b []byte) (int, error) {
err := c.Conn.SetReadDeadline(time.Now().Add(c.ReadTimeout))
if err != nil {
return 0, err
}
return c.Conn.Read(b)
}
func (c *Conn) Write(b []byte) (int, error) {
err := c.Conn.SetWriteDeadline(time.Now().Add(c.WriteTimeout))
if err != nil {
return 0, err
}
return c.Conn.Write(b)
}
func NewListener(addr string, timeout time.Duration) (net.Listener, error) {
l, err := net.Listen("tcp", addr)
if err != nil {
return nil, err
}
tl := &Listener{
Listener: l,
ReadTimeout: timeout,
WriteTimeout: timeout,
}
return tl, nil
}

17
go/weed/master.go

@ -35,7 +35,7 @@ var (
mpulse = cmdMaster.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats")
confFile = cmdMaster.Flag.String("conf", "/etc/weedfs/weedfs.conf", "xml configuration file")
defaultReplicaPlacement = cmdMaster.Flag.String("defaultReplication", "000", "Default replication type if not specified.")
mReadTimeout = cmdMaster.Flag.Int("readTimeout", 30, "connection read timeout in seconds")
mTimeout = cmdMaster.Flag.Int("idleTimeout", 10, "connection idle seconds")
mMaxCpu = cmdMaster.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs")
garbageThreshold = cmdMaster.Flag.String("garbageThreshold", "0.3", "threshold to vacuum and reclaim spaces")
masterWhiteListOption = cmdMaster.Flag.String("whiteList", "", "comma separated Ip addresses having write permission. No limit if empty.")
@ -62,10 +62,12 @@ func runMaster(cmd *Command, args []string) bool {
glog.V(0).Infoln("Start Weed Master", VERSION, "at port", *masterIp+":"+strconv.Itoa(*mport))
srv := &http.Server{
Addr: *masterIp + ":" + strconv.Itoa(*mport),
Handler: r,
ReadTimeout: time.Duration(*mReadTimeout) * time.Second,
listener, e := util.NewListener(
*masterIp+":"+strconv.Itoa(*mport),
time.Duration(*mTimeout)*time.Second,
)
if e != nil {
glog.Fatalf(e.Error())
}
go func() {
@ -78,9 +80,8 @@ func runMaster(cmd *Command, args []string) bool {
ms.SetRaftServer(raftServer)
}()
e := srv.ListenAndServe()
if e != nil {
glog.Fatalf("Fail to start:%s", e)
if e := http.Serve(listener, r); e != nil {
glog.Fatalf("Fail to serve:%s", e.Error())
}
return true
}

32
go/weed/server.go

@ -38,7 +38,7 @@ var cmdServer = &Command{
var (
serverIp = cmdServer.Flag.String("ip", "localhost", "ip or server name")
serverMaxCpu = cmdServer.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs")
serverReadTimeout = cmdServer.Flag.Int("readTimeout", 30, "connection read timeout in seconds. Increase this if uploading large files.")
serverTimeout = cmdServer.Flag.Int("idleTimeout", 10, "connection idle seconds")
serverDataCenter = cmdServer.Flag.String("dataCenter", "", "current volume server's data center name")
serverRack = cmdServer.Flag.String("rack", "", "current volume server's rack name")
serverWhiteListOption = cmdServer.Flag.String("whiteList", "", "comma separated Ip addresses having write permission. No limit if empty.")
@ -109,10 +109,12 @@ func runServer(cmd *Command, args []string) bool {
)
glog.V(0).Infoln("Start Weed Master", VERSION, "at port", *serverIp+":"+strconv.Itoa(*masterPort))
masterServer := &http.Server{
Addr: *serverIp + ":" + strconv.Itoa(*masterPort),
Handler: r,
ReadTimeout: time.Duration(*serverReadTimeout) * time.Second,
masterListener, e := util.NewListener(
*serverIp+":"+strconv.Itoa(*masterPort),
time.Duration(*serverTimeout)*time.Second,
)
if e != nil {
glog.Fatalf(e.Error())
}
go func() {
@ -128,9 +130,8 @@ func runServer(cmd *Command, args []string) bool {
}()
raftWaitForMaster.Done()
e := masterServer.ListenAndServe()
if e != nil {
glog.Fatalf("Fail to start master:%s", e)
if e := http.Serve(masterListener, r); e != nil {
glog.Fatalf("Master Fail to serve:%s", e.Error())
}
}()
@ -142,14 +143,15 @@ func runServer(cmd *Command, args []string) bool {
)
glog.V(0).Infoln("Start Weed volume server", VERSION, "at http://"+*serverIp+":"+strconv.Itoa(*volumePort))
volumeServer := &http.Server{
Addr: *serverIp + ":" + strconv.Itoa(*volumePort),
Handler: r,
ReadTimeout: (time.Duration(*serverReadTimeout) * time.Second),
}
e := volumeServer.ListenAndServe()
volumeListener, e := util.NewListener(
*serverIp+":"+strconv.Itoa(*volumePort),
time.Duration(*serverTimeout)*time.Second,
)
if e != nil {
glog.Fatalf("Fail to start volume:%s", e.Error())
glog.Fatalf(e.Error())
}
if e := http.Serve(volumeListener, r); e != nil {
glog.Fatalf("Fail to serve:%s", e.Error())
}
return true

18
go/weed/volume.go

@ -32,7 +32,7 @@ var (
publicUrl = cmdVolume.Flag.String("publicUrl", "", "Publicly accessible <ip|server_name>:<port>")
masterNode = cmdVolume.Flag.String("mserver", "localhost:9333", "master server location")
vpulse = cmdVolume.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats, must be smaller than or equal to the master's setting")
vReadTimeout = cmdVolume.Flag.Int("readTimeout", 30, "connection read timeout in seconds. Increase this if uploading large files.")
vTimeout = cmdVolume.Flag.Int("idleTimeout", 10, "connection idle seconds")
vMaxCpu = cmdVolume.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs")
dataCenter = cmdVolume.Flag.String("dataCenter", "", "current volume server's data center name")
rack = cmdVolume.Flag.String("rack", "", "current volume server's rack name")
@ -79,14 +79,16 @@ func runVolume(cmd *Command, args []string) bool {
)
glog.V(0).Infoln("Start Weed volume server", VERSION, "at http://"+*ip+":"+strconv.Itoa(*vport))
srv := &http.Server{
Addr: *ip + ":" + strconv.Itoa(*vport),
Handler: r,
ReadTimeout: (time.Duration(*vReadTimeout) * time.Second),
}
e := srv.ListenAndServe()
listener, e := util.NewListener(
*ip+":"+strconv.Itoa(*vport),
time.Duration(*vTimeout)*time.Second,
)
if e != nil {
glog.Fatalf("Fail to start:%s", e.Error())
glog.Fatalf(e.Error())
}
if e := http.Serve(listener, r); e != nil {
glog.Fatalf("Fail to serve:%s", e.Error())
}
return true
}

4
go/weed/weed_server/volume_server_handlers.go

@ -134,7 +134,7 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request,
return
}
if n.Cookie != cookie {
glog.V(0).Infoln("request with unmaching cookie seen:", cookie, "expected:", n.Cookie, "from", r.RemoteAddr, "agent", r.UserAgent())
glog.V(0).Infoln("request", r.URL.Path, "with unmaching cookie seen:", cookie, "expected:", n.Cookie, "from", r.RemoteAddr, "agent", r.UserAgent())
w.WriteHeader(http.StatusNotFound)
return
}
@ -235,7 +235,7 @@ func (vs *VolumeServer) DeleteHandler(w http.ResponseWriter, r *http.Request) {
}
if n.Cookie != cookie {
glog.V(0).Infoln("delete with unmaching cookie from ", r.RemoteAddr, "agent", r.UserAgent())
glog.V(0).Infoln("delete", r.URL.Path, "with unmaching cookie from ", r.RemoteAddr, "agent", r.UserAgent())
return
}

Loading…
Cancel
Save