13 changed files with 9 additions and 264 deletions
			
			
		- 
					1weed/command/server.go
- 
					3weed/command/volume.go
- 
					4weed/server/volume_server.go
- 
					1weed/server/volume_server_handlers_read.go
- 
					5weed/server/volume_server_handlers_sync.go
- 
					4weed/server/volume_server_handlers_write.go
- 
					2weed/storage/needle.go
- 
					75weed/storage/needle_byte_cache.go
- 
					5weed/storage/needle_read_write.go
- 
					2weed/storage/volume_read_write.go
- 
					3weed/storage/volume_vacuum.go
- 
					127weed/util/bytes_pool.go
- 
					41weed/util/bytes_pool_test.go
| @ -1,80 +1,11 @@ | |||
| package storage | |||
| 
 | |||
| import ( | |||
| 	"fmt" | |||
| 	"os" | |||
| 	"sync/atomic" | |||
| 
 | |||
| 	"github.com/hashicorp/golang-lru" | |||
| 
 | |||
| 	"github.com/chrislusf/seaweedfs/weed/util" | |||
| ) | |||
| 
 | |||
| var ( | |||
| 	EnableBytesCache = true | |||
| 	bytesCache       *lru.Cache | |||
| 	bytesPool        *util.BytesPool | |||
| ) | |||
| 
 | |||
| /* | |||
| There are one level of caching, and one level of pooling. | |||
| 
 | |||
| In pooling, all []byte are fetched and returned to the pool bytesPool. | |||
| 
 | |||
| In caching, the string~[]byte mapping is cached | |||
| */ | |||
| func init() { | |||
| 	bytesPool = util.NewBytesPool() | |||
| 	bytesCache, _ = lru.NewWithEvict(512, func(key interface{}, value interface{}) { | |||
| 		value.(*Block).decreaseReference() | |||
| 	}) | |||
| } | |||
| 
 | |||
| type Block struct { | |||
| 	Bytes    []byte | |||
| 	refCount int32 | |||
| } | |||
| 
 | |||
| func (block *Block) decreaseReference() { | |||
| 	if atomic.AddInt32(&block.refCount, -1) == 0 { | |||
| 		bytesPool.Put(block.Bytes) | |||
| 	} | |||
| } | |||
| func (block *Block) increaseReference() { | |||
| 	atomic.AddInt32(&block.refCount, 1) | |||
| } | |||
| 
 | |||
| // get bytes from the LRU cache of []byte first, then from the bytes pool
 | |||
| // when []byte in LRU cache is evicted, it will be put back to the bytes pool
 | |||
