From 5c8f1467a1c2e6db6025f387d779ecec222fb9b5 Mon Sep 17 00:00:00 2001 From: chrislu Date: Sun, 25 Sep 2022 13:45:55 -0700 Subject: [PATCH] ordered execution async wait --- weed/util/limited_async_pool.go | 44 +++++++++++++++++++++ weed/util/limited_async_pool_test.go | 58 ++++++++++++++++++++++++++++ 2 files changed, 102 insertions(+) create mode 100644 weed/util/limited_async_pool.go create mode 100644 weed/util/limited_async_pool_test.go diff --git a/weed/util/limited_async_pool.go b/weed/util/limited_async_pool.go new file mode 100644 index 000000000..c78de158b --- /dev/null +++ b/weed/util/limited_async_pool.go @@ -0,0 +1,44 @@ +package util + +// initial version comes from https://hackernoon.com/asyncawait-in-golang-an-introductory-guide-ol1e34sg + +import "context" + +type Future interface { + Await() interface{} +} + +type future struct { + await func(ctx context.Context) interface{} +} + +func (f future) Await() interface{} { + return f.await(context.Background()) +} + +type LimitedAsyncExecutor struct { + executor *LimitedConcurrentExecutor +} + +func NewLimitedAsyncExecutor(limit int) *LimitedAsyncExecutor { + return &LimitedAsyncExecutor{ + executor: NewLimitedConcurrentExecutor(limit), + } +} + +func (ae *LimitedAsyncExecutor) Execute(job func() interface{}) Future { + var result interface{} + c := make(chan struct{}) + ae.executor.Execute(func() { + defer close(c) + result = job() + }) + return future{await: func(ctx context.Context) interface{} { + select { + case <-ctx.Done(): + return ctx.Err() + case <-c: + return result + } + }} +} diff --git a/weed/util/limited_async_pool_test.go b/weed/util/limited_async_pool_test.go new file mode 100644 index 000000000..29a3d0498 --- /dev/null +++ b/weed/util/limited_async_pool_test.go @@ -0,0 +1,58 @@ +package util + +import ( + "fmt" + "testing" + "time" +) + +func TestAsyncPool(t *testing.T) { + p := NewLimitedAsyncExecutor(3) + var results []Future + + results = append(results, p.Execute(FirstFunc)) + results = append(results, p.Execute(SecondFunc)) + results = append(results, p.Execute(ThirdFunc)) + results = append(results, p.Execute(FourthFunc)) + results = append(results, p.Execute(FifthFunc)) + + for _, r := range results { + x := r.Await().(int) + println(x) + } +} + +func FirstFunc() any { + fmt.Println("-- Executing first function --") + time.Sleep(7 * time.Second) + fmt.Println("-- First Function finished --") + return 1 +} + +func SecondFunc() any { + fmt.Println("-- Executing second function --") + time.Sleep(5 * time.Second) + fmt.Println("-- Second Function finished --") + return 2 +} + +func ThirdFunc() any { + fmt.Println("-- Executing third function --") + time.Sleep(2 * time.Second) + fmt.Println("-- Third Function finished --") + return 3 +} + +func FourthFunc() any { + fmt.Println("-- Executing fourth function --") + time.Sleep(10 * time.Second) + fmt.Println("-- Fourth Function finished --") + return 4 +} + +func FifthFunc() any { + fmt.Println("-- Executing fifth function --") + time.Sleep(4 * time.Second) + fmt.Println("-- Fourth fifth finished --") + return 5 +}