You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

353 lines
10 KiB

package compression
import (
"bytes"
"fmt"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
// TestCompressionCodec_String tests the string representation of compression codecs
func TestCompressionCodec_String(t *testing.T) {
tests := []struct {
codec CompressionCodec
expected string
}{
{None, "none"},
{Gzip, "gzip"},
{Snappy, "snappy"},
{Lz4, "lz4"},
{Zstd, "zstd"},
{CompressionCodec(99), "unknown(99)"},
}
for _, test := range tests {
t.Run(test.expected, func(t *testing.T) {
assert.Equal(t, test.expected, test.codec.String())
})
}
}
// TestCompressionCodec_IsValid tests codec validation
func TestCompressionCodec_IsValid(t *testing.T) {
tests := []struct {
codec CompressionCodec
valid bool
}{
{None, true},
{Gzip, true},
{Snappy, true},
{Lz4, true},
{Zstd, true},
{CompressionCodec(-1), false},
{CompressionCodec(5), false},
{CompressionCodec(99), false},
}
for _, test := range tests {
t.Run(test.codec.String(), func(t *testing.T) {
assert.Equal(t, test.valid, test.codec.IsValid())
})
}
}
// TestExtractCompressionCodec tests extracting compression codec from attributes
func TestExtractCompressionCodec(t *testing.T) {
tests := []struct {
name string
attributes int16
expected CompressionCodec
}{
{"None", 0x0000, None},
{"Gzip", 0x0001, Gzip},
{"Snappy", 0x0002, Snappy},
{"Lz4", 0x0003, Lz4},
{"Zstd", 0x0004, Zstd},
{"Gzip with transactional", 0x0011, Gzip}, // Bit 4 set (transactional)
{"Snappy with control", 0x0022, Snappy}, // Bit 5 set (control)
{"Lz4 with both flags", 0x0033, Lz4}, // Both flags set
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
codec := ExtractCompressionCodec(test.attributes)
assert.Equal(t, test.expected, codec)
})
}
}
// TestSetCompressionCodec tests setting compression codec in attributes
func TestSetCompressionCodec(t *testing.T) {
tests := []struct {
name string
attributes int16
codec CompressionCodec
expected int16
}{
{"Set None", 0x0000, None, 0x0000},
{"Set Gzip", 0x0000, Gzip, 0x0001},
{"Set Snappy", 0x0000, Snappy, 0x0002},
{"Set Lz4", 0x0000, Lz4, 0x0003},
{"Set Zstd", 0x0000, Zstd, 0x0004},
{"Replace Gzip with Snappy", 0x0001, Snappy, 0x0002},
{"Set Gzip preserving transactional", 0x0010, Gzip, 0x0011},
{"Set Lz4 preserving control", 0x0020, Lz4, 0x0023},
{"Set Zstd preserving both flags", 0x0030, Zstd, 0x0034},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
result := SetCompressionCodec(test.attributes, test.codec)
assert.Equal(t, test.expected, result)
})
}
}
// TestCompress_None tests compression with None codec
func TestCompress_None(t *testing.T) {
data := []byte("Hello, World!")
compressed, err := Compress(None, data)
require.NoError(t, err)
assert.Equal(t, data, compressed, "None codec should return original data")
}
// TestCompress_Gzip tests gzip compression
func TestCompress_Gzip(t *testing.T) {
data := []byte("Hello, World! This is a test message for gzip compression.")
compressed, err := Compress(Gzip, data)
require.NoError(t, err)
assert.NotEqual(t, data, compressed, "Gzip should compress data")
assert.True(t, len(compressed) > 0, "Compressed data should not be empty")
}
// TestCompress_Snappy tests snappy compression
func TestCompress_Snappy(t *testing.T) {
data := []byte("Hello, World! This is a test message for snappy compression.")
compressed, err := Compress(Snappy, data)
require.NoError(t, err)
assert.NotEqual(t, data, compressed, "Snappy should compress data")
assert.True(t, len(compressed) > 0, "Compressed data should not be empty")
}
// TestCompress_Lz4 tests lz4 compression
func TestCompress_Lz4(t *testing.T) {
data := []byte("Hello, World! This is a test message for lz4 compression.")
compressed, err := Compress(Lz4, data)
require.NoError(t, err)
assert.NotEqual(t, data, compressed, "Lz4 should compress data")
assert.True(t, len(compressed) > 0, "Compressed data should not be empty")
}
// TestCompress_Zstd tests zstd compression
func TestCompress_Zstd(t *testing.T) {
data := []byte("Hello, World! This is a test message for zstd compression.")
compressed, err := Compress(Zstd, data)
require.NoError(t, err)
assert.NotEqual(t, data, compressed, "Zstd should compress data")
assert.True(t, len(compressed) > 0, "Compressed data should not be empty")
}
// TestCompress_InvalidCodec tests compression with invalid codec
func TestCompress_InvalidCodec(t *testing.T) {
data := []byte("Hello, World!")
_, err := Compress(CompressionCodec(99), data)
assert.Error(t, err)
assert.Contains(t, err.Error(), "unsupported compression codec")
}
// TestDecompress_None tests decompression with None codec
func TestDecompress_None(t *testing.T) {
data := []byte("Hello, World!")
decompressed, err := Decompress(None, data)
require.NoError(t, err)
assert.Equal(t, data, decompressed, "None codec should return original data")
}
// TestRoundTrip tests compression and decompression round trip for all codecs
func TestRoundTrip(t *testing.T) {
testData := [][]byte{
[]byte("Hello, World!"),
[]byte(""),
[]byte("A"),
[]byte(string(bytes.Repeat([]byte("Test data for compression round trip. "), 100))),
[]byte("Special characters: àáâãäåæçèéêëìíîïðñòóôõö÷øùúûüýþÿ"),
bytes.Repeat([]byte{0x00, 0x01, 0x02, 0xFF}, 256), // Binary data
}
codecs := []CompressionCodec{None, Gzip, Snappy, Lz4, Zstd}
for _, codec := range codecs {
t.Run(codec.String(), func(t *testing.T) {
for i, data := range testData {
t.Run(fmt.Sprintf("data_%d", i), func(t *testing.T) {
// Compress
compressed, err := Compress(codec, data)
require.NoError(t, err, "Compression should succeed")
// Decompress
decompressed, err := Decompress(codec, compressed)
require.NoError(t, err, "Decompression should succeed")
// Verify round trip
assert.Equal(t, data, decompressed, "Round trip should preserve data")
})
}
})
}
}
// TestDecompress_InvalidCodec tests decompression with invalid codec
func TestDecompress_InvalidCodec(t *testing.T) {
data := []byte("Hello, World!")
_, err := Decompress(CompressionCodec(99), data)
assert.Error(t, err)
assert.Contains(t, err.Error(), "unsupported compression codec")
}
// TestDecompress_CorruptedData tests decompression with corrupted data
func TestDecompress_CorruptedData(t *testing.T) {
corruptedData := []byte("This is not compressed data")
codecs := []CompressionCodec{Gzip, Snappy, Lz4, Zstd}
for _, codec := range codecs {
t.Run(codec.String(), func(t *testing.T) {
_, err := Decompress(codec, corruptedData)
assert.Error(t, err, "Decompression of corrupted data should fail")
})
}
}
// TestCompressRecordBatch tests record batch compression
func TestCompressRecordBatch(t *testing.T) {
recordsData := []byte("Record batch data for compression testing")
t.Run("None codec", func(t *testing.T) {
compressed, attributes, err := CompressRecordBatch(None, recordsData)
require.NoError(t, err)
assert.Equal(t, recordsData, compressed)
assert.Equal(t, int16(0), attributes)
})
t.Run("Gzip codec", func(t *testing.T) {
compressed, attributes, err := CompressRecordBatch(Gzip, recordsData)
require.NoError(t, err)
assert.NotEqual(t, recordsData, compressed)
assert.Equal(t, int16(1), attributes)
})
t.Run("Snappy codec", func(t *testing.T) {
compressed, attributes, err := CompressRecordBatch(Snappy, recordsData)
require.NoError(t, err)
assert.NotEqual(t, recordsData, compressed)
assert.Equal(t, int16(2), attributes)
})
}
// TestDecompressRecordBatch tests record batch decompression
func TestDecompressRecordBatch(t *testing.T) {
recordsData := []byte("Record batch data for decompression testing")
t.Run("None codec", func(t *testing.T) {
attributes := int16(0) // No compression
decompressed, err := DecompressRecordBatch(attributes, recordsData)
require.NoError(t, err)
assert.Equal(t, recordsData, decompressed)
})
t.Run("Round trip with Gzip", func(t *testing.T) {
// Compress
compressed, attributes, err := CompressRecordBatch(Gzip, recordsData)
require.NoError(t, err)
// Decompress
decompressed, err := DecompressRecordBatch(attributes, compressed)
require.NoError(t, err)
assert.Equal(t, recordsData, decompressed)
})
t.Run("Round trip with Snappy", func(t *testing.T) {
// Compress
compressed, attributes, err := CompressRecordBatch(Snappy, recordsData)
require.NoError(t, err)
// Decompress
decompressed, err := DecompressRecordBatch(attributes, compressed)
require.NoError(t, err)
assert.Equal(t, recordsData, decompressed)
})
}
// TestCompressionEfficiency tests compression efficiency for different codecs
func TestCompressionEfficiency(t *testing.T) {
// Create highly compressible data
data := bytes.Repeat([]byte("This is a repeated string for compression testing. "), 100)
codecs := []CompressionCodec{Gzip, Snappy, Lz4, Zstd}
for _, codec := range codecs {
t.Run(codec.String(), func(t *testing.T) {
compressed, err := Compress(codec, data)
require.NoError(t, err)
compressionRatio := float64(len(compressed)) / float64(len(data))
t.Logf("Codec: %s, Original: %d bytes, Compressed: %d bytes, Ratio: %.2f",
codec.String(), len(data), len(compressed), compressionRatio)
// All codecs should achieve some compression on this highly repetitive data
assert.Less(t, len(compressed), len(data), "Compression should reduce data size")
})
}
}
// BenchmarkCompression benchmarks compression performance for different codecs
func BenchmarkCompression(b *testing.B) {
data := bytes.Repeat([]byte("Benchmark data for compression testing. "), 1000)
codecs := []CompressionCodec{None, Gzip, Snappy, Lz4, Zstd}
for _, codec := range codecs {
b.Run(fmt.Sprintf("Compress_%s", codec.String()), func(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, err := Compress(codec, data)
if err != nil {
b.Fatal(err)
}
}
})
}
}
// BenchmarkDecompression benchmarks decompression performance for different codecs
func BenchmarkDecompression(b *testing.B) {
data := bytes.Repeat([]byte("Benchmark data for decompression testing. "), 1000)
codecs := []CompressionCodec{None, Gzip, Snappy, Lz4, Zstd}
for _, codec := range codecs {
// Pre-compress the data
compressed, err := Compress(codec, data)
if err != nil {
b.Fatal(err)
}
b.Run(fmt.Sprintf("Decompress_%s", codec.String()), func(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, err := Decompress(codec, compressed)
if err != nil {
b.Fatal(err)
}
}
})
}
}