diff --git a/go.mod b/go.mod index 6cce5cff3..47fed5f2a 100644 --- a/go.mod +++ b/go.mod @@ -139,6 +139,7 @@ require ( github.com/hashicorp/raft-boltdb/v2 v2.3.1 github.com/hashicorp/vault/api v1.22.0 github.com/jhump/protoreflect v1.17.0 + github.com/lesismal/nbio v1.6.7 github.com/lib/pq v1.10.9 github.com/linkedin/goavro/v2 v2.14.1 github.com/mattn/go-sqlite3 v1.14.32 @@ -198,6 +199,7 @@ require ( github.com/jaegertracing/jaeger v1.47.0 // indirect github.com/kr/pretty v0.3.1 // indirect github.com/kr/text v0.2.0 // indirect + github.com/lesismal/llib v1.2.2 // indirect github.com/lithammer/shortuuid/v3 v3.0.7 // indirect github.com/openzipkin/zipkin-go v0.4.3 // indirect github.com/parquet-go/bitpack v1.0.0 // indirect diff --git a/go.sum b/go.sum index 90276eb64..2521c9ce6 100644 --- a/go.sum +++ b/go.sum @@ -1358,6 +1358,10 @@ github.com/lanrat/extsort v1.4.0 h1:jysS/Tjnp7mBwJ6NG8SY+XYFi8HF3LujGbqY9jOWjco= github.com/lanrat/extsort v1.4.0/go.mod h1:hceP6kxKPKebjN1RVrDBXMXXECbaI41Y94tt6MDazc4= github.com/leodido/go-urn v1.4.0 h1:WT9HwE9SGECu3lg4d/dIA+jxlljEa1/ffXKmRjqdmIQ= github.com/leodido/go-urn v1.4.0/go.mod h1:bvxc+MVxLKB4z00jd1z+Dvzr47oO32F/QSNjSBOlFxI= +github.com/lesismal/llib v1.2.2 h1:ZoVgP9J58Ju3Yue5jtj8ybWl+BKqoVmdRaN1mNwG5Gc= +github.com/lesismal/llib v1.2.2/go.mod h1:70tFXXe7P1FZ02AU9l8LgSOK7d7sRrpnkUr3rd3gKSg= +github.com/lesismal/nbio v1.6.7 h1:EeiH0Vn0v5NG7masYNWugibPdNZZYYBPa0pGj4GOrbg= +github.com/lesismal/nbio v1.6.7/go.mod h1:mBn1rSIZ+cmOILhvP+/1Mb/JimgA+1LQudlHJUb/aNA= github.com/lib/pq v0.0.0-20180327071824-d34b9ff171c2/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= github.com/lib/pq v1.8.0/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw= @@ -1909,6 +1913,7 @@ golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59/go.mod h1:LzIPMQfyMNhhGPh golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20210322153248-0c34fe9e7dc2/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= golang.org/x/crypto v0.0.0-20210421170649-83a5a9bb288b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= +golang.org/x/crypto v0.0.0-20210513122933-cd7d49e622d5/go.mod h1:P+XmwS30IXTQdn5tA2iutPOUgjI07+tq3H3K9MVA1s8= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.0.0-20211108221036-ceb1ce70b4fa/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= @@ -2030,6 +2035,7 @@ golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v golang.org/x/net v0.0.0-20210316092652-d523dce5a7f4/go.mod h1:RBQZq4jEuRlivfhVLdyRGr576XBO4/greRjx4P4O3yc= golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= golang.org/x/net v0.0.0-20210503060351-7fd8e65b6420/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= +golang.org/x/net v0.0.0-20210510120150-4163338589ed/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20210813160813-60bc85c4be6d/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20211015210444-4f30a5c0130f/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= diff --git a/weed/util/http/nbio_benchmark_test.go b/weed/util/http/nbio_benchmark_test.go new file mode 100644 index 000000000..2813a8b4a --- /dev/null +++ b/weed/util/http/nbio_benchmark_test.go @@ -0,0 +1,439 @@ +package http + +import ( + "context" + "fmt" + "io" + "net" + "net/http" + "runtime" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/lesismal/nbio/nbhttp" +) + +// BenchmarkHTTPServerMemory compares memory usage between standard net/http +// and nbio HTTP server implementations with many concurrent connections. +// This is exploratory testing for issue #3884 - evaluating nbio for memory optimization. + +const ( + testPayload = "Hello, World!" + warmupConnections = 100 + connectionHoldTime = 100 * time.Millisecond +) + +// getMemStats returns the current memory allocation in bytes +func getMemStats() uint64 { + var m runtime.MemStats + runtime.GC() + runtime.ReadMemStats(&m) + return m.Alloc +} + +// findFreePort finds an available port on localhost +func findFreePort() (int, error) { + listener, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + return 0, err + } + defer listener.Close() + return listener.Addr().(*net.TCPAddr).Port, nil +} + +// TestNbioVsStdHTTPMemory is an exploratory test that compares memory usage +// between standard net/http and nbio for handling many HTTP connections. +func TestNbioVsStdHTTPMemory(t *testing.T) { + if testing.Short() { + t.Skip("Skipping memory comparison test in short mode") + } + + t.Run("StandardHTTP", func(t *testing.T) { + memBefore := getMemStats() + testStandardHTTPConnections(t, warmupConnections) + memAfter := getMemStats() + if memAfter >= memBefore { + t.Logf("Standard HTTP: %d connections, memory delta: +%d KB", + warmupConnections, (memAfter-memBefore)/1024) + } else { + t.Logf("Standard HTTP: %d connections, memory delta: -%d KB", + warmupConnections, (memBefore-memAfter)/1024) + } + }) + + t.Run("NbioHTTP", func(t *testing.T) { + memBefore := getMemStats() + testNbioHTTPConnections(t, warmupConnections) + memAfter := getMemStats() + if memAfter >= memBefore { + t.Logf("Nbio HTTP: %d connections, memory delta: +%d KB", + warmupConnections, (memAfter-memBefore)/1024) + } else { + t.Logf("Nbio HTTP: %d connections, memory delta: -%d KB", + warmupConnections, (memBefore-memAfter)/1024) + } + }) +} + +// BenchmarkStandardHTTPConnections benchmarks standard net/http server memory usage +func BenchmarkStandardHTTPConnections(b *testing.B) { + benchmarkHTTPConnections(b, false) +} + +// BenchmarkNbioHTTPConnections benchmarks nbio HTTP server memory usage +func BenchmarkNbioHTTPConnections(b *testing.B) { + benchmarkHTTPConnections(b, true) +} + +func benchmarkHTTPConnections(b *testing.B, useNbio bool) { + connectionCounts := []int{100, 500, 1000} + + for _, connCount := range connectionCounts { + name := fmt.Sprintf("Connections_%d", connCount) + b.Run(name, func(b *testing.B) { + var totalMemDelta uint64 + for i := 0; i < b.N; i++ { + runtime.GC() + memBefore := getMemStats() + + if useNbio { + testNbioHTTPConnections(b, connCount) + } else { + testStandardHTTPConnections(b, connCount) + } + + runtime.GC() + memAfter := getMemStats() + + if memAfter > memBefore { + totalMemDelta += memAfter - memBefore + } + } + + avgMemDelta := totalMemDelta / uint64(b.N) + b.ReportMetric(float64(avgMemDelta)/1024, "KB/op") + b.ReportMetric(float64(avgMemDelta)/float64(connCount), "bytes/conn") + }) + } +} + +// BenchmarkHTTPServerMemoryComparison provides a side-by-side comparison +func BenchmarkHTTPServerMemoryComparison(b *testing.B) { + connCounts := []int{100, 500, 1000} + + for _, count := range connCounts { + b.Run(fmt.Sprintf("StdHTTP_%d_conns", count), func(b *testing.B) { + for i := 0; i < b.N; i++ { + testStandardHTTPConnections(b, count) + } + }) + + b.Run(fmt.Sprintf("NbioHTTP_%d_conns", count), func(b *testing.B) { + for i := 0; i < b.N; i++ { + testNbioHTTPConnections(b, count) + } + }) + } +} + +func testStandardHTTPConnections(tb testing.TB, numConnections int) { + mux := http.NewServeMux() + mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { + w.Write([]byte(testPayload)) + }) + + listener, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + tb.Fatalf("Failed to create listener: %v", err) + } + + server := &http.Server{Handler: mux} + serverDone := make(chan struct{}) + go func() { + server.Serve(listener) + close(serverDone) + }() + + addr := listener.Addr().String() + + makeConnectionsToServer(tb, addr, numConnections) + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + server.Shutdown(ctx) + <-serverDone +} + +func testNbioHTTPConnections(tb testing.TB, numConnections int) { + mux := http.NewServeMux() + mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { + w.Write([]byte(testPayload)) + }) + + port, err := findFreePort() + if err != nil { + tb.Fatalf("Failed to find free port: %v", err) + } + + addr := fmt.Sprintf("127.0.0.1:%d", port) + + engine := nbhttp.NewEngine(nbhttp.Config{ + Network: "tcp", + Addrs: []string{addr}, + Handler: mux, + }) + + if err := engine.Start(); err != nil { + tb.Fatalf("Failed to start nbio engine: %v", err) + } + + time.Sleep(50 * time.Millisecond) + + makeConnectionsToServer(tb, addr, numConnections) + + engine.Stop() +} + +func makeConnectionsToServer(tb testing.TB, addr string, numConnections int) { + var wg sync.WaitGroup + var successCount int64 + var errorCount int64 + + semaphore := make(chan struct{}, 50) + transport := &http.Transport{ + DisableKeepAlives: false, + MaxIdleConns: numConnections, + MaxIdleConnsPerHost: numConnections, + IdleConnTimeout: 90 * time.Second, + } + client := &http.Client{ + Timeout: 5 * time.Second, + Transport: transport, + } + defer transport.CloseIdleConnections() + + for i := 0; i < numConnections; i++ { + wg.Add(1) + go func() { + defer wg.Done() + semaphore <- struct{}{} + defer func() { <-semaphore }() + + resp, err := client.Get("http://" + addr + "/") + if err != nil { + atomic.AddInt64(&errorCount, 1) + return + } + defer resp.Body.Close() + + _, err = io.ReadAll(resp.Body) + if err != nil { + atomic.AddInt64(&errorCount, 1) + return + } + + atomic.AddInt64(&successCount, 1) + + time.Sleep(connectionHoldTime) + }() + } + + wg.Wait() + + if testing.Verbose() { + if t, ok := tb.(*testing.T); ok { + t.Logf("Connections: success=%d, errors=%d", successCount, errorCount) + } + } +} + +// BenchmarkNbioHTTPThroughput benchmarks request throughput with nbio +func BenchmarkNbioHTTPThroughput(b *testing.B) { + mux := http.NewServeMux() + mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { + w.Write([]byte(testPayload)) + }) + + port, err := findFreePort() + if err != nil { + b.Fatalf("Failed to find free port: %v", err) + } + + addr := fmt.Sprintf("127.0.0.1:%d", port) + + engine := nbhttp.NewEngine(nbhttp.Config{ + Network: "tcp", + Addrs: []string{addr}, + Handler: mux, + }) + + if err := engine.Start(); err != nil { + b.Fatalf("Failed to start nbio engine: %v", err) + } + + time.Sleep(50 * time.Millisecond) + + client := &http.Client{ + Timeout: 5 * time.Second, + Transport: &http.Transport{ + DisableKeepAlives: false, + MaxIdleConns: 100, + }, + } + + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + resp, err := client.Get("http://" + addr + "/") + if err != nil { + continue + } + io.Copy(io.Discard, resp.Body) + resp.Body.Close() + } + }) + + b.StopTimer() + engine.Stop() +} + +// BenchmarkStandardHTTPThroughput benchmarks request throughput with standard net/http +func BenchmarkStandardHTTPThroughput(b *testing.B) { + mux := http.NewServeMux() + mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { + w.Write([]byte(testPayload)) + }) + + listener, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + b.Fatalf("Failed to create listener: %v", err) + } + + server := &http.Server{Handler: mux} + go server.Serve(listener) + + addr := listener.Addr().String() + client := &http.Client{ + Timeout: 5 * time.Second, + Transport: &http.Transport{ + DisableKeepAlives: false, + MaxIdleConns: 100, + }, + } + + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + resp, err := client.Get("http://" + addr + "/") + if err != nil { + continue + } + io.Copy(io.Discard, resp.Body) + resp.Body.Close() + } + }) + + b.StopTimer() + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + server.Shutdown(ctx) +} + +// BenchmarkIdleConnectionMemory measures memory overhead of idle connections +func BenchmarkIdleConnectionMemory(b *testing.B) { + b.Run("StdHTTP_IdleConns", func(b *testing.B) { + benchmarkIdleConnectionsStd(b) + }) + + b.Run("Nbio_IdleConns", func(b *testing.B) { + benchmarkIdleConnectionsNbio(b) + }) +} + +func benchmarkIdleConnectionsStd(b *testing.B) { + mux := http.NewServeMux() + mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { + w.Write([]byte(testPayload)) + }) + + listener, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + b.Fatalf("Failed to create listener: %v", err) + } + + server := &http.Server{Handler: mux} + go server.Serve(listener) + addr := listener.Addr().String() + + defer func() { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + server.Shutdown(ctx) + }() + + b.ResetTimer() + for i := 0; i < b.N; i++ { + conns := make([]net.Conn, 0, 100) + for j := 0; j < 100; j++ { + conn, err := net.Dial("tcp", addr) + if err != nil { + continue + } + conns = append(conns, conn) + } + + time.Sleep(10 * time.Millisecond) + + for _, conn := range conns { + conn.Close() + } + } +} + +func benchmarkIdleConnectionsNbio(b *testing.B) { + mux := http.NewServeMux() + mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { + w.Write([]byte(testPayload)) + }) + + port, err := findFreePort() + if err != nil { + b.Fatalf("Failed to find free port: %v", err) + } + + addr := fmt.Sprintf("127.0.0.1:%d", port) + + engine := nbhttp.NewEngine(nbhttp.Config{ + Network: "tcp", + Addrs: []string{addr}, + Handler: mux, + }) + + if err := engine.Start(); err != nil { + b.Fatalf("Failed to start nbio engine: %v", err) + } + + time.Sleep(50 * time.Millisecond) + + defer engine.Stop() + + b.ResetTimer() + for i := 0; i < b.N; i++ { + conns := make([]net.Conn, 0, 100) + for j := 0; j < 100; j++ { + conn, err := net.Dial("tcp", addr) + if err != nil { + continue + } + conns = append(conns, conn) + } + + time.Sleep(10 * time.Millisecond) + + for _, conn := range conns { + conn.Close() + } + } +}