You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
353 lines
10 KiB
353 lines
10 KiB
package compression
|
|
|
|
import (
|
|
"bytes"
|
|
"fmt"
|
|
"testing"
|
|
|
|
"github.com/stretchr/testify/assert"
|
|
"github.com/stretchr/testify/require"
|
|
)
|
|
|
|
// TestCompressionCodec_String tests the string representation of compression codecs
|
|
func TestCompressionCodec_String(t *testing.T) {
|
|
tests := []struct {
|
|
codec CompressionCodec
|
|
expected string
|
|
}{
|
|
{None, "none"},
|
|
{Gzip, "gzip"},
|
|
{Snappy, "snappy"},
|
|
{Lz4, "lz4"},
|
|
{Zstd, "zstd"},
|
|
{CompressionCodec(99), "unknown(99)"},
|
|
}
|
|
|
|
for _, test := range tests {
|
|
t.Run(test.expected, func(t *testing.T) {
|
|
assert.Equal(t, test.expected, test.codec.String())
|
|
})
|
|
}
|
|
}
|
|
|
|
// TestCompressionCodec_IsValid tests codec validation
|
|
func TestCompressionCodec_IsValid(t *testing.T) {
|
|
tests := []struct {
|
|
codec CompressionCodec
|
|
valid bool
|
|
}{
|
|
{None, true},
|
|
{Gzip, true},
|
|
{Snappy, true},
|
|
{Lz4, true},
|
|
{Zstd, true},
|
|
{CompressionCodec(-1), false},
|
|
{CompressionCodec(5), false},
|
|
{CompressionCodec(99), false},
|
|
}
|
|
|
|
for _, test := range tests {
|
|
t.Run(test.codec.String(), func(t *testing.T) {
|
|
assert.Equal(t, test.valid, test.codec.IsValid())
|
|
})
|
|
}
|
|
}
|
|
|
|
// TestExtractCompressionCodec tests extracting compression codec from attributes
|
|
func TestExtractCompressionCodec(t *testing.T) {
|
|
tests := []struct {
|
|
name string
|
|
attributes int16
|
|
expected CompressionCodec
|
|
}{
|
|
{"None", 0x0000, None},
|
|
{"Gzip", 0x0001, Gzip},
|
|
{"Snappy", 0x0002, Snappy},
|
|
{"Lz4", 0x0003, Lz4},
|
|
{"Zstd", 0x0004, Zstd},
|
|
{"Gzip with transactional", 0x0011, Gzip}, // Bit 4 set (transactional)
|
|
{"Snappy with control", 0x0022, Snappy}, // Bit 5 set (control)
|
|
{"Lz4 with both flags", 0x0033, Lz4}, // Both flags set
|
|
}
|
|
|
|
for _, test := range tests {
|
|
t.Run(test.name, func(t *testing.T) {
|
|
codec := ExtractCompressionCodec(test.attributes)
|
|
assert.Equal(t, test.expected, codec)
|
|
})
|
|
}
|
|
}
|
|
|
|
// TestSetCompressionCodec tests setting compression codec in attributes
|
|
func TestSetCompressionCodec(t *testing.T) {
|
|
tests := []struct {
|
|
name string
|
|
attributes int16
|
|
codec CompressionCodec
|
|
expected int16
|
|
}{
|
|
{"Set None", 0x0000, None, 0x0000},
|
|
{"Set Gzip", 0x0000, Gzip, 0x0001},
|
|
{"Set Snappy", 0x0000, Snappy, 0x0002},
|
|
{"Set Lz4", 0x0000, Lz4, 0x0003},
|
|
{"Set Zstd", 0x0000, Zstd, 0x0004},
|
|
{"Replace Gzip with Snappy", 0x0001, Snappy, 0x0002},
|
|
{"Set Gzip preserving transactional", 0x0010, Gzip, 0x0011},
|
|
{"Set Lz4 preserving control", 0x0020, Lz4, 0x0023},
|
|
{"Set Zstd preserving both flags", 0x0030, Zstd, 0x0034},
|
|
}
|
|
|
|
for _, test := range tests {
|
|
t.Run(test.name, func(t *testing.T) {
|
|
result := SetCompressionCodec(test.attributes, test.codec)
|
|
assert.Equal(t, test.expected, result)
|
|
})
|
|
}
|
|
}
|
|
|
|
// TestCompress_None tests compression with None codec
|
|
func TestCompress_None(t *testing.T) {
|
|
data := []byte("Hello, World!")
|
|
|
|
compressed, err := Compress(None, data)
|
|
require.NoError(t, err)
|
|
assert.Equal(t, data, compressed, "None codec should return original data")
|
|
}
|
|
|
|
// TestCompress_Gzip tests gzip compression
|
|
func TestCompress_Gzip(t *testing.T) {
|
|
data := []byte("Hello, World! This is a test message for gzip compression.")
|
|
|
|
compressed, err := Compress(Gzip, data)
|
|
require.NoError(t, err)
|
|
assert.NotEqual(t, data, compressed, "Gzip should compress data")
|
|
assert.True(t, len(compressed) > 0, "Compressed data should not be empty")
|
|
}
|
|
|
|
// TestCompress_Snappy tests snappy compression
|
|
func TestCompress_Snappy(t *testing.T) {
|
|
data := []byte("Hello, World! This is a test message for snappy compression.")
|
|
|
|
compressed, err := Compress(Snappy, data)
|
|
require.NoError(t, err)
|
|
assert.NotEqual(t, data, compressed, "Snappy should compress data")
|
|
assert.True(t, len(compressed) > 0, "Compressed data should not be empty")
|
|
}
|
|
|
|
// TestCompress_Lz4 tests lz4 compression
|
|
func TestCompress_Lz4(t *testing.T) {
|
|
data := []byte("Hello, World! This is a test message for lz4 compression.")
|
|
|
|
compressed, err := Compress(Lz4, data)
|
|
require.NoError(t, err)
|
|
assert.NotEqual(t, data, compressed, "Lz4 should compress data")
|
|
assert.True(t, len(compressed) > 0, "Compressed data should not be empty")
|
|
}
|
|
|
|
// TestCompress_Zstd tests zstd compression
|
|
func TestCompress_Zstd(t *testing.T) {
|
|
data := []byte("Hello, World! This is a test message for zstd compression.")
|
|
|
|
compressed, err := Compress(Zstd, data)
|
|
require.NoError(t, err)
|
|
assert.NotEqual(t, data, compressed, "Zstd should compress data")
|
|
assert.True(t, len(compressed) > 0, "Compressed data should not be empty")
|
|
}
|
|
|
|
// TestCompress_InvalidCodec tests compression with invalid codec
|
|
func TestCompress_InvalidCodec(t *testing.T) {
|
|
data := []byte("Hello, World!")
|
|
|
|
_, err := Compress(CompressionCodec(99), data)
|
|
assert.Error(t, err)
|
|
assert.Contains(t, err.Error(), "unsupported compression codec")
|
|
}
|
|
|
|
// TestDecompress_None tests decompression with None codec
|
|
func TestDecompress_None(t *testing.T) {
|
|
data := []byte("Hello, World!")
|
|
|
|
decompressed, err := Decompress(None, data)
|
|
require.NoError(t, err)
|
|
assert.Equal(t, data, decompressed, "None codec should return original data")
|
|
}
|
|
|
|
// TestRoundTrip tests compression and decompression round trip for all codecs
|
|
func TestRoundTrip(t *testing.T) {
|
|
testData := [][]byte{
|
|
[]byte("Hello, World!"),
|
|
[]byte(""),
|
|
[]byte("A"),
|
|
[]byte(string(bytes.Repeat([]byte("Test data for compression round trip. "), 100))),
|
|
[]byte("Special characters: àáâãäåæçèéêëìíîïðñòóôõö÷øùúûüýþÿ"),
|
|
bytes.Repeat([]byte{0x00, 0x01, 0x02, 0xFF}, 256), // Binary data
|
|
}
|
|
|
|
codecs := []CompressionCodec{None, Gzip, Snappy, Lz4, Zstd}
|
|
|
|
for _, codec := range codecs {
|
|
t.Run(codec.String(), func(t *testing.T) {
|
|
for i, data := range testData {
|
|
t.Run(fmt.Sprintf("data_%d", i), func(t *testing.T) {
|
|
// Compress
|
|
compressed, err := Compress(codec, data)
|
|
require.NoError(t, err, "Compression should succeed")
|
|
|
|
// Decompress
|
|
decompressed, err := Decompress(codec, compressed)
|
|
require.NoError(t, err, "Decompression should succeed")
|
|
|
|
// Verify round trip
|
|
assert.Equal(t, data, decompressed, "Round trip should preserve data")
|
|
})
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
// TestDecompress_InvalidCodec tests decompression with invalid codec
|
|
func TestDecompress_InvalidCodec(t *testing.T) {
|
|
data := []byte("Hello, World!")
|
|
|
|
_, err := Decompress(CompressionCodec(99), data)
|
|
assert.Error(t, err)
|
|
assert.Contains(t, err.Error(), "unsupported compression codec")
|
|
}
|
|
|
|
// TestDecompress_CorruptedData tests decompression with corrupted data
|
|
func TestDecompress_CorruptedData(t *testing.T) {
|
|
corruptedData := []byte("This is not compressed data")
|
|
|
|
codecs := []CompressionCodec{Gzip, Snappy, Lz4, Zstd}
|
|
|
|
for _, codec := range codecs {
|
|
t.Run(codec.String(), func(t *testing.T) {
|
|
_, err := Decompress(codec, corruptedData)
|
|
assert.Error(t, err, "Decompression of corrupted data should fail")
|
|
})
|
|
}
|
|
}
|
|
|
|
// TestCompressRecordBatch tests record batch compression
|
|
func TestCompressRecordBatch(t *testing.T) {
|
|
recordsData := []byte("Record batch data for compression testing")
|
|
|
|
t.Run("None codec", func(t *testing.T) {
|
|
compressed, attributes, err := CompressRecordBatch(None, recordsData)
|
|
require.NoError(t, err)
|
|
assert.Equal(t, recordsData, compressed)
|
|
assert.Equal(t, int16(0), attributes)
|
|
})
|
|
|
|
t.Run("Gzip codec", func(t *testing.T) {
|
|
compressed, attributes, err := CompressRecordBatch(Gzip, recordsData)
|
|
require.NoError(t, err)
|
|
assert.NotEqual(t, recordsData, compressed)
|
|
assert.Equal(t, int16(1), attributes)
|
|
})
|
|
|
|
t.Run("Snappy codec", func(t *testing.T) {
|
|
compressed, attributes, err := CompressRecordBatch(Snappy, recordsData)
|
|
require.NoError(t, err)
|
|
assert.NotEqual(t, recordsData, compressed)
|
|
assert.Equal(t, int16(2), attributes)
|
|
})
|
|
}
|
|
|
|
// TestDecompressRecordBatch tests record batch decompression
|
|
func TestDecompressRecordBatch(t *testing.T) {
|
|
recordsData := []byte("Record batch data for decompression testing")
|
|
|
|
t.Run("None codec", func(t *testing.T) {
|
|
attributes := int16(0) // No compression
|
|
decompressed, err := DecompressRecordBatch(attributes, recordsData)
|
|
require.NoError(t, err)
|
|
assert.Equal(t, recordsData, decompressed)
|
|
})
|
|
|
|
t.Run("Round trip with Gzip", func(t *testing.T) {
|
|
// Compress
|
|
compressed, attributes, err := CompressRecordBatch(Gzip, recordsData)
|
|
require.NoError(t, err)
|
|
|
|
// Decompress
|
|
decompressed, err := DecompressRecordBatch(attributes, compressed)
|
|
require.NoError(t, err)
|
|
assert.Equal(t, recordsData, decompressed)
|
|
})
|
|
|
|
t.Run("Round trip with Snappy", func(t *testing.T) {
|
|
// Compress
|
|
compressed, attributes, err := CompressRecordBatch(Snappy, recordsData)
|
|
require.NoError(t, err)
|
|
|
|
// Decompress
|
|
decompressed, err := DecompressRecordBatch(attributes, compressed)
|
|
require.NoError(t, err)
|
|
assert.Equal(t, recordsData, decompressed)
|
|
})
|
|
}
|
|
|
|
// TestCompressionEfficiency tests compression efficiency for different codecs
|
|
func TestCompressionEfficiency(t *testing.T) {
|
|
// Create highly compressible data
|
|
data := bytes.Repeat([]byte("This is a repeated string for compression testing. "), 100)
|
|
|
|
codecs := []CompressionCodec{Gzip, Snappy, Lz4, Zstd}
|
|
|
|
for _, codec := range codecs {
|
|
t.Run(codec.String(), func(t *testing.T) {
|
|
compressed, err := Compress(codec, data)
|
|
require.NoError(t, err)
|
|
|
|
compressionRatio := float64(len(compressed)) / float64(len(data))
|
|
t.Logf("Codec: %s, Original: %d bytes, Compressed: %d bytes, Ratio: %.2f",
|
|
codec.String(), len(data), len(compressed), compressionRatio)
|
|
|
|
// All codecs should achieve some compression on this highly repetitive data
|
|
assert.Less(t, len(compressed), len(data), "Compression should reduce data size")
|
|
})
|
|
}
|
|
}
|
|
|
|
// BenchmarkCompression benchmarks compression performance for different codecs
|
|
func BenchmarkCompression(b *testing.B) {
|
|
data := bytes.Repeat([]byte("Benchmark data for compression testing. "), 1000)
|
|
codecs := []CompressionCodec{None, Gzip, Snappy, Lz4, Zstd}
|
|
|
|
for _, codec := range codecs {
|
|
b.Run(fmt.Sprintf("Compress_%s", codec.String()), func(b *testing.B) {
|
|
b.ResetTimer()
|
|
for i := 0; i < b.N; i++ {
|
|
_, err := Compress(codec, data)
|
|
if err != nil {
|
|
b.Fatal(err)
|
|
}
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
// BenchmarkDecompression benchmarks decompression performance for different codecs
|
|
func BenchmarkDecompression(b *testing.B) {
|
|
data := bytes.Repeat([]byte("Benchmark data for decompression testing. "), 1000)
|
|
codecs := []CompressionCodec{None, Gzip, Snappy, Lz4, Zstd}
|
|
|
|
for _, codec := range codecs {
|
|
// Pre-compress the data
|
|
compressed, err := Compress(codec, data)
|
|
if err != nil {
|
|
b.Fatal(err)
|
|
}
|
|
|
|
b.Run(fmt.Sprintf("Decompress_%s", codec.String()), func(b *testing.B) {
|
|
b.ResetTimer()
|
|
for i := 0; i < b.N; i++ {
|
|
_, err := Decompress(codec, compressed)
|
|
if err != nil {
|
|
b.Fatal(err)
|
|
}
|
|
}
|
|
})
|
|
}
|
|
}
|