| func getBytesForFileBlock(r *os.File, offset int64, readSize int) (dataSlice []byte, block *Block, err error) { | |||
| 	// check cache, return if found
 | |||
| 	cacheKey := fmt.Sprintf("%d:%d:%d", r.Fd(), offset>>3, readSize) | |||
| 	if EnableBytesCache { | |||
| 		if obj, found := bytesCache.Get(cacheKey); found { | |||
| 			block = obj.(*Block) | |||
| 			block.increaseReference() | |||
| 			dataSlice = block.Bytes[0:readSize] | |||
| 			return dataSlice, block, nil | |||
| 		} | |||
| 	} | |||
| 
 | |||
| 	// get the []byte from pool
 | |||
| 	b := bytesPool.Get(readSize) | |||
| 	// refCount = 2, one by the bytesCache, one by the actual needle object
 | |||
| 	block = &Block{Bytes: b, refCount: 2} | |||
| 	dataSlice = block.Bytes[0:readSize] | |||
| func getBytesForFileBlock(r *os.File, offset int64, readSize int) (dataSlice []byte, err error) { | |||
| 	dataSlice = make([]byte, readSize) | |||
| 	_, err = r.ReadAt(dataSlice, offset) | |||
| 	if EnableBytesCache { | |||
| 		bytesCache.Add(cacheKey, block) | |||
| 	} | |||
| 	return dataSlice, block, err | |||
| } | |||
| 
 | |||
| func (n *Needle) ReleaseMemory() { | |||
| 	if n.rawBlock != nil { | |||
| 		n.rawBlock.decreaseReference() | |||
| 	} | |||
| } | |||
| func ReleaseBytes(b []byte) { | |||
| 	bytesPool.Put(b) | |||
| 	return dataSlice, err | |||
| } | |||
| @ -1,127 +0,0 @@ | |||
| package util | |||
| 
 | |||
| import ( | |||
| 	"bytes" | |||
| 	"fmt" | |||
| 	"sync" | |||
| 	"sync/atomic" | |||
| 	"time" | |||
| ) | |||
| 
 | |||
| var ( | |||
| 	ChunkSizes = []int{ | |||
| 		1 << 4,  // index 0, 16 bytes, inclusive
 | |||
| 		1 << 6,  // index 1, 64 bytes
 | |||
| 		1 << 8,  // index 2, 256 bytes
 | |||
| 		1 << 10, // index 3, 1K bytes
 | |||
| 		1 << 12, // index 4, 4K bytes
 | |||
| 		1 << 14, // index 5, 16K bytes
 | |||
| 		1 << 16, // index 6, 64K bytes
 | |||
| 		1 << 18, // index 7, 256K bytes
 | |||
| 		1 << 20, // index 8, 1M bytes
 | |||
| 		1 << 22, // index 9, 4M bytes
 | |||
| 		1 << 24, // index 10, 16M bytes
 | |||
| 		1 << 26, // index 11, 64M bytes
 | |||
| 		1 << 28, // index 12, 128M bytes
 | |||
| 	} | |||
| 
 | |||
| 	_DEBUG = false | |||
| ) | |||
| 
 | |||
| type BytesPool struct { | |||
| 	chunkPools []*byteChunkPool | |||
| } | |||
| 
 | |||
| func NewBytesPool() *BytesPool { | |||
| 	var bp BytesPool | |||
| 	for _, size := range ChunkSizes { | |||
| 		bp.chunkPools = append(bp.chunkPools, newByteChunkPool(size)) | |||
| 	} | |||
| 	ret := &bp | |||
| 	if _DEBUG { | |||
| 		t := time.NewTicker(10 * time.Second) | |||
| 		go func() { | |||
| 			for { | |||
| 				println("buffer:", ret.String()) | |||
| 				<-t.C | |||
| 			} | |||
| 		}() | |||
| 	} | |||
| 	return ret | |||
| } | |||
| 
 | |||
| func (m *BytesPool) String() string { | |||
| 	var buf bytes.Buffer | |||
| 	for index, size := range ChunkSizes { | |||
| 		if m.chunkPools[index].count > 0 { | |||
| 			buf.WriteString(fmt.Sprintf("size:%d count:%d\n", size, m.chunkPools[index].count)) | |||
| 		} | |||
| 	} | |||
| 	return buf.String() | |||
| } | |||
| 
 | |||
| func findChunkPoolIndex(size int) int { | |||
| 	if size <= 0 { | |||
| 		return -1 | |||
| 	} | |||
| 	size = (size - 1) >> 4 | |||
| 	ret := 0 | |||
| 	for size > 0 { | |||
| 		size = size >> 2 | |||
| 		ret = ret + 1 | |||
| 	} | |||
| 	if ret >= len(ChunkSizes) { | |||
| 		return -1 | |||
| 	} | |||
| 	return ret | |||
| } | |||
| 
 | |||
| func (m *BytesPool) Get(size int) []byte { | |||
| 	index := findChunkPoolIndex(size) | |||
| 	// println("get index:", index)
 | |||
| 	if index < 0 { | |||
| 		return make([]byte, size) | |||
| 	} | |||
| 	return m.chunkPools[index].Get() | |||
| } | |||
| 
 | |||
| func (m *BytesPool) Put(b []byte) { | |||
| 	index := findChunkPoolIndex(len(b)) | |||
| 	// println("put index:", index)
 | |||
| 	if index < 0 { | |||
| 		return | |||
| 	} | |||
| 	m.chunkPools[index].Put(b) | |||
| } | |||
| 
 | |||
| // a pool of fix-sized []byte chunks. The pool size is managed by Go GC
 | |||
| type byteChunkPool struct { | |||
| 	sync.Pool | |||
| 	chunkSizeLimit int | |||
| 	count          int64 | |||
| } | |||
| 
 | |||
| var count int | |||
| 
 | |||
| func newByteChunkPool(chunkSizeLimit int) *byteChunkPool { | |||
| 	var m byteChunkPool | |||
| 	m.chunkSizeLimit = chunkSizeLimit | |||
| 	m.Pool.New = func() interface{} { | |||
| 		count++ | |||
| 		// println("creating []byte size", m.chunkSizeLimit, "new", count, "count", m.count)
 | |||
| 		return make([]byte, m.chunkSizeLimit) | |||
| 	} | |||
| 	return &m | |||
| } | |||
| 
 | |||
| func (m *byteChunkPool) Get() []byte { | |||
| 	// println("before get size:", m.chunkSizeLimit, "count:", m.count)
 | |||
| 	atomic.AddInt64(&m.count, 1) | |||
| 	return m.Pool.Get().([]byte) | |||
| } | |||
| 
 | |||
| func (m *byteChunkPool) Put(b []byte) { | |||
| 	atomic.AddInt64(&m.count, -1) | |||
| 	// println("after put get size:", m.chunkSizeLimit, "count:", m.count)
 | |||
| 	m.Pool.Put(b) | |||
| } | |||
| @ -1,41 +0,0 @@ | |||
| package util | |||
| 
 | |||
| import ( | |||
| 	"testing" | |||
| ) | |||
| 
 | |||
| func TestTTLReadWrite(t *testing.T) { | |||
| 	var tests = []struct { | |||
| 		n        int // input
 | |||
| 		expected int // expected result
 | |||
| 	}{ | |||
| 		{0, -1}, | |||
| 		{1, 0}, | |||
| 		{1 << 4, 0}, | |||
| 		{1 << 6, 1}, | |||
| 		{1 << 8, 2}, | |||
| 		{1 << 10, 3}, | |||
| 		{1 << 12, 4}, | |||
| 		{1 << 14, 5}, | |||
| 		{1 << 16, 6}, | |||
| 		{1 << 18, 7}, | |||
| 		{1<<4 + 1, 1}, | |||
| 		{1<<6 + 1, 2}, | |||
| 		{1<<8 + 1, 3}, | |||
| 		{1<<10 + 1, 4}, | |||
| 		{1<<12 + 1, 5}, | |||
| 		{1<<14 + 1, 6}, | |||
| 		{1<<16 + 1, 7}, | |||
| 		{1<<18 + 1, 8}, | |||
| 		{1<<28 - 1, 12}, | |||
| 		{1 << 28, 12}, | |||
| 		{1<<28 + 2134, -1}, | |||
| 		{1080, 4}, | |||
| 	} | |||
| 	for _, tt := range tests { | |||
| 		actual := findChunkPoolIndex(tt.n) | |||
| 		if actual != tt.expected { | |||
| 			t.Errorf("findChunkPoolIndex(%d): expected %d, actual %d", tt.n, tt.expected, actual) | |||
| 		} | |||
| 	} | |||
| } | |||
						Write
						Preview
					
					
					Loading…
					
					Cancel
						Save
					
		Reference in new issue