From e12600cc8eea9a36115d465d2a7511b1b30aad07 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sat, 13 Mar 2021 14:48:28 -0800 Subject: [PATCH] still not working --- weed/command/volume.go | 2 +- weed/server/volume_server_udp_handlers.go | 97 ++++++++--------------- weed/wdclient/volume_udp_client.go | 51 +++++------- 3 files changed, 55 insertions(+), 95 deletions(-) diff --git a/weed/command/volume.go b/weed/command/volume.go index 1468aa2ee..ab3c63a9a 100644 --- a/weed/command/volume.go +++ b/weed/command/volume.go @@ -417,7 +417,7 @@ func (v VolumeServerOptions) startUdpService(volumeServer *weed_server.VolumeSer conn, err := listener.Accept() if err == nil { glog.V(0).Infof("Client from %s", conn.RemoteAddr()) - go volumeServer.HandleTcpConnection(conn) + go volumeServer.HandleUdpConnection(conn) } else if isTemporaryError(err) { continue } else { diff --git a/weed/server/volume_server_udp_handlers.go b/weed/server/volume_server_udp_handlers.go index 2f7563f4c..b1f95446c 100644 --- a/weed/server/volume_server_udp_handlers.go +++ b/weed/server/volume_server_udp_handlers.go @@ -1,81 +1,48 @@ package weed_server import ( + "bufio" "github.com/chrislusf/seaweedfs/weed/glog" - "pack.ag/tftp" + "io" + "net" ) -func (vs *VolumeServer) ServeTFTP(r tftp.ReadRequest) { +func (vs *VolumeServer) HandleUdpConnection(c net.Conn) { + defer c.Close() - filename := r.Name() + glog.V(0).Infof("Serving writes from %s", c.RemoteAddr().String()) - volumeId, n, err := vs.parseFileId(filename) - if err != nil { - glog.Errorf("parse file id %s: %v", filename, err) - return - } - - hasVolume := vs.store.HasVolume(volumeId) - _, hasEcVolume := vs.store.FindEcVolume(volumeId) - - if hasVolume { - if _, err = vs.store.ReadVolumeNeedle(volumeId, n, nil); err != nil { - glog.Errorf("ReadVolumeNeedle %s: %v", filename, err) - return - } - } - if hasEcVolume { - if _, err = vs.store.ReadEcShardNeedle(volumeId, n); err != nil { - glog.Errorf("ReadEcShardNeedle %s: %v", filename, err) - return - } - } - - if _, err = r.Write(n.Data); err != nil { - glog.Errorf("UDP Write data %s: %v", filename, err) - return - } - -} - -func (vs *VolumeServer) ReceiveTFTP(w tftp.WriteRequest) { - - filename := w.Name() - println("+ ", filename) + bufReader := bufio.NewReaderSize(c, 1024*1024) + bufWriter := bufio.NewWriterSize(c, 1024*1024) - // Get the file size - size, err := w.Size() - - // Note: The size value is sent by the client, the client could send more data than - // it indicated in the size option. To be safe we'd want to allocate a buffer - // with the size we're expecting and use w.Read(buf) rather than ioutil.ReadAll. - - if filename[0] == '-' { - err = vs.handleTcpDelete(filename[1:]) + for { + cmd, err := bufReader.ReadString('\n') if err != nil { - glog.Errorf("handleTcpDelete %s: %v", filename, err) + if err != io.EOF { + glog.Errorf("read command from %s: %v", c.RemoteAddr().String(), err) + } return } - } - - volumeId, n, err := vs.parseFileId(filename) - if err != nil { - glog.Errorf("parse file id %s: %v", filename, err) - return - } - - volume := vs.store.GetVolume(volumeId) - if volume == nil { - glog.Errorf("volume %d not found", volumeId) - return - } + cmd = cmd[:len(cmd)-1] + switch cmd[0] { + case '+': + fileId := cmd[1:] + err = vs.handleTcpPut(fileId, bufReader) + if err != nil { + glog.Errorf("put %s: %v", fileId, err) + } + case '-': + fileId := cmd[1:] + err = vs.handleTcpDelete(fileId) + if err != nil { + glog.Errorf("del %s: %v", fileId, err) + } + case '?': + fileId := cmd[1:] + err = vs.handleTcpGet(fileId, bufWriter) + case '!': + } - err = volume.StreamWrite(n, w, uint32(size)) - if err != nil { - glog.Errorf("StreamWrite %s: %v", filename, err) - return } - println("- ", filename) - } diff --git a/weed/wdclient/volume_udp_client.go b/weed/wdclient/volume_udp_client.go index 93fd2b227..470d7a82d 100644 --- a/weed/wdclient/volume_udp_client.go +++ b/weed/wdclient/volume_udp_client.go @@ -2,9 +2,6 @@ package wdclient import ( "bufio" - "bytes" - "fmt" - "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/udptransfer" "github.com/chrislusf/seaweedfs/weed/util" @@ -70,45 +67,41 @@ func (c *VolumeUdpClient) PutFileChunk(volumeServerAddress string, fileId string return parseErr } - c.cp.Register("udp", udpAddress) - udpConn, getErr := c.cp.Get("udp", udpAddress) - if getErr != nil { - return fmt.Errorf("get connection to %s: %v", udpAddress, getErr) + listener, err := udptransfer.NewEndpoint(&udptransfer.Params{ + LocalAddr: "", + Bandwidth: 100, + FastRetransmit: true, + FlatTraffic: true, + IsServ: false, + }) + if err != nil { + return err } - conn := udpConn.RawConn().(*VolumeUdpConn) - defer func() { - if err != nil { - udpConn.DiscardConnection() - } else { - udpConn.ReleaseConnection() - } - }() - buf := []byte("+" + fileId + "\n") - _, err = conn.bufWriter.Write([]byte(buf)) + conn, err := listener.Dial(udpAddress) if err != nil { - return + return err } - util.Uint32toBytes(buf[0:4], fileSize) - _, err = conn.bufWriter.Write(buf[0:4]) + defer conn.Close() + + bufWriter := bufio.NewWriter(conn) + + buf := []byte("+" + fileId + "\n") + _, err = bufWriter.Write([]byte(buf)) if err != nil { return } - _, err = io.Copy(conn.bufWriter, fileReader) + util.Uint32toBytes(buf[0:4], fileSize) + _, err = bufWriter.Write(buf[0:4]) if err != nil { return } - conn.bufWriter.Write([]byte("!\n")) - conn.bufWriter.Flush() - - ret, _, err := conn.bufReader.ReadLine() + _, err = io.Copy(bufWriter, fileReader) if err != nil { - glog.V(0).Infof("upload by udp: %v", err) return } - if !bytes.HasPrefix(ret, []byte("+OK")) { - glog.V(0).Infof("upload by udp: %v", string(ret)) - } + bufWriter.Write([]byte("!\n")) + bufWriter.Flush() return nil }