avoid slice out of bounds
avoid this problem
2018/09/04 16:27:14 fuse: panic in handler for Write [ID=0x27c0d Node=0x2 Uid=0 Gid=0 Pid=0] 0x1 131072 @10607788032 fl=WriteCache lock=0 ffl=OpenReadOnly: runtime error: slice bounds out of range
goroutine 211141 [running]:
bazil.org/fuse/fs.(*Server).serve.func2(0x10d3e60, 0xc00014be30, 0xc00052fef8, 0xc00052fe77)
/home/travis/gopath/src/bazil.org/fuse/fs/serve.go:857 +0x1ac
panic(0xe2d080, 0x17f62b0)
/home/travis/.gimme/versions/go/src/runtime/panic.go:513 +0x1b9
github.com/chrislusf/seaweedfs/weed/filesys.(*ContinuousDirtyPages).saveToStorage(0xc0000aca80, 0x10d7ba0, 0xc0003fcc00, 0xc0005dc000, 0x20000, 0x1000000, 0x276720000, 0xc0003feaa0, 0x0, 0x0)
/home/travis/gopath/src/github.com/chrislusf/seaweedfs/weed/filesys/dirty_page.go:142 +0x8ec
github.com/chrislusf/seaweedfs/weed/filesys.(*ContinuousDirtyPages).saveExistingPagesToStorage(0xc0000aca80, 0x10d7ba0, 0xc0003fcc00, 0x0, 0x0, 0x0)
/home/travis/gopath/src/github.com/chrislusf/seaweedfs/weed/filesys/dirty_page.go:107 +0x6c
github.com/chrislusf/seaweedfs/weed/filesys.(*ContinuousDirtyPages).AddPage(0xc0000aca80, 0x10d7ba0, 0xc0003fcc00, 0x278460000, 0xc011966050, 0x20000, 0x20fb0, 0x6fc23ac00, 0x4a817c800, 0x0, ...)
/home/travis/gopath/src/github.com/chrislusf/seaweedfs/weed/filesys/dirty_page.go:70 +0x8f
github.com/chrislusf/seaweedfs/weed/filesys.(*FileHandle).Write(0xc000548410, 0x10d7ba0, 0xc0003fcc00, 0xc00014be30, 0xc011946af8, 0x47fa01, 0x0)
/home/travis/gopath/src/github.com/chrislusf/seaweedfs/weed/filesys/filehandle.go:141 +0x245
bazil.org/fuse/fs.(*Server).handleRequest(0xc0002cc0c0, 0x10d7ba0, 0xc0003fcc00, 0x10cb020, 0xc000394140, 0xc0000acac0, 0x10d3e60, 0xc00014be30, 0xc00052fef8, 0x10ca6a0, ...)
/home/travis/gopath/src/bazil.org/fuse/fs/serve.go:1265 +0x1599
bazil.org/fuse/fs.(*Server).serve(0xc0002cc0c0, 0x10d3e60, 0xc00014be30)
/home/travis/gopath/src/bazil.org/fuse/fs/serve.go:878 +0x410
bazil.org/fuse/fs.(*Server).Serve.func1(0xc0002cc0c0, 0x10d3e60, 0xc00014be30)
/home/travis/gopath/src/bazil.org/fuse/fs/serve.go:425 +0x6e
created by bazil.org/fuse/fs.(*Server).Serve
/home/travis/gopath/src/bazil.org/fuse/fs/serve.go:423 +0x321
6 years ago |
|
package filesys
import ( "bytes" "context" "fmt" "time"
"github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "sync" )
type ContinuousDirtyPages struct { hasData bool Offset int64 Size int64 Data []byte f *File lock sync.Mutex }
func newDirtyPages(file *File) *ContinuousDirtyPages { return &ContinuousDirtyPages{ Data: make([]byte, file.wfs.option.ChunkSizeLimit), f: file, } }
func (pages *ContinuousDirtyPages) InitializeToFile(file *File) *ContinuousDirtyPages { if len(pages.Data) != int(file.wfs.option.ChunkSizeLimit) { pages.Data = make([]byte, file.wfs.option.ChunkSizeLimit) } pages.f = file return pages }
func (pages *ContinuousDirtyPages) AddPage(ctx context.Context, offset int64, data []byte) (chunks []*filer_pb.FileChunk, err error) {
pages.lock.Lock() defer pages.lock.Unlock()
var chunk *filer_pb.FileChunk
if len(data) > len(pages.Data) { // this is more than what buffer can hold.
return pages.flushAndSave(ctx, offset, data) }
if offset < pages.Offset || offset >= pages.Offset+int64(len(pages.Data)) || pages.Offset+int64(len(pages.Data)) < offset+int64(len(data)) { // if the data is out of range,
// or buffer is full if adding new data,
// flush current buffer and add new data
// println("offset", offset, "size", len(data), "existing offset", pages.Offset, "size", pages.Size)
if chunk, err = pages.saveExistingPagesToStorage(ctx); err == nil { if chunk != nil { glog.V(4).Infof("%s/%s add save [%d,%d)", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size)) chunks = append(chunks, chunk) } } else { glog.V(0).Infof("%s/%s add save [%d,%d): %v", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size), err) return } pages.Offset = offset copy(pages.Data, data) pages.Size = int64(len(data)) return }
if offset != pages.Offset+pages.Size { // when this happens, debug shows the data overlapping with existing data is empty
// the data is not just append
if offset == pages.Offset && int(pages.Size) < len(data) { // glog.V(2).Infof("pages[%d,%d) pages.Data len=%v, data len=%d, pages.Size=%d", pages.Offset, pages.Offset+pages.Size, len(pages.Data), len(data), pages.Size)
copy(pages.Data[pages.Size:], data[pages.Size:]) } else { if pages.Size != 0 { glog.V(1).Infof("%s/%s add page: pages [%d, %d) write [%d, %d)", pages.f.dir.Path, pages.f.Name, pages.Offset, pages.Offset+pages.Size, offset, offset+int64(len(data))) } return pages.flushAndSave(ctx, offset, data) } } else { copy(pages.Data[offset-pages.Offset:], data) }
pages.Size = max(pages.Size, offset+int64(len(data))-pages.Offset)
return }
func (pages *ContinuousDirtyPages) flushAndSave(ctx context.Context, offset int64, data []byte) (chunks []*filer_pb.FileChunk, err error) {
var chunk *filer_pb.FileChunk
// flush existing
if chunk, err = pages.saveExistingPagesToStorage(ctx); err == nil { if chunk != nil { glog.V(4).Infof("%s/%s flush existing [%d,%d)", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size)) chunks = append(chunks, chunk) } } else { glog.V(0).Infof("%s/%s failed to flush1 [%d,%d): %v", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size), err) return } pages.Size = 0 pages.Offset = 0
// flush the new page
if chunk, err = pages.saveToStorage(ctx, data, offset); err == nil { if chunk != nil { glog.V(4).Infof("%s/%s flush big request [%d,%d)", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size)) chunks = append(chunks, chunk) } } else { glog.V(0).Infof("%s/%s failed to flush2 [%d,%d): %v", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size), err) return }
return }
func (pages *ContinuousDirtyPages) FlushToStorage(ctx context.Context) (chunk *filer_pb.FileChunk, err error) {
pages.lock.Lock() defer pages.lock.Unlock()
if pages.Size == 0 { return nil, nil }
if chunk, err = pages.saveExistingPagesToStorage(ctx); err == nil { pages.Size = 0 pages.Offset = 0 if chunk != nil { glog.V(4).Infof("%s/%s flush [%d,%d)", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size)) } } return }
func (pages *ContinuousDirtyPages) saveExistingPagesToStorage(ctx context.Context) (*filer_pb.FileChunk, error) {
if pages.Size == 0 { return nil, nil }
return pages.saveToStorage(ctx, pages.Data[:pages.Size], pages.Offset) }
func (pages *ContinuousDirtyPages) saveToStorage(ctx context.Context, buf []byte, offset int64) (*filer_pb.FileChunk, error) {
var fileId, host string
if err := pages.f.wfs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.AssignVolumeRequest{ Count: 1, Replication: pages.f.wfs.option.Replication, Collection: pages.f.wfs.option.Collection, TtlSec: pages.f.wfs.option.TtlSec, DataCenter: pages.f.wfs.option.DataCenter, }
resp, err := client.AssignVolume(ctx, request) if err != nil { glog.V(0).Infof("assign volume failure %v: %v", request, err) return err }
fileId, host = resp.FileId, resp.Url
return nil }); err != nil { return nil, fmt.Errorf("filerGrpcAddress assign volume: %v", err) }
fileUrl := fmt.Sprintf("http://%s/%s", host, fileId) bufReader := bytes.NewReader(buf) uploadResult, err := operation.Upload(fileUrl, pages.f.Name, bufReader, false, "application/octet-stream", nil, "") if err != nil { glog.V(0).Infof("upload data %v to %s: %v", pages.f.Name, fileUrl, err) return nil, fmt.Errorf("upload data: %v", err) } if uploadResult.Error != "" { glog.V(0).Infof("upload failure %v to %s: %v", pages.f.Name, fileUrl, err) return nil, fmt.Errorf("upload result: %v", uploadResult.Error) }
return &filer_pb.FileChunk{ FileId: fileId, Offset: offset, Size: uint64(len(buf)), Mtime: time.Now().UnixNano(), ETag: uploadResult.ETag, }, nil
}
func max(x, y int64) int64 { if x > y { return x } return y }
|