9 changed files with 216 additions and 7 deletions
-
11go/storage/needle.go
-
26go/storage/needle_read_write.go
-
3go/storage/volume.go
-
127go/util/bytes_pool.go
-
41go/util/bytes_pool_test.go
-
3go/weed/signal_handling.go
-
1go/weed/weed_server/volume_server_handlers_read.go
-
3go/weed/weed_server/volume_server_handlers_sync.go
-
8go/weed/weed_server/volume_server_handlers_write.go
@ -0,0 +1,127 @@ |
|||||
|
package util |
||||
|
|
||||
|
import ( |
||||
|
"bytes" |
||||
|
"fmt" |
||||
|
"sync" |
||||
|
"sync/atomic" |
||||
|
"time" |
||||
|
) |
||||
|
|
||||
|
var ( |
||||
|
ChunkSizes = []int{ |
||||
|
1 << 4, // index 0, 16 bytes, inclusive
|
||||
|
1 << 6, // index 1, 64 bytes
|
||||
|
1 << 8, // index 2, 256 bytes
|
||||
|
1 << 10, // index 3, 1K bytes
|
||||
|
1 << 12, // index 4, 4K bytes
|
||||
|
1 << 14, // index 5, 16K bytes
|
||||
|
1 << 16, // index 6, 64K bytes
|
||||
|
1 << 18, // index 7, 256K bytes
|
||||
|
1 << 20, // index 8, 1M bytes
|
||||
|
1 << 22, // index 9, 4M bytes
|
||||
|
1 << 24, // index 10, 16M bytes
|
||||
|
1 << 26, // index 11, 64M bytes
|
||||
|
1 << 28, // index 12, 128M bytes
|
||||
|
} |
||||
|
|
||||
|
_DEBUG = false |
||||
|
) |
||||
|
|
||||
|
type BytesPool struct { |
||||
|
chunkPools []*byteChunkPool |
||||
|
} |
||||
|
|
||||
|
func NewBytesPool() *BytesPool { |
||||
|
var bp BytesPool |
||||
|
for _, size := range ChunkSizes { |
||||
|
bp.chunkPools = append(bp.chunkPools, newByteChunkPool(size)) |
||||
|
} |
||||
|
ret := &bp |
||||
|
if _DEBUG { |
||||
|
t := time.NewTicker(10 * time.Second) |
||||
|
go func() { |
||||
|
for { |
||||
|
println("buffer:", ret.String()) |
||||
|
<-t.C |
||||
|
} |
||||
|
}() |
||||
|
} |
||||
|
return ret |
||||
|
} |
||||
|
|
||||
|
func (m *BytesPool) String() string { |
||||
|
var buf bytes.Buffer |
||||
|
for index, size := range ChunkSizes { |
||||
|
if m.chunkPools[index].count > 0 { |
||||
|
buf.WriteString(fmt.Sprintf("size:%d count:%d\n", size, m.chunkPools[index].count)) |
||||
|
} |
||||
|
} |
||||
|
return buf.String() |
||||
|
} |
||||
|
|
||||
|
func findChunkPoolIndex(size int) int { |
||||
|
if size <= 0 { |
||||
|
return -1 |
||||
|
} |
||||
|
size = (size - 1) >> 4 |
||||
|
ret := 0 |
||||
|
for size > 0 { |
||||
|
size = size >> 2 |
||||
|
ret = ret + 1 |
||||
|
} |
||||
|
if ret >= len(ChunkSizes) { |
||||
|
return -1 |
||||
|
} |
||||
|
return ret |
||||
|
} |
||||
|
|
||||
|
func (m *BytesPool) Get(size int) []byte { |
||||
|
index := findChunkPoolIndex(size) |
||||
|
// println("get index:", index)
|
||||
|
if index < 0 { |
||||
|
return make([]byte, size) |
||||
|
} |
||||
|
return m.chunkPools[index].Get() |
||||
|
} |
||||
|
|
||||
|
func (m *BytesPool) Put(b []byte) { |
||||
|
index := findChunkPoolIndex(len(b)) |
||||
|
// println("put index:", index)
|
||||
|
if index < 0 { |
||||
|
return |
||||
|
} |
||||
|
m.chunkPools[index].Put(b) |
||||
|
} |
||||
|
|
||||
|
// a pool of fix-sized []byte chunks. The pool size is managed by Go GC
|
||||
|
type byteChunkPool struct { |
||||
|
sync.Pool |
||||
|
chunkSizeLimit int |
||||
|
count int64 |
||||
|
} |
||||
|
|
||||
|
var count int |
||||
|
|
||||
|
func newByteChunkPool(chunkSizeLimit int) *byteChunkPool { |
||||
|
var m byteChunkPool |
||||
|
m.chunkSizeLimit = chunkSizeLimit |
||||
|
m.Pool.New = func() interface{} { |
||||
|
count++ |
||||
|
// println("creating []byte size", m.chunkSizeLimit, "new", count, "count", m.count)
|
||||
|
return make([]byte, m.chunkSizeLimit) |
||||
|
} |
||||
|
return &m |
||||
|
} |
||||
|
|
||||
|
func (m *byteChunkPool) Get() []byte { |
||||
|
// println("before get size:", m.chunkSizeLimit, "count:", m.count)
|
||||
|
atomic.AddInt64(&m.count, 1) |
||||
|
return m.Pool.Get().([]byte) |
||||
|
} |
||||
|
|
||||
|
func (m *byteChunkPool) Put(b []byte) { |
||||
|
atomic.AddInt64(&m.count, -1) |
||||
|
// println("after put get size:", m.chunkSizeLimit, "count:", m.count)
|
||||
|
m.Pool.Put(b) |
||||
|
} |
@ -0,0 +1,41 @@ |
|||||
|
package util |
||||
|
|
||||
|
import ( |
||||
|
"testing" |
||||
|
) |
||||
|
|
||||
|
func TestTTLReadWrite(t *testing.T) { |
||||
|
var tests = []struct { |
||||
|
n int // input
|
||||
|
expected int // expected result
|
||||
|
}{ |
||||
|
{0, -1}, |
||||
|
{1, 0}, |
||||
|
{1 << 4, 0}, |
||||
|
{1 << 6, 1}, |
||||
|
{1 << 8, 2}, |
||||
|
{1 << 10, 3}, |
||||
|
{1 << 12, 4}, |
||||
|
{1 << 14, 5}, |
||||
|
{1 << 16, 6}, |
||||
|
{1 << 18, 7}, |
||||
|
{1<<4 + 1, 1}, |
||||
|
{1<<6 + 1, 2}, |
||||
|
{1<<8 + 1, 3}, |
||||
|
{1<<10 + 1, 4}, |
||||
|
{1<<12 + 1, 5}, |
||||
|
{1<<14 + 1, 6}, |
||||
|
{1<<16 + 1, 7}, |
||||
|
{1<<18 + 1, 8}, |
||||
|
{1<<28 - 1, 12}, |
||||
|
{1 << 28, 12}, |
||||
|
{1<<28 + 2134, -1}, |
||||
|
{1080, 4}, |
||||
|
} |
||||
|
for _, tt := range tests { |
||||
|
actual := findChunkPoolIndex(tt.n) |
||||
|
if actual != tt.expected { |
||||
|
t.Errorf("findChunkPoolIndex(%d): expected %d, actual %d", tt.n, tt.expected, actual) |
||||
|
} |
||||
|
} |
||||
|
} |
Write
Preview
Loading…
Cancel
Save
Reference in new issue