Chris Lu
5 years ago
4 changed files with 95 additions and 27 deletions
-
5weed/filer2/filer.go
-
33weed/filer2/filer_deletion.go
-
45weed/util/queue_unbounded.go
-
25weed/util/queue_unbounded_test.go
@ -0,0 +1,45 @@ |
|||
package util |
|||
|
|||
import "sync" |
|||
|
|||
type UnboundedQueue struct { |
|||
outbound []string |
|||
outboundLock sync.RWMutex |
|||
inbound []string |
|||
inboundLock sync.RWMutex |
|||
} |
|||
|
|||
func NewUnboundedQueue() *UnboundedQueue { |
|||
q := &UnboundedQueue{} |
|||
return q |
|||
} |
|||
|
|||
func (q *UnboundedQueue) EnQueue(items ...string) { |
|||
q.inboundLock.Lock() |
|||
defer q.inboundLock.Unlock() |
|||
|
|||
q.outbound = append(q.outbound, items...) |
|||
|
|||
} |
|||
|
|||
func (q *UnboundedQueue) Consume(fn func([]string)) { |
|||
q.outboundLock.Lock() |
|||
defer q.outboundLock.Unlock() |
|||
|
|||
if len(q.outbound) == 0 { |
|||
q.inboundLock.Lock() |
|||
inbountLen := len(q.inbound) |
|||
if inbountLen > 0 { |
|||
t := q.outbound |
|||
q.outbound = q.inbound |
|||
q.inbound = t |
|||
} |
|||
q.inboundLock.Unlock() |
|||
} |
|||
|
|||
if len(q.outbound) > 0 { |
|||
fn(q.outbound) |
|||
q.outbound = q.outbound[:0] |
|||
} |
|||
|
|||
} |
@ -0,0 +1,25 @@ |
|||
package util |
|||
|
|||
import "testing" |
|||
|
|||
func TestEnqueueAndConsume(t *testing.T) { |
|||
|
|||
q := NewUnboundedQueue() |
|||
|
|||
q.EnQueue("1", "2", "3") |
|||
|
|||
f := func(items []string) { |
|||
for _, t := range items { |
|||
println(t) |
|||
} |
|||
println("-----------------------") |
|||
} |
|||
q.Consume(f) |
|||
|
|||
q.Consume(f) |
|||
|
|||
q.EnQueue("4", "5") |
|||
q.EnQueue("6", "7") |
|||
q.Consume(f) |
|||
|
|||
} |
Write
Preview
Loading…
Cancel
Save
Reference in new issue