You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
198 lines
6.0 KiB
198 lines
6.0 KiB
package schema
|
|
|
|
import (
|
|
"testing"
|
|
|
|
"github.com/stretchr/testify/assert"
|
|
"github.com/stretchr/testify/require"
|
|
)
|
|
|
|
func TestEncodeDecodeVarint(t *testing.T) {
|
|
testCases := []struct {
|
|
name string
|
|
value uint64
|
|
}{
|
|
{"zero", 0},
|
|
{"small", 1},
|
|
{"medium", 127},
|
|
{"large", 128},
|
|
{"very_large", 16384},
|
|
{"max_uint32", 4294967295},
|
|
}
|
|
|
|
for _, tc := range testCases {
|
|
t.Run(tc.name, func(t *testing.T) {
|
|
// Encode the value
|
|
encoded := encodeVarint(tc.value)
|
|
require.NotEmpty(t, encoded)
|
|
|
|
// Decode it back
|
|
decoded, bytesRead := readVarint(encoded)
|
|
require.Equal(t, len(encoded), bytesRead, "Should consume all encoded bytes")
|
|
assert.Equal(t, tc.value, decoded, "Decoded value should match original")
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestCreateConfluentEnvelopeWithProtobufIndexes(t *testing.T) {
|
|
testCases := []struct {
|
|
name string
|
|
format Format
|
|
schemaID uint32
|
|
indexes []int
|
|
payload []byte
|
|
}{
|
|
{
|
|
name: "avro_no_indexes",
|
|
format: FormatAvro,
|
|
schemaID: 123,
|
|
indexes: nil,
|
|
payload: []byte("avro payload"),
|
|
},
|
|
{
|
|
name: "protobuf_no_indexes",
|
|
format: FormatProtobuf,
|
|
schemaID: 456,
|
|
indexes: nil,
|
|
payload: []byte("protobuf payload"),
|
|
},
|
|
{
|
|
name: "protobuf_single_index",
|
|
format: FormatProtobuf,
|
|
schemaID: 789,
|
|
indexes: []int{1},
|
|
payload: []byte("protobuf with index"),
|
|
},
|
|
{
|
|
name: "protobuf_multiple_indexes",
|
|
format: FormatProtobuf,
|
|
schemaID: 101112,
|
|
indexes: []int{0, 1, 2, 3},
|
|
payload: []byte("protobuf with multiple indexes"),
|
|
},
|
|
}
|
|
|
|
for _, tc := range testCases {
|
|
t.Run(tc.name, func(t *testing.T) {
|
|
// Create the envelope
|
|
envelope := CreateConfluentEnvelope(tc.format, tc.schemaID, tc.indexes, tc.payload)
|
|
|
|
// Verify basic structure
|
|
require.True(t, len(envelope) >= 5, "Envelope should be at least 5 bytes")
|
|
assert.Equal(t, byte(0x00), envelope[0], "Magic byte should be 0x00")
|
|
|
|
// Extract and verify schema ID
|
|
extractedSchemaID, ok := ExtractSchemaID(envelope)
|
|
require.True(t, ok, "Should be able to extract schema ID")
|
|
assert.Equal(t, tc.schemaID, extractedSchemaID, "Schema ID should match")
|
|
|
|
// Parse the envelope based on format
|
|
if tc.format == FormatProtobuf && len(tc.indexes) > 0 {
|
|
// Use Protobuf-specific parser with known index count
|
|
parsed, ok := ParseConfluentProtobufEnvelopeWithIndexCount(envelope, len(tc.indexes))
|
|
require.True(t, ok, "Should be able to parse Protobuf envelope")
|
|
assert.Equal(t, tc.format, parsed.Format)
|
|
assert.Equal(t, tc.schemaID, parsed.SchemaID)
|
|
assert.Equal(t, tc.indexes, parsed.Indexes, "Indexes should match")
|
|
assert.Equal(t, tc.payload, parsed.Payload, "Payload should match")
|
|
} else {
|
|
// Use generic parser
|
|
parsed, ok := ParseConfluentEnvelope(envelope)
|
|
require.True(t, ok, "Should be able to parse envelope")
|
|
assert.Equal(t, tc.schemaID, parsed.SchemaID)
|
|
|
|
if tc.format == FormatProtobuf && len(tc.indexes) == 0 {
|
|
// For Protobuf without indexes, payload should match
|
|
assert.Equal(t, tc.payload, parsed.Payload, "Payload should match")
|
|
} else if tc.format == FormatAvro {
|
|
// For Avro, payload should match (no indexes)
|
|
assert.Equal(t, tc.payload, parsed.Payload, "Payload should match")
|
|
}
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestProtobufEnvelopeRoundTrip(t *testing.T) {
|
|
// Use more realistic index values (typically small numbers for message types)
|
|
originalIndexes := []int{0, 1, 2, 3}
|
|
originalPayload := []byte("test protobuf message data")
|
|
schemaID := uint32(12345)
|
|
|
|
// Create envelope
|
|
envelope := CreateConfluentEnvelope(FormatProtobuf, schemaID, originalIndexes, originalPayload)
|
|
|
|
// Parse it back with known index count
|
|
parsed, ok := ParseConfluentProtobufEnvelopeWithIndexCount(envelope, len(originalIndexes))
|
|
require.True(t, ok, "Should be able to parse created envelope")
|
|
|
|
// Verify all fields
|
|
assert.Equal(t, FormatProtobuf, parsed.Format)
|
|
assert.Equal(t, schemaID, parsed.SchemaID)
|
|
assert.Equal(t, originalIndexes, parsed.Indexes)
|
|
assert.Equal(t, originalPayload, parsed.Payload)
|
|
assert.Equal(t, envelope, parsed.OriginalBytes)
|
|
}
|
|
|
|
func TestVarintEdgeCases(t *testing.T) {
|
|
t.Run("empty_data", func(t *testing.T) {
|
|
value, bytesRead := readVarint([]byte{})
|
|
assert.Equal(t, uint64(0), value)
|
|
assert.Equal(t, 0, bytesRead)
|
|
})
|
|
|
|
t.Run("incomplete_varint", func(t *testing.T) {
|
|
// Create an incomplete varint (continuation bit set but no more bytes)
|
|
incompleteVarint := []byte{0x80} // Continuation bit set, but no more bytes
|
|
value, bytesRead := readVarint(incompleteVarint)
|
|
assert.Equal(t, uint64(0), value)
|
|
assert.Equal(t, 0, bytesRead)
|
|
})
|
|
|
|
t.Run("max_varint_length", func(t *testing.T) {
|
|
// Create a varint that's too long (more than 10 bytes)
|
|
tooLongVarint := make([]byte, 11)
|
|
for i := 0; i < 10; i++ {
|
|
tooLongVarint[i] = 0x80 // All continuation bits
|
|
}
|
|
tooLongVarint[10] = 0x01 // Final byte
|
|
|
|
value, bytesRead := readVarint(tooLongVarint)
|
|
assert.Equal(t, uint64(0), value)
|
|
assert.Equal(t, 0, bytesRead)
|
|
})
|
|
}
|
|
|
|
func TestProtobufEnvelopeValidation(t *testing.T) {
|
|
t.Run("valid_envelope", func(t *testing.T) {
|
|
indexes := []int{1, 2}
|
|
envelope := CreateConfluentEnvelope(FormatProtobuf, 123, indexes, []byte("payload"))
|
|
parsed, ok := ParseConfluentProtobufEnvelopeWithIndexCount(envelope, len(indexes))
|
|
require.True(t, ok)
|
|
|
|
err := parsed.Validate()
|
|
assert.NoError(t, err)
|
|
})
|
|
|
|
t.Run("zero_schema_id", func(t *testing.T) {
|
|
indexes := []int{1}
|
|
envelope := CreateConfluentEnvelope(FormatProtobuf, 0, indexes, []byte("payload"))
|
|
parsed, ok := ParseConfluentProtobufEnvelopeWithIndexCount(envelope, len(indexes))
|
|
require.True(t, ok)
|
|
|
|
err := parsed.Validate()
|
|
assert.Error(t, err)
|
|
assert.Contains(t, err.Error(), "invalid schema ID: 0")
|
|
})
|
|
|
|
t.Run("empty_payload", func(t *testing.T) {
|
|
indexes := []int{1}
|
|
envelope := CreateConfluentEnvelope(FormatProtobuf, 123, indexes, []byte{})
|
|
parsed, ok := ParseConfluentProtobufEnvelopeWithIndexCount(envelope, len(indexes))
|
|
require.True(t, ok)
|
|
|
|
err := parsed.Validate()
|
|
assert.Error(t, err)
|
|
assert.Contains(t, err.Error(), "empty payload")
|
|
})
|
|
}
|