You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							157 lines
						
					
					
						
							4.2 KiB
						
					
					
				
			
		
		
		
			
			
			
		
		
	
	
							157 lines
						
					
					
						
							4.2 KiB
						
					
					
				| package filesys | |
| 
 | |
| import ( | |
| 	"fmt" | |
| 	"github.com/chrislusf/seaweedfs/weed/glog" | |
| 	"github.com/chrislusf/seaweedfs/weed/pb/filer_pb" | |
| 	"io" | |
| 	"os" | |
| 	"sync" | |
| 	"time" | |
| ) | |
| 
 | |
| type TempFileDirtyPages struct { | |
| 	f                *File | |
| 	tf               *os.File | |
| 	writtenIntervals *WrittenContinuousIntervals | |
| 	writeOnly        bool | |
| 	writeWaitGroup   sync.WaitGroup | |
| 	pageAddLock      sync.Mutex | |
| 	chunkAddLock     sync.Mutex | |
| 	lastErr          error | |
| 	collection       string | |
| 	replication      string | |
| } | |
| 
 | |
| func newTempFileDirtyPages(file *File, writeOnly bool) *TempFileDirtyPages { | |
| 
 | |
| 	tempFile := &TempFileDirtyPages{ | |
| 		f:                file, | |
| 		writeOnly:        writeOnly, | |
| 		writtenIntervals: &WrittenContinuousIntervals{}, | |
| 	} | |
| 
 | |
| 	return tempFile | |
| } | |
| 
 | |
| func (pages *TempFileDirtyPages) AddPage(offset int64, data []byte) { | |
| 
 | |
| 	pages.pageAddLock.Lock() | |
| 	defer pages.pageAddLock.Unlock() | |
| 
 | |
| 	if pages.tf == nil { | |
| 		tf, err := os.CreateTemp(pages.f.wfs.option.getTempFilePageDir(), "") | |
| 		if err != nil { | |
| 			glog.Errorf("create temp file: %v", err) | |
| 			pages.lastErr = err | |
| 			return | |
| 		} | |
| 		pages.tf = tf | |
| 		pages.writtenIntervals.tempFile = tf | |
| 		pages.writtenIntervals.lastOffset = 0 | |
| 	} | |
| 
 | |
| 	writtenOffset := pages.writtenIntervals.lastOffset | |
| 	dataSize := int64(len(data)) | |
| 
 | |
| 	// glog.V(4).Infof("%s AddPage %v at %d [%d,%d)", pages.f.fullpath(), pages.tf.Name(), writtenOffset, offset, offset+dataSize) | |
|  | |
| 	if _, err := pages.tf.WriteAt(data, writtenOffset); err != nil { | |
| 		pages.lastErr = err | |
| 	} else { | |
| 		pages.writtenIntervals.AddInterval(writtenOffset, len(data), offset) | |
| 		pages.writtenIntervals.lastOffset += dataSize | |
| 	} | |
| 
 | |
| 	// pages.writtenIntervals.debug() | |
|  | |
| 	return | |
| } | |
| 
 | |
| func (pages *TempFileDirtyPages) FlushData() error { | |
| 
 | |
| 	pages.saveExistingPagesToStorage() | |
| 	pages.writeWaitGroup.Wait() | |
| 	if pages.lastErr != nil { | |
| 		return fmt.Errorf("flush data: %v", pages.lastErr) | |
| 	} | |
| 	pages.pageAddLock.Lock() | |
| 	defer pages.pageAddLock.Unlock() | |
| 	if pages.tf != nil { | |
| 
 | |
| 		pages.writtenIntervals.tempFile = nil | |
| 		pages.writtenIntervals.lists = nil | |
| 
 | |
| 		pages.tf.Close() | |
| 		os.Remove(pages.tf.Name()) | |
| 		pages.tf = nil | |
| 	} | |
| 	return nil | |
| } | |
| 
 | |
| func (pages *TempFileDirtyPages) saveExistingPagesToStorage() { | |
| 
 | |
| 	pageSize := pages.f.wfs.option.ChunkSizeLimit | |
| 
 | |
| 	// glog.V(4).Infof("%v saveExistingPagesToStorage %d lists", pages.f.Name, len(pages.writtenIntervals.lists)) | |
|  | |
| 	for _, list := range pages.writtenIntervals.lists { | |
| 		listStopOffset := list.Offset() + list.Size() | |
| 		for uploadedOffset := int64(0); uploadedOffset < listStopOffset; uploadedOffset += pageSize { | |
| 			start, stop := max(list.Offset(), uploadedOffset), min(listStopOffset, uploadedOffset+pageSize) | |
| 			if start >= stop { | |
| 				continue | |
| 			} | |
| 			// glog.V(4).Infof("uploading %v [%d,%d) %d/%d", pages.f.Name, start, stop, i, len(pages.writtenIntervals.lists)) | |
| 			pages.saveToStorage(list.ToReader(start, stop), start, stop-start) | |
| 		} | |
| 	} | |
| 
 | |
| } | |
| 
 | |
| func (pages *TempFileDirtyPages) saveToStorage(reader io.Reader, offset int64, size int64) { | |
| 
 | |
| 	mtime := time.Now().UnixNano() | |
| 	pages.writeWaitGroup.Add(1) | |
| 	writer := func() { | |
| 		defer pages.writeWaitGroup.Done() | |
| 
 | |
| 		reader = io.LimitReader(reader, size) | |
| 		chunk, collection, replication, err := pages.f.wfs.saveDataAsChunk(pages.f.fullpath(), pages.writeOnly)(reader, pages.f.Name, offset) | |
| 		if err != nil { | |
| 			glog.V(0).Infof("%s saveToStorage [%d,%d): %v", pages.f.fullpath(), offset, offset+size, err) | |
| 			pages.lastErr = err | |
| 			return | |
| 		} | |
| 		chunk.Mtime = mtime | |
| 		pages.collection, pages.replication = collection, replication | |
| 		pages.chunkAddLock.Lock() | |
| 		defer pages.chunkAddLock.Unlock() | |
| 		pages.f.addChunks([]*filer_pb.FileChunk{chunk}) | |
| 		glog.V(3).Infof("%s saveToStorage %s [%d,%d)", pages.f.fullpath(), chunk.FileId, offset, offset+size) | |
| 	} | |
| 
 | |
| 	if pages.f.wfs.concurrentWriters != nil { | |
| 		pages.f.wfs.concurrentWriters.Execute(writer) | |
| 	} else { | |
| 		go writer() | |
| 	} | |
| } | |
| 
 | |
| func (pages *TempFileDirtyPages) ReadDirtyDataAt(data []byte, startOffset int64) (maxStop int64) { | |
| 	return pages.writtenIntervals.ReadDataAt(data, startOffset) | |
| } | |
| 
 | |
| func (pages *TempFileDirtyPages) GetStorageOptions() (collection, replication string) { | |
| 	return pages.collection, pages.replication | |
| } | |
| 
 | |
| func (pages *TempFileDirtyPages) SetWriteOnly(writeOnly bool) { | |
| 	if pages.writeOnly { | |
| 		pages.writeOnly = writeOnly | |
| 	} | |
| } | |
| 
 | |
| func (pages *TempFileDirtyPages) GetWriteOnly() (writeOnly bool) { | |
| 	return pages.writeOnly | |
| }
 |