diff --git a/go.mod b/go.mod index 7969c4c89..08b41cb09 100644 --- a/go.mod +++ b/go.mod @@ -57,6 +57,7 @@ require ( github.com/olivere/elastic/v7 v7.0.19 github.com/peterh/liner v1.1.0 github.com/pierrec/lz4 v2.2.7+incompatible // indirect + github.com/pin/tftp v2.1.0+incompatible // indirect github.com/prometheus/client_golang v1.3.0 github.com/rakyll/statik v0.1.7 github.com/rcrowley/go-metrics v0.0.0-20190826022208-cac0b30c2563 // indirect diff --git a/go.sum b/go.sum index 223d290af..e66b0ecb9 100644 --- a/go.sum +++ b/go.sum @@ -619,6 +619,8 @@ github.com/pierrec/lz4 v1.0.2-0.20190131084431-473cd7ce01a1/go.mod h1:3/3N9NVKO0 github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= github.com/pierrec/lz4 v2.2.7+incompatible h1:Eerk9aiqeZo2QzsbWOAsELUf9ddvAxEdMY9LYze/DEc= github.com/pierrec/lz4 v2.2.7+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= +github.com/pin/tftp v2.1.0+incompatible h1:Yng4J7jv6lOc6IF4XoB5mnd3P7ZrF60XQq+my3FAMus= +github.com/pin/tftp v2.1.0+incompatible/go.mod h1:xVpZOMCXTy+A5QMjEVN0Glwa1sUvaJhFXbr/aAxuxGY= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= diff --git a/weed/command/benchmark.go b/weed/command/benchmark.go index af0793c70..d40a6cfe0 100644 --- a/weed/command/benchmark.go +++ b/weed/command/benchmark.go @@ -42,6 +42,7 @@ type BenchmarkOptions struct { masterClient *wdclient.MasterClient fsync *bool useTcp *bool + useUdp *bool } var ( @@ -68,7 +69,8 @@ func init() { b.cpuprofile = cmdBenchmark.Flag.String("cpuprofile", "", "cpu profile output file") b.maxCpu = cmdBenchmark.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs") b.fsync = cmdBenchmark.Flag.Bool("fsync", false, "flush data to disk after write") - b.useTcp = cmdBenchmark.Flag.Bool("useTcp", false, "send data via tcp") + b.useTcp = cmdBenchmark.Flag.Bool("useTcp", false, "write data via tcp") + b.useUdp = cmdBenchmark.Flag.Bool("useUdp", false, "write data via udp") sharedBytes = make([]byte, 1024) } @@ -226,6 +228,7 @@ func writeFiles(idChan chan int, fileIdLineChan chan string, s *stat) { random := rand.New(rand.NewSource(time.Now().UnixNano())) volumeTcpClient := wdclient.NewVolumeTcpClient() + volumeUdpClient := wdclient.NewVolumeUdpClient() for id := range idChan { start := time.Now() @@ -255,6 +258,14 @@ func writeFiles(idChan chan int, fileIdLineChan chan string, s *stat) { } else { s.failed++ } + } else if *b.useUdp { + if uploadByUdp(volumeUdpClient, fp) { + fileIdLineChan <- fp.Fid + s.completed++ + s.transferred += fileSize + } else { + s.failed++ + } } else if _, err := fp.Upload(0, b.masterClient.GetMaster, false, assignResult.Auth, b.grpcDialOption); err == nil { if random.Intn(100) < *b.deletePercentage { s.total++ @@ -352,6 +363,17 @@ func uploadByTcp(volumeTcpClient *wdclient.VolumeTcpClient, fp *operation.FilePa return true } +func uploadByUdp(volumeUdpClient *wdclient.VolumeUdpClient, fp *operation.FilePart) bool { + + err := volumeUdpClient.PutFileChunk(fp.Server, fp.Fid, uint32(fp.FileSize), fp.Reader) + if err != nil { + glog.Errorf("upload chunk err: %v", err) + return false + } + + return true +} + func readFileIds(fileName string, fileIdLineChan chan string) { file, err := os.Open(fileName) // For read access. if err != nil { diff --git a/weed/command/volume.go b/weed/command/volume.go index 0e8224dbc..002227a10 100644 --- a/weed/command/volume.go +++ b/weed/command/volume.go @@ -29,6 +29,7 @@ import ( stats_collect "github.com/chrislusf/seaweedfs/weed/stats" "github.com/chrislusf/seaweedfs/weed/storage" "github.com/chrislusf/seaweedfs/weed/util" + "github.com/pin/tftp" ) var ( @@ -256,6 +257,7 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v // starting tcp server if *v.enableTcp { go v.startTcpService(volumeServer) + go v.startUdpService(volumeServer) } // starting the cluster http server @@ -378,10 +380,10 @@ func (v VolumeServerOptions) startClusterHttpService(handler http.Handler) httpd func (v VolumeServerOptions) startTcpService(volumeServer *weed_server.VolumeServer) { listeningAddress := *v.bindIp + ":" + strconv.Itoa(*v.port+20000) - glog.V(0).Infoln("Start Seaweed volume server", util.Version(), "tcp at", listeningAddress) + glog.V(0).Infoln("Start Seaweed volume server", util.Version(), "TCP at", listeningAddress) listener, e := util.NewListener(listeningAddress, 0) if e != nil { - glog.Fatalf("Volume server listener error on %s:%v", listeningAddress, e) + glog.Fatalf("Volume server TCP on %s:%v", listeningAddress, e) } defer listener.Close() @@ -394,3 +396,13 @@ func (v VolumeServerOptions) startTcpService(volumeServer *weed_server.VolumeSer go volumeServer.HandleTcpConnection(c) } } + +func (v VolumeServerOptions) startUdpService(volumeServer *weed_server.VolumeServer) { + tftpServer := tftp.NewServer(volumeServer.UdpReadHandler, volumeServer.UdpWriteHandler) + listeningAddress := *v.bindIp + ":" + strconv.Itoa(*v.port+20001) + + glog.V(0).Infoln("Start Seaweed volume server", util.Version(), "UDP at", listeningAddress) + if e:= tftpServer.ListenAndServe(listeningAddress); e != nil { + glog.Fatalf("Volume server UDP on %s:%v", listeningAddress, e) + } +} diff --git a/weed/server/volume_server_udp_handlers.go b/weed/server/volume_server_udp_handlers.go new file mode 100644 index 000000000..99c43e3f4 --- /dev/null +++ b/weed/server/volume_server_udp_handlers.go @@ -0,0 +1,65 @@ +package weed_server + +import ( + "bytes" + "fmt" + "io" +) + +func (vs *VolumeServer) UdpReadHandler(filename string, rf io.ReaderFrom) error { + + volumeId, n, err := vs.parseFileId(filename) + if err != nil { + return err + } + + hasVolume := vs.store.HasVolume(volumeId) + _, hasEcVolume := vs.store.FindEcVolume(volumeId) + + if hasVolume { + if _, err = vs.store.ReadVolumeNeedle(volumeId, n, nil); err != nil { + return err + } + } + if hasEcVolume { + if _, err = vs.store.ReadEcShardNeedle(volumeId, n); err != nil { + return err + } + } + + if _, err = rf.ReadFrom(bytes.NewBuffer(n.Data)); err != nil { + return err + } + + return nil +} + +func (vs *VolumeServer) UdpWriteHandler(filename string, wt io.WriterTo) error { + + if filename[0] == '-' { + return vs.handleTcpDelete(filename[1:]) + } + + volumeId, n, err := vs.parseFileId(filename) + if err != nil { + return err + } + + volume := vs.store.GetVolume(volumeId) + if volume == nil { + return fmt.Errorf("volume %d not found", volumeId) + } + + var buf bytes.Buffer + written, err := wt.WriteTo(&buf) + if err != nil { + return err + } + + err = volume.StreamWrite(n, &buf, uint32(written)) + if err != nil { + return err + } + + return nil +} diff --git a/weed/wdclient/volume_udp_client.go b/weed/wdclient/volume_udp_client.go new file mode 100644 index 000000000..0b7e8cf6d --- /dev/null +++ b/weed/wdclient/volume_udp_client.go @@ -0,0 +1,42 @@ +package wdclient + +import ( + "github.com/chrislusf/seaweedfs/weed/pb" + "github.com/pin/tftp" + "io" +) + +// VolumeTcpClient put/get/delete file chunks directly on volume servers without replication +type VolumeUdpClient struct { + udpClient *tftp.Client +} + +func NewVolumeUdpClient() *VolumeUdpClient { + return &VolumeUdpClient{ + } +} + +func (c *VolumeUdpClient) PutFileChunk(volumeServerAddress string, fileId string, fileSize uint32, fileReader io.Reader) (err error) { + + udpAddress, parseErr := pb.ParseServerAddress(volumeServerAddress, 20001) + if parseErr != nil { + return parseErr + } + + if c.udpClient == nil { + c.udpClient, err = tftp.NewClient(udpAddress) + if err != nil { + return + } + } + rf, err := c.udpClient.Send(fileId, "octet") + if err != nil { + return + } + _, err = rf.ReadFrom(fileReader) + if err != nil { + return + } + + return +}