chrislu
2 years ago
4 changed files with 0 additions and 270 deletions
-
1weed/command/server.go
-
26weed/command/volume.go
-
138weed/server/volume_server_tcp_handlers_write.go
-
105weed/storage/volume_stream_write.go
@ -1,138 +0,0 @@ |
|||||
package weed_server |
|
||||
|
|
||||
import ( |
|
||||
"bufio" |
|
||||
"fmt" |
|
||||
"io" |
|
||||
"net" |
|
||||
"strings" |
|
||||
|
|
||||
"github.com/seaweedfs/seaweedfs/weed/glog" |
|
||||
"github.com/seaweedfs/seaweedfs/weed/storage/needle" |
|
||||
"github.com/seaweedfs/seaweedfs/weed/util" |
|
||||
) |
|
||||
|
|
||||
func (vs *VolumeServer) HandleTcpConnection(c net.Conn) { |
|
||||
defer c.Close() |
|
||||
|
|
||||
glog.V(0).Infof("Serving writes from %s", c.RemoteAddr().String()) |
|
||||
|
|
||||
bufReader := bufio.NewReaderSize(c, 1024*1024) |
|
||||
bufWriter := bufio.NewWriterSize(c, 1024*1024) |
|
||||
|
|
||||
for { |
|
||||
cmd, err := bufReader.ReadString('\n') |
|
||||
if err != nil { |
|
||||
if err != io.EOF { |
|
||||
glog.Errorf("read command from %s: %v", c.RemoteAddr().String(), err) |
|
||||
} |
|
||||
return |
|
||||
} |
|
||||
cmd = cmd[:len(cmd)-1] |
|
||||
switch cmd[0] { |
|
||||
case '+': |
|
||||
fileId := cmd[1:] |
|
||||
err = vs.handleTcpPut(fileId, bufReader) |
|
||||
if err == nil { |
|
||||
bufWriter.Write([]byte("+OK\n")) |
|
||||
} else { |
|
||||
bufWriter.Write([]byte("-ERR " + string(err.Error()) + "\n")) |
|
||||
} |
|
||||
case '-': |
|
||||
fileId := cmd[1:] |
|
||||
err = vs.handleTcpDelete(fileId) |
|
||||
if err == nil { |
|
||||
bufWriter.Write([]byte("+OK\n")) |
|
||||
} else { |
|
||||
bufWriter.Write([]byte("-ERR " + string(err.Error()) + "\n")) |
|
||||
} |
|
||||
case '?': |
|
||||
fileId := cmd[1:] |
|
||||
err = vs.handleTcpGet(fileId, bufWriter) |
|
||||
case '!': |
|
||||
bufWriter.Flush() |
|
||||
} |
|
||||
|
|
||||
} |
|
||||
|
|
||||
} |
|
||||
|
|
||||
func (vs *VolumeServer) handleTcpGet(fileId string, writer *bufio.Writer) (err error) { |
|
||||
|
|
||||
volumeId, n, err2 := vs.parseFileId(fileId) |
|
||||
if err2 != nil { |
|
||||
return err2 |
|
||||
} |
|
||||
|
|
||||
volume := vs.store.GetVolume(volumeId) |
|
||||
if volume == nil { |
|
||||
return fmt.Errorf("volume %d not found", volumeId) |
|
||||
} |
|
||||
|
|
||||
err = volume.StreamRead(n, writer) |
|
||||
if err != nil { |
|
||||
return err |
|
||||
} |
|
||||
|
|
||||
return nil |
|
||||
} |
|
||||
|
|
||||
func (vs *VolumeServer) handleTcpPut(fileId string, bufReader *bufio.Reader) (err error) { |
|
||||
|
|
||||
volumeId, n, err2 := vs.parseFileId(fileId) |
|
||||
if err2 != nil { |
|
||||
return err2 |
|
||||
} |
|
||||
|
|
||||
volume := vs.store.GetVolume(volumeId) |
|
||||
if volume == nil { |
|
||||
return fmt.Errorf("volume %d not found", volumeId) |
|
||||
} |
|
||||
|
|
||||
sizeBuf := make([]byte, 4) |
|
||||
if _, err = bufReader.Read(sizeBuf); err != nil { |
|
||||
return err |
|
||||
} |
|
||||
dataSize := util.BytesToUint32(sizeBuf) |
|
||||
|
|
||||
err = volume.StreamWrite(n, bufReader, dataSize) |
|
||||
if err != nil { |
|
||||
return err |
|
||||
} |
|
||||
|
|
||||
return nil |
|
||||
} |
|
||||
|
|
||||
func (vs *VolumeServer) handleTcpDelete(fileId string) (err error) { |
|
||||
|
|
||||
volumeId, n, err2 := vs.parseFileId(fileId) |
|
||||
if err2 != nil { |
|
||||
return err2 |
|
||||
} |
|
||||
|
|
||||
_, err = vs.store.DeleteVolumeNeedle(volumeId, n) |
|
||||
if err != nil { |
|
||||
return err |
|
||||
} |
|
||||
|
|
||||
return nil |
|
||||
} |
|
||||
|
|
||||
func (vs *VolumeServer) parseFileId(fileId string) (needle.VolumeId, *needle.Needle, error) { |
|
||||
|
|
||||
commaIndex := strings.LastIndex(fileId, ",") |
|
||||
if commaIndex <= 0 { |
|
||||
return 0, nil, fmt.Errorf("unknown fileId %s", fileId) |
|
||||
} |
|
||||
|
|
||||
vid, fid := fileId[0:commaIndex], fileId[commaIndex+1:] |
|
||||
|
|
||||
volumeId, ve := needle.NewVolumeId(vid) |
|
||||
if ve != nil { |
|
||||
return 0, nil, fmt.Errorf("unknown volume id in fileId %s", fileId) |
|
||||
} |
|
||||
|
|
||||
n := new(needle.Needle) |
|
||||
n.ParsePath(fid) |
|
||||
return volumeId, n, nil |
|
||||
} |
|
@ -1,105 +0,0 @@ |
|||||
package storage |
|
||||
|
|
||||
import ( |
|
||||
"bufio" |
|
||||
"fmt" |
|
||||
"io" |
|
||||
"time" |
|
||||
|
|
||||
"github.com/seaweedfs/seaweedfs/weed/util" |
|
||||
|
|
||||
"github.com/seaweedfs/seaweedfs/weed/glog" |
|
||||
"github.com/seaweedfs/seaweedfs/weed/storage/backend" |
|
||||
"github.com/seaweedfs/seaweedfs/weed/storage/needle" |
|
||||
. "github.com/seaweedfs/seaweedfs/weed/storage/types" |
|
||||
) |
|
||||
|
|
||||
func (v *Volume) StreamWrite(n *needle.Needle, data io.Reader, dataSize uint32) (err error) { |
|
||||
|
|
||||
v.dataFileAccessLock.Lock() |
|
||||
defer v.dataFileAccessLock.Unlock() |
|
||||
|
|
||||
df, ok := v.DataBackend.(*backend.DiskFile) |
|
||||
if !ok { |
|
||||
return fmt.Errorf("unexpected volume backend") |
|
||||
} |
|
||||
offset, _, _ := v.DataBackend.GetStat() |
|
||||
|
|
||||
header := make([]byte, NeedleHeaderSize+TimestampSize) // adding timestamp to reuse it and avoid extra allocation
|
|
||||
CookieToBytes(header[0:CookieSize], n.Cookie) |
|
||||
NeedleIdToBytes(header[CookieSize:CookieSize+NeedleIdSize], n.Id) |
|
||||
n.Size = 4 + Size(dataSize) + 1 |
|
||||
SizeToBytes(header[CookieSize+NeedleIdSize:CookieSize+NeedleIdSize+SizeSize], n.Size) |
|
||||
|
|
||||
n.DataSize = dataSize |
|
||||
|
|
||||
// needle header
|
|
||||
df.Write(header[0:NeedleHeaderSize]) |
|
||||
|
|
||||
// data size and data
|
|
||||
util.Uint32toBytes(header[0:4], n.DataSize) |
|
||||
df.Write(header[0:4]) |
|
||||
// write and calculate CRC
|
|
||||
crcWriter := needle.NewCRCwriter(df) |
|
||||
io.Copy(crcWriter, io.LimitReader(data, int64(dataSize))) |
|
||||
|
|
||||
// flags
|
|
||||
util.Uint8toBytes(header[0:1], n.Flags) |
|
||||
df.Write(header[0:1]) |
|
||||
|
|
||||
// data checksum
|
|
||||
util.Uint32toBytes(header[0:needle.NeedleChecksumSize], crcWriter.Sum()) |
|
||||
// write timestamp, padding
|
|
||||
n.AppendAtNs = uint64(time.Now().UnixNano()) |
|
||||
util.Uint64toBytes(header[needle.NeedleChecksumSize:needle.NeedleChecksumSize+TimestampSize], n.AppendAtNs) |
|
||||
padding := needle.PaddingLength(n.Size, needle.Version3) |
|
||||
df.Write(header[0 : needle.NeedleChecksumSize+TimestampSize+padding]) |
|
||||
|
|
||||
// add to needle map
|
|
||||
if err = v.nm.Put(n.Id, ToOffset(int64(offset)), n.Size); err != nil { |
|
||||
glog.V(4).Infof("failed to save in needle map %d: %v", n.Id, err) |
|
||||
} |
|
||||
return |
|
||||
} |
|
||||
|
|
||||
func (v *Volume) StreamRead(n *needle.Needle, writer io.Writer) (err error) { |
|
||||
|
|
||||
v.dataFileAccessLock.Lock() |
|
||||
defer v.dataFileAccessLock.Unlock() |
|
||||
|
|
||||
nv, ok := v.nm.Get(n.Id) |
|
||||
if !ok || nv.Offset.IsZero() { |
|
||||
return ErrorNotFound |
|
||||
} |
|
||||
|
|
||||
sr := &StreamReader{ |
|
||||
readerAt: v.DataBackend, |
|
||||
offset: nv.Offset.ToActualOffset(), |
|
||||
} |
|
||||
bufReader := bufio.NewReader(sr) |
|
||||
bufReader.Discard(NeedleHeaderSize) |
|
||||
sizeBuf := make([]byte, 4) |
|
||||
bufReader.Read(sizeBuf) |
|
||||
if _, err = writer.Write(sizeBuf); err != nil { |
|
||||
return err |
|
||||
} |
|
||||
dataSize := util.BytesToUint32(sizeBuf) |
|
||||
|
|
||||
_, err = io.Copy(writer, io.LimitReader(bufReader, int64(dataSize))) |
|
||||
|
|
||||
return |
|
||||
} |
|
||||
|
|
||||
type StreamReader struct { |
|
||||
offset int64 |
|
||||
readerAt io.ReaderAt |
|
||||
} |
|
||||
|
|
||||
func (sr *StreamReader) Read(p []byte) (n int, err error) { |
|
||||
n, err = sr.readerAt.ReadAt(p, sr.offset) |
|
||||
if err != nil { |
|
||||
return |
|
||||
} |
|
||||
sr.offset += int64(n) |
|
||||
return |
|
||||
} |
|
Write
Preview
Loading…
Cancel
Save
Reference in new issue