Browse Source

kafka protocol: align advertised and validated API version ranges with implemented handlers (Fetch<=v7, ListOffsets<=v2, FindCoordinator<=v2, OffsetCommit/OffsetFetch<=v2); keep Metadata<=v7, JoinGroup<=v7, SyncGroup<=v5

pull/7231/head
chrislu 2 months ago
parent
commit
a5f330ad17
  1. 34
      weed/mq/kafka/protocol/handler.go

34
weed/mq/kafka/protocol/handler.go

@ -450,10 +450,10 @@ func (h *Handler) handleApiVersions(correlationID uint32) ([]byte, error) {
response = append(response, 0, 0) // min version 0
response = append(response, 0, 7) // max version 7
// API Key 2 (ListOffsets): api_key(2) + min_version(2) + max_version(2)
// API Key 2 (ListOffsets): limit to v2 (implemented and tested)
response = append(response, 0, 2) // API key 2
response = append(response, 0, 0) // min version 0
response = append(response, 0, 5) // max version 5
response = append(response, 0, 2) // max version 2
// API Key 19 (CreateTopics): api_key(2) + min_version(2) + max_version(2)
response = append(response, 0, 19) // API key 19
@ -471,10 +471,10 @@ func (h *Handler) handleApiVersions(correlationID uint32) ([]byte, error) {
response = append(response, 0, 0) // min version 0
response = append(response, 0, 7) // max version 7
// API Key 1 (Fetch): api_key(2) + min_version(2) + max_version(2)
response = append(response, 0, 1) // API key 1
response = append(response, 0, 0) // min version 0
response = append(response, 0, 11) // max version 11
// API Key 1 (Fetch): limit to v7 (current handler semantics)
response = append(response, 0, 1) // API key 1
response = append(response, 0, 0) // min version 0
response = append(response, 0, 7) // max version 7
// API Key 11 (JoinGroup): api_key(2) + min_version(2) + max_version(2)
response = append(response, 0, 11) // API key 11
@ -486,20 +486,20 @@ func (h *Handler) handleApiVersions(correlationID uint32) ([]byte, error) {
response = append(response, 0, 0) // min version 0
response = append(response, 0, 5) // max version 5
// API Key 8 (OffsetCommit): api_key(2) + min_version(2) + max_version(2)
// API Key 8 (OffsetCommit): limit to v2 for current implementation
response = append(response, 0, 8) // API key 8
response = append(response, 0, 0) // min version 0
response = append(response, 0, 8) // max version 8
response = append(response, 0, 2) // max version 2
// API Key 9 (OffsetFetch): api_key(2) + min_version(2) + max_version(2)
// API Key 9 (OffsetFetch): limit to v2 (implemented and tested)
response = append(response, 0, 9) // API key 9
response = append(response, 0, 0) // min version 0
response = append(response, 0, 8) // max version 8
response = append(response, 0, 2) // max version 2
// API Key 10 (FindCoordinator): api_key(2) + min_version(2) + max_version(2)
// API Key 10 (FindCoordinator): limit to v2 (implemented)
response = append(response, 0, 10) // API key 10
response = append(response, 0, 0) // min version 0
response = append(response, 0, 4) // max version 4
response = append(response, 0, 2) // max version 2
// API Key 12 (Heartbeat): api_key(2) + min_version(2) + max_version(2)
response = append(response, 0, 12) // API key 12
@ -1691,15 +1691,15 @@ func (h *Handler) validateAPIVersion(apiKey, apiVersion uint16) error {
18: {0, 3}, // ApiVersions: v0-v3
3: {0, 7}, // Metadata: v0-v7
0: {0, 7}, // Produce: v0-v7
1: {0, 11}, // Fetch: v0-v11
2: {0, 5}, // ListOffsets: v0-v5
1: {0, 7}, // Fetch: v0-v7
2: {0, 2}, // ListOffsets: v0-v2
19: {0, 4}, // CreateTopics: v0-v4
20: {0, 4}, // DeleteTopics: v0-v4
10: {0, 4}, // FindCoordinator: v0-v4
10: {0, 2}, // FindCoordinator: v0-v2
11: {0, 7}, // JoinGroup: v0-v7
14: {0, 5}, // SyncGroup: v0-v5
8: {0, 8}, // OffsetCommit: v0-v8
9: {0, 8}, // OffsetFetch: v0-v8
8: {0, 2}, // OffsetCommit: v0-v2
9: {0, 2}, // OffsetFetch: v0-v2
12: {0, 4}, // Heartbeat: v0-v4
13: {0, 4}, // LeaveGroup: v0-v4
}

Loading…
Cancel
Save