diff --git a/weed/storage/memory_map_windows.go b/weed/storage/memory_map/memory_map_windows.go similarity index 83% rename from weed/storage/memory_map_windows.go rename to weed/storage/memory_map/memory_map_windows.go index a6c96caaf..e293be515 100644 --- a/weed/storage/memory_map_windows.go +++ b/weed/storage/memory_map/memory_map_windows.go @@ -1,6 +1,6 @@ // +build windows -package storage +package memory_map import ( "reflect" @@ -13,19 +13,20 @@ import ( type DWORD = uint32 type WORD = uint16 -type memory_buffer struct { +type MemoryBuffer struct { aligned_length uint64 length uint64 aligned_ptr uintptr ptr uintptr - buffer []byte + Buffer []byte } -type memory_map struct { +type MemoryMap struct { file_handle windows.Handle file_memory_map_handle windows.Handle - write_map_views []memory_buffer + write_map_views []MemoryBuffer max_length uint64 + End_Of_File int64 // read_map_views []memory_buffer } @@ -33,13 +34,15 @@ var ( procGetSystemInfo = syscall.NewLazyDLL("kernel32.dll").NewProc("GetSystemInfo") ) +var FileMemoryMap = make(map[string]MemoryMap) + var system_info, err = getSystemInfo() var chunk_size = uint64(system_info.dwAllocationGranularity) * 512 -func CreateMemoryMap(hFile windows.Handle, maxlength uint64) memory_map { +func CreateMemoryMap(hFile windows.Handle, maxlength uint64) MemoryMap { - mem_map := memory_map{} + mem_map := MemoryMap{} maxlength_high := uint32(maxlength >> 32) maxlength_low := uint32(maxlength & 0xFFFFFFFF) file_memory_map_handle, err := windows.CreateFileMapping(hFile, nil, windows.PAGE_READWRITE, maxlength_high, maxlength_low, nil) @@ -48,12 +51,13 @@ func CreateMemoryMap(hFile windows.Handle, maxlength uint64) memory_map { mem_map.file_handle = hFile mem_map.file_memory_map_handle = file_memory_map_handle mem_map.max_length = maxlength + mem_map.End_Of_File = -1 } return mem_map } -func DeleteFileAndMemoryMap(mem_map memory_map) { +func DeleteFileAndMemoryMap(mem_map MemoryMap) { windows.CloseHandle(mem_map.file_memory_map_handle) windows.CloseHandle(mem_map.file_handle) @@ -72,7 +76,7 @@ func min(x, y uint64) uint64 { return y } -func WriteMemory(mem_map memory_map, offset uint64, length uint64, data []byte) { +func WriteMemory(mem_map MemoryMap, offset uint64, length uint64, data []byte) { for { if ((offset+length)/chunk_size)+1 > uint64(len(mem_map.write_map_views)) { @@ -89,7 +93,7 @@ func WriteMemory(mem_map memory_map, offset uint64, length uint64, data []byte) for { write_end := min(remaining_length, chunk_size) - copy(mem_map.write_map_views[slice_index].buffer[slice_offset:write_end], data[data_offset:]) + copy(mem_map.write_map_views[slice_index].Buffer[slice_offset:write_end], data[data_offset:]) remaining_length -= (write_end - slice_offset) data_offset += (write_end - slice_offset) @@ -100,23 +104,27 @@ func WriteMemory(mem_map memory_map, offset uint64, length uint64, data []byte) break } } + + if mem_map.End_Of_File < int64(offset+length) { + mem_map.End_Of_File = int64(offset + length) + } } -func ReadMemory(mem_map memory_map, offset uint64, length uint64) (memory_buffer, error) { +func ReadMemory(mem_map MemoryMap, offset uint64, length uint64) (MemoryBuffer, error) { return allocate(mem_map.file_memory_map_handle, offset, length, false) } -func ReleaseMemory(mem_buffer memory_buffer) { +func ReleaseMemory(mem_buffer MemoryBuffer) { windows.UnmapViewOfFile(mem_buffer.aligned_ptr) mem_buffer.ptr = 0 mem_buffer.aligned_ptr = 0 mem_buffer.length = 0 mem_buffer.aligned_length = 0 - mem_buffer.buffer = nil + mem_buffer.Buffer = nil } -func allocateChunk(mem_map memory_map) { +func allocateChunk(mem_map MemoryMap) { start := uint64(len(mem_map.write_map_views)-1) * chunk_size mem_buffer, err := allocate(mem_map.file_memory_map_handle, start, chunk_size, true) @@ -126,9 +134,9 @@ func allocateChunk(mem_map memory_map) { } } -func allocate(hMapFile windows.Handle, offset uint64, length uint64, write bool) (memory_buffer, error) { +func allocate(hMapFile windows.Handle, offset uint64, length uint64, write bool) (MemoryBuffer, error) { - mem_buffer := memory_buffer{} + mem_buffer := MemoryBuffer{} dwSysGran := system_info.dwAllocationGranularity @@ -160,7 +168,7 @@ func allocate(hMapFile windows.Handle, offset uint64, length uint64, write bool) mem_buffer.ptr = addr_ptr + uintptr(diff) mem_buffer.length = length - slice_header := (*reflect.SliceHeader)(unsafe.Pointer(&mem_buffer.buffer)) + slice_header := (*reflect.SliceHeader)(unsafe.Pointer(&mem_buffer.Buffer)) slice_header.Data = addr_ptr + uintptr(diff) slice_header.Len = int(length) slice_header.Cap = int(length) diff --git a/weed/storage/needle/needle_read_write.go b/weed/storage/needle/needle_read_write.go index 75aefdf16..828447bbe 100644 --- a/weed/storage/needle/needle_read_write.go +++ b/weed/storage/needle/needle_read_write.go @@ -11,6 +11,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/glog" . "github.com/chrislusf/seaweedfs/weed/storage/types" "github.com/chrislusf/seaweedfs/weed/util" + "github.com/joeslay/seaweedfs/weed/storage/memory_map" ) const ( @@ -29,39 +30,25 @@ func (n *Needle) DiskSize(version Version) int64 { return GetActualSize(n.Size, version) } -func (n *Needle) Append(w *os.File, version Version) (offset uint64, size uint32, actualSize int64, err error) { - if end, e := w.Seek(0, io.SeekEnd); e == nil { - defer func(w *os.File, off int64) { - if err != nil { - if te := w.Truncate(end); te != nil { - glog.V(0).Infof("Failed to truncate %s back to %d with error: %v", w.Name(), end, te) - } - } - }(w, end) - offset = uint64(end) - } else { - err = fmt.Errorf("Cannot Read Current Volume Position: %v", e) - return - } +func (n *Needle) prepareWriteBuffer(version Version) ([]byte, uint32, int64, error) { + + writeBytes := make([]byte, 0) + switch version { case Version1: header := make([]byte, NeedleHeaderSize) CookieToBytes(header[0:CookieSize], n.Cookie) NeedleIdToBytes(header[CookieSize:CookieSize+NeedleIdSize], n.Id) n.Size = uint32(len(n.Data)) - size = n.Size util.Uint32toBytes(header[CookieSize+NeedleIdSize:CookieSize+NeedleIdSize+SizeSize], n.Size) - if _, err = w.Write(header); err != nil { - return - } - if _, err = w.Write(n.Data); err != nil { - return - } - actualSize = NeedleHeaderSize + int64(n.Size) + size := n.Size + actualSize := NeedleHeaderSize + int64(n.Size) + writeBytes = append(writeBytes, header...) + writeBytes = append(writeBytes, n.Data...) padding := PaddingLength(n.Size, version) util.Uint32toBytes(header[0:NeedleChecksumSize], n.Checksum.Value()) - _, err = w.Write(header[0 : NeedleChecksumSize+padding]) - return + writeBytes = append(writeBytes, header[0:NeedleChecksumSize+padding]...) + return writeBytes, size, actualSize, nil case Version2, Version3: header := make([]byte, NeedleHeaderSize+TimestampSize) // adding timestamp to reuse it and avoid extra allocation CookieToBytes(header[0:CookieSize], n.Cookie) @@ -92,82 +79,103 @@ func (n *Needle) Append(w *os.File, version Version) (offset uint64, size uint32 } else { n.Size = 0 } - size = n.DataSize util.Uint32toBytes(header[CookieSize+NeedleIdSize:CookieSize+NeedleIdSize+SizeSize], n.Size) - if _, err = w.Write(header[0:NeedleHeaderSize]); err != nil { - return - } + writeBytes = append(writeBytes, header[0:NeedleHeaderSize]...) if n.DataSize > 0 { util.Uint32toBytes(header[0:4], n.DataSize) - if _, err = w.Write(header[0:4]); err != nil { - return - } - if _, err = w.Write(n.Data); err != nil { - return - } + writeBytes = append(writeBytes, header[0:4]...) + writeBytes = append(writeBytes, n.Data...) util.Uint8toBytes(header[0:1], n.Flags) - if _, err = w.Write(header[0:1]); err != nil { - return - } + writeBytes = append(writeBytes, header[0:1]...) if n.HasName() { util.Uint8toBytes(header[0:1], n.NameSize) - if _, err = w.Write(header[0:1]); err != nil { - return - } - if _, err = w.Write(n.Name[:n.NameSize]); err != nil { - return - } + writeBytes = append(writeBytes, header[0:1]...) + writeBytes = append(writeBytes, n.Name[:n.NameSize]...) } if n.HasMime() { util.Uint8toBytes(header[0:1], n.MimeSize) - if _, err = w.Write(header[0:1]); err != nil { - return - } - if _, err = w.Write(n.Mime); err != nil { - return - } + writeBytes = append(writeBytes, header[0:1]...) + writeBytes = append(writeBytes, n.Mime...) } if n.HasLastModifiedDate() { util.Uint64toBytes(header[0:8], n.LastModified) - if _, err = w.Write(header[8-LastModifiedBytesLength : 8]); err != nil { - return - } + writeBytes = append(writeBytes, header[8-LastModifiedBytesLength:8]...) } if n.HasTtl() && n.Ttl != nil { n.Ttl.ToBytes(header[0:TtlBytesLength]) - if _, err = w.Write(header[0:TtlBytesLength]); err != nil { - return - } + writeBytes = append(writeBytes, header[0:TtlBytesLength]...) } if n.HasPairs() { util.Uint16toBytes(header[0:2], n.PairsSize) - if _, err = w.Write(header[0:2]); err != nil { - return - } - if _, err = w.Write(n.Pairs); err != nil { - return - } + writeBytes = append(writeBytes, header[0:2]...) + writeBytes = append(writeBytes, n.Pairs...) } } padding := PaddingLength(n.Size, version) util.Uint32toBytes(header[0:NeedleChecksumSize], n.Checksum.Value()) if version == Version2 { - _, err = w.Write(header[0 : NeedleChecksumSize+padding]) + writeBytes = append(writeBytes, header[0:NeedleChecksumSize+padding]...) } else { // version3 util.Uint64toBytes(header[NeedleChecksumSize:NeedleChecksumSize+TimestampSize], n.AppendAtNs) - _, err = w.Write(header[0 : NeedleChecksumSize+TimestampSize+padding]) + writeBytes = append(writeBytes, header[0:NeedleChecksumSize+TimestampSize+padding]...) } - return offset, n.DataSize, GetActualSize(n.Size, version), err + return writeBytes, n.DataSize, GetActualSize(n.Size, version), nil } - return 0, 0, 0, fmt.Errorf("Unsupported Version! (%d)", version) + + return writeBytes, 0, 0, fmt.Errorf("Unsupported Version! (%d)", version) +} + +func (n *Needle) Append(w *os.File, version Version) (offset uint64, size uint32, actualSize int64, err error) { + + mem_map, exists := memory_map.FileMemoryMap[w.Name()] + if !exists { + if end, e := w.Seek(0, io.SeekEnd); e == nil { + defer func(w *os.File, off int64) { + if err != nil { + if te := w.Truncate(end); te != nil { + glog.V(0).Infof("Failed to truncate %s back to %d with error: %v", w.Name(), end, te) + } + } + }(w, end) + offset = uint64(end) + } else { + err = fmt.Errorf("Cannot Read Current Volume Position: %v", e) + return + } + } else { + offset = uint64(mem_map.End_Of_File + 1) + } + + bytesToWrite, size, actualSize, err := n.prepareWriteBuffer(version) + + if err == nil { + if exists { + memory_map.WriteMemory(mem_map, offset, uint64(len(bytesToWrite)), bytesToWrite) + } else { + _, err = w.Write(bytesToWrite) + } + } + + return offset, size, actualSize, err } func ReadNeedleBlob(r *os.File, offset int64, size uint32, version Version) (dataSlice []byte, err error) { - dataSlice = make([]byte, int(GetActualSize(size, version))) - _, err = r.ReadAt(dataSlice, offset) - return dataSlice, err + + dataSize := GetActualSize(size, version) + dataSlice = make([]byte, dataSize) + + mem_map, exists := memory_map.FileMemoryMap[r.Name()] + if exists { + mem_buffer, err := memory_map.ReadMemory(mem_map, uint64(offset), uint64(dataSize)) + copy(dataSlice, mem_buffer.Buffer) + memory_map.ReleaseMemory(mem_buffer) + return dataSlice, err + } else { + _, err = r.ReadAt(dataSlice, offset) + return dataSlice, err + } } // ReadBytes hydrates the needle from the bytes buffer, with only n.Id is set. @@ -280,14 +288,27 @@ func ReadNeedleHeader(r *os.File, version Version, offset int64) (n *Needle, byt n = new(Needle) if version == Version1 || version == Version2 || version == Version3 { bytes = make([]byte, NeedleHeaderSize) - var count int - count, err = r.ReadAt(bytes, offset) - if count <= 0 || err != nil { - return nil, bytes, 0, err + + mem_map, exists := memory_map.FileMemoryMap[r.Name()] + if exists { + mem_buffer, err := memory_map.ReadMemory(mem_map, uint64(offset), NeedleHeaderSize) + copy(bytes, mem_buffer.Buffer) + memory_map.ReleaseMemory(mem_buffer) + + if err != nil { + return nil, bytes, 0, err + } + } else { + var count int + count, err = r.ReadAt(bytes, offset) + if count <= 0 || err != nil { + return nil, bytes, 0, err + } } n.ParseNeedleHeader(bytes) bodyLength = NeedleBodyLength(n.Size, version) } + return } diff --git a/weed/storage/volume_read_write.go b/weed/storage/volume_read_write.go index ae05331a4..c7ba28e53 100644 --- a/weed/storage/volume_read_write.go +++ b/weed/storage/volume_read_write.go @@ -11,6 +11,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/storage/needle" . "github.com/chrislusf/seaweedfs/weed/storage/types" + "github.com/joeslay/seaweedfs/weed/storage/memory_map" ) var ErrorNotFound = errors.New("not found") @@ -48,6 +49,11 @@ func (v *Volume) Destroy() (err error) { err = fmt.Errorf("volume %d is compacting", v.Id) return } + mem_map, exists := memory_map.FileMemoryMap[v.FileName()] + if exists { + memory_map.DeleteFileAndMemoryMap(mem_map) + } + v.Close() os.Remove(v.FileName() + ".dat") os.Remove(v.FileName() + ".idx") diff --git a/weed/storage/volume_super_block.go b/weed/storage/volume_super_block.go index 164c887e1..9ef615cb3 100644 --- a/weed/storage/volume_super_block.go +++ b/weed/storage/volume_super_block.go @@ -4,6 +4,8 @@ import ( "fmt" "os" + "github.com/joeslay/seaweedfs/weed/storage/memory_map" + "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/master_pb" "github.com/chrislusf/seaweedfs/weed/storage/needle" @@ -70,24 +72,34 @@ func (s *SuperBlock) Bytes() []byte { } func (v *Volume) maybeWriteSuperBlock() error { - stat, e := v.dataFile.Stat() - if e != nil { - glog.V(0).Infof("failed to stat datafile %s: %v", v.dataFile.Name(), e) - return e - } - if stat.Size() == 0 { - v.SuperBlock.version = needle.CurrentVersion - _, e = v.dataFile.Write(v.SuperBlock.Bytes()) - if e != nil && os.IsPermission(e) { - //read-only, but zero length - recreate it! - if v.dataFile, e = os.Create(v.dataFile.Name()); e == nil { - if _, e = v.dataFile.Write(v.SuperBlock.Bytes()); e == nil { - v.readOnly = false + + mem_map, exists := memory_map.FileMemoryMap[v.FileName()] + if exists { + if mem_map.End_Of_File == -1 { + v.SuperBlock.version = needle.CurrentVersion + memory_map.WriteMemory(mem_map, 0, uint64(len(v.SuperBlock.Bytes())), v.SuperBlock.Bytes()) + } + return nil + } else { + stat, e := v.dataFile.Stat() + if e != nil { + glog.V(0).Infof("failed to stat datafile %s: %v", v.dataFile.Name(), e) + return e + } + if stat.Size() == 0 { + v.SuperBlock.version = needle.CurrentVersion + _, e = v.dataFile.Write(v.SuperBlock.Bytes()) + if e != nil && os.IsPermission(e) { + //read-only, but zero length - recreate it! + if v.dataFile, e = os.Create(v.dataFile.Name()); e == nil { + if _, e = v.dataFile.Write(v.SuperBlock.Bytes()); e == nil { + v.readOnly = false + } } } } + return e } - return e } func (v *Volume) readSuperBlock() (err error) { @@ -97,15 +109,28 @@ func (v *Volume) readSuperBlock() (err error) { // ReadSuperBlock reads from data file and load it into volume's super block func ReadSuperBlock(dataFile *os.File) (superBlock SuperBlock, err error) { - if _, err = dataFile.Seek(0, 0); err != nil { - err = fmt.Errorf("cannot seek to the beginning of %s: %v", dataFile.Name(), err) - return - } + header := make([]byte, _SuperBlockSize) - if _, e := dataFile.Read(header); e != nil { - err = fmt.Errorf("cannot read volume %s super block: %v", dataFile.Name(), e) - return + mem_map, exists := memory_map.FileMemoryMap[dataFile.Name()] + if exists { + mem_buffer, e := memory_map.ReadMemory(mem_map, 0, _SuperBlockSize) + if err != nil { + err = fmt.Errorf("cannot read volume %s super block: %v", dataFile.Name(), e) + return + } + copy(header, mem_buffer.Buffer) + memory_map.ReleaseMemory(mem_buffer) + } else { + if _, err = dataFile.Seek(0, 0); err != nil { + err = fmt.Errorf("cannot seek to the beginning of %s: %v", dataFile.Name(), err) + return + } + if _, e := dataFile.Read(header); e != nil { + err = fmt.Errorf("cannot read volume %s super block: %v", dataFile.Name(), e) + return + } } + superBlock.version = needle.Version(header[0]) if superBlock.ReplicaPlacement, err = NewReplicaPlacementFromByte(header[1]); err != nil { err = fmt.Errorf("cannot read replica type: %s", err.Error())