diff --git a/test/postgres/Dockerfile.client b/test/postgres/Dockerfile.client index f2a2b1f9b..2b85bc76e 100644 --- a/test/postgres/Dockerfile.client +++ b/test/postgres/Dockerfile.client @@ -7,11 +7,6 @@ WORKDIR /app COPY go.mod go.sum ./ RUN go mod download -# Install additional dependencies for PostgreSQL driver -RUN go mod edit -require github.com/lib/pq@v1.10.9 -RUN go mod tidy -RUN go mod download - # Copy source code COPY . . diff --git a/weed/command/db.go b/weed/command/db.go index 84027f167..3fa99103d 100644 --- a/weed/command/db.go +++ b/weed/command/db.go @@ -316,7 +316,7 @@ func parseAuthMethod(method string) (postgres.AuthMethod, error) { // parseUsers parses the user credentials string // Format: username:password;username2:password2 -// Semicolons are used as separators to avoid conflicts with commas that may appear in passwords +// Semicolons are used as separators to support passwords with special characters including commas func parseUsers(usersStr string, authMethod postgres.AuthMethod) (map[string]string, error) { users := make(map[string]string) @@ -328,23 +328,8 @@ func parseUsers(usersStr string, authMethod postgres.AuthMethod) (map[string]str return users, nil } - // Parse user:password pairs separated by semicolons (safer than commas for passwords) - // Also support legacy comma format for backward compatibility - var pairs []string - if strings.Contains(usersStr, ";") { - pairs = strings.Split(usersStr, ";") - } else { - // Legacy comma format - warn about potential issues - pairs = strings.Split(usersStr, ",") - if len(pairs) > 1 { - // Only warn if there are actually multiple pairs - for _, pair := range pairs { - if strings.Count(pair, ":") > 1 { - return nil, fmt.Errorf("detected multiple colons in user specification '%s'. This may indicate a password containing commas. Please use semicolons (;) to separate user:password pairs instead of commas", pair) - } - } - } - } + // Parse user:password pairs separated by semicolons + pairs := strings.Split(usersStr, ";") for _, pair := range pairs { pair = strings.TrimSpace(pair) diff --git a/weed/mq/broker/broker_grpc_query.go b/weed/mq/broker/broker_grpc_query.go index 6f0dd10ff..b0f3ac43d 100644 --- a/weed/mq/broker/broker_grpc_query.go +++ b/weed/mq/broker/broker_grpc_query.go @@ -134,48 +134,70 @@ func (b *MessageQueueBroker) buildBufferStartDeduplicationMap(partitionDir strin var flushedRanges []BufferRange // List all files in the partition directory using filer client accessor + // Use pagination to handle directories with more than 1000 files err := b.fca.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { - return filer_pb.SeaweedList(context.Background(), client, partitionDir, "", func(entry *filer_pb.Entry, isLast bool) error { - if entry.IsDirectory { - return nil - } + var lastFileName string + var hasMore = true + + for hasMore { + var currentBatchProcessed int + err := filer_pb.SeaweedList(context.Background(), client, partitionDir, "", func(entry *filer_pb.Entry, isLast bool) error { + currentBatchProcessed++ + hasMore = !isLast // If this is the last entry of a full batch, there might be more + lastFileName = entry.Name + + if entry.IsDirectory { + return nil + } - // Skip Parquet files - they don't represent buffer ranges - if strings.HasSuffix(entry.Name, ".parquet") { - return nil - } + // Skip Parquet files - they don't represent buffer ranges + if strings.HasSuffix(entry.Name, ".parquet") { + return nil + } - // Skip offset files - if strings.HasSuffix(entry.Name, ".offset") { - return nil - } + // Skip offset files + if strings.HasSuffix(entry.Name, ".offset") { + return nil + } - // Get buffer start for this file - bufferStart, err := b.getLogBufferStartFromFile(entry) - if err != nil { - glog.V(2).Infof("Failed to get buffer start from file %s: %v", entry.Name, err) - return nil // Continue with other files - } + // Get buffer start for this file + bufferStart, err := b.getLogBufferStartFromFile(entry) + if err != nil { + glog.V(2).Infof("Failed to get buffer start from file %s: %v", entry.Name, err) + return nil // Continue with other files + } + + if bufferStart == nil { + // File has no buffer metadata - skip deduplication for this file + glog.V(2).Infof("File %s has no buffer_start metadata", entry.Name) + return nil + } + + // Calculate the buffer range covered by this file + chunkCount := int64(len(entry.GetChunks())) + if chunkCount > 0 { + fileRange := BufferRange{ + start: bufferStart.StartIndex, + end: bufferStart.StartIndex + chunkCount - 1, + } + flushedRanges = append(flushedRanges, fileRange) + glog.V(3).Infof("File %s covers buffer range [%d-%d]", entry.Name, fileRange.start, fileRange.end) + } - if bufferStart == nil { - // File has no buffer metadata - skip deduplication for this file - glog.V(2).Infof("File %s has no buffer_start metadata", entry.Name) return nil + }, lastFileName, false, 1000) // Start from last processed file name for next batch + + if err != nil { + return err } - // Calculate the buffer range covered by this file - chunkCount := int64(len(entry.GetChunks())) - if chunkCount > 0 { - fileRange := BufferRange{ - start: bufferStart.StartIndex, - end: bufferStart.StartIndex + chunkCount - 1, - } - flushedRanges = append(flushedRanges, fileRange) - glog.V(3).Infof("File %s covers buffer range [%d-%d]", entry.Name, fileRange.start, fileRange.end) + // If we processed fewer than 1000 entries, we've reached the end + if currentBatchProcessed < 1000 { + hasMore = false } + } - return nil - }, "", true, 1000) + return nil }) if err != nil { diff --git a/weed/query/engine/hybrid_message_scanner.go b/weed/query/engine/hybrid_message_scanner.go index 280582771..f2e3d1a23 100644 --- a/weed/query/engine/hybrid_message_scanner.go +++ b/weed/query/engine/hybrid_message_scanner.go @@ -516,14 +516,8 @@ func (hms *HybridMessageScanner) scanPartitionHybridWithStats(ctx context.Contex // STEP 3: Sort results chronologically (unflushed + flushed data) // This ensures proper time ordering across all data sources if len(results) > 1 { - // Simple sort by timestamp - in a full implementation, consider more efficient merging - for i := 0; i < len(results)-1; i++ { - for j := i + 1; j < len(results); j++ { - if results[i].Timestamp > results[j].Timestamp { - results[i], results[j] = results[j], results[i] - } - } - } + // Use efficient merge sort for better performance with large datasets + hms.mergeSort(results, 0, len(results)-1) } // Apply final limit after merging and sorting @@ -1272,3 +1266,56 @@ func (h *HybridMessageScanner) compareRawValues(v1, v2 interface{}) int { } return 0 } + +// mergeSort efficiently sorts HybridScanResult slice by timestamp using merge sort algorithm +func (hms *HybridMessageScanner) mergeSort(results []HybridScanResult, left, right int) { + if left < right { + mid := left + (right-left)/2 + + // Recursively sort both halves + hms.mergeSort(results, left, mid) + hms.mergeSort(results, mid+1, right) + + // Merge the sorted halves + hms.merge(results, left, mid, right) + } +} + +// merge combines two sorted subarrays into a single sorted array +func (hms *HybridMessageScanner) merge(results []HybridScanResult, left, mid, right int) { + // Create temporary arrays for the two subarrays + leftArray := make([]HybridScanResult, mid-left+1) + rightArray := make([]HybridScanResult, right-mid) + + // Copy data to temporary arrays + copy(leftArray, results[left:mid+1]) + copy(rightArray, results[mid+1:right+1]) + + // Merge the temporary arrays back into results[left..right] + i, j, k := 0, 0, left + + for i < len(leftArray) && j < len(rightArray) { + if leftArray[i].Timestamp <= rightArray[j].Timestamp { + results[k] = leftArray[i] + i++ + } else { + results[k] = rightArray[j] + j++ + } + k++ + } + + // Copy remaining elements of leftArray, if any + for i < len(leftArray) { + results[k] = leftArray[i] + i++ + k++ + } + + // Copy remaining elements of rightArray, if any + for j < len(rightArray) { + results[k] = rightArray[j] + j++ + k++ + } +}