|
@ -5,7 +5,6 @@ import ( |
|
|
"fmt" |
|
|
"fmt" |
|
|
"github.com/seaweedfs/seaweedfs/weed/glog" |
|
|
"github.com/seaweedfs/seaweedfs/weed/glog" |
|
|
"github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer" |
|
|
"github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer" |
|
|
"github.com/seaweedfs/seaweedfs/weed/mq/schema" |
|
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/mq/topic" |
|
|
"github.com/seaweedfs/seaweedfs/weed/mq/topic" |
|
|
"github.com/seaweedfs/seaweedfs/weed/pb" |
|
|
"github.com/seaweedfs/seaweedfs/weed/pb" |
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" |
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" |
|
@ -30,9 +29,6 @@ func (b *MessageQueueBroker) ConfigureTopic(ctx context.Context, request *mq_pb. |
|
|
|
|
|
|
|
|
// validate the schema
|
|
|
// validate the schema
|
|
|
if request.RecordType != nil { |
|
|
if request.RecordType != nil { |
|
|
if _, err = schema.NewSchema(request.Topic.Namespace, request.Topic.Name, request.RecordType); err != nil { |
|
|
|
|
|
return nil, status.Errorf(codes.InvalidArgument, "invalid record type %+v: %v", request.RecordType, err) |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
t := topic.FromPbTopic(request.Topic) |
|
|
t := topic.FromPbTopic(request.Topic) |
|
|