From 2b955c171345334a4034888c69547662150ceb91 Mon Sep 17 00:00:00 2001 From: chrislu Date: Sun, 13 Feb 2022 22:50:44 -0800 Subject: [PATCH] support read --- weed/mount/dirty_pages_chunked.go | 99 ++++++++++ weed/mount/filehandle.go | 94 +++++++++ weed/mount/filehandle_map.go | 26 ++- weed/mount/filehandle_read.go | 114 +++++++++++ weed/mount/page_writer.go | 95 +++++++++ weed/mount/page_writer/chunk_interval_list.go | 115 +++++++++++ .../page_writer/chunk_interval_list_test.go | 49 +++++ weed/mount/page_writer/dirty_pages.go | 30 +++ weed/mount/page_writer/page_chunk.go | 16 ++ weed/mount/page_writer/page_chunk_mem.go | 69 +++++++ weed/mount/page_writer/page_chunk_swapfile.go | 121 ++++++++++++ weed/mount/page_writer/upload_pipeline.go | 182 ++++++++++++++++++ .../mount/page_writer/upload_pipeline_lock.go | 63 ++++++ .../mount/page_writer/upload_pipeline_test.go | 47 +++++ weed/mount/weedfs.go | 42 +++- weed/mount/weedfs_file_read.go | 29 ++- weed/mount/weedfs_filehandle.go | 6 +- weed/mount/weedfs_write.go | 84 ++++++++ 18 files changed, 1257 insertions(+), 24 deletions(-) create mode 100644 weed/mount/dirty_pages_chunked.go create mode 100644 weed/mount/filehandle.go create mode 100644 weed/mount/filehandle_read.go create mode 100644 weed/mount/page_writer.go create mode 100644 weed/mount/page_writer/chunk_interval_list.go create mode 100644 weed/mount/page_writer/chunk_interval_list_test.go create mode 100644 weed/mount/page_writer/dirty_pages.go create mode 100644 weed/mount/page_writer/page_chunk.go create mode 100644 weed/mount/page_writer/page_chunk_mem.go create mode 100644 weed/mount/page_writer/page_chunk_swapfile.go create mode 100644 weed/mount/page_writer/upload_pipeline.go create mode 100644 weed/mount/page_writer/upload_pipeline_lock.go create mode 100644 weed/mount/page_writer/upload_pipeline_test.go create mode 100644 weed/mount/weedfs_write.go diff --git a/weed/mount/dirty_pages_chunked.go b/weed/mount/dirty_pages_chunked.go new file mode 100644 index 000000000..5ffcff83a --- /dev/null +++ b/weed/mount/dirty_pages_chunked.go @@ -0,0 +1,99 @@ +package mount + +import ( + "fmt" + "github.com/chrislusf/seaweedfs/weed/filesys/page_writer" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "io" + "sync" + "time" +) + +type ChunkedDirtyPages struct { + fh *FileHandle + writeWaitGroup sync.WaitGroup + lastErr error + collection string + replication string + uploadPipeline *page_writer.UploadPipeline + hasWrites bool +} + +var ( + _ = page_writer.DirtyPages(&ChunkedDirtyPages{}) +) + +func newMemoryChunkPages(fh *FileHandle, chunkSize int64) *ChunkedDirtyPages { + + dirtyPages := &ChunkedDirtyPages{ + fh: fh, + } + + dirtyPages.uploadPipeline = page_writer.NewUploadPipeline(fh.wfs.concurrentWriters, chunkSize, dirtyPages.saveChunkedFileIntevalToStorage, fh.wfs.option.ConcurrentWriters) + + return dirtyPages +} + +func (pages *ChunkedDirtyPages) AddPage(offset int64, data []byte) { + pages.hasWrites = true + + glog.V(4).Infof("%v memory AddPage [%d, %d)", pages.fh, offset, offset+int64(len(data))) + pages.uploadPipeline.SaveDataAt(data, offset) + + return +} + +func (pages *ChunkedDirtyPages) FlushData() error { + if !pages.hasWrites { + return nil + } + pages.uploadPipeline.FlushAll() + if pages.lastErr != nil { + return fmt.Errorf("flush data: %v", pages.lastErr) + } + return nil +} + +func (pages *ChunkedDirtyPages) ReadDirtyDataAt(data []byte, startOffset int64) (maxStop int64) { + if !pages.hasWrites { + return + } + return pages.uploadPipeline.MaybeReadDataAt(data, startOffset) +} + +func (pages *ChunkedDirtyPages) GetStorageOptions() (collection, replication string) { + return pages.collection, pages.replication +} + +func (pages *ChunkedDirtyPages) saveChunkedFileIntevalToStorage(reader io.Reader, offset int64, size int64, cleanupFn func()) { + + mtime := time.Now().UnixNano() + defer cleanupFn() + + fileFullPath := pages.fh.FullPath() + fileName := fileFullPath.Name() + chunk, collection, replication, err := pages.fh.wfs.saveDataAsChunk(fileFullPath)(reader, fileName, offset) + if err != nil { + glog.V(0).Infof("%v saveToStorage [%d,%d): %v", fileFullPath, offset, offset+size, err) + pages.lastErr = err + return + } + chunk.Mtime = mtime + pages.collection, pages.replication = collection, replication + pages.fh.addChunks([]*filer_pb.FileChunk{chunk}) + pages.fh.entryViewCache = nil + glog.V(3).Infof("%v saveToStorage %s [%d,%d)", fileFullPath, chunk.FileId, offset, offset+size) + +} + +func (pages ChunkedDirtyPages) Destroy() { + pages.uploadPipeline.Shutdown() +} + +func (pages *ChunkedDirtyPages) LockForRead(startOffset, stopOffset int64) { + pages.uploadPipeline.LockForRead(startOffset, stopOffset) +} +func (pages *ChunkedDirtyPages) UnlockForRead(startOffset, stopOffset int64) { + pages.uploadPipeline.UnlockForRead(startOffset, stopOffset) +} diff --git a/weed/mount/filehandle.go b/weed/mount/filehandle.go new file mode 100644 index 000000000..0d5481b30 --- /dev/null +++ b/weed/mount/filehandle.go @@ -0,0 +1,94 @@ +package mount + +import ( + "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/util" + "io" + "sort" + "sync" +) + +type FileHandleId uint64 + +type FileHandle struct { + fh FileHandleId + counter int64 + entry *filer_pb.Entry + chunkAddLock sync.Mutex + inode uint64 + wfs *WFS + + // cache file has been written to + dirtyPages *PageWriter + entryViewCache []filer.VisibleInterval + reader io.ReaderAt + contentType string + handle uint64 + sync.Mutex + + isDeleted bool +} + +func newFileHandle(wfs *WFS, handleId FileHandleId, inode uint64, entry *filer_pb.Entry) *FileHandle { + fh := &FileHandle{ + fh: handleId, + counter: 1, + inode: inode, + wfs: wfs, + } + // dirtyPages: newContinuousDirtyPages(file, writeOnly), + fh.dirtyPages = newPageWriter(fh, wfs.option.ChunkSizeLimit) + if entry != nil { + entry.Attributes.FileSize = filer.FileSize(entry) + } + + return fh +} + +func (fh *FileHandle) FullPath() util.FullPath { + return fh.wfs.inodeToPath.GetPath(fh.inode) +} + +func (fh *FileHandle) addChunks(chunks []*filer_pb.FileChunk) { + + // find the earliest incoming chunk + newChunks := chunks + earliestChunk := newChunks[0] + for i := 1; i < len(newChunks); i++ { + if lessThan(earliestChunk, newChunks[i]) { + earliestChunk = newChunks[i] + } + } + + if fh.entry == nil { + return + } + + // pick out-of-order chunks from existing chunks + for _, chunk := range fh.entry.Chunks { + if lessThan(earliestChunk, chunk) { + chunks = append(chunks, chunk) + } + } + + // sort incoming chunks + sort.Slice(chunks, func(i, j int) bool { + return lessThan(chunks[i], chunks[j]) + }) + + glog.V(4).Infof("%s existing %d chunks adds %d more", fh.FullPath(), len(fh.entry.Chunks), len(chunks)) + + fh.chunkAddLock.Lock() + fh.entry.Chunks = append(fh.entry.Chunks, newChunks...) + fh.entryViewCache = nil + fh.chunkAddLock.Unlock() +} + +func lessThan(a, b *filer_pb.FileChunk) bool { + if a.Mtime == b.Mtime { + return a.Fid.FileKey < b.Fid.FileKey + } + return a.Mtime < b.Mtime +} diff --git a/weed/mount/filehandle_map.go b/weed/mount/filehandle_map.go index ca010dabb..50ca6bcea 100644 --- a/weed/mount/filehandle_map.go +++ b/weed/mount/filehandle_map.go @@ -5,20 +5,12 @@ import ( "sync" ) -type FileHandleId uint64 - type FileHandleToInode struct { sync.RWMutex nextFh FileHandleId inode2fh map[uint64]*FileHandle fh2inode map[FileHandleId]uint64 } -type FileHandle struct { - fh FileHandleId - counter int64 - entry *filer_pb.Entry - inode uint64 -} func NewFileHandleToInode() *FileHandleToInode { return &FileHandleToInode{ @@ -28,16 +20,22 @@ func NewFileHandleToInode() *FileHandleToInode { } } -func (i *FileHandleToInode) GetFileHandle(inode uint64) *FileHandle { +func (i *FileHandleToInode) GetFileHandle(fh FileHandleId) *FileHandle { + i.RLock() + defer i.RUnlock() + inode, found := i.fh2inode[fh] + if found { + return i.inode2fh[inode] + } + return nil +} + +func (i *FileHandleToInode) AcquireFileHandle(wfs *WFS, inode uint64, entry *filer_pb.Entry) *FileHandle { i.Lock() defer i.Unlock() fh, found := i.inode2fh[inode] if !found { - fh = &FileHandle{ - fh: i.nextFh, - counter: 1, - inode: inode, - } + fh = newFileHandle(wfs, i.nextFh, inode, entry) i.nextFh++ i.inode2fh[inode] = fh i.fh2inode[fh.fh] = inode diff --git a/weed/mount/filehandle_read.go b/weed/mount/filehandle_read.go new file mode 100644 index 000000000..71166169e --- /dev/null +++ b/weed/mount/filehandle_read.go @@ -0,0 +1,114 @@ +package mount + +import ( + "context" + "fmt" + "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "io" + "math" +) + +func (fh *FileHandle) lockForRead(startOffset int64, size int) { + fh.dirtyPages.LockForRead(startOffset, startOffset+int64(size)) +} +func (fh *FileHandle) unlockForRead(startOffset int64, size int) { + fh.dirtyPages.UnlockForRead(startOffset, startOffset+int64(size)) +} + +func (fh *FileHandle) readFromDirtyPages(buff []byte, startOffset int64) (maxStop int64) { + maxStop = fh.dirtyPages.ReadDirtyDataAt(buff, startOffset) + return +} + +func (fh *FileHandle) readFromChunks(buff []byte, offset int64) (int64, error) { + + fileFullPath := fh.FullPath() + + entry := fh.entry + if entry == nil { + return 0, io.EOF + } + + if entry.IsInRemoteOnly() { + glog.V(4).Infof("download remote entry %s", fileFullPath) + newEntry, err := fh.downloadRemoteEntry(entry) + if err != nil { + glog.V(1).Infof("download remote entry %s: %v", fileFullPath, err) + return 0, err + } + entry = newEntry + } + + fileSize := int64(filer.FileSize(entry)) + + if fileSize == 0 { + glog.V(1).Infof("empty fh %v", fileFullPath) + return 0, io.EOF + } + + if offset+int64(len(buff)) <= int64(len(entry.Content)) { + totalRead := copy(buff, entry.Content[offset:]) + glog.V(4).Infof("file handle read cached %s [%d,%d] %d", fileFullPath, offset, offset+int64(totalRead), totalRead) + return int64(totalRead), nil + } + + var chunkResolveErr error + if fh.entryViewCache == nil { + fh.entryViewCache, chunkResolveErr = filer.NonOverlappingVisibleIntervals(fh.wfs.LookupFn(), entry.Chunks, 0, math.MaxInt64) + if chunkResolveErr != nil { + return 0, fmt.Errorf("fail to resolve chunk manifest: %v", chunkResolveErr) + } + fh.reader = nil + } + + reader := fh.reader + if reader == nil { + chunkViews := filer.ViewFromVisibleIntervals(fh.entryViewCache, 0, math.MaxInt64) + glog.V(4).Infof("file handle read %s [%d,%d) from %d views", fileFullPath, offset, offset+int64(len(buff)), len(chunkViews)) + for _, chunkView := range chunkViews { + glog.V(4).Infof(" read %s [%d,%d) from chunk %+v", fileFullPath, chunkView.LogicOffset, chunkView.LogicOffset+int64(chunkView.Size), chunkView.FileId) + } + reader = filer.NewChunkReaderAtFromClient(fh.wfs.LookupFn(), chunkViews, fh.wfs.chunkCache, fileSize) + } + fh.reader = reader + + totalRead, err := reader.ReadAt(buff, offset) + + if err != nil && err != io.EOF { + glog.Errorf("file handle read %s: %v", fileFullPath, err) + } + + glog.V(4).Infof("file handle read %s [%d,%d] %d : %v", fileFullPath, offset, offset+int64(totalRead), totalRead, err) + + return int64(totalRead), err +} + +func (fh *FileHandle) downloadRemoteEntry(entry *filer_pb.Entry) (*filer_pb.Entry, error) { + + fileFullPath := fh.FullPath() + dir, _ := fileFullPath.DirAndName() + + err := fh.wfs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + + request := &filer_pb.CacheRemoteObjectToLocalClusterRequest{ + Directory: string(dir), + Name: entry.Name, + } + + glog.V(4).Infof("download entry: %v", request) + resp, err := client.CacheRemoteObjectToLocalCluster(context.Background(), request) + if err != nil { + return fmt.Errorf("CacheRemoteObjectToLocalCluster file %s: %v", fileFullPath, err) + } + + entry = resp.Entry + + fh.wfs.metaCache.InsertEntry(context.Background(), filer.FromPbEntry(request.Directory, resp.Entry)) + + return nil + }) + + return entry, err +} diff --git a/weed/mount/page_writer.go b/weed/mount/page_writer.go new file mode 100644 index 000000000..eaf1fc176 --- /dev/null +++ b/weed/mount/page_writer.go @@ -0,0 +1,95 @@ +package mount + +import ( + "github.com/chrislusf/seaweedfs/weed/filesys/page_writer" + "github.com/chrislusf/seaweedfs/weed/glog" +) + +type PageWriter struct { + fh *FileHandle + collection string + replication string + chunkSize int64 + + randomWriter page_writer.DirtyPages +} + +var ( + _ = page_writer.DirtyPages(&PageWriter{}) +) + +func newPageWriter(fh *FileHandle, chunkSize int64) *PageWriter { + pw := &PageWriter{ + fh: fh, + chunkSize: chunkSize, + randomWriter: newMemoryChunkPages(fh, chunkSize), + // randomWriter: newTempFileDirtyPages(fh.f, chunkSize), + } + return pw +} + +func (pw *PageWriter) AddPage(offset int64, data []byte) { + + glog.V(4).Infof("%v AddPage [%d, %d)", pw.fh, offset, offset+int64(len(data))) + + chunkIndex := offset / pw.chunkSize + for i := chunkIndex; len(data) > 0; i++ { + writeSize := min(int64(len(data)), (i+1)*pw.chunkSize-offset) + pw.addToOneChunk(i, offset, data[:writeSize]) + offset += writeSize + data = data[writeSize:] + } +} + +func (pw *PageWriter) addToOneChunk(chunkIndex, offset int64, data []byte) { + pw.randomWriter.AddPage(offset, data) +} + +func (pw *PageWriter) FlushData() error { + return pw.randomWriter.FlushData() +} + +func (pw *PageWriter) ReadDirtyDataAt(data []byte, offset int64) (maxStop int64) { + glog.V(4).Infof("ReadDirtyDataAt %v [%d, %d)", pw.fh, offset, offset+int64(len(data))) + + chunkIndex := offset / pw.chunkSize + for i := chunkIndex; len(data) > 0; i++ { + readSize := min(int64(len(data)), (i+1)*pw.chunkSize-offset) + + maxStop = pw.randomWriter.ReadDirtyDataAt(data[:readSize], offset) + + offset += readSize + data = data[readSize:] + } + + return +} + +func (pw *PageWriter) GetStorageOptions() (collection, replication string) { + return pw.randomWriter.GetStorageOptions() +} + +func (pw *PageWriter) LockForRead(startOffset, stopOffset int64) { + pw.randomWriter.LockForRead(startOffset, stopOffset) +} + +func (pw *PageWriter) UnlockForRead(startOffset, stopOffset int64) { + pw.randomWriter.UnlockForRead(startOffset, stopOffset) +} + +func (pw *PageWriter) Destroy() { + pw.randomWriter.Destroy() +} + +func max(x, y int64) int64 { + if x > y { + return x + } + return y +} +func min(x, y int64) int64 { + if x < y { + return x + } + return y +} diff --git a/weed/mount/page_writer/chunk_interval_list.go b/weed/mount/page_writer/chunk_interval_list.go new file mode 100644 index 000000000..e6dc5d1f5 --- /dev/null +++ b/weed/mount/page_writer/chunk_interval_list.go @@ -0,0 +1,115 @@ +package page_writer + +import "math" + +// ChunkWrittenInterval mark one written interval within one page chunk +type ChunkWrittenInterval struct { + StartOffset int64 + stopOffset int64 + prev *ChunkWrittenInterval + next *ChunkWrittenInterval +} + +func (interval *ChunkWrittenInterval) Size() int64 { + return interval.stopOffset - interval.StartOffset +} + +func (interval *ChunkWrittenInterval) isComplete(chunkSize int64) bool { + return interval.stopOffset-interval.StartOffset == chunkSize +} + +// ChunkWrittenIntervalList mark written intervals within one page chunk +type ChunkWrittenIntervalList struct { + head *ChunkWrittenInterval + tail *ChunkWrittenInterval +} + +func newChunkWrittenIntervalList() *ChunkWrittenIntervalList { + list := &ChunkWrittenIntervalList{ + head: &ChunkWrittenInterval{ + StartOffset: -1, + stopOffset: -1, + }, + tail: &ChunkWrittenInterval{ + StartOffset: math.MaxInt64, + stopOffset: math.MaxInt64, + }, + } + list.head.next = list.tail + list.tail.prev = list.head + return list +} + +func (list *ChunkWrittenIntervalList) MarkWritten(startOffset, stopOffset int64) { + interval := &ChunkWrittenInterval{ + StartOffset: startOffset, + stopOffset: stopOffset, + } + list.addInterval(interval) +} + +func (list *ChunkWrittenIntervalList) IsComplete(chunkSize int64) bool { + return list.size() == 1 && list.head.next.isComplete(chunkSize) +} +func (list *ChunkWrittenIntervalList) WrittenSize() (writtenByteCount int64) { + for t := list.head; t != nil; t = t.next { + writtenByteCount += t.Size() + } + return +} + +func (list *ChunkWrittenIntervalList) addInterval(interval *ChunkWrittenInterval) { + + p := list.head + for ; p.next != nil && p.next.StartOffset <= interval.StartOffset; p = p.next { + } + q := list.tail + for ; q.prev != nil && q.prev.stopOffset >= interval.stopOffset; q = q.prev { + } + + if interval.StartOffset <= p.stopOffset && q.StartOffset <= interval.stopOffset { + // merge p and q together + p.stopOffset = q.stopOffset + unlinkNodesBetween(p, q.next) + return + } + if interval.StartOffset <= p.stopOffset { + // merge new interval into p + p.stopOffset = interval.stopOffset + unlinkNodesBetween(p, q) + return + } + if q.StartOffset <= interval.stopOffset { + // merge new interval into q + q.StartOffset = interval.StartOffset + unlinkNodesBetween(p, q) + return + } + + // add the new interval between p and q + unlinkNodesBetween(p, q) + p.next = interval + interval.prev = p + q.prev = interval + interval.next = q + +} + +// unlinkNodesBetween remove all nodes after start and before stop, exclusive +func unlinkNodesBetween(start *ChunkWrittenInterval, stop *ChunkWrittenInterval) { + if start.next == stop { + return + } + start.next.prev = nil + start.next = stop + stop.prev.next = nil + stop.prev = start +} + +func (list *ChunkWrittenIntervalList) size() int { + var count int + for t := list.head; t != nil; t = t.next { + count++ + } + return count - 2 +} diff --git a/weed/mount/page_writer/chunk_interval_list_test.go b/weed/mount/page_writer/chunk_interval_list_test.go new file mode 100644 index 000000000..b22f5eb5d --- /dev/null +++ b/weed/mount/page_writer/chunk_interval_list_test.go @@ -0,0 +1,49 @@ +package page_writer + +import ( + "github.com/stretchr/testify/assert" + "testing" +) + +func Test_PageChunkWrittenIntervalList(t *testing.T) { + list := newChunkWrittenIntervalList() + + assert.Equal(t, 0, list.size(), "empty list") + + list.MarkWritten(0, 5) + assert.Equal(t, 1, list.size(), "one interval") + + list.MarkWritten(0, 5) + assert.Equal(t, 1, list.size(), "duplicated interval2") + + list.MarkWritten(95, 100) + assert.Equal(t, 2, list.size(), "two intervals") + + list.MarkWritten(50, 60) + assert.Equal(t, 3, list.size(), "three intervals") + + list.MarkWritten(50, 55) + assert.Equal(t, 3, list.size(), "three intervals merge") + + list.MarkWritten(40, 50) + assert.Equal(t, 3, list.size(), "three intervals grow forward") + + list.MarkWritten(50, 65) + assert.Equal(t, 3, list.size(), "three intervals grow backward") + + list.MarkWritten(70, 80) + assert.Equal(t, 4, list.size(), "four intervals") + + list.MarkWritten(60, 70) + assert.Equal(t, 3, list.size(), "three intervals merged") + + list.MarkWritten(59, 71) + assert.Equal(t, 3, list.size(), "covered three intervals") + + list.MarkWritten(5, 59) + assert.Equal(t, 2, list.size(), "covered two intervals") + + list.MarkWritten(70, 99) + assert.Equal(t, 1, list.size(), "covered one intervals") + +} diff --git a/weed/mount/page_writer/dirty_pages.go b/weed/mount/page_writer/dirty_pages.go new file mode 100644 index 000000000..25b747fad --- /dev/null +++ b/weed/mount/page_writer/dirty_pages.go @@ -0,0 +1,30 @@ +package page_writer + +type DirtyPages interface { + AddPage(offset int64, data []byte) + FlushData() error + ReadDirtyDataAt(data []byte, startOffset int64) (maxStop int64) + GetStorageOptions() (collection, replication string) + Destroy() + LockForRead(startOffset, stopOffset int64) + UnlockForRead(startOffset, stopOffset int64) +} + +func max(x, y int64) int64 { + if x > y { + return x + } + return y +} +func min(x, y int64) int64 { + if x < y { + return x + } + return y +} +func minInt(x, y int) int { + if x < y { + return x + } + return y +} diff --git a/weed/mount/page_writer/page_chunk.go b/weed/mount/page_writer/page_chunk.go new file mode 100644 index 000000000..4e8f31425 --- /dev/null +++ b/weed/mount/page_writer/page_chunk.go @@ -0,0 +1,16 @@ +package page_writer + +import ( + "io" +) + +type SaveToStorageFunc func(reader io.Reader, offset int64, size int64, cleanupFn func()) + +type PageChunk interface { + FreeResource() + WriteDataAt(src []byte, offset int64) (n int) + ReadDataAt(p []byte, off int64) (maxStop int64) + IsComplete() bool + WrittenSize() int64 + SaveContent(saveFn SaveToStorageFunc) +} diff --git a/weed/mount/page_writer/page_chunk_mem.go b/weed/mount/page_writer/page_chunk_mem.go new file mode 100644 index 000000000..dfd54c19e --- /dev/null +++ b/weed/mount/page_writer/page_chunk_mem.go @@ -0,0 +1,69 @@ +package page_writer + +import ( + "github.com/chrislusf/seaweedfs/weed/util" + "github.com/chrislusf/seaweedfs/weed/util/mem" +) + +var ( + _ = PageChunk(&MemChunk{}) +) + +type MemChunk struct { + buf []byte + usage *ChunkWrittenIntervalList + chunkSize int64 + logicChunkIndex LogicChunkIndex +} + +func NewMemChunk(logicChunkIndex LogicChunkIndex, chunkSize int64) *MemChunk { + return &MemChunk{ + logicChunkIndex: logicChunkIndex, + chunkSize: chunkSize, + buf: mem.Allocate(int(chunkSize)), + usage: newChunkWrittenIntervalList(), + } +} + +func (mc *MemChunk) FreeResource() { + mem.Free(mc.buf) +} + +func (mc *MemChunk) WriteDataAt(src []byte, offset int64) (n int) { + innerOffset := offset % mc.chunkSize + n = copy(mc.buf[innerOffset:], src) + mc.usage.MarkWritten(innerOffset, innerOffset+int64(n)) + return +} + +func (mc *MemChunk) ReadDataAt(p []byte, off int64) (maxStop int64) { + memChunkBaseOffset := int64(mc.logicChunkIndex) * mc.chunkSize + for t := mc.usage.head.next; t != mc.usage.tail; t = t.next { + logicStart := max(off, int64(mc.logicChunkIndex)*mc.chunkSize+t.StartOffset) + logicStop := min(off+int64(len(p)), memChunkBaseOffset+t.stopOffset) + if logicStart < logicStop { + copy(p[logicStart-off:logicStop-off], mc.buf[logicStart-memChunkBaseOffset:logicStop-memChunkBaseOffset]) + maxStop = max(maxStop, logicStop) + } + } + return +} + +func (mc *MemChunk) IsComplete() bool { + return mc.usage.IsComplete(mc.chunkSize) +} + +func (mc *MemChunk) WrittenSize() int64 { + return mc.usage.WrittenSize() +} + +func (mc *MemChunk) SaveContent(saveFn SaveToStorageFunc) { + if saveFn == nil { + return + } + for t := mc.usage.head.next; t != mc.usage.tail; t = t.next { + reader := util.NewBytesReader(mc.buf[t.StartOffset:t.stopOffset]) + saveFn(reader, int64(mc.logicChunkIndex)*mc.chunkSize+t.StartOffset, t.Size(), func() { + }) + } +} diff --git a/weed/mount/page_writer/page_chunk_swapfile.go b/weed/mount/page_writer/page_chunk_swapfile.go new file mode 100644 index 000000000..486557629 --- /dev/null +++ b/weed/mount/page_writer/page_chunk_swapfile.go @@ -0,0 +1,121 @@ +package page_writer + +import ( + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/util" + "github.com/chrislusf/seaweedfs/weed/util/mem" + "os" +) + +var ( + _ = PageChunk(&SwapFileChunk{}) +) + +type ActualChunkIndex int + +type SwapFile struct { + dir string + file *os.File + logicToActualChunkIndex map[LogicChunkIndex]ActualChunkIndex + chunkSize int64 +} + +type SwapFileChunk struct { + swapfile *SwapFile + usage *ChunkWrittenIntervalList + logicChunkIndex LogicChunkIndex + actualChunkIndex ActualChunkIndex +} + +func NewSwapFile(dir string, chunkSize int64) *SwapFile { + return &SwapFile{ + dir: dir, + file: nil, + logicToActualChunkIndex: make(map[LogicChunkIndex]ActualChunkIndex), + chunkSize: chunkSize, + } +} +func (sf *SwapFile) FreeResource() { + if sf.file != nil { + sf.file.Close() + os.Remove(sf.file.Name()) + } +} + +func (sf *SwapFile) NewTempFileChunk(logicChunkIndex LogicChunkIndex) (tc *SwapFileChunk) { + if sf.file == nil { + var err error + sf.file, err = os.CreateTemp(sf.dir, "") + if err != nil { + glog.Errorf("create swap file: %v", err) + return nil + } + } + actualChunkIndex, found := sf.logicToActualChunkIndex[logicChunkIndex] + if !found { + actualChunkIndex = ActualChunkIndex(len(sf.logicToActualChunkIndex)) + sf.logicToActualChunkIndex[logicChunkIndex] = actualChunkIndex + } + + return &SwapFileChunk{ + swapfile: sf, + usage: newChunkWrittenIntervalList(), + logicChunkIndex: logicChunkIndex, + actualChunkIndex: actualChunkIndex, + } +} + +func (sc *SwapFileChunk) FreeResource() { +} + +func (sc *SwapFileChunk) WriteDataAt(src []byte, offset int64) (n int) { + innerOffset := offset % sc.swapfile.chunkSize + var err error + n, err = sc.swapfile.file.WriteAt(src, int64(sc.actualChunkIndex)*sc.swapfile.chunkSize+innerOffset) + if err == nil { + sc.usage.MarkWritten(innerOffset, innerOffset+int64(n)) + } else { + glog.Errorf("failed to write swap file %s: %v", sc.swapfile.file.Name(), err) + } + return +} + +func (sc *SwapFileChunk) ReadDataAt(p []byte, off int64) (maxStop int64) { + chunkStartOffset := int64(sc.logicChunkIndex) * sc.swapfile.chunkSize + for t := sc.usage.head.next; t != sc.usage.tail; t = t.next { + logicStart := max(off, chunkStartOffset+t.StartOffset) + logicStop := min(off+int64(len(p)), chunkStartOffset+t.stopOffset) + if logicStart < logicStop { + actualStart := logicStart - chunkStartOffset + int64(sc.actualChunkIndex)*sc.swapfile.chunkSize + if _, err := sc.swapfile.file.ReadAt(p[logicStart-off:logicStop-off], actualStart); err != nil { + glog.Errorf("failed to reading swap file %s: %v", sc.swapfile.file.Name(), err) + break + } + maxStop = max(maxStop, logicStop) + } + } + return +} + +func (sc *SwapFileChunk) IsComplete() bool { + return sc.usage.IsComplete(sc.swapfile.chunkSize) +} + +func (sc *SwapFileChunk) WrittenSize() int64 { + return sc.usage.WrittenSize() +} + +func (sc *SwapFileChunk) SaveContent(saveFn SaveToStorageFunc) { + if saveFn == nil { + return + } + for t := sc.usage.head.next; t != sc.usage.tail; t = t.next { + data := mem.Allocate(int(t.Size())) + sc.swapfile.file.ReadAt(data, t.StartOffset+int64(sc.actualChunkIndex)*sc.swapfile.chunkSize) + reader := util.NewBytesReader(data) + saveFn(reader, int64(sc.logicChunkIndex)*sc.swapfile.chunkSize+t.StartOffset, t.Size(), func() { + }) + mem.Free(data) + } + sc.usage = newChunkWrittenIntervalList() +} diff --git a/weed/mount/page_writer/upload_pipeline.go b/weed/mount/page_writer/upload_pipeline.go new file mode 100644 index 000000000..53641e66d --- /dev/null +++ b/weed/mount/page_writer/upload_pipeline.go @@ -0,0 +1,182 @@ +package page_writer + +import ( + "fmt" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/util" + "sync" + "sync/atomic" + "time" +) + +type LogicChunkIndex int + +type UploadPipeline struct { + filepath util.FullPath + ChunkSize int64 + writableChunks map[LogicChunkIndex]PageChunk + writableChunksLock sync.Mutex + sealedChunks map[LogicChunkIndex]*SealedChunk + sealedChunksLock sync.Mutex + uploaders *util.LimitedConcurrentExecutor + uploaderCount int32 + uploaderCountCond *sync.Cond + saveToStorageFn SaveToStorageFunc + activeReadChunks map[LogicChunkIndex]int + activeReadChunksLock sync.Mutex + bufferChunkLimit int +} + +type SealedChunk struct { + chunk PageChunk + referenceCounter int // track uploading or reading processes +} + +func (sc *SealedChunk) FreeReference(messageOnFree string) { + sc.referenceCounter-- + if sc.referenceCounter == 0 { + glog.V(4).Infof("Free sealed chunk: %s", messageOnFree) + sc.chunk.FreeResource() + } +} + +func NewUploadPipeline(writers *util.LimitedConcurrentExecutor, chunkSize int64, saveToStorageFn SaveToStorageFunc, bufferChunkLimit int) *UploadPipeline { + return &UploadPipeline{ + ChunkSize: chunkSize, + writableChunks: make(map[LogicChunkIndex]PageChunk), + sealedChunks: make(map[LogicChunkIndex]*SealedChunk), + uploaders: writers, + uploaderCountCond: sync.NewCond(&sync.Mutex{}), + saveToStorageFn: saveToStorageFn, + activeReadChunks: make(map[LogicChunkIndex]int), + bufferChunkLimit: bufferChunkLimit, + } +} + +func (up *UploadPipeline) SaveDataAt(p []byte, off int64) (n int) { + up.writableChunksLock.Lock() + defer up.writableChunksLock.Unlock() + + logicChunkIndex := LogicChunkIndex(off / up.ChunkSize) + + memChunk, found := up.writableChunks[logicChunkIndex] + if !found { + if len(up.writableChunks) < up.bufferChunkLimit { + memChunk = NewMemChunk(logicChunkIndex, up.ChunkSize) + } else { + fullestChunkIndex, fullness := LogicChunkIndex(-1), int64(0) + for lci, mc := range up.writableChunks { + chunkFullness := mc.WrittenSize() + if fullness < chunkFullness { + fullestChunkIndex = lci + fullness = chunkFullness + } + } + up.moveToSealed(up.writableChunks[fullestChunkIndex], fullestChunkIndex) + delete(up.writableChunks, fullestChunkIndex) + fmt.Printf("flush chunk %d with %d bytes written", logicChunkIndex, fullness) + memChunk = NewMemChunk(logicChunkIndex, up.ChunkSize) + } + up.writableChunks[logicChunkIndex] = memChunk + } + n = memChunk.WriteDataAt(p, off) + up.maybeMoveToSealed(memChunk, logicChunkIndex) + + return +} + +func (up *UploadPipeline) MaybeReadDataAt(p []byte, off int64) (maxStop int64) { + logicChunkIndex := LogicChunkIndex(off / up.ChunkSize) + + // read from sealed chunks first + up.sealedChunksLock.Lock() + sealedChunk, found := up.sealedChunks[logicChunkIndex] + if found { + sealedChunk.referenceCounter++ + } + up.sealedChunksLock.Unlock() + if found { + maxStop = sealedChunk.chunk.ReadDataAt(p, off) + glog.V(4).Infof("%s read sealed memchunk [%d,%d)", up.filepath, off, maxStop) + sealedChunk.FreeReference(fmt.Sprintf("%s finish reading chunk %d", up.filepath, logicChunkIndex)) + } + + // read from writable chunks last + up.writableChunksLock.Lock() + defer up.writableChunksLock.Unlock() + writableChunk, found := up.writableChunks[logicChunkIndex] + if !found { + return + } + writableMaxStop := writableChunk.ReadDataAt(p, off) + glog.V(4).Infof("%s read writable memchunk [%d,%d)", up.filepath, off, writableMaxStop) + maxStop = max(maxStop, writableMaxStop) + + return +} + +func (up *UploadPipeline) FlushAll() { + up.writableChunksLock.Lock() + defer up.writableChunksLock.Unlock() + + for logicChunkIndex, memChunk := range up.writableChunks { + up.moveToSealed(memChunk, logicChunkIndex) + } + + up.waitForCurrentWritersToComplete() +} + +func (up *UploadPipeline) maybeMoveToSealed(memChunk PageChunk, logicChunkIndex LogicChunkIndex) { + if memChunk.IsComplete() { + up.moveToSealed(memChunk, logicChunkIndex) + } +} + +func (up *UploadPipeline) moveToSealed(memChunk PageChunk, logicChunkIndex LogicChunkIndex) { + atomic.AddInt32(&up.uploaderCount, 1) + glog.V(4).Infof("%s uploaderCount %d ++> %d", up.filepath, up.uploaderCount-1, up.uploaderCount) + + up.sealedChunksLock.Lock() + + if oldMemChunk, found := up.sealedChunks[logicChunkIndex]; found { + oldMemChunk.FreeReference(fmt.Sprintf("%s replace chunk %d", up.filepath, logicChunkIndex)) + } + sealedChunk := &SealedChunk{ + chunk: memChunk, + referenceCounter: 1, // default 1 is for uploading process + } + up.sealedChunks[logicChunkIndex] = sealedChunk + delete(up.writableChunks, logicChunkIndex) + + up.sealedChunksLock.Unlock() + + up.uploaders.Execute(func() { + // first add to the file chunks + sealedChunk.chunk.SaveContent(up.saveToStorageFn) + + // notify waiting process + atomic.AddInt32(&up.uploaderCount, -1) + glog.V(4).Infof("%s uploaderCount %d --> %d", up.filepath, up.uploaderCount+1, up.uploaderCount) + // Lock and Unlock are not required, + // but it may signal multiple times during one wakeup, + // and the waiting goroutine may miss some of them! + up.uploaderCountCond.L.Lock() + up.uploaderCountCond.Broadcast() + up.uploaderCountCond.L.Unlock() + + // wait for readers + for up.IsLocked(logicChunkIndex) { + time.Sleep(59 * time.Millisecond) + } + + // then remove from sealed chunks + up.sealedChunksLock.Lock() + defer up.sealedChunksLock.Unlock() + delete(up.sealedChunks, logicChunkIndex) + sealedChunk.FreeReference(fmt.Sprintf("%s finished uploading chunk %d", up.filepath, logicChunkIndex)) + + }) +} + +func (up *UploadPipeline) Shutdown() { +} diff --git a/weed/mount/page_writer/upload_pipeline_lock.go b/weed/mount/page_writer/upload_pipeline_lock.go new file mode 100644 index 000000000..47a40ba37 --- /dev/null +++ b/weed/mount/page_writer/upload_pipeline_lock.go @@ -0,0 +1,63 @@ +package page_writer + +import ( + "sync/atomic" +) + +func (up *UploadPipeline) LockForRead(startOffset, stopOffset int64) { + startLogicChunkIndex := LogicChunkIndex(startOffset / up.ChunkSize) + stopLogicChunkIndex := LogicChunkIndex(stopOffset / up.ChunkSize) + if stopOffset%up.ChunkSize > 0 { + stopLogicChunkIndex += 1 + } + up.activeReadChunksLock.Lock() + defer up.activeReadChunksLock.Unlock() + for i := startLogicChunkIndex; i < stopLogicChunkIndex; i++ { + if count, found := up.activeReadChunks[i]; found { + up.activeReadChunks[i] = count + 1 + } else { + up.activeReadChunks[i] = 1 + } + } +} + +func (up *UploadPipeline) UnlockForRead(startOffset, stopOffset int64) { + startLogicChunkIndex := LogicChunkIndex(startOffset / up.ChunkSize) + stopLogicChunkIndex := LogicChunkIndex(stopOffset / up.ChunkSize) + if stopOffset%up.ChunkSize > 0 { + stopLogicChunkIndex += 1 + } + up.activeReadChunksLock.Lock() + defer up.activeReadChunksLock.Unlock() + for i := startLogicChunkIndex; i < stopLogicChunkIndex; i++ { + if count, found := up.activeReadChunks[i]; found { + if count == 1 { + delete(up.activeReadChunks, i) + } else { + up.activeReadChunks[i] = count - 1 + } + } + } +} + +func (up *UploadPipeline) IsLocked(logicChunkIndex LogicChunkIndex) bool { + up.activeReadChunksLock.Lock() + defer up.activeReadChunksLock.Unlock() + if count, found := up.activeReadChunks[logicChunkIndex]; found { + return count > 0 + } + return false +} + +func (up *UploadPipeline) waitForCurrentWritersToComplete() { + up.uploaderCountCond.L.Lock() + t := int32(100) + for { + t = atomic.LoadInt32(&up.uploaderCount) + if t <= 0 { + break + } + up.uploaderCountCond.Wait() + } + up.uploaderCountCond.L.Unlock() +} diff --git a/weed/mount/page_writer/upload_pipeline_test.go b/weed/mount/page_writer/upload_pipeline_test.go new file mode 100644 index 000000000..816fb228b --- /dev/null +++ b/weed/mount/page_writer/upload_pipeline_test.go @@ -0,0 +1,47 @@ +package page_writer + +import ( + "github.com/chrislusf/seaweedfs/weed/util" + "testing" +) + +func TestUploadPipeline(t *testing.T) { + + uploadPipeline := NewUploadPipeline(nil, 2*1024*1024, nil, 16) + + writeRange(uploadPipeline, 0, 131072) + writeRange(uploadPipeline, 131072, 262144) + writeRange(uploadPipeline, 262144, 1025536) + + confirmRange(t, uploadPipeline, 0, 1025536) + + writeRange(uploadPipeline, 1025536, 1296896) + + confirmRange(t, uploadPipeline, 1025536, 1296896) + + writeRange(uploadPipeline, 1296896, 2162688) + + confirmRange(t, uploadPipeline, 1296896, 2162688) + + confirmRange(t, uploadPipeline, 1296896, 2162688) +} + +// startOff and stopOff must be divided by 4 +func writeRange(uploadPipeline *UploadPipeline, startOff, stopOff int64) { + p := make([]byte, 4) + for i := startOff / 4; i < stopOff/4; i += 4 { + util.Uint32toBytes(p, uint32(i)) + uploadPipeline.SaveDataAt(p, i) + } +} + +func confirmRange(t *testing.T, uploadPipeline *UploadPipeline, startOff, stopOff int64) { + p := make([]byte, 4) + for i := startOff; i < stopOff/4; i += 4 { + uploadPipeline.MaybeReadDataAt(p, i) + x := util.BytesToUint32(p) + if x != uint32(i) { + t.Errorf("expecting %d found %d at offset [%d,%d)", i, x, i, i+4) + } + } +} diff --git a/weed/mount/weedfs.go b/weed/mount/weedfs.go index 1e9f07df9..b7f50cd13 100644 --- a/weed/mount/weedfs.go +++ b/weed/mount/weedfs.go @@ -2,14 +2,18 @@ package mount import ( "context" + "github.com/chrislusf/seaweedfs/weed/filer" "github.com/chrislusf/seaweedfs/weed/filesys/meta_cache" "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/storage/types" "github.com/chrislusf/seaweedfs/weed/util" + "github.com/chrislusf/seaweedfs/weed/util/chunk_cache" "github.com/chrislusf/seaweedfs/weed/util/grace" + "github.com/chrislusf/seaweedfs/weed/wdclient" "github.com/hanwen/go-fuse/v2/fuse" "google.golang.org/grpc" + "math/rand" "os" "path" "path/filepath" @@ -54,13 +58,15 @@ type WFS struct { // follow https://github.com/hanwen/go-fuse/blob/master/fuse/api.go fuse.RawFileSystem fs.Inode - option *Option - metaCache *meta_cache.MetaCache - stats statsCache - root Directory - signature int32 - inodeToPath *InodeToPath - fhmap *FileHandleToInode + option *Option + metaCache *meta_cache.MetaCache + stats statsCache + root Directory + chunkCache *chunk_cache.TieredChunkCache + signature int32 + concurrentWriters *util.LimitedConcurrentExecutor + inodeToPath *InodeToPath + fhmap *FileHandleToInode } func NewSeaweedFileSystem(option *Option) *WFS { @@ -79,12 +85,21 @@ func NewSeaweedFileSystem(option *Option) *WFS { parent: nil, } + wfs.option.filerIndex = rand.Intn(len(option.FilerAddresses)) + wfs.option.setupUniqueCacheDirectory() + if option.CacheSizeMB > 0 { + wfs.chunkCache = chunk_cache.NewTieredChunkCache(256, option.getUniqueCacheDir(), option.CacheSizeMB, 1024*1024) + } + wfs.metaCache = meta_cache.NewMetaCache(path.Join(option.getUniqueCacheDir(), "meta"), util.FullPath(option.FilerMountRootPath), option.UidGidMapper, func(filePath util.FullPath, entry *filer_pb.Entry) { }) grace.OnInterrupt(func() { wfs.metaCache.Shutdown() }) + if wfs.option.ConcurrentWriters > 0 { + wfs.concurrentWriters = util.NewLimitedConcurrentExecutor(wfs.option.ConcurrentWriters) + } return wfs } @@ -132,6 +147,19 @@ func (wfs *WFS) maybeLoadEntry(fullpath util.FullPath) (*filer_pb.Entry, fuse.St return cachedEntry.ToProtoEntry(), fuse.OK } +func (wfs *WFS) LookupFn() wdclient.LookupFileIdFunctionType { + if wfs.option.VolumeServerAccess == "filerProxy" { + return func(fileId string) (targetUrls []string, err error) { + return []string{"http://" + wfs.getCurrentFiler().ToHttpAddress() + "/?proxyChunkId=" + fileId}, nil + } + } + return filer.LookupFn(wfs) +} + +func (wfs *WFS) getCurrentFiler() pb.ServerAddress { + return wfs.option.FilerAddresses[wfs.option.filerIndex] +} + func (option *Option) setupUniqueCacheDirectory() { cacheUniqueId := util.Md5String([]byte(option.MountDirectory + string(option.FilerAddresses[0]) + option.FilerMountRootPath + util.Version()))[0:8] option.uniqueCacheDir = path.Join(option.CacheDir, cacheUniqueId) diff --git a/weed/mount/weedfs_file_read.go b/weed/mount/weedfs_file_read.go index d9ad1f4ea..00143a5b4 100644 --- a/weed/mount/weedfs_file_read.go +++ b/weed/mount/weedfs_file_read.go @@ -1,7 +1,9 @@ package mount import ( + "github.com/chrislusf/seaweedfs/weed/glog" "github.com/hanwen/go-fuse/v2/fuse" + "io" ) /** @@ -29,6 +31,29 @@ import ( * @param off offset to read from * @param fi file information */ -func (wfs *WFS) Read(cancel <-chan struct{}, in *fuse.ReadIn, buf []byte) (fuse.ReadResult, fuse.Status) { - return nil, fuse.ENOSYS +func (wfs *WFS) Read(cancel <-chan struct{}, in *fuse.ReadIn, buff []byte) (fuse.ReadResult, fuse.Status) { + fh := wfs.GetHandle(FileHandleId(in.Fh)) + if fh == nil { + return nil, fuse.ENOENT + } + + offset := int64(in.Offset) + fh.lockForRead(offset, len(buff)) + defer fh.unlockForRead(offset, len(buff)) + + totalRead, err := fh.readFromChunks(buff, offset) + if err == nil || err == io.EOF { + maxStop := fh.readFromDirtyPages(buff, offset) + totalRead = max(maxStop-offset, totalRead) + } + if err == io.EOF { + err = nil + } + + if err != nil { + glog.Warningf("file handle read %s %d: %v", fh.FullPath(), totalRead, err) + return nil, fuse.EIO + } + + return fuse.ReadResultData(buff[:totalRead]), fuse.OK } diff --git a/weed/mount/weedfs_filehandle.go b/weed/mount/weedfs_filehandle.go index 551394262..03f72282e 100644 --- a/weed/mount/weedfs_filehandle.go +++ b/weed/mount/weedfs_filehandle.go @@ -5,7 +5,7 @@ import "github.com/hanwen/go-fuse/v2/fuse" func (wfs *WFS) AcquireHandle(inode uint64, uid, gid uint32) (fileHandle *FileHandle, code fuse.Status) { _, entry, status := wfs.maybeReadEntry(inode) if status == fuse.OK { - fileHandle = wfs.fhmap.GetFileHandle(inode) + fileHandle = wfs.fhmap.AcquireFileHandle(wfs, inode, entry) fileHandle.entry = entry } return @@ -14,3 +14,7 @@ func (wfs *WFS) AcquireHandle(inode uint64, uid, gid uint32) (fileHandle *FileHa func (wfs *WFS) ReleaseHandle(handleId FileHandleId) { wfs.fhmap.ReleaseByHandle(handleId) } + +func (wfs *WFS) GetHandle(handleId FileHandleId) *FileHandle { + return wfs.fhmap.GetFileHandle(handleId) +} diff --git a/weed/mount/weedfs_write.go b/weed/mount/weedfs_write.go new file mode 100644 index 000000000..723ce9c34 --- /dev/null +++ b/weed/mount/weedfs_write.go @@ -0,0 +1,84 @@ +package mount + +import ( + "context" + "fmt" + "io" + + "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/operation" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/security" + "github.com/chrislusf/seaweedfs/weed/util" +) + +func (wfs *WFS) saveDataAsChunk(fullPath util.FullPath) filer.SaveDataAsChunkFunctionType { + + return func(reader io.Reader, filename string, offset int64) (chunk *filer_pb.FileChunk, collection, replication string, err error) { + var fileId, host string + var auth security.EncodedJwt + + if err := wfs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + return util.Retry("assignVolume", func() error { + request := &filer_pb.AssignVolumeRequest{ + Count: 1, + Replication: wfs.option.Replication, + Collection: wfs.option.Collection, + TtlSec: wfs.option.TtlSec, + DiskType: string(wfs.option.DiskType), + DataCenter: wfs.option.DataCenter, + Path: string(fullPath), + } + + resp, err := client.AssignVolume(context.Background(), request) + if err != nil { + glog.V(0).Infof("assign volume failure %v: %v", request, err) + return err + } + if resp.Error != "" { + return fmt.Errorf("assign volume failure %v: %v", request, resp.Error) + } + + fileId, auth = resp.FileId, security.EncodedJwt(resp.Auth) + loc := resp.Location + host = wfs.AdjustedUrl(loc) + collection, replication = resp.Collection, resp.Replication + + return nil + }) + }); err != nil { + return nil, "", "", fmt.Errorf("filerGrpcAddress assign volume: %v", err) + } + + fileUrl := fmt.Sprintf("http://%s/%s", host, fileId) + if wfs.option.VolumeServerAccess == "filerProxy" { + fileUrl = fmt.Sprintf("http://%s/?proxyChunkId=%s", wfs.getCurrentFiler(), fileId) + } + uploadOption := &operation.UploadOption{ + UploadUrl: fileUrl, + Filename: filename, + Cipher: wfs.option.Cipher, + IsInputCompressed: false, + MimeType: "", + PairMap: nil, + Jwt: auth, + } + uploadResult, err, data := operation.Upload(reader, uploadOption) + if err != nil { + glog.V(0).Infof("upload data %v to %s: %v", filename, fileUrl, err) + return nil, "", "", fmt.Errorf("upload data: %v", err) + } + if uploadResult.Error != "" { + glog.V(0).Infof("upload failure %v to %s: %v", filename, fileUrl, err) + return nil, "", "", fmt.Errorf("upload result: %v", uploadResult.Error) + } + + if offset == 0 { + wfs.chunkCache.SetChunk(fileId, data) + } + + chunk = uploadResult.ToPbFileChunk(fileId, offset) + return chunk, collection, replication, nil + } +}