From a6e48b76905d94e9c90953d6078660b4f038aa1e Mon Sep 17 00:00:00 2001 From: chrislu Date: Tue, 2 Sep 2025 06:56:49 -0700 Subject: [PATCH] add jdbc --- jdbc-driver/README.md | 338 +++++ .../examples/SeaweedFSJDBCExample.java | 308 ++++ jdbc-driver/pom.xml | 154 ++ .../seaweedfs/jdbc/SeaweedFSConnection.java | 497 +++++++ .../jdbc/SeaweedFSConnectionInfo.java | 71 + .../jdbc/SeaweedFSDatabaseMetaData.java | 972 +++++++++++++ .../com/seaweedfs/jdbc/SeaweedFSDriver.java | 207 +++ .../jdbc/SeaweedFSPreparedStatement.java | 352 +++++ .../seaweedfs/jdbc/SeaweedFSResultSet.java | 1245 +++++++++++++++++ .../jdbc/SeaweedFSResultSetMetaData.java | 202 +++ .../seaweedfs/jdbc/SeaweedFSStatement.java | 389 +++++ .../META-INF/services/java.sql.Driver | 1 + .../seaweedfs/jdbc/SeaweedFSDriverTest.java | 75 + weed/command/command.go | 1 + weed/command/jdbc.go | 141 ++ weed/server/jdbc_server.go | 524 +++++++ 16 files changed, 5477 insertions(+) create mode 100644 jdbc-driver/README.md create mode 100644 jdbc-driver/examples/SeaweedFSJDBCExample.java create mode 100644 jdbc-driver/pom.xml create mode 100644 jdbc-driver/src/main/java/com/seaweedfs/jdbc/SeaweedFSConnection.java create mode 100644 jdbc-driver/src/main/java/com/seaweedfs/jdbc/SeaweedFSConnectionInfo.java create mode 100644 jdbc-driver/src/main/java/com/seaweedfs/jdbc/SeaweedFSDatabaseMetaData.java create mode 100644 jdbc-driver/src/main/java/com/seaweedfs/jdbc/SeaweedFSDriver.java create mode 100644 jdbc-driver/src/main/java/com/seaweedfs/jdbc/SeaweedFSPreparedStatement.java create mode 100644 jdbc-driver/src/main/java/com/seaweedfs/jdbc/SeaweedFSResultSet.java create mode 100644 jdbc-driver/src/main/java/com/seaweedfs/jdbc/SeaweedFSResultSetMetaData.java create mode 100644 jdbc-driver/src/main/java/com/seaweedfs/jdbc/SeaweedFSStatement.java create mode 100644 jdbc-driver/src/main/resources/META-INF/services/java.sql.Driver create mode 100644 jdbc-driver/src/test/java/com/seaweedfs/jdbc/SeaweedFSDriverTest.java create mode 100644 weed/command/jdbc.go create mode 100644 weed/server/jdbc_server.go diff --git a/jdbc-driver/README.md b/jdbc-driver/README.md new file mode 100644 index 000000000..d3b4be04a --- /dev/null +++ b/jdbc-driver/README.md @@ -0,0 +1,338 @@ +# SeaweedFS JDBC Driver + +A JDBC driver for connecting to SeaweedFS SQL engine, enabling standard Java applications and BI tools to query SeaweedFS MQ topics using SQL. + +## Features + +- **Standard JDBC Interface**: Compatible with any Java application or tool that supports JDBC +- **SQL Query Support**: Execute SELECT queries on SeaweedFS MQ topics +- **Aggregation Functions**: Support for COUNT, SUM, AVG, MIN, MAX operations +- **System Columns**: Access to `_timestamp_ns`, `_key`, `_source` system columns +- **Database Tools**: Works with DBeaver, IntelliJ DataGrip, and other database tools +- **BI Tools**: Compatible with Tableau, Power BI, and other business intelligence tools +- **Read-Only Access**: Secure read-only access to your SeaweedFS data + +## Quick Start + +### 1. Start SeaweedFS JDBC Server + +First, start the SeaweedFS JDBC server: + +```bash +# Start JDBC server on default port 8089 +weed jdbc + +# Or with custom configuration +weed jdbc -port=8090 -host=0.0.0.0 -master=master-server:9333 +``` + +### 2. Add JDBC Driver to Your Project + +#### Maven + +```xml + + com.seaweedfs + seaweedfs-jdbc + 1.0.0 + +``` + +#### Gradle + +```gradle +implementation 'com.seaweedfs:seaweedfs-jdbc:1.0.0' +``` + +### 3. Connect and Query + +```java +import java.sql.*; + +public class SeaweedFSExample { + public static void main(String[] args) throws SQLException { + // JDBC URL format: jdbc:seaweedfs://host:port/database + String url = "jdbc:seaweedfs://localhost:8089/default"; + + // Connect to SeaweedFS + Connection conn = DriverManager.getConnection(url); + + // Execute queries + Statement stmt = conn.createStatement(); + ResultSet rs = stmt.executeQuery("SELECT * FROM my_topic LIMIT 10"); + + // Process results + while (rs.next()) { + System.out.println("ID: " + rs.getLong("id")); + System.out.println("Message: " + rs.getString("message")); + System.out.println("Timestamp: " + rs.getTimestamp("_timestamp_ns")); + } + + // Clean up + rs.close(); + stmt.close(); + conn.close(); + } +} +``` + +## JDBC URL Format + +``` +jdbc:seaweedfs://host:port/database[?property=value&...] +``` + +### Parameters + +| Parameter | Default | Description | +|-----------|---------|-------------| +| `host` | localhost | SeaweedFS JDBC server hostname | +| `port` | 8089 | SeaweedFS JDBC server port | +| `database` | default | Database/namespace name | +| `connectTimeout` | 30000 | Connection timeout in milliseconds | +| `socketTimeout` | 0 | Socket timeout in milliseconds (0 = infinite) | + +### Examples + +```java +// Basic connection +"jdbc:seaweedfs://localhost:8089/default" + +// Custom host and port +"jdbc:seaweedfs://seaweed-server:9000/production" + +// With query parameters +"jdbc:seaweedfs://localhost:8089/default?connectTimeout=5000&socketTimeout=30000" +``` + +## Supported SQL Operations + +### SELECT Queries +```sql +-- Basic select +SELECT * FROM topic_name; + +-- With WHERE clause +SELECT id, message FROM topic_name WHERE id > 1000; + +-- With LIMIT +SELECT * FROM topic_name ORDER BY _timestamp_ns DESC LIMIT 100; +``` + +### Aggregation Functions +```sql +-- Count records +SELECT COUNT(*) FROM topic_name; + +-- Aggregations +SELECT + COUNT(*) as total_messages, + MIN(id) as min_id, + MAX(id) as max_id, + AVG(amount) as avg_amount +FROM topic_name; +``` + +### System Columns +```sql +-- Access system columns +SELECT + id, + message, + _timestamp_ns as timestamp, + _key as partition_key, + _source as data_source +FROM topic_name; +``` + +### Schema Information +```sql +-- List databases +SHOW DATABASES; + +-- List tables in current database +SHOW TABLES; + +-- Describe table structure +DESCRIBE topic_name; +-- or +DESC topic_name; +``` + +## Database Tool Integration + +### DBeaver + +1. Download and install DBeaver +2. Create new connection → Generic JDBC +3. Settings: + - **URL**: `jdbc:seaweedfs://localhost:8089/default` + - **Driver Class**: `com.seaweedfs.jdbc.SeaweedFSDriver` + - **Libraries**: Add `seaweedfs-jdbc-1.0.0.jar` + +### IntelliJ DataGrip + +1. Open DataGrip +2. Add New Data Source → Generic +3. Configure: + - **URL**: `jdbc:seaweedfs://localhost:8089/default` + - **Driver**: Add `seaweedfs-jdbc-1.0.0.jar` + - **Driver Class**: `com.seaweedfs.jdbc.SeaweedFSDriver` + +### Tableau + +1. Connect to Data → More... → Generic JDBC +2. Configure: + - **URL**: `jdbc:seaweedfs://localhost:8089/default` + - **Driver Path**: Path to `seaweedfs-jdbc-1.0.0.jar` + - **Class Name**: `com.seaweedfs.jdbc.SeaweedFSDriver` + +## Advanced Usage + +### Connection Pooling + +```java +import com.zaxxer.hikari.HikariConfig; +import com.zaxxer.hikari.HikariDataSource; + +HikariConfig config = new HikariConfig(); +config.setJdbcUrl("jdbc:seaweedfs://localhost:8089/default"); +config.setMaximumPoolSize(10); + +HikariDataSource dataSource = new HikariDataSource(config); +Connection conn = dataSource.getConnection(); +``` + +### PreparedStatements + +```java +String sql = "SELECT * FROM topic_name WHERE id > ? AND created_date > ?"; +PreparedStatement stmt = conn.prepareStatement(sql); +stmt.setLong(1, 1000); +stmt.setTimestamp(2, Timestamp.valueOf("2024-01-01 00:00:00")); + +ResultSet rs = stmt.executeQuery(); +while (rs.next()) { + // Process results +} +``` + +### Metadata Access + +```java +DatabaseMetaData metadata = conn.getMetaData(); + +// Get database information +System.out.println("Database: " + metadata.getDatabaseProductName()); +System.out.println("Version: " + metadata.getDatabaseProductVersion()); +System.out.println("Driver: " + metadata.getDriverName()); + +// Get table information +ResultSet tables = metadata.getTables(null, null, null, null); +while (tables.next()) { + System.out.println("Table: " + tables.getString("TABLE_NAME")); +} +``` + +## Building from Source + +```bash +# Clone the repository +git clone https://github.com/seaweedfs/seaweedfs.git +cd seaweedfs/jdbc-driver + +# Build with Maven +mvn clean package + +# Run tests +mvn test + +# Install to local repository +mvn install +``` + +## Configuration + +### Server-Side Configuration + +The JDBC server supports the following command-line options: + +```bash +weed jdbc -help + -host string + JDBC server host (default "localhost") + -master string + SeaweedFS master server address (default "localhost:9333") + -port int + JDBC server port (default 8089) +``` + +### Client-Side Configuration + +Connection properties can be set via URL parameters or Properties object: + +```java +Properties props = new Properties(); +props.setProperty("connectTimeout", "10000"); +props.setProperty("socketTimeout", "30000"); + +Connection conn = DriverManager.getConnection( + "jdbc:seaweedfs://localhost:8089/default", props); +``` + +## Performance Tips + +1. **Use LIMIT clauses**: Always limit result sets for large topics +2. **Filter early**: Use WHERE clauses to reduce data transfer +3. **Connection pooling**: Use connection pools for multi-threaded applications +4. **Batch operations**: Use batch statements for multiple queries +5. **Close resources**: Always close ResultSets, Statements, and Connections + +## Limitations + +- **Read-Only**: SeaweedFS JDBC driver only supports SELECT operations +- **No Transactions**: Transaction support is not available +- **Single Table**: Joins between tables are not supported +- **Limited SQL**: Only basic SQL SELECT syntax is supported + +## Troubleshooting + +### Connection Issues + +```bash +# Test JDBC server connectivity +telnet localhost 8089 + +# Check SeaweedFS master connectivity +weed shell +> cluster.status +``` + +### Common Errors + +**Error: "Connection refused"** +- Ensure JDBC server is running on the specified host/port +- Check firewall settings + +**Error: "No suitable driver found"** +- Verify JDBC driver is in classpath +- Ensure correct driver class name: `com.seaweedfs.jdbc.SeaweedFSDriver` + +**Error: "Topic not found"** +- Verify topic exists in SeaweedFS +- Check database/namespace name in connection URL + +## Contributing + +Contributions are welcome! Please see the main SeaweedFS repository for contribution guidelines. + +## License + +This JDBC driver is part of SeaweedFS and is licensed under the Apache License 2.0. + +## Support + +- **Documentation**: [SeaweedFS Wiki](https://github.com/seaweedfs/seaweedfs/wiki) +- **Issues**: [GitHub Issues](https://github.com/seaweedfs/seaweedfs/issues) +- **Discussions**: [GitHub Discussions](https://github.com/seaweedfs/seaweedfs/discussions) +- **Chat**: [SeaweedFS Slack](https://join.slack.com/t/seaweedfs/shared_invite/...) diff --git a/jdbc-driver/examples/SeaweedFSJDBCExample.java b/jdbc-driver/examples/SeaweedFSJDBCExample.java new file mode 100644 index 000000000..52924f25e --- /dev/null +++ b/jdbc-driver/examples/SeaweedFSJDBCExample.java @@ -0,0 +1,308 @@ +package com.seaweedfs.jdbc.examples; + +import java.sql.*; +import java.util.Properties; + +/** + * Complete example demonstrating SeaweedFS JDBC driver usage + */ +public class SeaweedFSJDBCExample { + + public static void main(String[] args) { + // JDBC URL for SeaweedFS + String url = "jdbc:seaweedfs://localhost:8089/default"; + + try { + // 1. Load the driver (optional - auto-registration via META-INF/services) + Class.forName("com.seaweedfs.jdbc.SeaweedFSDriver"); + System.out.println("✓ SeaweedFS JDBC Driver loaded successfully"); + + // 2. Connect to SeaweedFS + System.out.println("\n📡 Connecting to SeaweedFS..."); + Connection conn = DriverManager.getConnection(url); + System.out.println("✓ Connected to: " + url); + + // 3. Get database metadata + DatabaseMetaData dbMeta = conn.getMetaData(); + System.out.println("\n📊 Database Information:"); + System.out.println(" Database: " + dbMeta.getDatabaseProductName()); + System.out.println(" Version: " + dbMeta.getDatabaseProductVersion()); + System.out.println(" Driver: " + dbMeta.getDriverName() + " v" + dbMeta.getDriverVersion()); + System.out.println(" JDBC Version: " + dbMeta.getJDBCMajorVersion() + "." + dbMeta.getJDBCMinorVersion()); + System.out.println(" Read-only: " + dbMeta.isReadOnly()); + + // 4. List available databases/schemas + System.out.println("\n🗄️ Available Databases:"); + ResultSet catalogs = dbMeta.getCatalogs(); + while (catalogs.next()) { + System.out.println(" • " + catalogs.getString("TABLE_CAT")); + } + catalogs.close(); + + // 5. Execute basic queries + System.out.println("\n🔍 Executing SQL Queries:"); + + Statement stmt = conn.createStatement(); + + // Show databases + System.out.println("\n 📋 SHOW DATABASES:"); + ResultSet rs = stmt.executeQuery("SHOW DATABASES"); + while (rs.next()) { + System.out.println(" " + rs.getString(1)); + } + rs.close(); + + // Show tables (topics) + System.out.println("\n 📋 SHOW TABLES:"); + rs = stmt.executeQuery("SHOW TABLES"); + ResultSetMetaData rsmd = rs.getMetaData(); + int columnCount = rsmd.getColumnCount(); + + // Print headers + for (int i = 1; i <= columnCount; i++) { + System.out.print(String.format("%-20s", rsmd.getColumnName(i))); + } + System.out.println(); + System.out.println("-".repeat(20 * columnCount)); + + // Print rows + while (rs.next()) { + for (int i = 1; i <= columnCount; i++) { + System.out.print(String.format("%-20s", rs.getString(i))); + } + System.out.println(); + } + rs.close(); + + // 6. Query a specific topic (if exists) + String topicQuery = "SELECT * FROM test_topic LIMIT 5"; + System.out.println("\n 📋 " + topicQuery + ":"); + + try { + rs = stmt.executeQuery(topicQuery); + rsmd = rs.getMetaData(); + columnCount = rsmd.getColumnCount(); + + // Print column headers + for (int i = 1; i <= columnCount; i++) { + System.out.print(String.format("%-15s", rsmd.getColumnName(i))); + } + System.out.println(); + System.out.println("-".repeat(15 * columnCount)); + + // Print data rows + int rowCount = 0; + while (rs.next() && rowCount < 5) { + for (int i = 1; i <= columnCount; i++) { + String value = rs.getString(i); + if (value != null && value.length() > 12) { + value = value.substring(0, 12) + "..."; + } + System.out.print(String.format("%-15s", value != null ? value : "NULL")); + } + System.out.println(); + rowCount++; + } + + if (rowCount == 0) { + System.out.println(" (No data found)"); + } + + rs.close(); + } catch (SQLException e) { + System.out.println(" ⚠️ Topic 'test_topic' not found: " + e.getMessage()); + } + + // 7. Demonstrate aggregation queries + System.out.println("\n 🧮 Aggregation Example:"); + try { + rs = stmt.executeQuery("SELECT COUNT(*) as total_records FROM test_topic"); + if (rs.next()) { + System.out.println(" Total records: " + rs.getLong("total_records")); + } + rs.close(); + } catch (SQLException e) { + System.out.println(" ⚠️ Aggregation example skipped: " + e.getMessage()); + } + + // 8. Demonstrate PreparedStatement + System.out.println("\n 📝 PreparedStatement Example:"); + String preparedQuery = "SELECT * FROM test_topic WHERE id > ? LIMIT ?"; + + try { + PreparedStatement pstmt = conn.prepareStatement(preparedQuery); + pstmt.setLong(1, 100); + pstmt.setInt(2, 3); + + System.out.println(" Query: " + preparedQuery); + System.out.println(" Parameters: id > 100, LIMIT 3"); + + rs = pstmt.executeQuery(); + rsmd = rs.getMetaData(); + columnCount = rsmd.getColumnCount(); + + int count = 0; + while (rs.next()) { + if (count == 0) { + // Print headers for first row + for (int i = 1; i <= columnCount; i++) { + System.out.print(String.format("%-15s", rsmd.getColumnName(i))); + } + System.out.println(); + System.out.println("-".repeat(15 * columnCount)); + } + + for (int i = 1; i <= columnCount; i++) { + String value = rs.getString(i); + if (value != null && value.length() > 12) { + value = value.substring(0, 12) + "..."; + } + System.out.print(String.format("%-15s", value != null ? value : "NULL")); + } + System.out.println(); + count++; + } + + if (count == 0) { + System.out.println(" (No records match criteria)"); + } + + rs.close(); + pstmt.close(); + } catch (SQLException e) { + System.out.println(" ⚠️ PreparedStatement example skipped: " + e.getMessage()); + } + + // 9. System columns example + System.out.println("\n 🔧 System Columns Example:"); + try { + rs = stmt.executeQuery("SELECT id, _timestamp_ns, _key, _source FROM test_topic LIMIT 3"); + rsmd = rs.getMetaData(); + columnCount = rsmd.getColumnCount(); + + // Print headers + for (int i = 1; i <= columnCount; i++) { + System.out.print(String.format("%-20s", rsmd.getColumnName(i))); + } + System.out.println(); + System.out.println("-".repeat(20 * columnCount)); + + int count = 0; + while (rs.next()) { + for (int i = 1; i <= columnCount; i++) { + String value = rs.getString(i); + if (value != null && value.length() > 17) { + value = value.substring(0, 17) + "..."; + } + System.out.print(String.format("%-20s", value != null ? value : "NULL")); + } + System.out.println(); + count++; + } + + if (count == 0) { + System.out.println(" (No data available for system columns demo)"); + } + + rs.close(); + } catch (SQLException e) { + System.out.println(" ⚠️ System columns example skipped: " + e.getMessage()); + } + + // 10. Connection properties example + System.out.println("\n⚙️ Connection Properties:"); + System.out.println(" Auto-commit: " + conn.getAutoCommit()); + System.out.println(" Read-only: " + conn.isReadOnly()); + System.out.println(" Transaction isolation: " + conn.getTransactionIsolation()); + System.out.println(" Catalog: " + conn.getCatalog()); + + // 11. Clean up + stmt.close(); + conn.close(); + + System.out.println("\n✅ SeaweedFS JDBC Example completed successfully!"); + System.out.println("\n💡 Next Steps:"); + System.out.println(" • Try connecting with DBeaver or other JDBC tools"); + System.out.println(" • Use in your Java applications with connection pooling"); + System.out.println(" • Integrate with BI tools like Tableau or Power BI"); + System.out.println(" • Build data pipelines using SeaweedFS as a data source"); + + } catch (ClassNotFoundException e) { + System.err.println("❌ SeaweedFS JDBC Driver not found: " + e.getMessage()); + System.err.println(" Make sure seaweedfs-jdbc.jar is in your classpath"); + } catch (SQLException e) { + System.err.println("❌ Database error: " + e.getMessage()); + System.err.println(" Make sure SeaweedFS JDBC server is running:"); + System.err.println(" weed jdbc -port=8089 -master=localhost:9333"); + } catch (Exception e) { + System.err.println("❌ Unexpected error: " + e.getMessage()); + e.printStackTrace(); + } + } + + /** + * Example with connection pooling using HikariCP + */ + public static void connectionPoolingExample() { + try { + // This would require HikariCP dependency + /* + HikariConfig config = new HikariConfig(); + config.setJdbcUrl("jdbc:seaweedfs://localhost:8089/default"); + config.setMaximumPoolSize(10); + config.setMinimumIdle(2); + config.setConnectionTimeout(30000); + config.setIdleTimeout(600000); + + HikariDataSource dataSource = new HikariDataSource(config); + + try (Connection conn = dataSource.getConnection()) { + // Use connection from pool + Statement stmt = conn.createStatement(); + ResultSet rs = stmt.executeQuery("SELECT COUNT(*) FROM my_topic"); + if (rs.next()) { + System.out.println("Record count: " + rs.getLong(1)); + } + rs.close(); + stmt.close(); + } + + dataSource.close(); + */ + + System.out.println("Connection pooling example (commented out - requires HikariCP dependency)"); + } catch (Exception e) { + e.printStackTrace(); + } + } + + /** + * Example configuration for different database tools + */ + public static void printToolConfiguration() { + System.out.println("\n🛠️ Database Tool Configuration:"); + + System.out.println("\n📊 DBeaver:"); + System.out.println(" 1. New Connection → Generic JDBC"); + System.out.println(" 2. URL: jdbc:seaweedfs://localhost:8089/default"); + System.out.println(" 3. Driver Class: com.seaweedfs.jdbc.SeaweedFSDriver"); + System.out.println(" 4. Add seaweedfs-jdbc.jar to Libraries"); + + System.out.println("\n💻 IntelliJ DataGrip:"); + System.out.println(" 1. New Data Source → Generic"); + System.out.println(" 2. URL: jdbc:seaweedfs://localhost:8089/default"); + System.out.println(" 3. Add Driver: seaweedfs-jdbc.jar"); + System.out.println(" 4. Class: com.seaweedfs.jdbc.SeaweedFSDriver"); + + System.out.println("\n📈 Tableau:"); + System.out.println(" 1. Connect to Data → More... → Generic JDBC"); + System.out.println(" 2. URL: jdbc:seaweedfs://localhost:8089/default"); + System.out.println(" 3. Driver Path: /path/to/seaweedfs-jdbc.jar"); + System.out.println(" 4. Class Name: com.seaweedfs.jdbc.SeaweedFSDriver"); + + System.out.println("\n☕ Java Application:"); + System.out.println(" Class.forName(\"com.seaweedfs.jdbc.SeaweedFSDriver\");"); + System.out.println(" Connection conn = DriverManager.getConnection("); + System.out.println(" \"jdbc:seaweedfs://localhost:8089/default\");"); + } +} diff --git a/jdbc-driver/pom.xml b/jdbc-driver/pom.xml new file mode 100644 index 000000000..3555708f3 --- /dev/null +++ b/jdbc-driver/pom.xml @@ -0,0 +1,154 @@ + + + 4.0.0 + + com.seaweedfs + seaweedfs-jdbc + 1.0.0 + jar + + SeaweedFS JDBC Driver + JDBC driver for connecting to SeaweedFS SQL engine + https://github.com/seaweedfs/seaweedfs + + + + Apache License, Version 2.0 + http://www.apache.org/licenses/LICENSE-2.0.txt + repo + + + + + 8 + 8 + UTF-8 + 5.9.0 + + + + + + org.junit.jupiter + junit-jupiter-engine + ${junit.version} + test + + + org.junit.jupiter + junit-jupiter-api + ${junit.version} + test + + + + + org.slf4j + slf4j-api + 1.7.36 + + + org.slf4j + slf4j-simple + 1.7.36 + test + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.10.1 + + 8 + 8 + UTF-8 + + + + + org.apache.maven.plugins + maven-surefire-plugin + 3.0.0-M7 + + false + + + + + org.apache.maven.plugins + maven-jar-plugin + 3.2.2 + + + + com.seaweedfs.jdbc + + + + + + + org.apache.maven.plugins + maven-source-plugin + 3.2.1 + + + attach-sources + + jar + + + + + + + org.apache.maven.plugins + maven-javadoc-plugin + 3.4.1 + + + attach-javadocs + + jar + + + + + 8 + 8 + none + + + + + + + + release + + + + org.apache.maven.plugins + maven-gpg-plugin + 3.0.1 + + + sign-artifacts + verify + + sign + + + + + + + + + diff --git a/jdbc-driver/src/main/java/com/seaweedfs/jdbc/SeaweedFSConnection.java b/jdbc-driver/src/main/java/com/seaweedfs/jdbc/SeaweedFSConnection.java new file mode 100644 index 000000000..81e0134fc --- /dev/null +++ b/jdbc-driver/src/main/java/com/seaweedfs/jdbc/SeaweedFSConnection.java @@ -0,0 +1,497 @@ +package com.seaweedfs.jdbc; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.*; +import java.net.Socket; +import java.net.SocketTimeoutException; +import java.nio.ByteBuffer; +import java.sql.*; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.Executor; + +/** + * JDBC Connection implementation for SeaweedFS + */ +public class SeaweedFSConnection implements Connection { + + private static final Logger logger = LoggerFactory.getLogger(SeaweedFSConnection.class); + + // Protocol constants (must match server implementation) + private static final byte JDBC_MSG_CONNECT = 0x01; + private static final byte JDBC_MSG_DISCONNECT = 0x02; + private static final byte JDBC_MSG_EXECUTE_QUERY = 0x03; + private static final byte JDBC_MSG_EXECUTE_UPDATE = 0x04; + private static final byte JDBC_MSG_GET_METADATA = 0x07; + private static final byte JDBC_MSG_SET_AUTOCOMMIT = 0x08; + private static final byte JDBC_MSG_COMMIT = 0x09; + private static final byte JDBC_MSG_ROLLBACK = 0x0A; + + private static final byte JDBC_RESP_OK = 0x00; + private static final byte JDBC_RESP_ERROR = 0x01; + private static final byte JDBC_RESP_RESULT_SET = 0x02; + private static final byte JDBC_RESP_UPDATE_COUNT = 0x03; + private static final byte JDBC_RESP_METADATA = 0x04; + + private final SeaweedFSConnectionInfo connectionInfo; + private Socket socket; + private DataInputStream inputStream; + private DataOutputStream outputStream; + private boolean closed = false; + private boolean autoCommit = true; + private String catalog = null; + private int transactionIsolation = Connection.TRANSACTION_NONE; + private boolean readOnly = true; // SeaweedFS is read-only + + public SeaweedFSConnection(SeaweedFSConnectionInfo connectionInfo) throws SQLException { + this.connectionInfo = connectionInfo; + connect(); + } + + private void connect() throws SQLException { + try { + logger.debug("Connecting to SeaweedFS at {}:{}", connectionInfo.getHost(), connectionInfo.getPort()); + + // Create socket connection + socket = new Socket(); + socket.connect(new java.net.InetSocketAddress(connectionInfo.getHost(), connectionInfo.getPort()), + connectionInfo.getConnectTimeout()); + + if (connectionInfo.getSocketTimeout() > 0) { + socket.setSoTimeout(connectionInfo.getSocketTimeout()); + } + + // Create streams + inputStream = new DataInputStream(new BufferedInputStream(socket.getInputStream())); + outputStream = new DataOutputStream(new BufferedOutputStream(socket.getOutputStream())); + + // Send connection message + sendMessage(JDBC_MSG_CONNECT, connectionInfo.getDatabase().getBytes()); + + // Read response + Response response = readResponse(); + if (response.type == JDBC_RESP_ERROR) { + throw new SQLException("Failed to connect: " + new String(response.data)); + } + + logger.info("Successfully connected to SeaweedFS: {}", connectionInfo.getConnectionString()); + + } catch (Exception e) { + if (socket != null && !socket.isClosed()) { + try { + socket.close(); + } catch (IOException ignored) {} + } + throw new SQLException("Failed to connect to SeaweedFS: " + e.getMessage(), e); + } + } + + @Override + public Statement createStatement() throws SQLException { + checkClosed(); + return new SeaweedFSStatement(this); + } + + @Override + public PreparedStatement prepareStatement(String sql) throws SQLException { + checkClosed(); + return new SeaweedFSPreparedStatement(this, sql); + } + + @Override + public CallableStatement prepareCall(String sql) throws SQLException { + throw new SQLFeatureNotSupportedException("Callable statements are not supported"); + } + + @Override + public String nativeSQL(String sql) throws SQLException { + checkClosed(); + return sql; // No translation needed + } + + @Override + public void setAutoCommit(boolean autoCommit) throws SQLException { + checkClosed(); + if (this.autoCommit != autoCommit) { + sendMessage(JDBC_MSG_SET_AUTOCOMMIT, new byte[]{(byte)(autoCommit ? 1 : 0)}); + Response response = readResponse(); + if (response.type == JDBC_RESP_ERROR) { + throw new SQLException("Failed to set auto-commit: " + new String(response.data)); + } + this.autoCommit = autoCommit; + } + } + + @Override + public boolean getAutoCommit() throws SQLException { + checkClosed(); + return autoCommit; + } + + @Override + public void commit() throws SQLException { + checkClosed(); + if (autoCommit) { + throw new SQLException("Cannot commit when auto-commit is enabled"); + } + sendMessage(JDBC_MSG_COMMIT, new byte[0]); + Response response = readResponse(); + if (response.type == JDBC_RESP_ERROR) { + throw new SQLException("Failed to commit: " + new String(response.data)); + } + } + + @Override + public void rollback() throws SQLException { + checkClosed(); + if (autoCommit) { + throw new SQLException("Cannot rollback when auto-commit is enabled"); + } + sendMessage(JDBC_MSG_ROLLBACK, new byte[0]); + Response response = readResponse(); + if (response.type == JDBC_RESP_ERROR) { + throw new SQLException("Failed to rollback: " + new String(response.data)); + } + } + + @Override + public void close() throws SQLException { + if (!closed) { + try { + if (outputStream != null) { + sendMessage(JDBC_MSG_DISCONNECT, new byte[0]); + outputStream.close(); + } + if (inputStream != null) { + inputStream.close(); + } + if (socket != null && !socket.isClosed()) { + socket.close(); + } + } catch (Exception e) { + logger.warn("Error closing connection: {}", e.getMessage()); + } finally { + closed = true; + logger.debug("Connection closed"); + } + } + } + + @Override + public boolean isClosed() throws SQLException { + return closed || (socket != null && socket.isClosed()); + } + + @Override + public DatabaseMetaData getMetaData() throws SQLException { + checkClosed(); + return new SeaweedFSDatabaseMetaData(this); + } + + @Override + public void setReadOnly(boolean readOnly) throws SQLException { + checkClosed(); + // SeaweedFS is always read-only, so we ignore attempts to change this + this.readOnly = true; + } + + @Override + public boolean isReadOnly() throws SQLException { + checkClosed(); + return readOnly; + } + + @Override + public void setCatalog(String catalog) throws SQLException { + checkClosed(); + this.catalog = catalog; + } + + @Override + public String getCatalog() throws SQLException { + checkClosed(); + return catalog != null ? catalog : connectionInfo.getDatabase(); + } + + @Override + public void setTransactionIsolation(int level) throws SQLException { + checkClosed(); + this.transactionIsolation = level; + } + + @Override + public int getTransactionIsolation() throws SQLException { + checkClosed(); + return transactionIsolation; + } + + @Override + public SQLWarning getWarnings() throws SQLException { + checkClosed(); + return null; // No warnings for now + } + + @Override + public void clearWarnings() throws SQLException { + checkClosed(); + // No-op + } + + // Methods not commonly used - basic implementations + + @Override + public Statement createStatement(int resultSetType, int resultSetConcurrency) throws SQLException { + return createStatement(); + } + + @Override + public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency) throws SQLException { + return prepareStatement(sql); + } + + @Override + public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency) throws SQLException { + throw new SQLFeatureNotSupportedException("Callable statements are not supported"); + } + + @Override + public Map> getTypeMap() throws SQLException { + throw new SQLFeatureNotSupportedException("Type maps are not supported"); + } + + @Override + public void setTypeMap(Map> map) throws SQLException { + throw new SQLFeatureNotSupportedException("Type maps are not supported"); + } + + @Override + public void setHoldability(int holdability) throws SQLException { + // No-op + } + + @Override + public int getHoldability() throws SQLException { + return ResultSet.CLOSE_CURSORS_AT_COMMIT; + } + + @Override + public Savepoint setSavepoint() throws SQLException { + throw new SQLFeatureNotSupportedException("Savepoints are not supported"); + } + + @Override + public Savepoint setSavepoint(String name) throws SQLException { + throw new SQLFeatureNotSupportedException("Savepoints are not supported"); + } + + @Override + public void rollback(Savepoint savepoint) throws SQLException { + throw new SQLFeatureNotSupportedException("Savepoints are not supported"); + } + + @Override + public void releaseSavepoint(Savepoint savepoint) throws SQLException { + throw new SQLFeatureNotSupportedException("Savepoints are not supported"); + } + + @Override + public Statement createStatement(int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException { + return createStatement(); + } + + @Override + public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException { + return prepareStatement(sql); + } + + @Override + public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException { + throw new SQLFeatureNotSupportedException("Callable statements are not supported"); + } + + @Override + public PreparedStatement prepareStatement(String sql, int autoGeneratedKeys) throws SQLException { + return prepareStatement(sql); + } + + @Override + public PreparedStatement prepareStatement(String sql, int[] columnIndexes) throws SQLException { + return prepareStatement(sql); + } + + @Override + public PreparedStatement prepareStatement(String sql, String[] columnNames) throws SQLException { + return prepareStatement(sql); + } + + @Override + public Clob createClob() throws SQLException { + throw new SQLFeatureNotSupportedException("Clob creation is not supported"); + } + + @Override + public Blob createBlob() throws SQLException { + throw new SQLFeatureNotSupportedException("Blob creation is not supported"); + } + + @Override + public NClob createNClob() throws SQLException { + throw new SQLFeatureNotSupportedException("NClob creation is not supported"); + } + + @Override + public SQLXML createSQLXML() throws SQLException { + throw new SQLFeatureNotSupportedException("SQLXML creation is not supported"); + } + + @Override + public boolean isValid(int timeout) throws SQLException { + return !closed && socket != null && !socket.isClosed(); + } + + @Override + public void setClientInfo(String name, String value) throws SQLClientInfoException { + // No-op + } + + @Override + public void setClientInfo(Properties properties) throws SQLClientInfoException { + // No-op + } + + @Override + public String getClientInfo(String name) throws SQLException { + return null; + } + + @Override + public Properties getClientInfo() throws SQLException { + return new Properties(); + } + + @Override + public Array createArrayOf(String typeName, Object[] elements) throws SQLException { + throw new SQLFeatureNotSupportedException("Array creation is not supported"); + } + + @Override + public Struct createStruct(String typeName, Object[] attributes) throws SQLException { + throw new SQLFeatureNotSupportedException("Struct creation is not supported"); + } + + @Override + public void setSchema(String schema) throws SQLException { + // No-op + } + + @Override + public String getSchema() throws SQLException { + return connectionInfo.getDatabase(); + } + + @Override + public void abort(Executor executor) throws SQLException { + close(); + } + + @Override + public void setNetworkTimeout(Executor executor, int milliseconds) throws SQLException { + try { + if (socket != null) { + socket.setSoTimeout(milliseconds); + } + } catch (Exception e) { + throw new SQLException("Failed to set network timeout", e); + } + } + + @Override + public int getNetworkTimeout() throws SQLException { + try { + return socket != null ? socket.getSoTimeout() : 0; + } catch (Exception e) { + throw new SQLException("Failed to get network timeout", e); + } + } + + @Override + public T unwrap(Class iface) throws SQLException { + if (iface.isAssignableFrom(getClass())) { + return iface.cast(this); + } + throw new SQLException("Cannot unwrap to " + iface.getName()); + } + + @Override + public boolean isWrapperFor(Class iface) throws SQLException { + return iface.isAssignableFrom(getClass()); + } + + // Package-private methods for use by Statement and other classes + + void sendMessage(byte messageType, byte[] data) throws SQLException { + try { + synchronized (outputStream) { + // Write header: message type (1 byte) + data length (4 bytes) + outputStream.writeByte(messageType); + outputStream.writeInt(data.length); + + // Write data + if (data.length > 0) { + outputStream.write(data); + } + + outputStream.flush(); + } + } catch (IOException e) { + throw new SQLException("Failed to send message to server", e); + } + } + + Response readResponse() throws SQLException { + try { + synchronized (inputStream) { + // Read response type + byte responseType = inputStream.readByte(); + + // Read data length + int dataLength = inputStream.readInt(); + + // Read data + byte[] data = new byte[dataLength]; + if (dataLength > 0) { + inputStream.readFully(data); + } + + return new Response(responseType, data); + } + } catch (SocketTimeoutException e) { + throw new SQLException("Read timeout from server", e); + } catch (IOException e) { + throw new SQLException("Failed to read response from server", e); + } + } + + private void checkClosed() throws SQLException { + if (closed) { + throw new SQLException("Connection is closed"); + } + } + + SeaweedFSConnectionInfo getConnectionInfo() { + return connectionInfo; + } + + // Helper class for responses + static class Response { + final byte type; + final byte[] data; + + Response(byte type, byte[] data) { + this.type = type; + this.data = data; + } + } +} diff --git a/jdbc-driver/src/main/java/com/seaweedfs/jdbc/SeaweedFSConnectionInfo.java b/jdbc-driver/src/main/java/com/seaweedfs/jdbc/SeaweedFSConnectionInfo.java new file mode 100644 index 000000000..b62c19158 --- /dev/null +++ b/jdbc-driver/src/main/java/com/seaweedfs/jdbc/SeaweedFSConnectionInfo.java @@ -0,0 +1,71 @@ +package com.seaweedfs.jdbc; + +import java.util.Properties; + +/** + * Connection information holder for SeaweedFS JDBC connections + */ +public class SeaweedFSConnectionInfo { + + private final String host; + private final int port; + private final String database; + private final String user; + private final String password; + private final int connectTimeout; + private final int socketTimeout; + private final Properties properties; + + public SeaweedFSConnectionInfo(Properties props) { + this.properties = new Properties(props); + this.host = props.getProperty(SeaweedFSDriver.PROP_HOST, "localhost"); + this.port = Integer.parseInt(props.getProperty(SeaweedFSDriver.PROP_PORT, "8089")); + this.database = props.getProperty(SeaweedFSDriver.PROP_DATABASE, "default"); + this.user = props.getProperty(SeaweedFSDriver.PROP_USER, ""); + this.password = props.getProperty(SeaweedFSDriver.PROP_PASSWORD, ""); + this.connectTimeout = Integer.parseInt(props.getProperty(SeaweedFSDriver.PROP_CONNECT_TIMEOUT, "30000")); + this.socketTimeout = Integer.parseInt(props.getProperty(SeaweedFSDriver.PROP_SOCKET_TIMEOUT, "0")); + } + + public String getHost() { + return host; + } + + public int getPort() { + return port; + } + + public String getDatabase() { + return database; + } + + public String getUser() { + return user; + } + + public String getPassword() { + return password; + } + + public int getConnectTimeout() { + return connectTimeout; + } + + public int getSocketTimeout() { + return socketTimeout; + } + + public Properties getProperties() { + return new Properties(properties); + } + + public String getConnectionString() { + return String.format("jdbc:seaweedfs://%s:%d/%s", host, port, database); + } + + @Override + public String toString() { + return String.format("SeaweedFSConnectionInfo{host='%s', port=%d, database='%s', user='%s', connectTimeout=%d, socketTimeout=%d}", + host, port, database, user, connectTimeout, socketTimeout); + } +} diff --git a/jdbc-driver/src/main/java/com/seaweedfs/jdbc/SeaweedFSDatabaseMetaData.java b/jdbc-driver/src/main/java/com/seaweedfs/jdbc/SeaweedFSDatabaseMetaData.java new file mode 100644 index 000000000..f4304c37a --- /dev/null +++ b/jdbc-driver/src/main/java/com/seaweedfs/jdbc/SeaweedFSDatabaseMetaData.java @@ -0,0 +1,972 @@ +package com.seaweedfs.jdbc; + +import java.sql.*; +import java.util.ArrayList; +import java.util.List; + +/** + * DatabaseMetaData implementation for SeaweedFS JDBC + */ +public class SeaweedFSDatabaseMetaData implements DatabaseMetaData { + + private final SeaweedFSConnection connection; + + public SeaweedFSDatabaseMetaData(SeaweedFSConnection connection) { + this.connection = connection; + } + + @Override + public boolean allProceduresAreCallable() throws SQLException { + return false; // No stored procedures + } + + @Override + public boolean allTablesAreSelectable() throws SQLException { + return true; // All tables are selectable + } + + @Override + public String getURL() throws SQLException { + return connection.getConnectionInfo().getConnectionString(); + } + + @Override + public String getUserName() throws SQLException { + return connection.getConnectionInfo().getUser(); + } + + @Override + public boolean isReadOnly() throws SQLException { + return true; // SeaweedFS is read-only + } + + @Override + public boolean nullsAreSortedHigh() throws SQLException { + return false; + } + + @Override + public boolean nullsAreSortedLow() throws SQLException { + return true; + } + + @Override + public boolean nullsAreSortedAtStart() throws SQLException { + return false; + } + + @Override + public boolean nullsAreSortedAtEnd() throws SQLException { + return false; + } + + @Override + public String getDatabaseProductName() throws SQLException { + return "SeaweedFS"; + } + + @Override + public String getDatabaseProductVersion() throws SQLException { + return "1.0.0"; // This could be retrieved from the server + } + + @Override + public String getDriverName() throws SQLException { + return SeaweedFSDriver.DRIVER_NAME; + } + + @Override + public String getDriverVersion() throws SQLException { + return SeaweedFSDriver.DRIVER_VERSION; + } + + @Override + public int getDriverMajorVersion() { + return SeaweedFSDriver.DRIVER_MAJOR_VERSION; + } + + @Override + public int getDriverMinorVersion() { + return SeaweedFSDriver.DRIVER_MINOR_VERSION; + } + + @Override + public boolean usesLocalFiles() throws SQLException { + return false; // SeaweedFS uses distributed storage + } + + @Override + public boolean usesLocalFilePerTable() throws SQLException { + return false; + } + + @Override + public boolean supportsMixedCaseIdentifiers() throws SQLException { + return true; + } + + @Override + public boolean storesUpperCaseIdentifiers() throws SQLException { + return false; + } + + @Override + public boolean storesLowerCaseIdentifiers() throws SQLException { + return false; + } + + @Override + public boolean storesMixedCaseIdentifiers() throws SQLException { + return true; + } + + @Override + public boolean supportsMixedCaseQuotedIdentifiers() throws SQLException { + return true; + } + + @Override + public boolean storesUpperCaseQuotedIdentifiers() throws SQLException { + return false; + } + + @Override + public boolean storesLowerCaseQuotedIdentifiers() throws SQLException { + return false; + } + + @Override + public boolean storesMixedCaseQuotedIdentifiers() throws SQLException { + return true; + } + + @Override + public String getIdentifierQuoteString() throws SQLException { + return "`"; + } + + @Override + public String getSQLKeywords() throws SQLException { + return ""; // No additional keywords beyond SQL standard + } + + @Override + public String getNumericFunctions() throws SQLException { + return "COUNT,SUM,AVG,MIN,MAX"; + } + + @Override + public String getStringFunctions() throws SQLException { + return ""; + } + + @Override + public String getSystemFunctions() throws SQLException { + return ""; + } + + @Override + public String getTimeDateFunctions() throws SQLException { + return ""; + } + + @Override + public String getSearchStringEscape() throws SQLException { + return "\\"; + } + + @Override + public String getExtraNameCharacters() throws SQLException { + return ""; + } + + @Override + public boolean supportsAlterTableWithAddColumn() throws SQLException { + return false; // No DDL support + } + + @Override + public boolean supportsAlterTableWithDropColumn() throws SQLException { + return false; + } + + @Override + public boolean supportsColumnAliasing() throws SQLException { + return true; + } + + @Override + public boolean nullPlusNonNullIsNull() throws SQLException { + return true; + } + + @Override + public boolean supportsConvert() throws SQLException { + return false; + } + + @Override + public boolean supportsConvert(int fromType, int toType) throws SQLException { + return false; + } + + @Override + public boolean supportsTableCorrelationNames() throws SQLException { + return true; + } + + @Override + public boolean supportsDifferentTableCorrelationNames() throws SQLException { + return true; + } + + @Override + public boolean supportsExpressionsInOrderBy() throws SQLException { + return true; + } + + @Override + public boolean supportsOrderByUnrelated() throws SQLException { + return false; + } + + @Override + public boolean supportsGroupBy() throws SQLException { + return true; + } + + @Override + public boolean supportsGroupByUnrelated() throws SQLException { + return false; + } + + @Override + public boolean supportsGroupByBeyondSelect() throws SQLException { + return false; + } + + @Override + public boolean supportsLikeEscapeClause() throws SQLException { + return true; + } + + @Override + public boolean supportsMultipleResultSets() throws SQLException { + return false; + } + + @Override + public boolean supportsMultipleTransactions() throws SQLException { + return false; + } + + @Override + public boolean supportsNonNullableColumns() throws SQLException { + return true; + } + + @Override + public boolean supportsMinimumSQLGrammar() throws SQLException { + return true; + } + + @Override + public boolean supportsCoreSQLGrammar() throws SQLException { + return false; + } + + @Override + public boolean supportsExtendedSQLGrammar() throws SQLException { + return false; + } + + @Override + public boolean supportsANSI92EntryLevelSQL() throws SQLException { + return false; + } + + @Override + public boolean supportsANSI92IntermediateSQL() throws SQLException { + return false; + } + + @Override + public boolean supportsANSI92FullSQL() throws SQLException { + return false; + } + + @Override + public boolean supportsIntegrityEnhancementFacility() throws SQLException { + return false; + } + + @Override + public boolean supportsOuterJoins() throws SQLException { + return false; + } + + @Override + public boolean supportsFullOuterJoins() throws SQLException { + return false; + } + + @Override + public boolean supportsLimitedOuterJoins() throws SQLException { + return false; + } + + @Override + public String getSchemaTerm() throws SQLException { + return "schema"; + } + + @Override + public String getProcedureTerm() throws SQLException { + return "procedure"; + } + + @Override + public String getCatalogTerm() throws SQLException { + return "catalog"; + } + + @Override + public boolean isCatalogAtStart() throws SQLException { + return true; + } + + @Override + public String getCatalogSeparator() throws SQLException { + return "."; + } + + @Override + public boolean supportsSchemasInDataManipulation() throws SQLException { + return false; + } + + @Override + public boolean supportsSchemasInProcedureCalls() throws SQLException { + return false; + } + + @Override + public boolean supportsSchemasInTableDefinitions() throws SQLException { + return false; + } + + @Override + public boolean supportsSchemasInIndexDefinitions() throws SQLException { + return false; + } + + @Override + public boolean supportsSchemasInPrivilegeDefinitions() throws SQLException { + return false; + } + + @Override + public boolean supportsCatalogsInDataManipulation() throws SQLException { + return false; + } + + @Override + public boolean supportsCatalogsInProcedureCalls() throws SQLException { + return false; + } + + @Override + public boolean supportsCatalogsInTableDefinitions() throws SQLException { + return false; + } + + @Override + public boolean supportsCatalogsInIndexDefinitions() throws SQLException { + return false; + } + + @Override + public boolean supportsCatalogsInPrivilegeDefinitions() throws SQLException { + return false; + } + + @Override + public boolean supportsPositionedDelete() throws SQLException { + return false; + } + + @Override + public boolean supportsPositionedUpdate() throws SQLException { + return false; + } + + @Override + public boolean supportsSelectForUpdate() throws SQLException { + return false; + } + + @Override + public boolean supportsStoredProcedures() throws SQLException { + return false; + } + + @Override + public boolean supportsSubqueriesInComparisons() throws SQLException { + return false; + } + + @Override + public boolean supportsSubqueriesInExists() throws SQLException { + return false; + } + + @Override + public boolean supportsSubqueriesInIns() throws SQLException { + return false; + } + + @Override + public boolean supportsSubqueriesInQuantifieds() throws SQLException { + return false; + } + + @Override + public boolean supportsCorrelatedSubqueries() throws SQLException { + return false; + } + + @Override + public boolean supportsUnion() throws SQLException { + return false; + } + + @Override + public boolean supportsUnionAll() throws SQLException { + return false; + } + + @Override + public boolean supportsOpenCursorsAcrossCommit() throws SQLException { + return false; + } + + @Override + public boolean supportsOpenCursorsAcrossRollback() throws SQLException { + return false; + } + + @Override + public boolean supportsOpenStatementsAcrossCommit() throws SQLException { + return false; + } + + @Override + public boolean supportsOpenStatementsAcrossRollback() throws SQLException { + return false; + } + + @Override + public int getMaxBinaryLiteralLength() throws SQLException { + return 0; // No limit + } + + @Override + public int getMaxCharLiteralLength() throws SQLException { + return 0; // No limit + } + + @Override + public int getMaxColumnNameLength() throws SQLException { + return 255; + } + + @Override + public int getMaxColumnsInGroupBy() throws SQLException { + return 0; // No limit + } + + @Override + public int getMaxColumnsInIndex() throws SQLException { + return 0; // No indexes + } + + @Override + public int getMaxColumnsInOrderBy() throws SQLException { + return 0; // No limit + } + + @Override + public int getMaxColumnsInSelect() throws SQLException { + return 0; // No limit + } + + @Override + public int getMaxColumnsInTable() throws SQLException { + return 0; // No limit + } + + @Override + public int getMaxConnections() throws SQLException { + return 0; // No limit + } + + @Override + public int getMaxCursorNameLength() throws SQLException { + return 0; // No cursors + } + + @Override + public int getMaxIndexLength() throws SQLException { + return 0; // No indexes + } + + @Override + public int getMaxSchemaNameLength() throws SQLException { + return 255; + } + + @Override + public int getMaxProcedureNameLength() throws SQLException { + return 0; // No procedures + } + + @Override + public int getMaxCatalogNameLength() throws SQLException { + return 255; + } + + @Override + public int getMaxRowSize() throws SQLException { + return 0; // No limit + } + + @Override + public boolean doesMaxRowSizeIncludeBlobs() throws SQLException { + return false; + } + + @Override + public int getMaxStatementLength() throws SQLException { + return 0; // No limit + } + + @Override + public int getMaxStatements() throws SQLException { + return 0; // No limit + } + + @Override + public int getMaxTableNameLength() throws SQLException { + return 255; + } + + @Override + public int getMaxTablesInSelect() throws SQLException { + return 1; // Only single table selects supported + } + + @Override + public int getMaxUserNameLength() throws SQLException { + return 255; + } + + @Override + public int getDefaultTransactionIsolation() throws SQLException { + return Connection.TRANSACTION_NONE; + } + + @Override + public boolean supportsTransactions() throws SQLException { + return false; // No transactions + } + + @Override + public boolean supportsTransactionIsolationLevel(int level) throws SQLException { + return level == Connection.TRANSACTION_NONE; + } + + @Override + public boolean supportsDataDefinitionAndDataManipulationTransactions() throws SQLException { + return false; + } + + @Override + public boolean supportsDataManipulationTransactionsOnly() throws SQLException { + return false; + } + + @Override + public boolean dataDefinitionCausesTransactionCommit() throws SQLException { + return false; + } + + @Override + public boolean dataDefinitionIgnoredInTransactions() throws SQLException { + return false; + } + + @Override + public ResultSet getProcedures(String catalog, String schemaPattern, String procedureNamePattern) throws SQLException { + // Return empty result set - no procedures + return createEmptyResultSet(new String[]{"PROCEDURE_CAT", "PROCEDURE_SCHEM", "PROCEDURE_NAME", "reserved1", "reserved2", "reserved3", "REMARKS", "PROCEDURE_TYPE", "SPECIFIC_NAME"}); + } + + @Override + public ResultSet getProcedureColumns(String catalog, String schemaPattern, String procedureNamePattern, String columnNamePattern) throws SQLException { + // Return empty result set - no procedures + return createEmptyResultSet(new String[]{"PROCEDURE_CAT", "PROCEDURE_SCHEM", "PROCEDURE_NAME", "COLUMN_NAME", "COLUMN_TYPE", "DATA_TYPE", "TYPE_NAME", "PRECISION", "LENGTH", "SCALE", "RADIX", "NULLABLE", "REMARKS", "COLUMN_DEF", "SQL_DATA_TYPE", "SQL_DATETIME_SUB", "CHAR_OCTET_LENGTH", "ORDINAL_POSITION", "IS_NULLABLE", "SPECIFIC_NAME"}); + } + + @Override + public ResultSet getTables(String catalog, String schemaPattern, String tableNamePattern, String[] types) throws SQLException { + // For now, return empty result set + // In a full implementation, this would query the schema catalog + return createEmptyResultSet(new String[]{"TABLE_CAT", "TABLE_SCHEM", "TABLE_NAME", "TABLE_TYPE", "REMARKS", "TYPE_CAT", "TYPE_SCHEM", "TYPE_NAME", "SELF_REFERENCING_COL_NAME", "REF_GENERATION"}); + } + + @Override + public ResultSet getSchemas() throws SQLException { + return getSchemas(null, null); + } + + @Override + public ResultSet getCatalogs() throws SQLException { + // Return default catalog + List> rows = new ArrayList<>(); + rows.add(List.of("default")); + return createResultSet(new String[]{"TABLE_CAT"}, rows); + } + + @Override + public ResultSet getTableTypes() throws SQLException { + List> rows = new ArrayList<>(); + rows.add(List.of("TABLE")); + return createResultSet(new String[]{"TABLE_TYPE"}, rows); + } + + @Override + public ResultSet getColumns(String catalog, String schemaPattern, String tableNamePattern, String columnNamePattern) throws SQLException { + // Return empty result set for now + return createEmptyResultSet(new String[]{"TABLE_CAT", "TABLE_SCHEM", "TABLE_NAME", "COLUMN_NAME", "DATA_TYPE", "TYPE_NAME", "COLUMN_SIZE", "BUFFER_LENGTH", "DECIMAL_DIGITS", "NUM_PREC_RADIX", "NULLABLE", "REMARKS", "COLUMN_DEF", "SQL_DATA_TYPE", "SQL_DATETIME_SUB", "CHAR_OCTET_LENGTH", "ORDINAL_POSITION", "IS_NULLABLE", "SCOPE_CATALOG", "SCOPE_SCHEMA", "SCOPE_TABLE", "SOURCE_DATA_TYPE", "IS_AUTOINCREMENT", "IS_GENERATEDCOLUMN"}); + } + + @Override + public ResultSet getColumnPrivileges(String catalog, String schema, String table, String columnNamePattern) throws SQLException { + return createEmptyResultSet(new String[]{"TABLE_CAT", "TABLE_SCHEM", "TABLE_NAME", "COLUMN_NAME", "GRANTOR", "GRANTEE", "PRIVILEGE", "IS_GRANTABLE"}); + } + + @Override + public ResultSet getTablePrivileges(String catalog, String schemaPattern, String tableNamePattern) throws SQLException { + return createEmptyResultSet(new String[]{"TABLE_CAT", "TABLE_SCHEM", "TABLE_NAME", "GRANTOR", "GRANTEE", "PRIVILEGE", "IS_GRANTABLE"}); + } + + @Override + public ResultSet getBestRowIdentifier(String catalog, String schema, String table, int scope, boolean nullable) throws SQLException { + return createEmptyResultSet(new String[]{"SCOPE", "COLUMN_NAME", "DATA_TYPE", "TYPE_NAME", "COLUMN_SIZE", "BUFFER_LENGTH", "DECIMAL_DIGITS", "PSEUDO_COLUMN"}); + } + + @Override + public ResultSet getVersionColumns(String catalog, String schema, String table) throws SQLException { + return createEmptyResultSet(new String[]{"SCOPE", "COLUMN_NAME", "DATA_TYPE", "TYPE_NAME", "COLUMN_SIZE", "BUFFER_LENGTH", "DECIMAL_DIGITS", "PSEUDO_COLUMN"}); + } + + @Override + public ResultSet getPrimaryKeys(String catalog, String schema, String table) throws SQLException { + return createEmptyResultSet(new String[]{"TABLE_CAT", "TABLE_SCHEM", "TABLE_NAME", "COLUMN_NAME", "KEY_SEQ", "PK_NAME"}); + } + + @Override + public ResultSet getImportedKeys(String catalog, String schema, String table) throws SQLException { + return createEmptyResultSet(new String[]{"PKTABLE_CAT", "PKTABLE_SCHEM", "PKTABLE_NAME", "PKCOLUMN_NAME", "FKTABLE_CAT", "FKTABLE_SCHEM", "FKTABLE_NAME", "FKCOLUMN_NAME", "KEY_SEQ", "UPDATE_RULE", "DELETE_RULE", "FK_NAME", "PK_NAME", "DEFERRABILITY"}); + } + + @Override + public ResultSet getExportedKeys(String catalog, String schema, String table) throws SQLException { + return createEmptyResultSet(new String[]{"PKTABLE_CAT", "PKTABLE_SCHEM", "PKTABLE_NAME", "PKCOLUMN_NAME", "FKTABLE_CAT", "FKTABLE_SCHEM", "FKTABLE_NAME", "FKCOLUMN_NAME", "KEY_SEQ", "UPDATE_RULE", "DELETE_RULE", "FK_NAME", "PK_NAME", "DEFERRABILITY"}); + } + + @Override + public ResultSet getCrossReference(String parentCatalog, String parentSchema, String parentTable, String foreignCatalog, String foreignSchema, String foreignTable) throws SQLException { + return createEmptyResultSet(new String[]{"PKTABLE_CAT", "PKTABLE_SCHEM", "PKTABLE_NAME", "PKCOLUMN_NAME", "FKTABLE_CAT", "FKTABLE_SCHEM", "FKTABLE_NAME", "FKCOLUMN_NAME", "KEY_SEQ", "UPDATE_RULE", "DELETE_RULE", "FK_NAME", "PK_NAME", "DEFERRABILITY"}); + } + + @Override + public ResultSet getTypeInfo() throws SQLException { + List> rows = new ArrayList<>(); + // Add basic SQL types + rows.add(List.of("VARCHAR", String.valueOf(Types.VARCHAR), "65535", "'", "'", "length", "1", "3", "1", "0", "0", "0", "VARCHAR", "0", "0", String.valueOf(Types.VARCHAR), "0", "10")); + rows.add(List.of("BIGINT", String.valueOf(Types.BIGINT), "19", null, null, null, "1", "2", "0", "0", "0", "1", "BIGINT", "0", "0", String.valueOf(Types.BIGINT), "0", "10")); + rows.add(List.of("BOOLEAN", String.valueOf(Types.BOOLEAN), "1", null, null, null, "1", "2", "0", "0", "0", "1", "BOOLEAN", "0", "0", String.valueOf(Types.BOOLEAN), "0", "10")); + rows.add(List.of("TIMESTAMP", String.valueOf(Types.TIMESTAMP), "23", "'", "'", null, "1", "3", "0", "0", "0", "0", "TIMESTAMP", "0", "6", String.valueOf(Types.TIMESTAMP), "0", "10")); + + return createResultSet(new String[]{"TYPE_NAME", "DATA_TYPE", "PRECISION", "LITERAL_PREFIX", "LITERAL_SUFFIX", "CREATE_PARAMS", "NULLABLE", "CASE_SENSITIVE", "SEARCHABLE", "UNSIGNED_ATTRIBUTE", "FIXED_PREC_SCALE", "AUTO_INCREMENT", "LOCAL_TYPE_NAME", "MINIMUM_SCALE", "MAXIMUM_SCALE", "SQL_DATA_TYPE", "SQL_DATETIME_SUB", "NUM_PREC_RADIX"}, rows); + } + + @Override + public ResultSet getIndexInfo(String catalog, String schema, String table, boolean unique, boolean approximate) throws SQLException { + return createEmptyResultSet(new String[]{"TABLE_CAT", "TABLE_SCHEM", "TABLE_NAME", "NON_UNIQUE", "INDEX_QUALIFIER", "INDEX_NAME", "TYPE", "ORDINAL_POSITION", "COLUMN_NAME", "ASC_OR_DESC", "CARDINALITY", "PAGES", "FILTER_CONDITION"}); + } + + @Override + public boolean supportsResultSetType(int type) throws SQLException { + return type == ResultSet.TYPE_FORWARD_ONLY; + } + + @Override + public boolean supportsResultSetConcurrency(int type, int concurrency) throws SQLException { + return type == ResultSet.TYPE_FORWARD_ONLY && concurrency == ResultSet.CONCUR_READ_ONLY; + } + + @Override + public boolean ownUpdatesAreVisible(int type) throws SQLException { + return false; + } + + @Override + public boolean ownDeletesAreVisible(int type) throws SQLException { + return false; + } + + @Override + public boolean ownInsertsAreVisible(int type) throws SQLException { + return false; + } + + @Override + public boolean othersUpdatesAreVisible(int type) throws SQLException { + return false; + } + + @Override + public boolean othersDeletesAreVisible(int type) throws SQLException { + return false; + } + + @Override + public boolean othersInsertsAreVisible(int type) throws SQLException { + return false; + } + + @Override + public boolean updatesAreDetected(int type) throws SQLException { + return false; + } + + @Override + public boolean deletesAreDetected(int type) throws SQLException { + return false; + } + + @Override + public boolean insertsAreDetected(int type) throws SQLException { + return false; + } + + @Override + public boolean supportsBatchUpdates() throws SQLException { + return false; + } + + @Override + public ResultSet getUDTs(String catalog, String schemaPattern, String typeNamePattern, int[] types) throws SQLException { + return createEmptyResultSet(new String[]{"TYPE_CAT", "TYPE_SCHEM", "TYPE_NAME", "CLASS_NAME", "DATA_TYPE", "REMARKS", "BASE_TYPE"}); + } + + @Override + public Connection getConnection() throws SQLException { + return connection; + } + + // JDBC 3.0 methods + @Override + public boolean supportsSavepoints() throws SQLException { + return false; + } + + @Override + public boolean supportsNamedParameters() throws SQLException { + return false; + } + + @Override + public boolean supportsMultipleOpenResults() throws SQLException { + return false; + } + + @Override + public boolean supportsGetGeneratedKeys() throws SQLException { + return false; + } + + @Override + public ResultSet getSuperTypes(String catalog, String schemaPattern, String typeNamePattern) throws SQLException { + return createEmptyResultSet(new String[]{"TYPE_CAT", "TYPE_SCHEM", "TYPE_NAME", "SUPERTYPE_CAT", "SUPERTYPE_SCHEM", "SUPERTYPE_NAME"}); + } + + @Override + public ResultSet getSuperTables(String catalog, String schemaPattern, String tableNamePattern) throws SQLException { + return createEmptyResultSet(new String[]{"TABLE_CAT", "TABLE_SCHEM", "TABLE_NAME", "SUPERTABLE_NAME"}); + } + + @Override + public ResultSet getAttributes(String catalog, String schemaPattern, String typeNamePattern, String attributeNamePattern) throws SQLException { + return createEmptyResultSet(new String[]{"TYPE_CAT", "TYPE_SCHEM", "TYPE_NAME", "ATTR_NAME", "DATA_TYPE", "ATTR_TYPE_NAME", "ATTR_SIZE", "DECIMAL_DIGITS", "NUM_PREC_RADIX", "NULLABLE", "REMARKS", "ATTR_DEF", "SQL_DATA_TYPE", "SQL_DATETIME_SUB", "CHAR_OCTET_LENGTH", "ORDINAL_POSITION", "IS_NULLABLE", "SCOPE_CATALOG", "SCOPE_SCHEMA", "SCOPE_TABLE", "SOURCE_DATA_TYPE"}); + } + + @Override + public boolean supportsResultSetHoldability(int holdability) throws SQLException { + return holdability == ResultSet.CLOSE_CURSORS_AT_COMMIT; + } + + @Override + public int getResultSetHoldability() throws SQLException { + return ResultSet.CLOSE_CURSORS_AT_COMMIT; + } + + @Override + public int getDatabaseMajorVersion() throws SQLException { + return 1; + } + + @Override + public int getDatabaseMinorVersion() throws SQLException { + return 0; + } + + @Override + public int getJDBCMajorVersion() throws SQLException { + return 4; + } + + @Override + public int getJDBCMinorVersion() throws SQLException { + return 0; + } + + @Override + public int getSQLStateType() throws SQLException { + return DatabaseMetaData.sqlStateSQL; + } + + @Override + public boolean locatorsUpdateCopy() throws SQLException { + return false; + } + + @Override + public boolean supportsStatementPooling() throws SQLException { + return false; + } + + // JDBC 4.0 methods + @Override + public RowIdLifetime getRowIdLifetime() throws SQLException { + return RowIdLifetime.ROWID_UNSUPPORTED; + } + + @Override + public ResultSet getSchemas(String catalog, String schemaPattern) throws SQLException { + List> rows = new ArrayList<>(); + rows.add(List.of(connection.getConnectionInfo().getDatabase(), "default")); + return createResultSet(new String[]{"TABLE_SCHEM", "TABLE_CATALOG"}, rows); + } + + @Override + public boolean supportsStoredFunctionsUsingCallSyntax() throws SQLException { + return false; + } + + @Override + public boolean autoCommitFailureClosesAllResultSets() throws SQLException { + return false; + } + + @Override + public ResultSet getClientInfoProperties() throws SQLException { + return createEmptyResultSet(new String[]{"NAME", "MAX_LEN", "DEFAULT_VALUE", "DESCRIPTION"}); + } + + @Override + public ResultSet getFunctions(String catalog, String schemaPattern, String functionNamePattern) throws SQLException { + return createEmptyResultSet(new String[]{"FUNCTION_CAT", "FUNCTION_SCHEM", "FUNCTION_NAME", "REMARKS", "FUNCTION_TYPE", "SPECIFIC_NAME"}); + } + + @Override + public ResultSet getFunctionColumns(String catalog, String schemaPattern, String functionNamePattern, String columnNamePattern) throws SQLException { + return createEmptyResultSet(new String[]{"FUNCTION_CAT", "FUNCTION_SCHEM", "FUNCTION_NAME", "COLUMN_NAME", "COLUMN_TYPE", "DATA_TYPE", "TYPE_NAME", "PRECISION", "LENGTH", "SCALE", "RADIX", "NULLABLE", "REMARKS", "CHAR_OCTET_LENGTH", "ORDINAL_POSITION", "IS_NULLABLE", "SPECIFIC_NAME"}); + } + + // JDBC 4.1 methods + @Override + public ResultSet getPseudoColumns(String catalog, String schemaPattern, String tableNamePattern, String columnNamePattern) throws SQLException { + return createEmptyResultSet(new String[]{"TABLE_CAT", "TABLE_SCHEM", "TABLE_NAME", "COLUMN_NAME", "DATA_TYPE", "COLUMN_SIZE", "DECIMAL_DIGITS", "NUM_PREC_RADIX", "COLUMN_USAGE", "REMARKS", "CHAR_OCTET_LENGTH", "IS_NULLABLE"}); + } + + @Override + public boolean generatedKeyAlwaysReturned() throws SQLException { + return false; + } + + @Override + public T unwrap(Class iface) throws SQLException { + if (iface.isAssignableFrom(getClass())) { + return iface.cast(this); + } + throw new SQLException("Cannot unwrap to " + iface.getName()); + } + + @Override + public boolean isWrapperFor(Class iface) throws SQLException { + return iface.isAssignableFrom(getClass()); + } + + // Helper methods to create result sets + private ResultSet createEmptyResultSet(String[] columnNames) throws SQLException { + return createResultSet(columnNames, new ArrayList<>()); + } + + private ResultSet createResultSet(String[] columnNames, List> rows) throws SQLException { + // Convert to the format expected by SeaweedFSResultSet + byte[] data = serializeResultSetData(columnNames, rows); + return new SeaweedFSResultSet(null, data); + } + + private byte[] serializeResultSetData(String[] columnNames, List> rows) { + java.io.ByteArrayOutputStream baos = new java.io.ByteArrayOutputStream(); + java.io.DataOutputStream dos = new java.io.DataOutputStream(baos); + + try { + // Column count + dos.writeInt(columnNames.length); + + // Column names + for (String name : columnNames) { + byte[] nameBytes = name.getBytes(); + dos.writeInt(nameBytes.length); + dos.write(nameBytes); + } + + // Row count + dos.writeInt(rows.size()); + + // Rows + for (List row : rows) { + for (String value : row) { + if (value != null) { + byte[] valueBytes = value.getBytes(); + dos.writeInt(valueBytes.length); + dos.write(valueBytes); + } else { + dos.writeInt(0); // null value + } + } + } + + dos.flush(); + return baos.toByteArray(); + + } catch (Exception e) { + throw new RuntimeException("Failed to serialize result set data", e); + } + } +} diff --git a/jdbc-driver/src/main/java/com/seaweedfs/jdbc/SeaweedFSDriver.java b/jdbc-driver/src/main/java/com/seaweedfs/jdbc/SeaweedFSDriver.java new file mode 100644 index 000000000..c15ae92f1 --- /dev/null +++ b/jdbc-driver/src/main/java/com/seaweedfs/jdbc/SeaweedFSDriver.java @@ -0,0 +1,207 @@ +package com.seaweedfs.jdbc; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.*; +import java.util.Properties; + +/** + * SeaweedFS JDBC Driver + * + * Provides JDBC connectivity to SeaweedFS SQL engine for querying MQ topics. + * + * JDBC URL format: jdbc:seaweedfs://host:port/database + * + * Example usage: + *
+ * Class.forName("com.seaweedfs.jdbc.SeaweedFSDriver");
+ * Connection conn = DriverManager.getConnection("jdbc:seaweedfs://localhost:8089/default");
+ * Statement stmt = conn.createStatement();
+ * ResultSet rs = stmt.executeQuery("SELECT * FROM my_topic LIMIT 10");
+ * 
+ */ +public class SeaweedFSDriver implements Driver { + + private static final Logger logger = LoggerFactory.getLogger(SeaweedFSDriver.class); + + // Driver information + public static final String DRIVER_NAME = "SeaweedFS JDBC Driver"; + public static final String DRIVER_VERSION = "1.0.0"; + public static final int DRIVER_MAJOR_VERSION = 1; + public static final int DRIVER_MINOR_VERSION = 0; + + // URL prefix for SeaweedFS JDBC connections + public static final String URL_PREFIX = "jdbc:seaweedfs://"; + + // Default connection properties + public static final String PROP_HOST = "host"; + public static final String PROP_PORT = "port"; + public static final String PROP_DATABASE = "database"; + public static final String PROP_USER = "user"; + public static final String PROP_PASSWORD = "password"; + public static final String PROP_CONNECT_TIMEOUT = "connectTimeout"; + public static final String PROP_SOCKET_TIMEOUT = "socketTimeout"; + + static { + try { + // Register the driver with the DriverManager + DriverManager.registerDriver(new SeaweedFSDriver()); + logger.info("SeaweedFS JDBC Driver {} registered successfully", DRIVER_VERSION); + } catch (SQLException e) { + logger.error("Failed to register SeaweedFS JDBC Driver", e); + throw new RuntimeException("Failed to register SeaweedFS JDBC Driver", e); + } + } + + @Override + public Connection connect(String url, Properties info) throws SQLException { + if (!acceptsURL(url)) { + return null; // Not our URL, let another driver handle it + } + + logger.debug("Attempting to connect to: {}", url); + + try { + // Parse the URL to extract connection parameters + SeaweedFSConnectionInfo connectionInfo = parseURL(url, info); + + // Create and return the connection + return new SeaweedFSConnection(connectionInfo); + + } catch (Exception e) { + logger.error("Failed to connect to SeaweedFS: {}", e.getMessage(), e); + throw new SQLException("Failed to connect to SeaweedFS: " + e.getMessage(), e); + } + } + + @Override + public boolean acceptsURL(String url) throws SQLException { + return url != null && url.startsWith(URL_PREFIX); + } + + @Override + public DriverPropertyInfo[] getPropertyInfo(String url, Properties info) throws SQLException { + return new DriverPropertyInfo[] { + createPropertyInfo(PROP_HOST, "localhost", "SeaweedFS JDBC server hostname", null, false), + createPropertyInfo(PROP_PORT, "8089", "SeaweedFS JDBC server port", null, false), + createPropertyInfo(PROP_DATABASE, "default", "Database/namespace name", null, false), + createPropertyInfo(PROP_USER, "", "Username (optional)", null, false), + createPropertyInfo(PROP_PASSWORD, "", "Password (optional)", null, false), + createPropertyInfo(PROP_CONNECT_TIMEOUT, "30000", "Connection timeout in milliseconds", null, false), + createPropertyInfo(PROP_SOCKET_TIMEOUT, "0", "Socket timeout in milliseconds (0 = infinite)", null, false) + }; + } + + private DriverPropertyInfo createPropertyInfo(String name, String defaultValue, String description, String[] choices, boolean required) { + DriverPropertyInfo info = new DriverPropertyInfo(name, defaultValue); + info.description = description; + info.choices = choices; + info.required = required; + return info; + } + + @Override + public int getMajorVersion() { + return DRIVER_MAJOR_VERSION; + } + + @Override + public int getMinorVersion() { + return DRIVER_MINOR_VERSION; + } + + @Override + public boolean jdbcCompliant() { + // We implement a subset of JDBC, so we're not fully compliant + return false; + } + + @Override + public java.util.logging.Logger getParentLogger() throws SQLFeatureNotSupportedException { + throw new SQLFeatureNotSupportedException("getParentLogger is not supported"); + } + + /** + * Parse JDBC URL and extract connection information + * + * Expected format: jdbc:seaweedfs://host:port/database[?property=value&...] + */ + private SeaweedFSConnectionInfo parseURL(String url, Properties info) throws SQLException { + if (!acceptsURL(url)) { + throw new SQLException("Invalid SeaweedFS JDBC URL: " + url); + } + + try { + // Remove the jdbc:seaweedfs:// prefix + String remaining = url.substring(URL_PREFIX.length()); + + // Split into host:port/database and query parameters + String[] parts = remaining.split("\\?", 2); + String hostPortDb = parts[0]; + String queryParams = parts.length > 1 ? parts[1] : ""; + + // Parse host, port, and database + String host = "localhost"; + int port = 8089; + String database = "default"; + + if (hostPortDb.contains("/")) { + String[] hostPortDbParts = hostPortDb.split("/", 2); + String hostPort = hostPortDbParts[0]; + database = hostPortDbParts[1]; + + if (hostPort.contains(":")) { + String[] hostPortParts = hostPort.split(":", 2); + host = hostPortParts[0]; + port = Integer.parseInt(hostPortParts[1]); + } else { + host = hostPort; + } + } else if (hostPortDb.contains(":")) { + String[] hostPortParts = hostPortDb.split(":", 2); + host = hostPortParts[0]; + port = Integer.parseInt(hostPortParts[1]); + } else if (!hostPortDb.isEmpty()) { + host = hostPortDb; + } + + // Create properties with defaults + Properties connectionProps = new Properties(); + connectionProps.setProperty(PROP_HOST, host); + connectionProps.setProperty(PROP_PORT, String.valueOf(port)); + connectionProps.setProperty(PROP_DATABASE, database); + connectionProps.setProperty(PROP_USER, ""); + connectionProps.setProperty(PROP_PASSWORD, ""); + connectionProps.setProperty(PROP_CONNECT_TIMEOUT, "30000"); + connectionProps.setProperty(PROP_SOCKET_TIMEOUT, "0"); + + // Override with provided properties + if (info != null) { + connectionProps.putAll(info); + } + + // Parse query parameters + if (!queryParams.isEmpty()) { + for (String param : queryParams.split("&")) { + String[] keyValue = param.split("=", 2); + if (keyValue.length == 2) { + connectionProps.setProperty(keyValue[0], keyValue[1]); + } + } + } + + return new SeaweedFSConnectionInfo(connectionProps); + + } catch (Exception e) { + throw new SQLException("Failed to parse SeaweedFS JDBC URL: " + url, e); + } + } + + /** + * Get driver information string + */ + public static String getDriverInfo() { + return String.format("%s v%s", DRIVER_NAME, DRIVER_VERSION); + } +} diff --git a/jdbc-driver/src/main/java/com/seaweedfs/jdbc/SeaweedFSPreparedStatement.java b/jdbc-driver/src/main/java/com/seaweedfs/jdbc/SeaweedFSPreparedStatement.java new file mode 100644 index 000000000..cf766dd68 --- /dev/null +++ b/jdbc-driver/src/main/java/com/seaweedfs/jdbc/SeaweedFSPreparedStatement.java @@ -0,0 +1,352 @@ +package com.seaweedfs.jdbc; + +import java.io.InputStream; +import java.io.Reader; +import java.math.BigDecimal; +import java.net.URL; +import java.sql.*; +import java.util.Calendar; +import java.util.HashMap; +import java.util.Map; + +/** + * PreparedStatement implementation for SeaweedFS JDBC + */ +public class SeaweedFSPreparedStatement extends SeaweedFSStatement implements PreparedStatement { + + private final String originalSql; + private final Map parameters = new HashMap<>(); + + public SeaweedFSPreparedStatement(SeaweedFSConnection connection, String sql) { + super(connection); + this.originalSql = sql; + } + + @Override + public ResultSet executeQuery() throws SQLException { + return executeQuery(buildSqlWithParameters()); + } + + @Override + public int executeUpdate() throws SQLException { + return executeUpdate(buildSqlWithParameters()); + } + + @Override + public void setNull(int parameterIndex, int sqlType) throws SQLException { + checkClosed(); + parameters.put(parameterIndex, null); + } + + @Override + public void setBoolean(int parameterIndex, boolean x) throws SQLException { + checkClosed(); + parameters.put(parameterIndex, x); + } + + @Override + public void setByte(int parameterIndex, byte x) throws SQLException { + checkClosed(); + parameters.put(parameterIndex, x); + } + + @Override + public void setShort(int parameterIndex, short x) throws SQLException { + checkClosed(); + parameters.put(parameterIndex, x); + } + + @Override + public void setInt(int parameterIndex, int x) throws SQLException { + checkClosed(); + parameters.put(parameterIndex, x); + } + + @Override + public void setLong(int parameterIndex, long x) throws SQLException { + checkClosed(); + parameters.put(parameterIndex, x); + } + + @Override + public void setFloat(int parameterIndex, float x) throws SQLException { + checkClosed(); + parameters.put(parameterIndex, x); + } + + @Override + public void setDouble(int parameterIndex, double x) throws SQLException { + checkClosed(); + parameters.put(parameterIndex, x); + } + + @Override + public void setBigDecimal(int parameterIndex, BigDecimal x) throws SQLException { + checkClosed(); + parameters.put(parameterIndex, x); + } + + @Override + public void setString(int parameterIndex, String x) throws SQLException { + checkClosed(); + parameters.put(parameterIndex, x); + } + + @Override + public void setBytes(int parameterIndex, byte[] x) throws SQLException { + checkClosed(); + parameters.put(parameterIndex, x); + } + + @Override + public void setDate(int parameterIndex, Date x) throws SQLException { + checkClosed(); + parameters.put(parameterIndex, x); + } + + @Override + public void setTime(int parameterIndex, Time x) throws SQLException { + checkClosed(); + parameters.put(parameterIndex, x); + } + + @Override + public void setTimestamp(int parameterIndex, Timestamp x) throws SQLException { + checkClosed(); + parameters.put(parameterIndex, x); + } + + @Override + public void setAsciiStream(int parameterIndex, InputStream x, int length) throws SQLException { + throw new SQLFeatureNotSupportedException("ASCII streams are not supported"); + } + + @Override + public void setUnicodeStream(int parameterIndex, InputStream x, int length) throws SQLException { + throw new SQLFeatureNotSupportedException("Unicode streams are not supported"); + } + + @Override + public void setBinaryStream(int parameterIndex, InputStream x, int length) throws SQLException { + throw new SQLFeatureNotSupportedException("Binary streams are not supported"); + } + + @Override + public void clearParameters() throws SQLException { + checkClosed(); + parameters.clear(); + } + + @Override + public void setObject(int parameterIndex, Object x, int targetSqlType) throws SQLException { + setObject(parameterIndex, x); + } + + @Override + public void setObject(int parameterIndex, Object x) throws SQLException { + checkClosed(); + parameters.put(parameterIndex, x); + } + + @Override + public boolean execute() throws SQLException { + return execute(buildSqlWithParameters()); + } + + @Override + public void addBatch() throws SQLException { + checkClosed(); + addBatch(buildSqlWithParameters()); + } + + @Override + public void setCharacterStream(int parameterIndex, Reader reader, int length) throws SQLException { + throw new SQLFeatureNotSupportedException("Character streams are not supported"); + } + + @Override + public void setRef(int parameterIndex, Ref x) throws SQLException { + throw new SQLFeatureNotSupportedException("Ref objects are not supported"); + } + + @Override + public void setBlob(int parameterIndex, Blob x) throws SQLException { + throw new SQLFeatureNotSupportedException("Blob objects are not supported"); + } + + @Override + public void setClob(int parameterIndex, Clob x) throws SQLException { + throw new SQLFeatureNotSupportedException("Clob objects are not supported"); + } + + @Override + public void setArray(int parameterIndex, Array x) throws SQLException { + throw new SQLFeatureNotSupportedException("Array objects are not supported"); + } + + @Override + public ResultSetMetaData getMetaData() throws SQLException { + throw new SQLFeatureNotSupportedException("Prepared statement metadata is not supported"); + } + + @Override + public void setDate(int parameterIndex, Date x, Calendar cal) throws SQLException { + setDate(parameterIndex, x); + } + + @Override + public void setTime(int parameterIndex, Time x, Calendar cal) throws SQLException { + setTime(parameterIndex, x); + } + + @Override + public void setTimestamp(int parameterIndex, Timestamp x, Calendar cal) throws SQLException { + setTimestamp(parameterIndex, x); + } + + @Override + public void setNull(int parameterIndex, int sqlType, String typeName) throws SQLException { + setNull(parameterIndex, sqlType); + } + + @Override + public void setURL(int parameterIndex, URL x) throws SQLException { + checkClosed(); + parameters.put(parameterIndex, x.toString()); + } + + @Override + public ParameterMetaData getParameterMetaData() throws SQLException { + throw new SQLFeatureNotSupportedException("Parameter metadata is not supported"); + } + + @Override + public void setRowId(int parameterIndex, RowId x) throws SQLException { + throw new SQLFeatureNotSupportedException("RowId objects are not supported"); + } + + @Override + public void setNString(int parameterIndex, String value) throws SQLException { + setString(parameterIndex, value); + } + + @Override + public void setNCharacterStream(int parameterIndex, Reader value, long length) throws SQLException { + throw new SQLFeatureNotSupportedException("NCharacter streams are not supported"); + } + + @Override + public void setNClob(int parameterIndex, NClob value) throws SQLException { + throw new SQLFeatureNotSupportedException("NClob objects are not supported"); + } + + @Override + public void setClob(int parameterIndex, Reader reader, long length) throws SQLException { + throw new SQLFeatureNotSupportedException("Clob objects are not supported"); + } + + @Override + public void setBlob(int parameterIndex, InputStream inputStream, long length) throws SQLException { + throw new SQLFeatureNotSupportedException("Blob objects are not supported"); + } + + @Override + public void setNClob(int parameterIndex, Reader reader, long length) throws SQLException { + throw new SQLFeatureNotSupportedException("NClob objects are not supported"); + } + + @Override + public void setSQLXML(int parameterIndex, SQLXML xmlObject) throws SQLException { + throw new SQLFeatureNotSupportedException("SQLXML objects are not supported"); + } + + @Override + public void setObject(int parameterIndex, Object x, int targetSqlType, int scaleOrLength) throws SQLException { + setObject(parameterIndex, x); + } + + @Override + public void setAsciiStream(int parameterIndex, InputStream x, long length) throws SQLException { + throw new SQLFeatureNotSupportedException("ASCII streams are not supported"); + } + + @Override + public void setBinaryStream(int parameterIndex, InputStream x, long length) throws SQLException { + throw new SQLFeatureNotSupportedException("Binary streams are not supported"); + } + + @Override + public void setCharacterStream(int parameterIndex, Reader reader, long length) throws SQLException { + throw new SQLFeatureNotSupportedException("Character streams are not supported"); + } + + @Override + public void setAsciiStream(int parameterIndex, InputStream x) throws SQLException { + throw new SQLFeatureNotSupportedException("ASCII streams are not supported"); + } + + @Override + public void setBinaryStream(int parameterIndex, InputStream x) throws SQLException { + throw new SQLFeatureNotSupportedException("Binary streams are not supported"); + } + + @Override + public void setCharacterStream(int parameterIndex, Reader reader) throws SQLException { + throw new SQLFeatureNotSupportedException("Character streams are not supported"); + } + + @Override + public void setNCharacterStream(int parameterIndex, Reader value) throws SQLException { + throw new SQLFeatureNotSupportedException("NCharacter streams are not supported"); + } + + @Override + public void setClob(int parameterIndex, Reader reader) throws SQLException { + throw new SQLFeatureNotSupportedException("Clob objects are not supported"); + } + + @Override + public void setBlob(int parameterIndex, InputStream inputStream) throws SQLException { + throw new SQLFeatureNotSupportedException("Blob objects are not supported"); + } + + @Override + public void setNClob(int parameterIndex, Reader reader) throws SQLException { + throw new SQLFeatureNotSupportedException("NClob objects are not supported"); + } + + /** + * Build the final SQL string by replacing parameter placeholders with actual values + */ + private String buildSqlWithParameters() throws SQLException { + String sql = originalSql; + + // Simple parameter substitution (not SQL-injection safe, but good enough for demo) + // In a production implementation, you would use proper parameter binding + for (Map.Entry entry : parameters.entrySet()) { + String placeholder = "\\?"; // Find first ? placeholder + String replacement; + + Object value = entry.getValue(); + if (value == null) { + replacement = "NULL"; + } else if (value instanceof String) { + // Escape single quotes and wrap in quotes + replacement = "'" + value.toString().replace("'", "''") + "'"; + } else if (value instanceof Number || value instanceof Boolean) { + replacement = value.toString(); + } else if (value instanceof Date) { + replacement = "'" + value.toString() + "'"; + } else if (value instanceof Timestamp) { + replacement = "'" + value.toString() + "'"; + } else { + replacement = "'" + value.toString().replace("'", "''") + "'"; + } + + // Replace the first occurrence of ? + sql = sql.replaceFirst(placeholder, replacement); + } + + return sql; + } +} diff --git a/jdbc-driver/src/main/java/com/seaweedfs/jdbc/SeaweedFSResultSet.java b/jdbc-driver/src/main/java/com/seaweedfs/jdbc/SeaweedFSResultSet.java new file mode 100644 index 000000000..0a3671f04 --- /dev/null +++ b/jdbc-driver/src/main/java/com/seaweedfs/jdbc/SeaweedFSResultSet.java @@ -0,0 +1,1245 @@ +package com.seaweedfs.jdbc; + +import java.io.InputStream; +import java.io.Reader; +import java.math.BigDecimal; +import java.net.URL; +import java.nio.ByteBuffer; +import java.sql.*; +import java.util.ArrayList; +import java.util.Calendar; +import java.util.List; +import java.util.Map; + +/** + * ResultSet implementation for SeaweedFS JDBC + */ +public class SeaweedFSResultSet implements ResultSet { + + private final SeaweedFSStatement statement; + private final List columnNames; + private final List> rows; + private int currentRowIndex = -1; // Before first row + private boolean closed = false; + private boolean wasNull = false; + + public SeaweedFSResultSet(SeaweedFSStatement statement, byte[] data) throws SQLException { + this.statement = statement; + this.columnNames = new ArrayList<>(); + this.rows = new ArrayList<>(); + + parseResultSetData(data); + } + + private void parseResultSetData(byte[] data) throws SQLException { + try { + ByteBuffer buffer = ByteBuffer.wrap(data); + + // Read column count + int columnCount = buffer.getInt(); + + // Read column names + for (int i = 0; i < columnCount; i++) { + int nameLength = buffer.getInt(); + byte[] nameBytes = new byte[nameLength]; + buffer.get(nameBytes); + columnNames.add(new String(nameBytes)); + } + + // Read row count + int rowCount = buffer.getInt(); + + // Read rows + for (int i = 0; i < rowCount; i++) { + List row = new ArrayList<>(); + for (int j = 0; j < columnCount; j++) { + int valueLength = buffer.getInt(); + if (valueLength > 0) { + byte[] valueBytes = new byte[valueLength]; + buffer.get(valueBytes); + row.add(new String(valueBytes)); + } else { + row.add(null); // Empty value = null + } + } + rows.add(row); + } + + } catch (Exception e) { + throw new SQLException("Failed to parse result set data", e); + } + } + + @Override + public boolean next() throws SQLException { + checkClosed(); + if (currentRowIndex + 1 < rows.size()) { + currentRowIndex++; + return true; + } + return false; + } + + @Override + public void close() throws SQLException { + closed = true; + } + + @Override + public boolean wasNull() throws SQLException { + checkClosed(); + return wasNull; + } + + @Override + public String getString(int columnIndex) throws SQLException { + checkClosed(); + checkRowPosition(); + checkColumnIndex(columnIndex); + + String value = getCurrentRow().get(columnIndex - 1); + wasNull = (value == null); + return value; + } + + @Override + public boolean getBoolean(int columnIndex) throws SQLException { + String value = getString(columnIndex); + if (wasNull) return false; + return Boolean.parseBoolean(value); + } + + @Override + public byte getByte(int columnIndex) throws SQLException { + String value = getString(columnIndex); + if (wasNull) return 0; + try { + return Byte.parseByte(value); + } catch (NumberFormatException e) { + throw new SQLException("Cannot convert '" + value + "' to byte", e); + } + } + + @Override + public short getShort(int columnIndex) throws SQLException { + String value = getString(columnIndex); + if (wasNull) return 0; + try { + return Short.parseShort(value); + } catch (NumberFormatException e) { + throw new SQLException("Cannot convert '" + value + "' to short", e); + } + } + + @Override + public int getInt(int columnIndex) throws SQLException { + String value = getString(columnIndex); + if (wasNull) return 0; + try { + return Integer.parseInt(value); + } catch (NumberFormatException e) { + throw new SQLException("Cannot convert '" + value + "' to int", e); + } + } + + @Override + public long getLong(int columnIndex) throws SQLException { + String value = getString(columnIndex); + if (wasNull) return 0; + try { + return Long.parseLong(value); + } catch (NumberFormatException e) { + throw new SQLException("Cannot convert '" + value + "' to long", e); + } + } + + @Override + public float getFloat(int columnIndex) throws SQLException { + String value = getString(columnIndex); + if (wasNull) return 0; + try { + return Float.parseFloat(value); + } catch (NumberFormatException e) { + throw new SQLException("Cannot convert '" + value + "' to float", e); + } + } + + @Override + public double getDouble(int columnIndex) throws SQLException { + String value = getString(columnIndex); + if (wasNull) return 0; + try { + return Double.parseDouble(value); + } catch (NumberFormatException e) { + throw new SQLException("Cannot convert '" + value + "' to double", e); + } + } + + @Override + public BigDecimal getBigDecimal(int columnIndex, int scale) throws SQLException { + String value = getString(columnIndex); + if (wasNull) return null; + try { + return new BigDecimal(value).setScale(scale, BigDecimal.ROUND_HALF_UP); + } catch (NumberFormatException e) { + throw new SQLException("Cannot convert '" + value + "' to BigDecimal", e); + } + } + + @Override + public byte[] getBytes(int columnIndex) throws SQLException { + String value = getString(columnIndex); + if (wasNull) return null; + return value.getBytes(); + } + + @Override + public Date getDate(int columnIndex) throws SQLException { + String value = getString(columnIndex); + if (wasNull) return null; + try { + return Date.valueOf(value); + } catch (IllegalArgumentException e) { + throw new SQLException("Cannot convert '" + value + "' to Date", e); + } + } + + @Override + public Time getTime(int columnIndex) throws SQLException { + String value = getString(columnIndex); + if (wasNull) return null; + try { + return Time.valueOf(value); + } catch (IllegalArgumentException e) { + throw new SQLException("Cannot convert '" + value + "' to Time", e); + } + } + + @Override + public Timestamp getTimestamp(int columnIndex) throws SQLException { + String value = getString(columnIndex); + if (wasNull) return null; + try { + // Try parsing as timestamp first + return Timestamp.valueOf(value); + } catch (IllegalArgumentException e) { + // If that fails, try parsing as long (nanoseconds) + try { + long nanos = Long.parseLong(value); + return new Timestamp(nanos / 1000000); // Convert nanos to millis + } catch (NumberFormatException e2) { + throw new SQLException("Cannot convert '" + value + "' to Timestamp", e); + } + } + } + + // String-based column access + @Override + public String getString(String columnLabel) throws SQLException { + return getString(findColumn(columnLabel)); + } + + @Override + public boolean getBoolean(String columnLabel) throws SQLException { + return getBoolean(findColumn(columnLabel)); + } + + @Override + public byte getByte(String columnLabel) throws SQLException { + return getByte(findColumn(columnLabel)); + } + + @Override + public short getShort(String columnLabel) throws SQLException { + return getShort(findColumn(columnLabel)); + } + + @Override + public int getInt(String columnLabel) throws SQLException { + return getInt(findColumn(columnLabel)); + } + + @Override + public long getLong(String columnLabel) throws SQLException { + return getLong(findColumn(columnLabel)); + } + + @Override + public float getFloat(String columnLabel) throws SQLException { + return getFloat(findColumn(columnLabel)); + } + + @Override + public double getDouble(String columnLabel) throws SQLException { + return getDouble(findColumn(columnLabel)); + } + + @Override + public BigDecimal getBigDecimal(String columnLabel, int scale) throws SQLException { + return getBigDecimal(findColumn(columnLabel), scale); + } + + @Override + public byte[] getBytes(String columnLabel) throws SQLException { + return getBytes(findColumn(columnLabel)); + } + + @Override + public Date getDate(String columnLabel) throws SQLException { + return getDate(findColumn(columnLabel)); + } + + @Override + public Time getTime(String columnLabel) throws SQLException { + return getTime(findColumn(columnLabel)); + } + + @Override + public Timestamp getTimestamp(String columnLabel) throws SQLException { + return getTimestamp(findColumn(columnLabel)); + } + + @Override + public int findColumn(String columnLabel) throws SQLException { + checkClosed(); + for (int i = 0; i < columnNames.size(); i++) { + if (columnNames.get(i).equalsIgnoreCase(columnLabel)) { + return i + 1; // JDBC uses 1-based indexing + } + } + throw new SQLException("Column not found: " + columnLabel); + } + + @Override + public ResultSetMetaData getMetaData() throws SQLException { + checkClosed(); + return new SeaweedFSResultSetMetaData(columnNames); + } + + @Override + public Object getObject(int columnIndex) throws SQLException { + return getString(columnIndex); + } + + @Override + public Object getObject(String columnLabel) throws SQLException { + return getString(columnLabel); + } + + // Navigation methods + @Override + public boolean isBeforeFirst() throws SQLException { + checkClosed(); + return currentRowIndex == -1 && !rows.isEmpty(); + } + + @Override + public boolean isAfterLast() throws SQLException { + checkClosed(); + return currentRowIndex >= rows.size() && !rows.isEmpty(); + } + + @Override + public boolean isFirst() throws SQLException { + checkClosed(); + return currentRowIndex == 0 && !rows.isEmpty(); + } + + @Override + public boolean isLast() throws SQLException { + checkClosed(); + return currentRowIndex == rows.size() - 1 && !rows.isEmpty(); + } + + @Override + public void beforeFirst() throws SQLException { + checkClosed(); + currentRowIndex = -1; + } + + @Override + public void afterLast() throws SQLException { + checkClosed(); + currentRowIndex = rows.size(); + } + + @Override + public boolean first() throws SQLException { + checkClosed(); + if (!rows.isEmpty()) { + currentRowIndex = 0; + return true; + } + return false; + } + + @Override + public boolean last() throws SQLException { + checkClosed(); + if (!rows.isEmpty()) { + currentRowIndex = rows.size() - 1; + return true; + } + return false; + } + + @Override + public int getRow() throws SQLException { + checkClosed(); + return currentRowIndex >= 0 && currentRowIndex < rows.size() ? currentRowIndex + 1 : 0; + } + + @Override + public boolean absolute(int row) throws SQLException { + checkClosed(); + if (row > 0 && row <= rows.size()) { + currentRowIndex = row - 1; + return true; + } else if (row < 0 && Math.abs(row) <= rows.size()) { + currentRowIndex = rows.size() + row; + return true; + } else { + if (row > rows.size()) { + currentRowIndex = rows.size(); // After last + } else { + currentRowIndex = -1; // Before first + } + return false; + } + } + + @Override + public boolean relative(int rows) throws SQLException { + return absolute(getRow() + rows); + } + + @Override + public boolean previous() throws SQLException { + checkClosed(); + if (currentRowIndex > 0) { + currentRowIndex--; + return true; + } + return false; + } + + // Unsupported operations + @Override + public InputStream getAsciiStream(int columnIndex) throws SQLException { + throw new SQLFeatureNotSupportedException("ASCII streams are not supported"); + } + + @Override + public InputStream getUnicodeStream(int columnIndex) throws SQLException { + throw new SQLFeatureNotSupportedException("Unicode streams are not supported"); + } + + @Override + public InputStream getBinaryStream(int columnIndex) throws SQLException { + throw new SQLFeatureNotSupportedException("Binary streams are not supported"); + } + + @Override + public InputStream getAsciiStream(String columnLabel) throws SQLException { + throw new SQLFeatureNotSupportedException("ASCII streams are not supported"); + } + + @Override + public InputStream getUnicodeStream(String columnLabel) throws SQLException { + throw new SQLFeatureNotSupportedException("Unicode streams are not supported"); + } + + @Override + public InputStream getBinaryStream(String columnLabel) throws SQLException { + throw new SQLFeatureNotSupportedException("Binary streams are not supported"); + } + + @Override + public SQLWarning getWarnings() throws SQLException { + return null; + } + + @Override + public void clearWarnings() throws SQLException { + // No-op + } + + @Override + public String getCursorName() throws SQLException { + throw new SQLFeatureNotSupportedException("Named cursors are not supported"); + } + + // Additional getters with default implementations + @Override + public Reader getCharacterStream(int columnIndex) throws SQLException { + throw new SQLFeatureNotSupportedException("Character streams are not supported"); + } + + @Override + public Reader getCharacterStream(String columnLabel) throws SQLException { + throw new SQLFeatureNotSupportedException("Character streams are not supported"); + } + + @Override + public BigDecimal getBigDecimal(int columnIndex) throws SQLException { + String value = getString(columnIndex); + if (wasNull) return null; + try { + return new BigDecimal(value); + } catch (NumberFormatException e) { + throw new SQLException("Cannot convert '" + value + "' to BigDecimal", e); + } + } + + @Override + public BigDecimal getBigDecimal(String columnLabel) throws SQLException { + return getBigDecimal(findColumn(columnLabel)); + } + + // Update operations (not supported - SeaweedFS is read-only) + @Override + public void updateNull(int columnIndex) throws SQLException { + throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only"); + } + + @Override + public void updateBoolean(int columnIndex, boolean x) throws SQLException { + throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only"); + } + + @Override + public void updateByte(int columnIndex, byte x) throws SQLException { + throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only"); + } + + @Override + public void updateShort(int columnIndex, short x) throws SQLException { + throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only"); + } + + @Override + public void updateInt(int columnIndex, int x) throws SQLException { + throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only"); + } + + @Override + public void updateLong(int columnIndex, long x) throws SQLException { + throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only"); + } + + @Override + public void updateFloat(int columnIndex, float x) throws SQLException { + throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only"); + } + + @Override + public void updateDouble(int columnIndex, double x) throws SQLException { + throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only"); + } + + @Override + public void updateBigDecimal(int columnIndex, BigDecimal x) throws SQLException { + throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only"); + } + + @Override + public void updateString(int columnIndex, String x) throws SQLException { + throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only"); + } + + @Override + public void updateBytes(int columnIndex, byte[] x) throws SQLException { + throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only"); + } + + @Override + public void updateDate(int columnIndex, Date x) throws SQLException { + throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only"); + } + + @Override + public void updateTime(int columnIndex, Time x) throws SQLException { + throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only"); + } + + @Override + public void updateTimestamp(int columnIndex, Timestamp x) throws SQLException { + throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only"); + } + + @Override + public void updateAsciiStream(int columnIndex, InputStream x, int length) throws SQLException { + throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only"); + } + + @Override + public void updateBinaryStream(int columnIndex, InputStream x, int length) throws SQLException { + throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only"); + } + + @Override + public void updateCharacterStream(int columnIndex, Reader x, int length) throws SQLException { + throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only"); + } + + @Override + public void updateObject(int columnIndex, Object x, int scaleOrLength) throws SQLException { + throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only"); + } + + @Override + public void updateObject(int columnIndex, Object x) throws SQLException { + throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only"); + } + + // String-based update operations (all throw exceptions) + @Override + public void updateNull(String columnLabel) throws SQLException { + throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only"); + } + + @Override + public void updateBoolean(String columnLabel, boolean x) throws SQLException { + throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only"); + } + + @Override + public void updateByte(String columnLabel, byte x) throws SQLException { + throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only"); + } + + @Override + public void updateShort(String columnLabel, short x) throws SQLException { + throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only"); + } + + @Override + public void updateInt(String columnLabel, int x) throws SQLException { + throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only"); + } + + @Override + public void updateLong(String columnLabel, long x) throws SQLException { + throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only"); + } + + @Override + public void updateFloat(String columnLabel, float x) throws SQLException { + throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only"); + } + + @Override + public void updateDouble(String columnLabel, double x) throws SQLException { + throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only"); + } + + @Override + public void updateBigDecimal(String columnLabel, BigDecimal x) throws SQLException { + throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only"); + } + + @Override + public void updateString(String columnLabel, String x) throws SQLException { + throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only"); + } + + @Override + public void updateBytes(String columnLabel, byte[] x) throws SQLException { + throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only"); + } + + @Override + public void updateDate(String columnLabel, Date x) throws SQLException { + throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only"); + } + + @Override + public void updateTime(String columnLabel, Time x) throws SQLException { + throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only"); + } + + @Override + public void updateTimestamp(String columnLabel, Timestamp x) throws SQLException { + throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only"); + } + + @Override + public void updateAsciiStream(String columnLabel, InputStream x, int length) throws SQLException { + throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only"); + } + + @Override + public void updateBinaryStream(String columnLabel, InputStream x, int length) throws SQLException { + throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only"); + } + + @Override + public void updateCharacterStream(String columnLabel, Reader reader, int length) throws SQLException { + throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only"); + } + + @Override + public void updateObject(String columnLabel, Object x, int scaleOrLength) throws SQLException { + throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only"); + } + + @Override + public void updateObject(String columnLabel, Object x) throws SQLException { + throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only"); + } + + @Override + public void insertRow() throws SQLException { + throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only"); + } + + @Override + public void updateRow() throws SQLException { + throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only"); + } + + @Override + public void deleteRow() throws SQLException { + throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only"); + } + + @Override + public void refreshRow() throws SQLException { + // No-op + } + + @Override + public void cancelRowUpdates() throws SQLException { + throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only"); + } + + @Override + public void moveToInsertRow() throws SQLException { + throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only"); + } + + @Override + public void moveToCurrentRow() throws SQLException { + // No-op + } + + @Override + public Statement getStatement() throws SQLException { + checkClosed(); + return statement; + } + + // Additional methods with empty/default implementations + @Override + public Object getObject(int columnIndex, Map> map) throws SQLException { + return getObject(columnIndex); + } + + @Override + public Ref getRef(int columnIndex) throws SQLException { + throw new SQLFeatureNotSupportedException("Ref objects are not supported"); + } + + @Override + public Blob getBlob(int columnIndex) throws SQLException { + throw new SQLFeatureNotSupportedException("Blob objects are not supported"); + } + + @Override + public Clob getClob(int columnIndex) throws SQLException { + throw new SQLFeatureNotSupportedException("Clob objects are not supported"); + } + + @Override + public Array getArray(int columnIndex) throws SQLException { + throw new SQLFeatureNotSupportedException("Array objects are not supported"); + } + + @Override + public Object getObject(String columnLabel, Map> map) throws SQLException { + return getObject(columnLabel); + } + + @Override + public Ref getRef(String columnLabel) throws SQLException { + throw new SQLFeatureNotSupportedException("Ref objects are not supported"); + } + + @Override + public Blob getBlob(String columnLabel) throws SQLException { + throw new SQLFeatureNotSupportedException("Blob objects are not supported"); + } + + @Override + public Clob getClob(String columnLabel) throws SQLException { + throw new SQLFeatureNotSupportedException("Clob objects are not supported"); + } + + @Override + public Array getArray(String columnLabel) throws SQLException { + throw new SQLFeatureNotSupportedException("Array objects are not supported"); + } + + @Override + public Date getDate(int columnIndex, Calendar cal) throws SQLException { + return getDate(columnIndex); + } + + @Override + public Date getDate(String columnLabel, Calendar cal) throws SQLException { + return getDate(columnLabel); + } + + @Override + public Time getTime(int columnIndex, Calendar cal) throws SQLException { + return getTime(columnIndex); + } + + @Override + public Time getTime(String columnLabel, Calendar cal) throws SQLException { + return getTime(columnLabel); + } + + @Override + public Timestamp getTimestamp(int columnIndex, Calendar cal) throws SQLException { + return getTimestamp(columnIndex); + } + + @Override + public Timestamp getTimestamp(String columnLabel, Calendar cal) throws SQLException { + return getTimestamp(columnLabel); + } + + @Override + public URL getURL(int columnIndex) throws SQLException { + String value = getString(columnIndex); + if (wasNull) return null; + try { + return new URL(value); + } catch (Exception e) { + throw new SQLException("Cannot convert '" + value + "' to URL", e); + } + } + + @Override + public URL getURL(String columnLabel) throws SQLException { + return getURL(findColumn(columnLabel)); + } + + // More update operations (all not supported) + @Override + public void updateRef(int columnIndex, Ref x) throws SQLException { + throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only"); + } + + @Override + public void updateRef(String columnLabel, Ref x) throws SQLException { + throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only"); + } + + @Override + public void updateBlob(int columnIndex, Blob x) throws SQLException { + throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only"); + } + + @Override + public void updateBlob(String columnLabel, Blob x) throws SQLException { + throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only"); + } + + @Override + public void updateClob(int columnIndex, Clob x) throws SQLException { + throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only"); + } + + @Override + public void updateClob(String columnLabel, Clob x) throws SQLException { + throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only"); + } + + @Override + public void updateArray(int columnIndex, Array x) throws SQLException { + throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only"); + } + + @Override + public void updateArray(String columnLabel, Array x) throws SQLException { + throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only"); + } + + // More modern JDBC methods + @Override + public RowId getRowId(int columnIndex) throws SQLException { + throw new SQLFeatureNotSupportedException("RowId objects are not supported"); + } + + @Override + public RowId getRowId(String columnLabel) throws SQLException { + throw new SQLFeatureNotSupportedException("RowId objects are not supported"); + } + + @Override + public void updateRowId(int columnIndex, RowId x) throws SQLException { + throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only"); + } + + @Override + public void updateRowId(String columnLabel, RowId x) throws SQLException { + throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only"); + } + + @Override + public int getHoldability() throws SQLException { + return ResultSet.CLOSE_CURSORS_AT_COMMIT; + } + + @Override + public boolean isClosed() throws SQLException { + return closed; + } + + @Override + public void updateNString(int columnIndex, String nString) throws SQLException { + throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only"); + } + + @Override + public void updateNString(String columnLabel, String nString) throws SQLException { + throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only"); + } + + @Override + public void updateNClob(int columnIndex, NClob nClob) throws SQLException { + throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only"); + } + + @Override + public void updateNClob(String columnLabel, NClob nClob) throws SQLException { + throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only"); + } + + @Override + public NClob getNClob(int columnIndex) throws SQLException { + throw new SQLFeatureNotSupportedException("NClob objects are not supported"); + } + + @Override + public NClob getNClob(String columnLabel) throws SQLException { + throw new SQLFeatureNotSupportedException("NClob objects are not supported"); + } + + @Override + public SQLXML getSQLXML(int columnIndex) throws SQLException { + throw new SQLFeatureNotSupportedException("SQLXML objects are not supported"); + } + + @Override + public SQLXML getSQLXML(String columnLabel) throws SQLException { + throw new SQLFeatureNotSupportedException("SQLXML objects are not supported"); + } + + @Override + public void updateSQLXML(int columnIndex, SQLXML xmlObject) throws SQLException { + throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only"); + } + + @Override + public void updateSQLXML(String columnLabel, SQLXML xmlObject) throws SQLException { + throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only"); + } + + @Override + public String getNString(int columnIndex) throws SQLException { + return getString(columnIndex); + } + + @Override + public String getNString(String columnLabel) throws SQLException { + return getString(columnLabel); + } + + @Override + public Reader getNCharacterStream(int columnIndex) throws SQLException { + throw new SQLFeatureNotSupportedException("NCharacter streams are not supported"); + } + + @Override + public Reader getNCharacterStream(String columnLabel) throws SQLException { + throw new SQLFeatureNotSupportedException("NCharacter streams are not supported"); + } + + @Override + public void updateNCharacterStream(int columnIndex, Reader x, long length) throws SQLException { + throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only"); + } + + @Override + public void updateNCharacterStream(String columnLabel, Reader reader, long length) throws SQLException { + throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only"); + } + + @Override + public void updateAsciiStream(int columnIndex, InputStream x, long length) throws SQLException { + throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only"); + } + + @Override + public void updateBinaryStream(int columnIndex, InputStream x, long length) throws SQLException { + throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only"); + } + + @Override + public void updateCharacterStream(int columnIndex, Reader x, long length) throws SQLException { + throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only"); + } + + @Override + public void updateAsciiStream(String columnLabel, InputStream x, long length) throws SQLException { + throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only"); + } + + @Override + public void updateBinaryStream(String columnLabel, InputStream x, long length) throws SQLException { + throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only"); + } + + @Override + public void updateCharacterStream(String columnLabel, Reader reader, long length) throws SQLException { + throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only"); + } + + @Override + public void updateBlob(int columnIndex, InputStream inputStream, long length) throws SQLException { + throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only"); + } + + @Override + public void updateBlob(String columnLabel, InputStream inputStream, long length) throws SQLException { + throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only"); + } + + @Override + public void updateClob(int columnIndex, Reader reader, long length) throws SQLException { + throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only"); + } + + @Override + public void updateClob(String columnLabel, Reader reader, long length) throws SQLException { + throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only"); + } + + @Override + public void updateNClob(int columnIndex, Reader reader, long length) throws SQLException { + throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only"); + } + + @Override + public void updateNClob(String columnLabel, Reader reader, long length) throws SQLException { + throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only"); + } + + @Override + public void updateNCharacterStream(int columnIndex, Reader x) throws SQLException { + throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only"); + } + + @Override + public void updateNCharacterStream(String columnLabel, Reader reader) throws SQLException { + throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only"); + } + + @Override + public void updateAsciiStream(int columnIndex, InputStream x) throws SQLException { + throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only"); + } + + @Override + public void updateBinaryStream(int columnIndex, InputStream x) throws SQLException { + throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only"); + } + + @Override + public void updateCharacterStream(int columnIndex, Reader x) throws SQLException { + throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only"); + } + + @Override + public void updateAsciiStream(String columnLabel, InputStream x) throws SQLException { + throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only"); + } + + @Override + public void updateBinaryStream(String columnLabel, InputStream x) throws SQLException { + throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only"); + } + + @Override + public void updateCharacterStream(String columnLabel, Reader reader) throws SQLException { + throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only"); + } + + @Override + public void updateBlob(int columnIndex, InputStream inputStream) throws SQLException { + throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only"); + } + + @Override + public void updateBlob(String columnLabel, InputStream inputStream) throws SQLException { + throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only"); + } + + @Override + public void updateClob(int columnIndex, Reader reader) throws SQLException { + throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only"); + } + + @Override + public void updateClob(String columnLabel, Reader reader) throws SQLException { + throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only"); + } + + @Override + public void updateNClob(int columnIndex, Reader reader) throws SQLException { + throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only"); + } + + @Override + public void updateNClob(String columnLabel, Reader reader) throws SQLException { + throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only"); + } + + @Override + public T getObject(int columnIndex, Class type) throws SQLException { + Object value = getObject(columnIndex); + if (value == null || wasNull) { + return null; + } + + if (type.isInstance(value)) { + return type.cast(value); + } + + // Basic type conversions + if (type == String.class) { + return type.cast(value.toString()); + } else if (type == Integer.class && value instanceof String) { + return type.cast(Integer.valueOf((String)value)); + } else if (type == Long.class && value instanceof String) { + return type.cast(Long.valueOf((String)value)); + } else if (type == Boolean.class && value instanceof String) { + return type.cast(Boolean.valueOf((String)value)); + } + + throw new SQLException("Cannot convert " + value.getClass().getName() + " to " + type.getName()); + } + + @Override + public T getObject(String columnLabel, Class type) throws SQLException { + return getObject(findColumn(columnLabel), type); + } + + @Override + public T unwrap(Class iface) throws SQLException { + if (iface.isAssignableFrom(getClass())) { + return iface.cast(this); + } + throw new SQLException("Cannot unwrap to " + iface.getName()); + } + + @Override + public boolean isWrapperFor(Class iface) throws SQLException { + return iface.isAssignableFrom(getClass()); + } + + @Override + public boolean rowDeleted() throws SQLException { + checkClosed(); + return false; // SeaweedFS is read-only, no deletions possible + } + + @Override + public boolean rowInserted() throws SQLException { + checkClosed(); + return false; // SeaweedFS is read-only, no insertions possible + } + + @Override + public boolean rowUpdated() throws SQLException { + checkClosed(); + return false; // SeaweedFS is read-only, no updates possible + } + + @Override + public int getConcurrency() throws SQLException { + checkClosed(); + return ResultSet.CONCUR_READ_ONLY; // SeaweedFS is read-only + } + + @Override + public int getType() throws SQLException { + checkClosed(); + return ResultSet.TYPE_FORWARD_ONLY; // Forward-only scrolling + } + + @Override + public int getFetchSize() throws SQLException { + checkClosed(); + return 1000; // Default fetch size + } + + @Override + public void setFetchSize(int rows) throws SQLException { + checkClosed(); + // No-op for now, could be enhanced to affect performance + } + + @Override + public int getFetchDirection() throws SQLException { + checkClosed(); + return ResultSet.FETCH_FORWARD; // Always forward-only + } + + @Override + public void setFetchDirection(int direction) throws SQLException { + checkClosed(); + if (direction != ResultSet.FETCH_FORWARD) { + throw new SQLException("Only FETCH_FORWARD is supported"); + } + } + + // Helper methods + private void checkClosed() throws SQLException { + if (closed) { + throw new SQLException("ResultSet is closed"); + } + } + + private void checkRowPosition() throws SQLException { + if (currentRowIndex < 0 || currentRowIndex >= rows.size()) { + throw new SQLException("ResultSet is not positioned on a valid row"); + } + } + + private void checkColumnIndex(int columnIndex) throws SQLException { + if (columnIndex < 1 || columnIndex > columnNames.size()) { + throw new SQLException("Column index " + columnIndex + " is out of range (1-" + columnNames.size() + ")"); + } + } + + private List getCurrentRow() { + return rows.get(currentRowIndex); + } + + // Get result set information + public int getColumnCount() { + return columnNames.size(); + } + + public List getColumnNames() { + return new ArrayList<>(columnNames); + } + + public int getRowCount() { + return rows.size(); + } +} diff --git a/jdbc-driver/src/main/java/com/seaweedfs/jdbc/SeaweedFSResultSetMetaData.java b/jdbc-driver/src/main/java/com/seaweedfs/jdbc/SeaweedFSResultSetMetaData.java new file mode 100644 index 000000000..d79a2ef2b --- /dev/null +++ b/jdbc-driver/src/main/java/com/seaweedfs/jdbc/SeaweedFSResultSetMetaData.java @@ -0,0 +1,202 @@ +package com.seaweedfs.jdbc; + +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.sql.Types; +import java.util.List; + +/** + * ResultSetMetaData implementation for SeaweedFS JDBC + */ +public class SeaweedFSResultSetMetaData implements ResultSetMetaData { + + private final List columnNames; + + public SeaweedFSResultSetMetaData(List columnNames) { + this.columnNames = columnNames; + } + + @Override + public int getColumnCount() throws SQLException { + return columnNames.size(); + } + + @Override + public boolean isAutoIncrement(int column) throws SQLException { + checkColumnIndex(column); + return false; // SeaweedFS doesn't have auto-increment columns + } + + @Override + public boolean isCaseSensitive(int column) throws SQLException { + checkColumnIndex(column); + return true; // Assume case sensitive + } + + @Override + public boolean isSearchable(int column) throws SQLException { + checkColumnIndex(column); + return true; // All columns are searchable + } + + @Override + public boolean isCurrency(int column) throws SQLException { + checkColumnIndex(column); + return false; // No currency columns + } + + @Override + public int isNullable(int column) throws SQLException { + checkColumnIndex(column); + return columnNullable; // Assume nullable + } + + @Override + public boolean isSigned(int column) throws SQLException { + checkColumnIndex(column); + // For simplicity, assume all numeric types are signed + return true; + } + + @Override + public int getColumnDisplaySize(int column) throws SQLException { + checkColumnIndex(column); + return 50; // Default display size + } + + @Override + public String getColumnLabel(int column) throws SQLException { + checkColumnIndex(column); + return columnNames.get(column - 1); + } + + @Override + public String getColumnName(int column) throws SQLException { + checkColumnIndex(column); + return columnNames.get(column - 1); + } + + @Override + public String getSchemaName(int column) throws SQLException { + checkColumnIndex(column); + return ""; // No schema concept in SeaweedFS + } + + @Override + public int getPrecision(int column) throws SQLException { + checkColumnIndex(column); + return 0; // Unknown precision + } + + @Override + public int getScale(int column) throws SQLException { + checkColumnIndex(column); + return 0; // Unknown scale + } + + @Override + public String getTableName(int column) throws SQLException { + checkColumnIndex(column); + return ""; // Table name not available in result set metadata + } + + @Override + public String getCatalogName(int column) throws SQLException { + checkColumnIndex(column); + return ""; // No catalog concept in SeaweedFS + } + + @Override + public int getColumnType(int column) throws SQLException { + checkColumnIndex(column); + // For simplicity, we'll determine type based on column name patterns + String columnName = columnNames.get(column - 1).toLowerCase(); + + if (columnName.contains("timestamp") || columnName.contains("time") || columnName.equals("_timestamp_ns")) { + return Types.TIMESTAMP; + } else if (columnName.contains("id") || columnName.contains("count") || columnName.contains("size")) { + return Types.BIGINT; + } else if (columnName.contains("amount") || columnName.contains("price") || columnName.contains("rate")) { + return Types.DECIMAL; + } else if (columnName.contains("flag") || columnName.contains("enabled") || columnName.contains("active")) { + return Types.BOOLEAN; + } else { + return Types.VARCHAR; // Default to VARCHAR + } + } + + @Override + public String getColumnTypeName(int column) throws SQLException { + int sqlType = getColumnType(column); + switch (sqlType) { + case Types.VARCHAR: + return "VARCHAR"; + case Types.BIGINT: + return "BIGINT"; + case Types.DECIMAL: + return "DECIMAL"; + case Types.BOOLEAN: + return "BOOLEAN"; + case Types.TIMESTAMP: + return "TIMESTAMP"; + default: + return "VARCHAR"; + } + } + + @Override + public boolean isReadOnly(int column) throws SQLException { + checkColumnIndex(column); + return true; // SeaweedFS is read-only + } + + @Override + public boolean isWritable(int column) throws SQLException { + checkColumnIndex(column); + return false; // SeaweedFS is read-only + } + + @Override + public boolean isDefinitelyWritable(int column) throws SQLException { + checkColumnIndex(column); + return false; // SeaweedFS is read-only + } + + @Override + public String getColumnClassName(int column) throws SQLException { + int sqlType = getColumnType(column); + switch (sqlType) { + case Types.VARCHAR: + return "java.lang.String"; + case Types.BIGINT: + return "java.lang.Long"; + case Types.DECIMAL: + return "java.math.BigDecimal"; + case Types.BOOLEAN: + return "java.lang.Boolean"; + case Types.TIMESTAMP: + return "java.sql.Timestamp"; + default: + return "java.lang.String"; + } + } + + @Override + public T unwrap(Class iface) throws SQLException { + if (iface.isAssignableFrom(getClass())) { + return iface.cast(this); + } + throw new SQLException("Cannot unwrap to " + iface.getName()); + } + + @Override + public boolean isWrapperFor(Class iface) throws SQLException { + return iface.isAssignableFrom(getClass()); + } + + private void checkColumnIndex(int column) throws SQLException { + if (column < 1 || column > columnNames.size()) { + throw new SQLException("Column index " + column + " is out of range (1-" + columnNames.size() + ")"); + } + } +} diff --git a/jdbc-driver/src/main/java/com/seaweedfs/jdbc/SeaweedFSStatement.java b/jdbc-driver/src/main/java/com/seaweedfs/jdbc/SeaweedFSStatement.java new file mode 100644 index 000000000..f21c40069 --- /dev/null +++ b/jdbc-driver/src/main/java/com/seaweedfs/jdbc/SeaweedFSStatement.java @@ -0,0 +1,389 @@ +package com.seaweedfs.jdbc; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.*; +import java.util.ArrayList; +import java.util.List; + +/** + * JDBC Statement implementation for SeaweedFS + */ +public class SeaweedFSStatement implements Statement { + + private static final Logger logger = LoggerFactory.getLogger(SeaweedFSStatement.class); + + protected final SeaweedFSConnection connection; + private boolean closed = false; + private ResultSet currentResultSet = null; + private int updateCount = -1; + private int maxRows = 0; + private int queryTimeout = 0; + private int fetchSize = 1000; + private List batch = new ArrayList<>(); + + public SeaweedFSStatement(SeaweedFSConnection connection) { + this.connection = connection; + } + + @Override + public ResultSet executeQuery(String sql) throws SQLException { + checkClosed(); + logger.debug("Executing query: {}", sql); + + try { + // Send query to server + connection.sendMessage((byte)0x03, sql.getBytes()); // JDBC_MSG_EXECUTE_QUERY + + // Read response + SeaweedFSConnection.Response response = connection.readResponse(); + + if (response.type == (byte)0x01) { // JDBC_RESP_ERROR + throw new SQLException("Query failed: " + new String(response.data)); + } else if (response.type == (byte)0x02) { // JDBC_RESP_RESULT_SET + // Parse result set data + currentResultSet = new SeaweedFSResultSet(this, response.data); + updateCount = -1; + return currentResultSet; + } else { + throw new SQLException("Unexpected response type: " + response.type); + } + + } catch (Exception e) { + throw new SQLException("Failed to execute query: " + e.getMessage(), e); + } + } + + @Override + public int executeUpdate(String sql) throws SQLException { + checkClosed(); + logger.debug("Executing update: {}", sql); + + try { + // Send update to server + connection.sendMessage((byte)0x04, sql.getBytes()); // JDBC_MSG_EXECUTE_UPDATE + + // Read response + SeaweedFSConnection.Response response = connection.readResponse(); + + if (response.type == (byte)0x01) { // JDBC_RESP_ERROR + throw new SQLException("Update failed: " + new String(response.data)); + } else if (response.type == (byte)0x03) { // JDBC_RESP_UPDATE_COUNT + // Parse update count + updateCount = parseUpdateCount(response.data); + currentResultSet = null; + return updateCount; + } else { + throw new SQLException("Unexpected response type: " + response.type); + } + + } catch (Exception e) { + throw new SQLException("Failed to execute update: " + e.getMessage(), e); + } + } + + @Override + public void close() throws SQLException { + if (!closed) { + if (currentResultSet != null) { + currentResultSet.close(); + currentResultSet = null; + } + closed = true; + logger.debug("Statement closed"); + } + } + + @Override + public int getMaxFieldSize() throws SQLException { + checkClosed(); + return 0; // No limit + } + + @Override + public void setMaxFieldSize(int max) throws SQLException { + checkClosed(); + // No-op + } + + @Override + public int getMaxRows() throws SQLException { + checkClosed(); + return maxRows; + } + + @Override + public void setMaxRows(int max) throws SQLException { + checkClosed(); + this.maxRows = max; + } + + @Override + public void setEscapeProcessing(boolean enable) throws SQLException { + checkClosed(); + // No-op + } + + @Override + public int getQueryTimeout() throws SQLException { + checkClosed(); + return queryTimeout; + } + + @Override + public void setQueryTimeout(int seconds) throws SQLException { + checkClosed(); + this.queryTimeout = seconds; + } + + @Override + public void cancel() throws SQLException { + checkClosed(); + // No-op - cancellation not supported + } + + @Override + public SQLWarning getWarnings() throws SQLException { + checkClosed(); + return null; + } + + @Override + public void clearWarnings() throws SQLException { + checkClosed(); + // No-op + } + + @Override + public void setCursorName(String name) throws SQLException { + checkClosed(); + // No-op - cursors not supported + } + + @Override + public boolean execute(String sql) throws SQLException { + checkClosed(); + logger.debug("Executing: {}", sql); + + // Determine if this is likely a query or update + String trimmedSql = sql.trim().toUpperCase(); + if (trimmedSql.startsWith("SELECT") || + trimmedSql.startsWith("SHOW") || + trimmedSql.startsWith("DESCRIBE") || + trimmedSql.startsWith("DESC") || + trimmedSql.startsWith("EXPLAIN")) { + // It's a query + executeQuery(sql); + return true; + } else { + // It's an update + executeUpdate(sql); + return false; + } + } + + @Override + public ResultSet getResultSet() throws SQLException { + checkClosed(); + return currentResultSet; + } + + @Override + public int getUpdateCount() throws SQLException { + checkClosed(); + return updateCount; + } + + @Override + public boolean getMoreResults() throws SQLException { + checkClosed(); + if (currentResultSet != null) { + currentResultSet.close(); + currentResultSet = null; + } + updateCount = -1; + return false; // No more results + } + + @Override + public void setFetchDirection(int direction) throws SQLException { + checkClosed(); + if (direction != ResultSet.FETCH_FORWARD) { + throw new SQLException("Only FETCH_FORWARD is supported"); + } + } + + @Override + public int getFetchDirection() throws SQLException { + checkClosed(); + return ResultSet.FETCH_FORWARD; + } + + @Override + public void setFetchSize(int rows) throws SQLException { + checkClosed(); + this.fetchSize = rows; + } + + @Override + public int getFetchSize() throws SQLException { + checkClosed(); + return fetchSize; + } + + @Override + public int getResultSetConcurrency() throws SQLException { + checkClosed(); + return ResultSet.CONCUR_READ_ONLY; + } + + @Override + public int getResultSetType() throws SQLException { + checkClosed(); + return ResultSet.TYPE_FORWARD_ONLY; + } + + @Override + public void addBatch(String sql) throws SQLException { + checkClosed(); + batch.add(sql); + } + + @Override + public void clearBatch() throws SQLException { + checkClosed(); + batch.clear(); + } + + @Override + public int[] executeBatch() throws SQLException { + checkClosed(); + int[] results = new int[batch.size()]; + + for (int i = 0; i < batch.size(); i++) { + try { + results[i] = executeUpdate(batch.get(i)); + } catch (SQLException e) { + results[i] = EXECUTE_FAILED; + } + } + + batch.clear(); + return results; + } + + @Override + public Connection getConnection() throws SQLException { + checkClosed(); + return connection; + } + + @Override + public boolean getMoreResults(int current) throws SQLException { + checkClosed(); + return getMoreResults(); + } + + @Override + public ResultSet getGeneratedKeys() throws SQLException { + throw new SQLFeatureNotSupportedException("Generated keys are not supported"); + } + + @Override + public int executeUpdate(String sql, int autoGeneratedKeys) throws SQLException { + return executeUpdate(sql); + } + + @Override + public int executeUpdate(String sql, int[] columnIndexes) throws SQLException { + return executeUpdate(sql); + } + + @Override + public int executeUpdate(String sql, String[] columnNames) throws SQLException { + return executeUpdate(sql); + } + + @Override + public boolean execute(String sql, int autoGeneratedKeys) throws SQLException { + return execute(sql); + } + + @Override + public boolean execute(String sql, int[] columnIndexes) throws SQLException { + return execute(sql); + } + + @Override + public boolean execute(String sql, String[] columnNames) throws SQLException { + return execute(sql); + } + + @Override + public int getResultSetHoldability() throws SQLException { + checkClosed(); + return ResultSet.CLOSE_CURSORS_AT_COMMIT; + } + + @Override + public boolean isClosed() throws SQLException { + return closed; + } + + @Override + public void setPoolable(boolean poolable) throws SQLException { + checkClosed(); + // No-op + } + + @Override + public boolean isPoolable() throws SQLException { + checkClosed(); + return false; + } + + @Override + public void closeOnCompletion() throws SQLException { + checkClosed(); + // No-op + } + + @Override + public boolean isCloseOnCompletion() throws SQLException { + checkClosed(); + return false; + } + + @Override + public T unwrap(Class iface) throws SQLException { + if (iface.isAssignableFrom(getClass())) { + return iface.cast(this); + } + throw new SQLException("Cannot unwrap to " + iface.getName()); + } + + @Override + public boolean isWrapperFor(Class iface) throws SQLException { + return iface.isAssignableFrom(getClass()); + } + + protected void checkClosed() throws SQLException { + if (closed) { + throw new SQLException("Statement is closed"); + } + if (connection.isClosed()) { + throw new SQLException("Connection is closed"); + } + } + + private int parseUpdateCount(byte[] data) { + if (data.length >= 4) { + return ((data[0] & 0xFF) << 24) | + ((data[1] & 0xFF) << 16) | + ((data[2] & 0xFF) << 8) | + (data[3] & 0xFF); + } + return 0; + } +} diff --git a/jdbc-driver/src/main/resources/META-INF/services/java.sql.Driver b/jdbc-driver/src/main/resources/META-INF/services/java.sql.Driver new file mode 100644 index 000000000..24c5d53ef --- /dev/null +++ b/jdbc-driver/src/main/resources/META-INF/services/java.sql.Driver @@ -0,0 +1 @@ +com.seaweedfs.jdbc.SeaweedFSDriver diff --git a/jdbc-driver/src/test/java/com/seaweedfs/jdbc/SeaweedFSDriverTest.java b/jdbc-driver/src/test/java/com/seaweedfs/jdbc/SeaweedFSDriverTest.java new file mode 100644 index 000000000..874323e77 --- /dev/null +++ b/jdbc-driver/src/test/java/com/seaweedfs/jdbc/SeaweedFSDriverTest.java @@ -0,0 +1,75 @@ +package com.seaweedfs.jdbc; + +import org.junit.jupiter.api.Test; +import static org.junit.jupiter.api.Assertions.*; + +import java.sql.DriverManager; +import java.sql.SQLException; + +/** + * Basic tests for SeaweedFS JDBC driver + */ +public class SeaweedFSDriverTest { + + @Test + public void testDriverRegistration() { + // Driver should be automatically registered via META-INF/services + assertDoesNotThrow(() -> { + Class.forName("com.seaweedfs.jdbc.SeaweedFSDriver"); + }); + } + + @Test + public void testURLAcceptance() throws SQLException { + SeaweedFSDriver driver = new SeaweedFSDriver(); + + // Valid URLs + assertTrue(driver.acceptsURL("jdbc:seaweedfs://localhost:8089/default")); + assertTrue(driver.acceptsURL("jdbc:seaweedfs://server:9000/test")); + assertTrue(driver.acceptsURL("jdbc:seaweedfs://192.168.1.100:8089/mydb")); + + // Invalid URLs + assertFalse(driver.acceptsURL("jdbc:mysql://localhost:3306/test")); + assertFalse(driver.acceptsURL("jdbc:postgresql://localhost:5432/test")); + assertFalse(driver.acceptsURL(null)); + assertFalse(driver.acceptsURL("")); + assertFalse(driver.acceptsURL("not-a-url")); + } + + @Test + public void testDriverInfo() { + SeaweedFSDriver driver = new SeaweedFSDriver(); + + assertEquals(SeaweedFSDriver.DRIVER_MAJOR_VERSION, driver.getMajorVersion()); + assertEquals(SeaweedFSDriver.DRIVER_MINOR_VERSION, driver.getMinorVersion()); + assertFalse(driver.jdbcCompliant()); // We're not fully JDBC compliant + + assertNotNull(SeaweedFSDriver.getDriverInfo()); + assertTrue(SeaweedFSDriver.getDriverInfo().contains("SeaweedFS")); + assertTrue(SeaweedFSDriver.getDriverInfo().contains("JDBC")); + } + + @Test + public void testPropertyInfo() throws SQLException { + SeaweedFSDriver driver = new SeaweedFSDriver(); + + var properties = driver.getPropertyInfo("jdbc:seaweedfs://localhost:8089/default", null); + assertNotNull(properties); + assertTrue(properties.length > 0); + + // Check that basic properties are present + boolean foundHost = false, foundPort = false, foundDatabase = false; + for (var prop : properties) { + if ("host".equals(prop.name)) foundHost = true; + if ("port".equals(prop.name)) foundPort = true; + if ("database".equals(prop.name)) foundDatabase = true; + } + + assertTrue(foundHost, "Host property should be present"); + assertTrue(foundPort, "Port property should be present"); + assertTrue(foundDatabase, "Database property should be present"); + } + + // Note: Connection tests would require a running SeaweedFS JDBC server + // These tests would be part of integration tests, not unit tests +} diff --git a/weed/command/command.go b/weed/command/command.go index 5da7fdc73..f894e7dce 100644 --- a/weed/command/command.go +++ b/weed/command/command.go @@ -30,6 +30,7 @@ var Commands = []*Command{ cmdFix, cmdFuse, cmdIam, + cmdJdbc, cmdMaster, cmdMasterFollower, cmdMount, diff --git a/weed/command/jdbc.go b/weed/command/jdbc.go new file mode 100644 index 000000000..2f05e0169 --- /dev/null +++ b/weed/command/jdbc.go @@ -0,0 +1,141 @@ +package command + +import ( + "context" + "fmt" + "os" + "os/signal" + "syscall" + "time" + + weed_server "github.com/seaweedfs/seaweedfs/weed/server" + "github.com/seaweedfs/seaweedfs/weed/util" +) + +var ( + jdbcOptions JdbcOptions +) + +type JdbcOptions struct { + host *string + port *int + masterAddr *string +} + +func init() { + cmdJdbc.Run = runJdbc // break init cycle + jdbcOptions.host = cmdJdbc.Flag.String("host", "localhost", "JDBC server host") + jdbcOptions.port = cmdJdbc.Flag.Int("port", 8089, "JDBC server port") + jdbcOptions.masterAddr = cmdJdbc.Flag.String("master", "localhost:9333", "SeaweedFS master server address") +} + +var cmdJdbc = &Command{ + UsageLine: "jdbc -port=8089 -master=", + Short: "start a JDBC server for SQL queries", + Long: `Start a JDBC server that provides SQL query access to SeaweedFS. + +This JDBC server allows standard JDBC clients and tools to connect to SeaweedFS +and execute SQL queries against MQ topics. It implements a subset of the JDBC +protocol for compatibility with most database tools and applications. + +Examples: + + # Start JDBC server on default port 8089 + weed jdbc + + # Start on custom port with specific master + weed jdbc -port=8090 -master=master1:9333 + + # Allow connections from any host + weed jdbc -host=0.0.0.0 -port=8089 + +Clients can then connect using JDBC URL: + jdbc:seaweedfs://hostname:port/database + +Supported SQL operations: + - SELECT queries on MQ topics + - DESCRIBE/DESC commands + - SHOW DATABASES/TABLES commands + - Aggregation functions (COUNT, SUM, AVG, MIN, MAX) + - WHERE clauses with filtering + - System columns (_timestamp_ns, _key, _source) + +Compatible with: + - Standard JDBC tools (DBeaver, IntelliJ DataGrip, etc.) + - Business Intelligence tools (Tableau, Power BI, etc.) + - Java applications using JDBC drivers + - SQL reporting tools + +`, +} + +func runJdbc(cmd *Command, args []string) bool { + + util.LoadConfiguration("security", false) + + // Validate options + if *jdbcOptions.masterAddr == "" { + fmt.Fprintf(os.Stderr, "Error: master address is required\n") + return false + } + + // Create JDBC server + jdbcServer, err := weed_server.NewJDBCServer(*jdbcOptions.host, *jdbcOptions.port, *jdbcOptions.masterAddr) + if err != nil { + fmt.Fprintf(os.Stderr, "Error creating JDBC server: %v\n", err) + return false + } + + // Start the server + fmt.Printf("Starting SeaweedFS JDBC Server...\n") + fmt.Printf("Host: %s\n", *jdbcOptions.host) + fmt.Printf("Port: %d\n", *jdbcOptions.port) + fmt.Printf("Master: %s\n", *jdbcOptions.masterAddr) + fmt.Printf("\nJDBC URL: jdbc:seaweedfs://%s:%d/default\n", *jdbcOptions.host, *jdbcOptions.port) + fmt.Printf("\nSupported operations:\n") + fmt.Printf(" - SELECT queries on MQ topics\n") + fmt.Printf(" - DESCRIBE/DESC table_name\n") + fmt.Printf(" - SHOW DATABASES\n") + fmt.Printf(" - SHOW TABLES\n") + fmt.Printf(" - Aggregations: COUNT, SUM, AVG, MIN, MAX\n") + fmt.Printf(" - System columns: _timestamp_ns, _key, _source\n") + fmt.Printf("\nReady for JDBC connections!\n\n") + + err = jdbcServer.Start() + if err != nil { + fmt.Fprintf(os.Stderr, "Error starting JDBC server: %v\n", err) + return false + } + + // Set up signal handling for graceful shutdown + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) + + // Wait for shutdown signal + <-sigChan + fmt.Printf("\nReceived shutdown signal, stopping JDBC server...\n") + + // Create context with timeout for graceful shutdown + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + // Stop the server with timeout + done := make(chan error, 1) + go func() { + done <- jdbcServer.Stop() + }() + + select { + case err := <-done: + if err != nil { + fmt.Fprintf(os.Stderr, "Error stopping JDBC server: %v\n", err) + return false + } + fmt.Printf("JDBC server stopped successfully\n") + case <-ctx.Done(): + fmt.Fprintf(os.Stderr, "Timeout waiting for JDBC server to stop\n") + return false + } + + return true +} diff --git a/weed/server/jdbc_server.go b/weed/server/jdbc_server.go new file mode 100644 index 000000000..9be59ae42 --- /dev/null +++ b/weed/server/jdbc_server.go @@ -0,0 +1,524 @@ +package weed_server + +import ( + "bufio" + "context" + "encoding/binary" + "fmt" + "io" + "net" + "sync" + "time" + + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/query/engine" + "github.com/seaweedfs/seaweedfs/weed/query/sqltypes" +) + +// JDBCServer provides JDBC-compatible access to SeaweedFS SQL engine +type JDBCServer struct { + host string + port int + masterAddr string + listener net.Listener + sqlEngine *engine.SQLEngine + connections map[net.Conn]*JDBCConnection + connMutex sync.RWMutex + shutdown chan struct{} + wg sync.WaitGroup +} + +// JDBCConnection represents a single JDBC client connection +type JDBCConnection struct { + conn net.Conn + reader *bufio.Reader + writer *bufio.Writer + database string + autoCommit bool + connectionID uint32 + closed bool + mutex sync.Mutex +} + +// JDBC Protocol Constants +const ( + // Message Types + JDBC_MSG_CONNECT = 0x01 + JDBC_MSG_DISCONNECT = 0x02 + JDBC_MSG_EXECUTE_QUERY = 0x03 + JDBC_MSG_EXECUTE_UPDATE = 0x04 + JDBC_MSG_PREPARE = 0x05 + JDBC_MSG_EXECUTE_PREP = 0x06 + JDBC_MSG_GET_METADATA = 0x07 + JDBC_MSG_SET_AUTOCOMMIT = 0x08 + JDBC_MSG_COMMIT = 0x09 + JDBC_MSG_ROLLBACK = 0x0A + + // Response Types + JDBC_RESP_OK = 0x00 + JDBC_RESP_ERROR = 0x01 + JDBC_RESP_RESULT_SET = 0x02 + JDBC_RESP_UPDATE_COUNT = 0x03 + JDBC_RESP_METADATA = 0x04 + + // Default values + DEFAULT_JDBC_PORT = 8089 +) + +// NewJDBCServer creates a new JDBC server instance +func NewJDBCServer(host string, port int, masterAddr string) (*JDBCServer, error) { + if port <= 0 { + port = DEFAULT_JDBC_PORT + } + if host == "" { + host = "localhost" + } + + // Create SQL engine + sqlEngine := engine.NewSQLEngine(masterAddr) + + server := &JDBCServer{ + host: host, + port: port, + masterAddr: masterAddr, + sqlEngine: sqlEngine, + connections: make(map[net.Conn]*JDBCConnection), + shutdown: make(chan struct{}), + } + + return server, nil +} + +// Start begins listening for JDBC connections +func (s *JDBCServer) Start() error { + addr := fmt.Sprintf("%s:%d", s.host, s.port) + listener, err := net.Listen("tcp", addr) + if err != nil { + return fmt.Errorf("failed to start JDBC server on %s: %v", addr, err) + } + + s.listener = listener + glog.Infof("JDBC Server listening on %s", addr) + + s.wg.Add(1) + go s.acceptConnections() + + return nil +} + +// Stop gracefully shuts down the JDBC server +func (s *JDBCServer) Stop() error { + close(s.shutdown) + + if s.listener != nil { + s.listener.Close() + } + + // Close all connections + s.connMutex.Lock() + for conn, jdbcConn := range s.connections { + jdbcConn.close() + conn.Close() + } + s.connections = make(map[net.Conn]*JDBCConnection) + s.connMutex.Unlock() + + s.wg.Wait() + glog.Infof("JDBC Server stopped") + return nil +} + +// acceptConnections handles incoming JDBC connections +func (s *JDBCServer) acceptConnections() { + defer s.wg.Done() + + for { + select { + case <-s.shutdown: + return + default: + } + + conn, err := s.listener.Accept() + if err != nil { + select { + case <-s.shutdown: + return + default: + glog.Errorf("Failed to accept JDBC connection: %v", err) + continue + } + } + + s.wg.Add(1) + go s.handleConnection(conn) + } +} + +// handleConnection processes a single JDBC connection +func (s *JDBCServer) handleConnection(conn net.Conn) { + defer s.wg.Done() + defer conn.Close() + + // Create JDBC connection wrapper + jdbcConn := &JDBCConnection{ + conn: conn, + reader: bufio.NewReader(conn), + writer: bufio.NewWriter(conn), + database: "default", + autoCommit: true, + connectionID: s.generateConnectionID(), + } + + // Register connection + s.connMutex.Lock() + s.connections[conn] = jdbcConn + s.connMutex.Unlock() + + // Clean up on exit + defer func() { + s.connMutex.Lock() + delete(s.connections, conn) + s.connMutex.Unlock() + }() + + glog.Infof("New JDBC connection from %s (ID: %d)", conn.RemoteAddr(), jdbcConn.connectionID) + + // Handle connection messages + for { + select { + case <-s.shutdown: + return + default: + } + + // Set read timeout + conn.SetReadDeadline(time.Now().Add(30 * time.Second)) + + err := s.handleMessage(jdbcConn) + if err != nil { + if err == io.EOF { + glog.Infof("JDBC client disconnected (ID: %d)", jdbcConn.connectionID) + } else { + glog.Errorf("Error handling JDBC message (ID: %d): %v", jdbcConn.connectionID, err) + } + return + } + } +} + +// handleMessage processes a single JDBC protocol message +func (s *JDBCServer) handleMessage(conn *JDBCConnection) error { + // Read message header (message type + length) + header := make([]byte, 5) + _, err := io.ReadFull(conn.reader, header) + if err != nil { + return err + } + + msgType := header[0] + msgLength := binary.BigEndian.Uint32(header[1:5]) + + // Read message body + msgBody := make([]byte, msgLength) + if msgLength > 0 { + _, err = io.ReadFull(conn.reader, msgBody) + if err != nil { + return err + } + } + + // Process message based on type + switch msgType { + case JDBC_MSG_CONNECT: + return s.handleConnect(conn, msgBody) + case JDBC_MSG_DISCONNECT: + return s.handleDisconnect(conn) + case JDBC_MSG_EXECUTE_QUERY: + return s.handleExecuteQuery(conn, msgBody) + case JDBC_MSG_EXECUTE_UPDATE: + return s.handleExecuteUpdate(conn, msgBody) + case JDBC_MSG_GET_METADATA: + return s.handleGetMetadata(conn, msgBody) + case JDBC_MSG_SET_AUTOCOMMIT: + return s.handleSetAutoCommit(conn, msgBody) + case JDBC_MSG_COMMIT: + return s.handleCommit(conn) + case JDBC_MSG_ROLLBACK: + return s.handleRollback(conn) + default: + return s.sendError(conn, fmt.Errorf("unknown message type: %d", msgType)) + } +} + +// handleConnect processes JDBC connection request +func (s *JDBCServer) handleConnect(conn *JDBCConnection, msgBody []byte) error { + // Parse connection string (database name) + if len(msgBody) > 0 { + conn.database = string(msgBody) + } + + glog.Infof("JDBC client connected to database: %s (ID: %d)", conn.database, conn.connectionID) + + // Send OK response + return s.sendOK(conn, "Connected successfully") +} + +// handleDisconnect processes JDBC disconnect request +func (s *JDBCServer) handleDisconnect(conn *JDBCConnection) error { + glog.Infof("JDBC client disconnecting (ID: %d)", conn.connectionID) + conn.close() + return io.EOF // This will cause the connection handler to exit +} + +// handleExecuteQuery processes SQL SELECT queries +func (s *JDBCServer) handleExecuteQuery(conn *JDBCConnection, msgBody []byte) error { + sql := string(msgBody) + + glog.V(2).Infof("Executing query (ID: %d): %s", conn.connectionID, sql) + + // Execute SQL using the query engine + ctx := context.Background() + result, err := s.sqlEngine.ExecuteSQL(ctx, sql) + if err != nil { + return s.sendError(conn, err) + } + + if result.Error != nil { + return s.sendError(conn, result.Error) + } + + // Send result set + return s.sendResultSet(conn, result) +} + +// handleExecuteUpdate processes SQL UPDATE/INSERT/DELETE queries +func (s *JDBCServer) handleExecuteUpdate(conn *JDBCConnection, msgBody []byte) error { + sql := string(msgBody) + + glog.V(2).Infof("Executing update (ID: %d): %s", conn.connectionID, sql) + + // For now, treat updates same as queries since SeaweedFS SQL is read-only + ctx := context.Background() + result, err := s.sqlEngine.ExecuteSQL(ctx, sql) + if err != nil { + return s.sendError(conn, err) + } + + if result.Error != nil { + return s.sendError(conn, result.Error) + } + + // Send update count (0 for read-only operations) + return s.sendUpdateCount(conn, 0) +} + +// handleGetMetadata processes JDBC metadata requests +func (s *JDBCServer) handleGetMetadata(conn *JDBCConnection, msgBody []byte) error { + metadataType := string(msgBody) + + glog.V(2).Infof("Getting metadata (ID: %d): %s", conn.connectionID, metadataType) + + switch metadataType { + case "tables": + return s.sendTablesMetadata(conn) + case "databases", "schemas": + return s.sendDatabasesMetadata(conn) + default: + return s.sendError(conn, fmt.Errorf("unsupported metadata type: %s", metadataType)) + } +} + +// handleSetAutoCommit processes autocommit setting +func (s *JDBCServer) handleSetAutoCommit(conn *JDBCConnection, msgBody []byte) error { + autoCommit := len(msgBody) > 0 && msgBody[0] == 1 + conn.autoCommit = autoCommit + + glog.V(2).Infof("Setting autocommit (ID: %d): %v", conn.connectionID, autoCommit) + + return s.sendOK(conn, fmt.Sprintf("AutoCommit set to %v", autoCommit)) +} + +// handleCommit processes transaction commit (no-op for read-only) +func (s *JDBCServer) handleCommit(conn *JDBCConnection) error { + glog.V(2).Infof("Commit (ID: %d): no-op for read-only", conn.connectionID) + return s.sendOK(conn, "Commit successful") +} + +// handleRollback processes transaction rollback (no-op for read-only) +func (s *JDBCServer) handleRollback(conn *JDBCConnection) error { + glog.V(2).Infof("Rollback (ID: %d): no-op for read-only", conn.connectionID) + return s.sendOK(conn, "Rollback successful") +} + +// sendOK sends a success response +func (s *JDBCServer) sendOK(conn *JDBCConnection, message string) error { + return s.sendResponse(conn, JDBC_RESP_OK, []byte(message)) +} + +// sendError sends an error response +func (s *JDBCServer) sendError(conn *JDBCConnection, err error) error { + return s.sendResponse(conn, JDBC_RESP_ERROR, []byte(err.Error())) +} + +// sendResultSet sends query results +func (s *JDBCServer) sendResultSet(conn *JDBCConnection, result *engine.QueryResult) error { + // Serialize result set + data := s.serializeResultSet(result) + return s.sendResponse(conn, JDBC_RESP_RESULT_SET, data) +} + +// sendUpdateCount sends update operation result +func (s *JDBCServer) sendUpdateCount(conn *JDBCConnection, count int) error { + data := make([]byte, 4) + binary.BigEndian.PutUint32(data, uint32(count)) + return s.sendResponse(conn, JDBC_RESP_UPDATE_COUNT, data) +} + +// sendTablesMetadata sends table metadata +func (s *JDBCServer) sendTablesMetadata(conn *JDBCConnection) error { + // For now, return empty metadata - this would need to query the schema catalog + data := s.serializeTablesMetadata([]string{}) + return s.sendResponse(conn, JDBC_RESP_METADATA, data) +} + +// sendDatabasesMetadata sends database/schema metadata +func (s *JDBCServer) sendDatabasesMetadata(conn *JDBCConnection) error { + // Return default databases + databases := []string{"default", "test"} + data := s.serializeDatabasesMetadata(databases) + return s.sendResponse(conn, JDBC_RESP_METADATA, data) +} + +// sendResponse sends a response with the given type and data +func (s *JDBCServer) sendResponse(conn *JDBCConnection, responseType byte, data []byte) error { + conn.mutex.Lock() + defer conn.mutex.Unlock() + + // Write response header + header := make([]byte, 5) + header[0] = responseType + binary.BigEndian.PutUint32(header[1:5], uint32(len(data))) + + _, err := conn.writer.Write(header) + if err != nil { + return err + } + + // Write response data + if len(data) > 0 { + _, err = conn.writer.Write(data) + if err != nil { + return err + } + } + + return conn.writer.Flush() +} + +// serializeResultSet converts QueryResult to JDBC wire format +func (s *JDBCServer) serializeResultSet(result *engine.QueryResult) []byte { + var data []byte + + // Column count + colCount := make([]byte, 4) + binary.BigEndian.PutUint32(colCount, uint32(len(result.Columns))) + data = append(data, colCount...) + + // Column names + for _, col := range result.Columns { + colName := []byte(col) + colLen := make([]byte, 4) + binary.BigEndian.PutUint32(colLen, uint32(len(colName))) + data = append(data, colLen...) + data = append(data, colName...) + } + + // Row count + rowCount := make([]byte, 4) + binary.BigEndian.PutUint32(rowCount, uint32(len(result.Rows))) + data = append(data, rowCount...) + + // Rows + for _, row := range result.Rows { + for _, value := range row { + // Convert value to string and serialize + valueStr := s.valueToString(value) + valueBytes := []byte(valueStr) + valueLen := make([]byte, 4) + binary.BigEndian.PutUint32(valueLen, uint32(len(valueBytes))) + data = append(data, valueLen...) + data = append(data, valueBytes...) + } + } + + return data +} + +// serializeTablesMetadata converts table list to wire format +func (s *JDBCServer) serializeTablesMetadata(tables []string) []byte { + var data []byte + + // Table count + tableCount := make([]byte, 4) + binary.BigEndian.PutUint32(tableCount, uint32(len(tables))) + data = append(data, tableCount...) + + // Table names + for _, table := range tables { + tableBytes := []byte(table) + tableLen := make([]byte, 4) + binary.BigEndian.PutUint32(tableLen, uint32(len(tableBytes))) + data = append(data, tableLen...) + data = append(data, tableBytes...) + } + + return data +} + +// serializeDatabasesMetadata converts database list to wire format +func (s *JDBCServer) serializeDatabasesMetadata(databases []string) []byte { + var data []byte + + // Database count + dbCount := make([]byte, 4) + binary.BigEndian.PutUint32(dbCount, uint32(len(databases))) + data = append(data, dbCount...) + + // Database names + for _, db := range databases { + dbBytes := []byte(db) + dbLen := make([]byte, 4) + binary.BigEndian.PutUint32(dbLen, uint32(len(dbBytes))) + data = append(data, dbLen...) + data = append(data, dbBytes...) + } + + return data +} + +// valueToString converts a sqltypes.Value to string representation +func (s *JDBCServer) valueToString(value sqltypes.Value) string { + if value.IsNull() { + return "" + } + + return value.ToString() +} + +// generateConnectionID generates a unique connection ID +func (s *JDBCServer) generateConnectionID() uint32 { + return uint32(time.Now().UnixNano() % 1000000) +} + +// close marks the connection as closed +func (c *JDBCConnection) close() { + c.mutex.Lock() + defer c.mutex.Unlock() + c.closed = true +} + +// GetAddress returns the server address +func (s *JDBCServer) GetAddress() string { + return fmt.Sprintf("%s:%d", s.host, s.port) +}