diff --git a/weed/command/sql.go b/weed/command/sql.go index 5b7f3a0d6..264e0515c 100644 --- a/weed/command/sql.go +++ b/weed/command/sql.go @@ -288,15 +288,13 @@ func runInteractiveShell(ctx *SQLContext) bool { } // Handle database switching - upperQuery := strings.ToUpper(cleanQuery) - if strings.HasPrefix(upperQuery, "USE ") { + parts := strings.Fields(cleanQuery) + if len(parts) >= 2 && strings.ToUpper(parts[0]) == "USE" { // Extract database name preserving original case - parts := strings.SplitN(cleanQuery, " ", 2) - if len(parts) >= 2 { - dbName := strings.TrimSpace(parts[1]) - ctx.currentDatabase = dbName - // Also update the SQL engine's catalog current database - ctx.engine.GetCatalog().SetCurrentDatabase(dbName) + dbName := strings.TrimSpace(parts[1]) + ctx.currentDatabase = dbName + // Also update the SQL engine's catalog current database + ctx.engine.GetCatalog().SetCurrentDatabase(dbName) fmt.Printf("Database changed to: %s\n\n", dbName) queryBuffer.Reset() continue @@ -349,8 +347,12 @@ func isSpecialCommand(query string) bool { } } - // Commands that start with specific prefixes - return strings.HasPrefix(strings.ToUpper(cleanQuery), "USE ") || + // Commands that are exactly specific commands (not just prefixes) + parts := strings.Fields(strings.ToUpper(cleanQuery)) + if len(parts) == 0 { + return false + } + return (parts[0] == "USE" && len(parts) >= 2) || strings.HasPrefix(strings.ToUpper(cleanQuery), "\\FORMAT ") } diff --git a/weed/mq/broker/broker_grpc_query.go b/weed/mq/broker/broker_grpc_query.go index c2d693db1..6f0dd10ff 100644 --- a/weed/mq/broker/broker_grpc_query.go +++ b/weed/mq/broker/broker_grpc_query.go @@ -6,13 +6,10 @@ import ( "fmt" "strings" - "google.golang.org/protobuf/proto" - "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/mq/topic" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" - "github.com/seaweedfs/seaweedfs/weed/util" "github.com/seaweedfs/seaweedfs/weed/util/log_buffer" ) @@ -65,55 +62,23 @@ func (b *MessageQueueBroker) GetUnflushedMessages(req *mq_pb.GetUnflushedMessage messageCount := 0 startPosition := log_buffer.NewMessagePosition(startTimeNs, startBufferIndex) - // Create a custom LoopProcessLogData function that captures the batch index - // Since we can't modify the existing EachLogEntryFuncType signature, - // we'll implement our own iteration logic based on LoopProcessLogData - var lastReadPosition = startPosition - var isDone bool - - for !isDone { - // Use ReadFromBuffer to get the next batch with its correct batch index - bytesBuf, batchIndex, err := localPartition.LogBuffer.ReadFromBuffer(lastReadPosition) - if err == log_buffer.ResumeFromDiskError { - break - } - if err != nil { - return err - } - - // If no more data in memory, we're done - if bytesBuf == nil { - break - } - - // Process all messages in this batch - buf := bytesBuf.Bytes() - for pos := 0; pos+4 < len(buf); { - size := util.BytesToUint32(buf[pos : pos+4]) - if pos+4+int(size) > len(buf) { - break - } - entryData := buf[pos+4 : pos+4+int(size)] - - logEntry := &filer_pb.LogEntry{} - if err = proto.Unmarshal(entryData, logEntry); err != nil { - pos += 4 + int(size) - continue - } - - // Now we have the correct batchIndex for this message + // Use the new LoopProcessLogDataWithBatchIndex method to avoid code duplication + _, _, err = localPartition.LogBuffer.LoopProcessLogDataWithBatchIndex( + "GetUnflushedMessages", + startPosition, + 0, // stopTsNs = 0 means process all available data + func() bool { return false }, // waitForDataFn = false means don't wait for new data + func(logEntry *filer_pb.LogEntry, batchIndex int64) (isDone bool, err error) { // Apply buffer index filtering if specified if startBufferIndex > 0 && batchIndex < startBufferIndex { glog.V(3).Infof("Skipping message from buffer index %d (< %d)", batchIndex, startBufferIndex) - pos += 4 + int(size) - continue + return false, nil } // Check if this message is from a buffer range that's already been flushed if b.isBufferIndexFlushed(batchIndex, flushedBufferRanges) { glog.V(3).Infof("Skipping message from flushed buffer index %d", batchIndex) - pos += 4 + int(size) - continue + return false, nil } // Stream this message @@ -129,18 +94,13 @@ func (b *MessageQueueBroker) GetUnflushedMessages(req *mq_pb.GetUnflushedMessage if err != nil { glog.Errorf("Failed to stream message: %v", err) - isDone = true - break + return true, err // isDone = true to stop processing } messageCount++ - lastReadPosition = log_buffer.NewMessagePosition(logEntry.TsNs, batchIndex) - pos += 4 + int(size) - } - - // Release the buffer back to the pool - localPartition.LogBuffer.ReleaseMemory(bytesBuf) - } + return false, nil // Continue processing + }, + ) // Handle collection errors if err != nil && err != log_buffer.ResumeFromDiskError { diff --git a/weed/server/postgres/protocol.go b/weed/server/postgres/protocol.go index a110e03af..ccb873a35 100644 --- a/weed/server/postgres/protocol.go +++ b/weed/server/postgres/protocol.go @@ -132,21 +132,18 @@ func (s *PostgreSQLServer) handleSimpleQuery(session *PostgreSQLSession, query s glog.V(2).Infof("PostgreSQL Query (ID: %d): %s", session.processID, query) // Handle USE database commands for session context - queryUpper := strings.ToUpper(strings.TrimSpace(query)) - if strings.HasPrefix(queryUpper, "USE ") { - parts := strings.Fields(query) - if len(parts) >= 2 { - newDatabase := strings.TrimSpace(parts[1]) - session.database = newDatabase - s.sqlEngine.GetCatalog().SetCurrentDatabase(newDatabase) - - // Send command complete for USE - err := s.sendCommandComplete(session, "USE") - if err != nil { - return err - } - return s.sendReadyForQuery(session) + parts := strings.Fields(strings.TrimSpace(query)) + if len(parts) >= 2 && strings.ToUpper(parts[0]) == "USE" { + newDatabase := strings.TrimSpace(parts[1]) + session.database = newDatabase + s.sqlEngine.GetCatalog().SetCurrentDatabase(newDatabase) + + // Send command complete for USE + err := s.sendCommandComplete(session, "USE") + if err != nil { + return err } + return s.sendReadyForQuery(session) } // Set database context in SQL engine if session database is different from current diff --git a/weed/util/log_buffer/log_buffer.go b/weed/util/log_buffer/log_buffer.go index f99f1a7dd..8c9e222af 100644 --- a/weed/util/log_buffer/log_buffer.go +++ b/weed/util/log_buffer/log_buffer.go @@ -24,6 +24,7 @@ type dataToFlush struct { } type EachLogEntryFuncType func(logEntry *filer_pb.LogEntry) (isDone bool, err error) +type EachLogEntryWithBatchIndexFuncType func(logEntry *filer_pb.LogEntry, batchIndex int64) (isDone bool, err error) type LogFlushFuncType func(logBuffer *LogBuffer, startTime, stopTime time.Time, buf []byte) type LogReadFromDiskFuncType func(startPosition MessagePosition, stopTsNs int64, eachLogEntryFn EachLogEntryFuncType) (lastReadPosition MessagePosition, isDone bool, err error) diff --git a/weed/util/log_buffer/log_read.go b/weed/util/log_buffer/log_read.go index cf83de1e5..0ebcc7cc9 100644 --- a/weed/util/log_buffer/log_read.go +++ b/weed/util/log_buffer/log_read.go @@ -130,3 +130,105 @@ func (logBuffer *LogBuffer) LoopProcessLogData(readerName string, startPosition } } + +// LoopProcessLogDataWithBatchIndex is similar to LoopProcessLogData but provides batchIndex to the callback +func (logBuffer *LogBuffer) LoopProcessLogDataWithBatchIndex(readerName string, startPosition MessagePosition, stopTsNs int64, + waitForDataFn func() bool, eachLogDataFn EachLogEntryWithBatchIndexFuncType) (lastReadPosition MessagePosition, isDone bool, err error) { + // loop through all messages + var bytesBuf *bytes.Buffer + var batchIndex int64 + lastReadPosition = startPosition + var entryCounter int64 + defer func() { + if bytesBuf != nil { + logBuffer.ReleaseMemory(bytesBuf) + } + // println("LoopProcessLogDataWithBatchIndex", readerName, "sent messages total", entryCounter) + }() + + for { + + if bytesBuf != nil { + logBuffer.ReleaseMemory(bytesBuf) + } + bytesBuf, batchIndex, err = logBuffer.ReadFromBuffer(lastReadPosition) + if err == ResumeFromDiskError { + time.Sleep(1127 * time.Millisecond) + return lastReadPosition, isDone, ResumeFromDiskError + } + readSize := 0 + if bytesBuf != nil { + readSize = bytesBuf.Len() + } + glog.V(4).Infof("%s ReadFromBuffer at %v batch %d. Read bytes %v batch %d", readerName, lastReadPosition, lastReadPosition.BatchIndex, readSize, batchIndex) + if bytesBuf == nil { + if batchIndex >= 0 { + lastReadPosition = NewMessagePosition(lastReadPosition.UnixNano(), batchIndex) + } + if stopTsNs != 0 { + isDone = true + return + } + lastTsNs := logBuffer.LastTsNs.Load() + + for lastTsNs == logBuffer.LastTsNs.Load() { + if waitForDataFn() { + continue + } else { + isDone = true + return + } + } + if logBuffer.IsStopping() { + isDone = true + return + } + continue + } + + buf := bytesBuf.Bytes() + // fmt.Printf("ReadFromBuffer %s by %v size %d\n", readerName, lastReadPosition, len(buf)) + + batchSize := 0 + + for pos := 0; pos+4 < len(buf); { + + size := util.BytesToUint32(buf[pos : pos+4]) + if pos+4+int(size) > len(buf) { + err = ResumeError + glog.Errorf("LoopProcessLogDataWithBatchIndex: %s read buffer %v read %d entries [%d,%d) from [0,%d)", readerName, lastReadPosition, batchSize, pos, pos+int(size)+4, len(buf)) + return + } + entryData := buf[pos+4 : pos+4+int(size)] + + logEntry := &filer_pb.LogEntry{} + if err = proto.Unmarshal(entryData, logEntry); err != nil { + glog.Errorf("unexpected unmarshal mq_pb.Message: %v", err) + pos += 4 + int(size) + continue + } + if stopTsNs != 0 && logEntry.TsNs > stopTsNs { + isDone = true + // println("stopTsNs", stopTsNs, "logEntry.TsNs", logEntry.TsNs) + return + } + lastReadPosition = NewMessagePosition(logEntry.TsNs, batchIndex) + + if isDone, err = eachLogDataFn(logEntry, batchIndex); err != nil { + glog.Errorf("LoopProcessLogDataWithBatchIndex: %s process log entry %d %v: %v", readerName, batchSize+1, logEntry, err) + return + } + if isDone { + glog.V(0).Infof("LoopProcessLogDataWithBatchIndex: %s process log entry %d", readerName, batchSize+1) + return + } + + pos += 4 + int(size) + batchSize++ + entryCounter++ + + } + + } + +}