|
|
@ -7,7 +7,6 @@ import ( |
|
|
|
"strings" |
|
|
|
"time" |
|
|
|
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/glog" |
|
|
|
"github.com/seaweedfs/seaweedfs/weed/mq/kafka/compression" |
|
|
|
"github.com/seaweedfs/seaweedfs/weed/mq/kafka/schema" |
|
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" |
|
|
@ -1452,13 +1451,11 @@ func (h *Handler) inferRecordTypeFromAvroSchema(avroSchema string) (*schema_pb.R |
|
|
|
h.inferredRecordTypesMu.RLock() |
|
|
|
if recordType, exists := h.inferredRecordTypes[avroSchema]; exists { |
|
|
|
h.inferredRecordTypesMu.RUnlock() |
|
|
|
glog.Infof("RecordType cache HIT for Avro schema (length=%d)", len(avroSchema)) |
|
|
|
return recordType, nil |
|
|
|
} |
|
|
|
h.inferredRecordTypesMu.RUnlock() |
|
|
|
|
|
|
|
// Cache miss - create decoder and infer type
|
|
|
|
glog.Infof("RecordType cache MISS for Avro schema (length=%d), creating codec", len(avroSchema)) |
|
|
|
decoder, err := schema.NewAvroDecoder(avroSchema) |
|
|
|
if err != nil { |
|
|
|
return nil, fmt.Errorf("failed to create Avro decoder: %w", err) |
|
|
@ -1473,7 +1470,6 @@ func (h *Handler) inferRecordTypeFromAvroSchema(avroSchema string) (*schema_pb.R |
|
|
|
h.inferredRecordTypesMu.Lock() |
|
|
|
h.inferredRecordTypes[avroSchema] = recordType |
|
|
|
h.inferredRecordTypesMu.Unlock() |
|
|
|
glog.Infof("Cached inferred RecordType for Avro schema") |
|
|
|
|
|
|
|
return recordType, nil |
|
|
|
} |
|
|
@ -1486,13 +1482,11 @@ func (h *Handler) inferRecordTypeFromProtobufSchema(protobufSchema string) (*sch |
|
|
|
h.inferredRecordTypesMu.RLock() |
|
|
|
if recordType, exists := h.inferredRecordTypes[cacheKey]; exists { |
|
|
|
h.inferredRecordTypesMu.RUnlock() |
|
|
|
glog.Infof("RecordType cache HIT for Protobuf schema") |
|
|
|
return recordType, nil |
|
|
|
} |
|
|
|
h.inferredRecordTypesMu.RUnlock() |
|
|
|
|
|
|
|
// Cache miss - create decoder and infer type
|
|
|
|
glog.Infof("RecordType cache MISS for Protobuf schema, creating decoder") |
|
|
|
decoder, err := schema.NewProtobufDecoder([]byte(protobufSchema)) |
|
|
|
if err != nil { |
|
|
|
return nil, fmt.Errorf("failed to create Protobuf decoder: %w", err) |
|
|
@ -1519,13 +1513,11 @@ func (h *Handler) inferRecordTypeFromJSONSchema(jsonSchema string) (*schema_pb.R |
|
|
|
h.inferredRecordTypesMu.RLock() |
|
|
|
if recordType, exists := h.inferredRecordTypes[cacheKey]; exists { |
|
|
|
h.inferredRecordTypesMu.RUnlock() |
|
|
|
glog.Infof("RecordType cache HIT for JSON schema") |
|
|
|
return recordType, nil |
|
|
|
} |
|
|
|
h.inferredRecordTypesMu.RUnlock() |
|
|
|
|
|
|
|
// Cache miss - create decoder and infer type
|
|
|
|
glog.Infof("RecordType cache MISS for JSON schema, creating decoder") |
|
|
|
decoder, err := schema.NewJSONSchemaDecoder(jsonSchema) |
|
|
|
if err != nil { |
|
|
|
return nil, fmt.Errorf("failed to create JSON Schema decoder: %w", err) |
|
|
|