Chris Lu
4 years ago
6 changed files with 287 additions and 2 deletions
-
22weed/command/volume.go
-
133weed/server/volume_server_tcp_handlers_write.go
-
2weed/storage/backend/disk_file.go
-
24weed/storage/needle/crc.go
-
106weed/storage/volume_stream_write.go
-
2weed/storage/volume_vacuum.go
@ -0,0 +1,133 @@ |
|||
package weed_server |
|||
|
|||
import ( |
|||
"bufio" |
|||
"fmt" |
|||
"github.com/chrislusf/seaweedfs/weed/glog" |
|||
"github.com/chrislusf/seaweedfs/weed/storage/needle" |
|||
"github.com/chrislusf/seaweedfs/weed/util" |
|||
"net" |
|||
"strings" |
|||
) |
|||
|
|||
func (vs *VolumeServer) HandleTcpConnection(c net.Conn) { |
|||
defer c.Close() |
|||
|
|||
glog.V(0).Infof("Serving writes from %s", c.RemoteAddr().String()) |
|||
|
|||
bufReader := bufio.NewReaderSize(c, 1024*1024) |
|||
bufWriter := bufio.NewWriterSize(c, 1024*1024) |
|||
|
|||
for { |
|||
cmd, err := bufReader.ReadString('\n') |
|||
if err != nil { |
|||
glog.Errorf("read command from %s: %v", c.RemoteAddr().String(), err) |
|||
return |
|||
} |
|||
switch cmd[0] { |
|||
case '+': |
|||
fileId := cmd[1:] |
|||
err = vs.handleTcpPut(fileId, bufReader) |
|||
if err == nil { |
|||
bufWriter.Write([]byte("+OK\n")) |
|||
} else { |
|||
bufWriter.Write([]byte("-ERR " + string(err.Error()) + "\n")) |
|||
} |
|||
case '-': |
|||
fileId := cmd[1:] |
|||
err = vs.handleTcpDelete(fileId) |
|||
if err == nil { |
|||
bufWriter.Write([]byte("+OK\n")) |
|||
} else { |
|||
bufWriter.Write([]byte("-ERR " + string(err.Error()) + "\n")) |
|||
} |
|||
case '?': |
|||
fileId := cmd[1:] |
|||
err = vs.handleTcpGet(fileId, bufWriter) |
|||
case '!': |
|||
bufWriter.Flush() |
|||
} |
|||
|
|||
} |
|||
|
|||
} |
|||
|
|||
func (vs *VolumeServer) handleTcpGet(fileId string, writer *bufio.Writer) (err error) { |
|||
|
|||
volumeId, n, err2 := vs.parseFileId(fileId) |
|||
if err2 != nil { |
|||
return err2 |
|||
} |
|||
|
|||
volume := vs.store.GetVolume(volumeId) |
|||
if volume == nil { |
|||
return fmt.Errorf("volume %d not found", volumeId) |
|||
} |
|||
|
|||
err = volume.StreamRead(n, writer) |
|||
if err != nil { |
|||
return err |
|||
} |
|||
|
|||
return nil |
|||
} |
|||
|
|||
func (vs *VolumeServer) handleTcpPut(fileId string, bufReader *bufio.Reader) (err error) { |
|||
|
|||
volumeId, n, err2 := vs.parseFileId(fileId) |
|||
if err2 != nil { |
|||
return err2 |
|||
} |
|||
|
|||
volume := vs.store.GetVolume(volumeId) |
|||
if volume == nil { |
|||
return fmt.Errorf("volume %d not found", volumeId) |
|||
} |
|||
|
|||
sizeBuf := make([]byte, 4) |
|||
if _, err = bufReader.Read(sizeBuf); err != nil { |
|||
return err |
|||
} |
|||
dataSize := util.BytesToUint32(sizeBuf) |
|||
|
|||
err = volume.StreamWrite(n, bufReader, dataSize) |
|||
if err != nil { |
|||
return err |
|||
} |
|||
|
|||
return nil |
|||
} |
|||
|
|||
func (vs *VolumeServer) handleTcpDelete(fileId string) (err error) { |
|||
|
|||
volumeId, n, err2 := vs.parseFileId(fileId) |
|||
if err2 != nil { |
|||
return err2 |
|||
} |
|||
|
|||
_, err = vs.store.DeleteVolumeNeedle(volumeId, n) |
|||
if err != nil { |
|||
return err |
|||
} |
|||
|
|||
return nil |
|||
} |
|||
|
|||
func (vs *VolumeServer) parseFileId(fileId string) (needle.VolumeId, *needle.Needle, error) { |
|||
|
|||
commaIndex := strings.LastIndex(fileId, ",") |
|||
if commaIndex <= 0 { |
|||
return 0, nil, fmt.Errorf("unknown fileId %s", fileId) |
|||
} |
|||
|
|||
vid, fid := fileId[0:commaIndex], fileId[commaIndex+1:] |
|||
|
|||
volumeId, ve := needle.NewVolumeId(vid) |
|||
if ve != nil { |
|||
return 0, nil, fmt.Errorf("unknown volume id in fileId %s", fileId) |
|||
} |
|||
|
|||
n := new(needle.Needle) |
|||
n.ParsePath(fid) |
|||
return volumeId, n, nil |
|||
} |
@ -0,0 +1,106 @@ |
|||
package storage |
|||
|
|||
import ( |
|||
"bufio" |
|||
"fmt" |
|||
"github.com/chrislusf/seaweedfs/weed/util" |
|||
"io" |
|||
"time" |
|||
|
|||
"github.com/chrislusf/seaweedfs/weed/glog" |
|||
"github.com/chrislusf/seaweedfs/weed/storage/backend" |
|||
"github.com/chrislusf/seaweedfs/weed/storage/needle" |
|||
. "github.com/chrislusf/seaweedfs/weed/storage/types" |
|||
) |
|||
|
|||
func (v *Volume) StreamWrite(n *needle.Needle, data io.Reader, dataSize uint32) (err error) { |
|||
|
|||
v.dataFileAccessLock.Lock() |
|||
defer v.dataFileAccessLock.Unlock() |
|||
|
|||
df, ok := v.DataBackend.(*backend.DiskFile) |
|||
if !ok { |
|||
return fmt.Errorf("unexpected volume backend") |
|||
} |
|||
offset, _, _ := v.DataBackend.GetStat() |
|||
|
|||
header := make([]byte, NeedleHeaderSize+TimestampSize) // adding timestamp to reuse it and avoid extra allocation
|
|||
CookieToBytes(header[0:CookieSize], n.Cookie) |
|||
NeedleIdToBytes(header[CookieSize:CookieSize+NeedleIdSize], n.Id) |
|||
n.Size = 4 + Size(dataSize) + 1 |
|||
SizeToBytes(header[CookieSize+NeedleIdSize:CookieSize+NeedleIdSize+SizeSize], n.Size) |
|||
|
|||
n.DataSize = dataSize |
|||
|
|||
// needle header
|
|||
df.Write(header[0:NeedleHeaderSize]) |
|||
|
|||
// data size and data
|
|||
util.Uint32toBytes(header[0:4], n.DataSize) |
|||
df.Write(header[0:4]) |
|||
// write and calculate CRC
|
|||
crcWriter := needle.NewCRCwriter(df) |
|||
io.Copy(crcWriter, io.LimitReader(data, int64(dataSize))) |
|||
|
|||
// flags
|
|||
util.Uint8toBytes(header[0:1], n.Flags) |
|||
df.Write(header[0:1]) |
|||
|
|||
// data checksum
|
|||
util.Uint32toBytes(header[0:needle.NeedleChecksumSize], crcWriter.Sum()) |
|||
df.Write(header[0:needle.NeedleChecksumSize]) |
|||
|
|||
// write timestamp, padding
|
|||
n.AppendAtNs = uint64(time.Now().UnixNano()) |
|||
util.Uint64toBytes(header[needle.NeedleChecksumSize:needle.NeedleChecksumSize+TimestampSize], n.AppendAtNs) |
|||
padding := needle.PaddingLength(n.Size, needle.Version3) |
|||
df.Write(header[0 : needle.NeedleChecksumSize+TimestampSize+padding]) |
|||
|
|||
// add to needle map
|
|||
if err = v.nm.Put(n.Id, ToOffset(int64(offset)), n.Size); err != nil { |
|||
glog.V(4).Infof("failed to save in needle map %d: %v", n.Id, err) |
|||
} |
|||
return |
|||
} |
|||
|
|||
func (v *Volume) StreamRead(n *needle.Needle, writer io.Writer) (err error) { |
|||
|
|||
v.dataFileAccessLock.Lock() |
|||
defer v.dataFileAccessLock.Unlock() |
|||
|
|||
nv, ok := v.nm.Get(n.Id) |
|||
if !ok || nv.Offset.IsZero() { |
|||
return ErrorNotFound |
|||
} |
|||
|
|||
sr := &StreamReader{ |
|||
readerAt: v.DataBackend, |
|||
offset: nv.Offset.ToActualOffset(), |
|||
} |
|||
bufReader := bufio.NewReader(sr) |
|||
bufReader.Discard(NeedleHeaderSize) |
|||
sizeBuf := make([]byte, 4) |
|||
bufReader.Read(sizeBuf) |
|||
if _, err = writer.Write(sizeBuf); err != nil { |
|||
return err |
|||
} |
|||
dataSize := util.BytesToUint32(sizeBuf) |
|||
|
|||
_, err = io.Copy(writer, io.LimitReader(bufReader, int64(dataSize))) |
|||
|
|||
return |
|||
} |
|||
|
|||
type StreamReader struct { |
|||
offset int64 |
|||
readerAt io.ReaderAt |
|||
} |
|||
|
|||
func (sr *StreamReader) Read(p []byte) (n int, err error) { |
|||
n, err = sr.readerAt.ReadAt(p, sr.offset) |
|||
if err != nil { |
|||
return |
|||
} |
|||
sr.offset += int64(n) |
|||
return |
|||
} |
Write
Preview
Loading…
Cancel
Save
Reference in new issue