row : rows) {
+ for (String value : row) {
+ if (value != null) {
+ byte[] valueBytes = value.getBytes();
+ dos.writeInt(valueBytes.length);
+ dos.write(valueBytes);
+ } else {
+ dos.writeInt(0); // null value
+ }
+ }
+ }
+
+ dos.flush();
+ return baos.toByteArray();
+
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to serialize result set data", e);
+ }
+ }
+}
diff --git a/jdbc-driver/src/main/java/com/seaweedfs/jdbc/SeaweedFSDriver.java b/jdbc-driver/src/main/java/com/seaweedfs/jdbc/SeaweedFSDriver.java
new file mode 100644
index 000000000..c15ae92f1
--- /dev/null
+++ b/jdbc-driver/src/main/java/com/seaweedfs/jdbc/SeaweedFSDriver.java
@@ -0,0 +1,207 @@
+package com.seaweedfs.jdbc;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.*;
+import java.util.Properties;
+
+/**
+ * SeaweedFS JDBC Driver
+ *
+ * Provides JDBC connectivity to SeaweedFS SQL engine for querying MQ topics.
+ *
+ * JDBC URL format: jdbc:seaweedfs://host:port/database
+ *
+ * Example usage:
+ *
+ * Class.forName("com.seaweedfs.jdbc.SeaweedFSDriver");
+ * Connection conn = DriverManager.getConnection("jdbc:seaweedfs://localhost:8089/default");
+ * Statement stmt = conn.createStatement();
+ * ResultSet rs = stmt.executeQuery("SELECT * FROM my_topic LIMIT 10");
+ *
+ */
+public class SeaweedFSDriver implements Driver {
+
+ private static final Logger logger = LoggerFactory.getLogger(SeaweedFSDriver.class);
+
+ // Driver information
+ public static final String DRIVER_NAME = "SeaweedFS JDBC Driver";
+ public static final String DRIVER_VERSION = "1.0.0";
+ public static final int DRIVER_MAJOR_VERSION = 1;
+ public static final int DRIVER_MINOR_VERSION = 0;
+
+ // URL prefix for SeaweedFS JDBC connections
+ public static final String URL_PREFIX = "jdbc:seaweedfs://";
+
+ // Default connection properties
+ public static final String PROP_HOST = "host";
+ public static final String PROP_PORT = "port";
+ public static final String PROP_DATABASE = "database";
+ public static final String PROP_USER = "user";
+ public static final String PROP_PASSWORD = "password";
+ public static final String PROP_CONNECT_TIMEOUT = "connectTimeout";
+ public static final String PROP_SOCKET_TIMEOUT = "socketTimeout";
+
+ static {
+ try {
+ // Register the driver with the DriverManager
+ DriverManager.registerDriver(new SeaweedFSDriver());
+ logger.info("SeaweedFS JDBC Driver {} registered successfully", DRIVER_VERSION);
+ } catch (SQLException e) {
+ logger.error("Failed to register SeaweedFS JDBC Driver", e);
+ throw new RuntimeException("Failed to register SeaweedFS JDBC Driver", e);
+ }
+ }
+
+ @Override
+ public Connection connect(String url, Properties info) throws SQLException {
+ if (!acceptsURL(url)) {
+ return null; // Not our URL, let another driver handle it
+ }
+
+ logger.debug("Attempting to connect to: {}", url);
+
+ try {
+ // Parse the URL to extract connection parameters
+ SeaweedFSConnectionInfo connectionInfo = parseURL(url, info);
+
+ // Create and return the connection
+ return new SeaweedFSConnection(connectionInfo);
+
+ } catch (Exception e) {
+ logger.error("Failed to connect to SeaweedFS: {}", e.getMessage(), e);
+ throw new SQLException("Failed to connect to SeaweedFS: " + e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public boolean acceptsURL(String url) throws SQLException {
+ return url != null && url.startsWith(URL_PREFIX);
+ }
+
+ @Override
+ public DriverPropertyInfo[] getPropertyInfo(String url, Properties info) throws SQLException {
+ return new DriverPropertyInfo[] {
+ createPropertyInfo(PROP_HOST, "localhost", "SeaweedFS JDBC server hostname", null, false),
+ createPropertyInfo(PROP_PORT, "8089", "SeaweedFS JDBC server port", null, false),
+ createPropertyInfo(PROP_DATABASE, "default", "Database/namespace name", null, false),
+ createPropertyInfo(PROP_USER, "", "Username (optional)", null, false),
+ createPropertyInfo(PROP_PASSWORD, "", "Password (optional)", null, false),
+ createPropertyInfo(PROP_CONNECT_TIMEOUT, "30000", "Connection timeout in milliseconds", null, false),
+ createPropertyInfo(PROP_SOCKET_TIMEOUT, "0", "Socket timeout in milliseconds (0 = infinite)", null, false)
+ };
+ }
+
+ private DriverPropertyInfo createPropertyInfo(String name, String defaultValue, String description, String[] choices, boolean required) {
+ DriverPropertyInfo info = new DriverPropertyInfo(name, defaultValue);
+ info.description = description;
+ info.choices = choices;
+ info.required = required;
+ return info;
+ }
+
+ @Override
+ public int getMajorVersion() {
+ return DRIVER_MAJOR_VERSION;
+ }
+
+ @Override
+ public int getMinorVersion() {
+ return DRIVER_MINOR_VERSION;
+ }
+
+ @Override
+ public boolean jdbcCompliant() {
+ // We implement a subset of JDBC, so we're not fully compliant
+ return false;
+ }
+
+ @Override
+ public java.util.logging.Logger getParentLogger() throws SQLFeatureNotSupportedException {
+ throw new SQLFeatureNotSupportedException("getParentLogger is not supported");
+ }
+
+ /**
+ * Parse JDBC URL and extract connection information
+ *
+ * Expected format: jdbc:seaweedfs://host:port/database[?property=value&...]
+ */
+ private SeaweedFSConnectionInfo parseURL(String url, Properties info) throws SQLException {
+ if (!acceptsURL(url)) {
+ throw new SQLException("Invalid SeaweedFS JDBC URL: " + url);
+ }
+
+ try {
+ // Remove the jdbc:seaweedfs:// prefix
+ String remaining = url.substring(URL_PREFIX.length());
+
+ // Split into host:port/database and query parameters
+ String[] parts = remaining.split("\\?", 2);
+ String hostPortDb = parts[0];
+ String queryParams = parts.length > 1 ? parts[1] : "";
+
+ // Parse host, port, and database
+ String host = "localhost";
+ int port = 8089;
+ String database = "default";
+
+ if (hostPortDb.contains("/")) {
+ String[] hostPortDbParts = hostPortDb.split("/", 2);
+ String hostPort = hostPortDbParts[0];
+ database = hostPortDbParts[1];
+
+ if (hostPort.contains(":")) {
+ String[] hostPortParts = hostPort.split(":", 2);
+ host = hostPortParts[0];
+ port = Integer.parseInt(hostPortParts[1]);
+ } else {
+ host = hostPort;
+ }
+ } else if (hostPortDb.contains(":")) {
+ String[] hostPortParts = hostPortDb.split(":", 2);
+ host = hostPortParts[0];
+ port = Integer.parseInt(hostPortParts[1]);
+ } else if (!hostPortDb.isEmpty()) {
+ host = hostPortDb;
+ }
+
+ // Create properties with defaults
+ Properties connectionProps = new Properties();
+ connectionProps.setProperty(PROP_HOST, host);
+ connectionProps.setProperty(PROP_PORT, String.valueOf(port));
+ connectionProps.setProperty(PROP_DATABASE, database);
+ connectionProps.setProperty(PROP_USER, "");
+ connectionProps.setProperty(PROP_PASSWORD, "");
+ connectionProps.setProperty(PROP_CONNECT_TIMEOUT, "30000");
+ connectionProps.setProperty(PROP_SOCKET_TIMEOUT, "0");
+
+ // Override with provided properties
+ if (info != null) {
+ connectionProps.putAll(info);
+ }
+
+ // Parse query parameters
+ if (!queryParams.isEmpty()) {
+ for (String param : queryParams.split("&")) {
+ String[] keyValue = param.split("=", 2);
+ if (keyValue.length == 2) {
+ connectionProps.setProperty(keyValue[0], keyValue[1]);
+ }
+ }
+ }
+
+ return new SeaweedFSConnectionInfo(connectionProps);
+
+ } catch (Exception e) {
+ throw new SQLException("Failed to parse SeaweedFS JDBC URL: " + url, e);
+ }
+ }
+
+ /**
+ * Get driver information string
+ */
+ public static String getDriverInfo() {
+ return String.format("%s v%s", DRIVER_NAME, DRIVER_VERSION);
+ }
+}
diff --git a/jdbc-driver/src/main/java/com/seaweedfs/jdbc/SeaweedFSPreparedStatement.java b/jdbc-driver/src/main/java/com/seaweedfs/jdbc/SeaweedFSPreparedStatement.java
new file mode 100644
index 000000000..cf766dd68
--- /dev/null
+++ b/jdbc-driver/src/main/java/com/seaweedfs/jdbc/SeaweedFSPreparedStatement.java
@@ -0,0 +1,352 @@
+package com.seaweedfs.jdbc;
+
+import java.io.InputStream;
+import java.io.Reader;
+import java.math.BigDecimal;
+import java.net.URL;
+import java.sql.*;
+import java.util.Calendar;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * PreparedStatement implementation for SeaweedFS JDBC
+ */
+public class SeaweedFSPreparedStatement extends SeaweedFSStatement implements PreparedStatement {
+
+ private final String originalSql;
+ private final Map parameters = new HashMap<>();
+
+ public SeaweedFSPreparedStatement(SeaweedFSConnection connection, String sql) {
+ super(connection);
+ this.originalSql = sql;
+ }
+
+ @Override
+ public ResultSet executeQuery() throws SQLException {
+ return executeQuery(buildSqlWithParameters());
+ }
+
+ @Override
+ public int executeUpdate() throws SQLException {
+ return executeUpdate(buildSqlWithParameters());
+ }
+
+ @Override
+ public void setNull(int parameterIndex, int sqlType) throws SQLException {
+ checkClosed();
+ parameters.put(parameterIndex, null);
+ }
+
+ @Override
+ public void setBoolean(int parameterIndex, boolean x) throws SQLException {
+ checkClosed();
+ parameters.put(parameterIndex, x);
+ }
+
+ @Override
+ public void setByte(int parameterIndex, byte x) throws SQLException {
+ checkClosed();
+ parameters.put(parameterIndex, x);
+ }
+
+ @Override
+ public void setShort(int parameterIndex, short x) throws SQLException {
+ checkClosed();
+ parameters.put(parameterIndex, x);
+ }
+
+ @Override
+ public void setInt(int parameterIndex, int x) throws SQLException {
+ checkClosed();
+ parameters.put(parameterIndex, x);
+ }
+
+ @Override
+ public void setLong(int parameterIndex, long x) throws SQLException {
+ checkClosed();
+ parameters.put(parameterIndex, x);
+ }
+
+ @Override
+ public void setFloat(int parameterIndex, float x) throws SQLException {
+ checkClosed();
+ parameters.put(parameterIndex, x);
+ }
+
+ @Override
+ public void setDouble(int parameterIndex, double x) throws SQLException {
+ checkClosed();
+ parameters.put(parameterIndex, x);
+ }
+
+ @Override
+ public void setBigDecimal(int parameterIndex, BigDecimal x) throws SQLException {
+ checkClosed();
+ parameters.put(parameterIndex, x);
+ }
+
+ @Override
+ public void setString(int parameterIndex, String x) throws SQLException {
+ checkClosed();
+ parameters.put(parameterIndex, x);
+ }
+
+ @Override
+ public void setBytes(int parameterIndex, byte[] x) throws SQLException {
+ checkClosed();
+ parameters.put(parameterIndex, x);
+ }
+
+ @Override
+ public void setDate(int parameterIndex, Date x) throws SQLException {
+ checkClosed();
+ parameters.put(parameterIndex, x);
+ }
+
+ @Override
+ public void setTime(int parameterIndex, Time x) throws SQLException {
+ checkClosed();
+ parameters.put(parameterIndex, x);
+ }
+
+ @Override
+ public void setTimestamp(int parameterIndex, Timestamp x) throws SQLException {
+ checkClosed();
+ parameters.put(parameterIndex, x);
+ }
+
+ @Override
+ public void setAsciiStream(int parameterIndex, InputStream x, int length) throws SQLException {
+ throw new SQLFeatureNotSupportedException("ASCII streams are not supported");
+ }
+
+ @Override
+ public void setUnicodeStream(int parameterIndex, InputStream x, int length) throws SQLException {
+ throw new SQLFeatureNotSupportedException("Unicode streams are not supported");
+ }
+
+ @Override
+ public void setBinaryStream(int parameterIndex, InputStream x, int length) throws SQLException {
+ throw new SQLFeatureNotSupportedException("Binary streams are not supported");
+ }
+
+ @Override
+ public void clearParameters() throws SQLException {
+ checkClosed();
+ parameters.clear();
+ }
+
+ @Override
+ public void setObject(int parameterIndex, Object x, int targetSqlType) throws SQLException {
+ setObject(parameterIndex, x);
+ }
+
+ @Override
+ public void setObject(int parameterIndex, Object x) throws SQLException {
+ checkClosed();
+ parameters.put(parameterIndex, x);
+ }
+
+ @Override
+ public boolean execute() throws SQLException {
+ return execute(buildSqlWithParameters());
+ }
+
+ @Override
+ public void addBatch() throws SQLException {
+ checkClosed();
+ addBatch(buildSqlWithParameters());
+ }
+
+ @Override
+ public void setCharacterStream(int parameterIndex, Reader reader, int length) throws SQLException {
+ throw new SQLFeatureNotSupportedException("Character streams are not supported");
+ }
+
+ @Override
+ public void setRef(int parameterIndex, Ref x) throws SQLException {
+ throw new SQLFeatureNotSupportedException("Ref objects are not supported");
+ }
+
+ @Override
+ public void setBlob(int parameterIndex, Blob x) throws SQLException {
+ throw new SQLFeatureNotSupportedException("Blob objects are not supported");
+ }
+
+ @Override
+ public void setClob(int parameterIndex, Clob x) throws SQLException {
+ throw new SQLFeatureNotSupportedException("Clob objects are not supported");
+ }
+
+ @Override
+ public void setArray(int parameterIndex, Array x) throws SQLException {
+ throw new SQLFeatureNotSupportedException("Array objects are not supported");
+ }
+
+ @Override
+ public ResultSetMetaData getMetaData() throws SQLException {
+ throw new SQLFeatureNotSupportedException("Prepared statement metadata is not supported");
+ }
+
+ @Override
+ public void setDate(int parameterIndex, Date x, Calendar cal) throws SQLException {
+ setDate(parameterIndex, x);
+ }
+
+ @Override
+ public void setTime(int parameterIndex, Time x, Calendar cal) throws SQLException {
+ setTime(parameterIndex, x);
+ }
+
+ @Override
+ public void setTimestamp(int parameterIndex, Timestamp x, Calendar cal) throws SQLException {
+ setTimestamp(parameterIndex, x);
+ }
+
+ @Override
+ public void setNull(int parameterIndex, int sqlType, String typeName) throws SQLException {
+ setNull(parameterIndex, sqlType);
+ }
+
+ @Override
+ public void setURL(int parameterIndex, URL x) throws SQLException {
+ checkClosed();
+ parameters.put(parameterIndex, x.toString());
+ }
+
+ @Override
+ public ParameterMetaData getParameterMetaData() throws SQLException {
+ throw new SQLFeatureNotSupportedException("Parameter metadata is not supported");
+ }
+
+ @Override
+ public void setRowId(int parameterIndex, RowId x) throws SQLException {
+ throw new SQLFeatureNotSupportedException("RowId objects are not supported");
+ }
+
+ @Override
+ public void setNString(int parameterIndex, String value) throws SQLException {
+ setString(parameterIndex, value);
+ }
+
+ @Override
+ public void setNCharacterStream(int parameterIndex, Reader value, long length) throws SQLException {
+ throw new SQLFeatureNotSupportedException("NCharacter streams are not supported");
+ }
+
+ @Override
+ public void setNClob(int parameterIndex, NClob value) throws SQLException {
+ throw new SQLFeatureNotSupportedException("NClob objects are not supported");
+ }
+
+ @Override
+ public void setClob(int parameterIndex, Reader reader, long length) throws SQLException {
+ throw new SQLFeatureNotSupportedException("Clob objects are not supported");
+ }
+
+ @Override
+ public void setBlob(int parameterIndex, InputStream inputStream, long length) throws SQLException {
+ throw new SQLFeatureNotSupportedException("Blob objects are not supported");
+ }
+
+ @Override
+ public void setNClob(int parameterIndex, Reader reader, long length) throws SQLException {
+ throw new SQLFeatureNotSupportedException("NClob objects are not supported");
+ }
+
+ @Override
+ public void setSQLXML(int parameterIndex, SQLXML xmlObject) throws SQLException {
+ throw new SQLFeatureNotSupportedException("SQLXML objects are not supported");
+ }
+
+ @Override
+ public void setObject(int parameterIndex, Object x, int targetSqlType, int scaleOrLength) throws SQLException {
+ setObject(parameterIndex, x);
+ }
+
+ @Override
+ public void setAsciiStream(int parameterIndex, InputStream x, long length) throws SQLException {
+ throw new SQLFeatureNotSupportedException("ASCII streams are not supported");
+ }
+
+ @Override
+ public void setBinaryStream(int parameterIndex, InputStream x, long length) throws SQLException {
+ throw new SQLFeatureNotSupportedException("Binary streams are not supported");
+ }
+
+ @Override
+ public void setCharacterStream(int parameterIndex, Reader reader, long length) throws SQLException {
+ throw new SQLFeatureNotSupportedException("Character streams are not supported");
+ }
+
+ @Override
+ public void setAsciiStream(int parameterIndex, InputStream x) throws SQLException {
+ throw new SQLFeatureNotSupportedException("ASCII streams are not supported");
+ }
+
+ @Override
+ public void setBinaryStream(int parameterIndex, InputStream x) throws SQLException {
+ throw new SQLFeatureNotSupportedException("Binary streams are not supported");
+ }
+
+ @Override
+ public void setCharacterStream(int parameterIndex, Reader reader) throws SQLException {
+ throw new SQLFeatureNotSupportedException("Character streams are not supported");
+ }
+
+ @Override
+ public void setNCharacterStream(int parameterIndex, Reader value) throws SQLException {
+ throw new SQLFeatureNotSupportedException("NCharacter streams are not supported");
+ }
+
+ @Override
+ public void setClob(int parameterIndex, Reader reader) throws SQLException {
+ throw new SQLFeatureNotSupportedException("Clob objects are not supported");
+ }
+
+ @Override
+ public void setBlob(int parameterIndex, InputStream inputStream) throws SQLException {
+ throw new SQLFeatureNotSupportedException("Blob objects are not supported");
+ }
+
+ @Override
+ public void setNClob(int parameterIndex, Reader reader) throws SQLException {
+ throw new SQLFeatureNotSupportedException("NClob objects are not supported");
+ }
+
+ /**
+ * Build the final SQL string by replacing parameter placeholders with actual values
+ */
+ private String buildSqlWithParameters() throws SQLException {
+ String sql = originalSql;
+
+ // Simple parameter substitution (not SQL-injection safe, but good enough for demo)
+ // In a production implementation, you would use proper parameter binding
+ for (Map.Entry entry : parameters.entrySet()) {
+ String placeholder = "\\?"; // Find first ? placeholder
+ String replacement;
+
+ Object value = entry.getValue();
+ if (value == null) {
+ replacement = "NULL";
+ } else if (value instanceof String) {
+ // Escape single quotes and wrap in quotes
+ replacement = "'" + value.toString().replace("'", "''") + "'";
+ } else if (value instanceof Number || value instanceof Boolean) {
+ replacement = value.toString();
+ } else if (value instanceof Date) {
+ replacement = "'" + value.toString() + "'";
+ } else if (value instanceof Timestamp) {
+ replacement = "'" + value.toString() + "'";
+ } else {
+ replacement = "'" + value.toString().replace("'", "''") + "'";
+ }
+
+ // Replace the first occurrence of ?
+ sql = sql.replaceFirst(placeholder, replacement);
+ }
+
+ return sql;
+ }
+}
diff --git a/jdbc-driver/src/main/java/com/seaweedfs/jdbc/SeaweedFSResultSet.java b/jdbc-driver/src/main/java/com/seaweedfs/jdbc/SeaweedFSResultSet.java
new file mode 100644
index 000000000..0a3671f04
--- /dev/null
+++ b/jdbc-driver/src/main/java/com/seaweedfs/jdbc/SeaweedFSResultSet.java
@@ -0,0 +1,1245 @@
+package com.seaweedfs.jdbc;
+
+import java.io.InputStream;
+import java.io.Reader;
+import java.math.BigDecimal;
+import java.net.URL;
+import java.nio.ByteBuffer;
+import java.sql.*;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * ResultSet implementation for SeaweedFS JDBC
+ */
+public class SeaweedFSResultSet implements ResultSet {
+
+ private final SeaweedFSStatement statement;
+ private final List columnNames;
+ private final List> rows;
+ private int currentRowIndex = -1; // Before first row
+ private boolean closed = false;
+ private boolean wasNull = false;
+
+ public SeaweedFSResultSet(SeaweedFSStatement statement, byte[] data) throws SQLException {
+ this.statement = statement;
+ this.columnNames = new ArrayList<>();
+ this.rows = new ArrayList<>();
+
+ parseResultSetData(data);
+ }
+
+ private void parseResultSetData(byte[] data) throws SQLException {
+ try {
+ ByteBuffer buffer = ByteBuffer.wrap(data);
+
+ // Read column count
+ int columnCount = buffer.getInt();
+
+ // Read column names
+ for (int i = 0; i < columnCount; i++) {
+ int nameLength = buffer.getInt();
+ byte[] nameBytes = new byte[nameLength];
+ buffer.get(nameBytes);
+ columnNames.add(new String(nameBytes));
+ }
+
+ // Read row count
+ int rowCount = buffer.getInt();
+
+ // Read rows
+ for (int i = 0; i < rowCount; i++) {
+ List row = new ArrayList<>();
+ for (int j = 0; j < columnCount; j++) {
+ int valueLength = buffer.getInt();
+ if (valueLength > 0) {
+ byte[] valueBytes = new byte[valueLength];
+ buffer.get(valueBytes);
+ row.add(new String(valueBytes));
+ } else {
+ row.add(null); // Empty value = null
+ }
+ }
+ rows.add(row);
+ }
+
+ } catch (Exception e) {
+ throw new SQLException("Failed to parse result set data", e);
+ }
+ }
+
+ @Override
+ public boolean next() throws SQLException {
+ checkClosed();
+ if (currentRowIndex + 1 < rows.size()) {
+ currentRowIndex++;
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public void close() throws SQLException {
+ closed = true;
+ }
+
+ @Override
+ public boolean wasNull() throws SQLException {
+ checkClosed();
+ return wasNull;
+ }
+
+ @Override
+ public String getString(int columnIndex) throws SQLException {
+ checkClosed();
+ checkRowPosition();
+ checkColumnIndex(columnIndex);
+
+ String value = getCurrentRow().get(columnIndex - 1);
+ wasNull = (value == null);
+ return value;
+ }
+
+ @Override
+ public boolean getBoolean(int columnIndex) throws SQLException {
+ String value = getString(columnIndex);
+ if (wasNull) return false;
+ return Boolean.parseBoolean(value);
+ }
+
+ @Override
+ public byte getByte(int columnIndex) throws SQLException {
+ String value = getString(columnIndex);
+ if (wasNull) return 0;
+ try {
+ return Byte.parseByte(value);
+ } catch (NumberFormatException e) {
+ throw new SQLException("Cannot convert '" + value + "' to byte", e);
+ }
+ }
+
+ @Override
+ public short getShort(int columnIndex) throws SQLException {
+ String value = getString(columnIndex);
+ if (wasNull) return 0;
+ try {
+ return Short.parseShort(value);
+ } catch (NumberFormatException e) {
+ throw new SQLException("Cannot convert '" + value + "' to short", e);
+ }
+ }
+
+ @Override
+ public int getInt(int columnIndex) throws SQLException {
+ String value = getString(columnIndex);
+ if (wasNull) return 0;
+ try {
+ return Integer.parseInt(value);
+ } catch (NumberFormatException e) {
+ throw new SQLException("Cannot convert '" + value + "' to int", e);
+ }
+ }
+
+ @Override
+ public long getLong(int columnIndex) throws SQLException {
+ String value = getString(columnIndex);
+ if (wasNull) return 0;
+ try {
+ return Long.parseLong(value);
+ } catch (NumberFormatException e) {
+ throw new SQLException("Cannot convert '" + value + "' to long", e);
+ }
+ }
+
+ @Override
+ public float getFloat(int columnIndex) throws SQLException {
+ String value = getString(columnIndex);
+ if (wasNull) return 0;
+ try {
+ return Float.parseFloat(value);
+ } catch (NumberFormatException e) {
+ throw new SQLException("Cannot convert '" + value + "' to float", e);
+ }
+ }
+
+ @Override
+ public double getDouble(int columnIndex) throws SQLException {
+ String value = getString(columnIndex);
+ if (wasNull) return 0;
+ try {
+ return Double.parseDouble(value);
+ } catch (NumberFormatException e) {
+ throw new SQLException("Cannot convert '" + value + "' to double", e);
+ }
+ }
+
+ @Override
+ public BigDecimal getBigDecimal(int columnIndex, int scale) throws SQLException {
+ String value = getString(columnIndex);
+ if (wasNull) return null;
+ try {
+ return new BigDecimal(value).setScale(scale, BigDecimal.ROUND_HALF_UP);
+ } catch (NumberFormatException e) {
+ throw new SQLException("Cannot convert '" + value + "' to BigDecimal", e);
+ }
+ }
+
+ @Override
+ public byte[] getBytes(int columnIndex) throws SQLException {
+ String value = getString(columnIndex);
+ if (wasNull) return null;
+ return value.getBytes();
+ }
+
+ @Override
+ public Date getDate(int columnIndex) throws SQLException {
+ String value = getString(columnIndex);
+ if (wasNull) return null;
+ try {
+ return Date.valueOf(value);
+ } catch (IllegalArgumentException e) {
+ throw new SQLException("Cannot convert '" + value + "' to Date", e);
+ }
+ }
+
+ @Override
+ public Time getTime(int columnIndex) throws SQLException {
+ String value = getString(columnIndex);
+ if (wasNull) return null;
+ try {
+ return Time.valueOf(value);
+ } catch (IllegalArgumentException e) {
+ throw new SQLException("Cannot convert '" + value + "' to Time", e);
+ }
+ }
+
+ @Override
+ public Timestamp getTimestamp(int columnIndex) throws SQLException {
+ String value = getString(columnIndex);
+ if (wasNull) return null;
+ try {
+ // Try parsing as timestamp first
+ return Timestamp.valueOf(value);
+ } catch (IllegalArgumentException e) {
+ // If that fails, try parsing as long (nanoseconds)
+ try {
+ long nanos = Long.parseLong(value);
+ return new Timestamp(nanos / 1000000); // Convert nanos to millis
+ } catch (NumberFormatException e2) {
+ throw new SQLException("Cannot convert '" + value + "' to Timestamp", e);
+ }
+ }
+ }
+
+ // String-based column access
+ @Override
+ public String getString(String columnLabel) throws SQLException {
+ return getString(findColumn(columnLabel));
+ }
+
+ @Override
+ public boolean getBoolean(String columnLabel) throws SQLException {
+ return getBoolean(findColumn(columnLabel));
+ }
+
+ @Override
+ public byte getByte(String columnLabel) throws SQLException {
+ return getByte(findColumn(columnLabel));
+ }
+
+ @Override
+ public short getShort(String columnLabel) throws SQLException {
+ return getShort(findColumn(columnLabel));
+ }
+
+ @Override
+ public int getInt(String columnLabel) throws SQLException {
+ return getInt(findColumn(columnLabel));
+ }
+
+ @Override
+ public long getLong(String columnLabel) throws SQLException {
+ return getLong(findColumn(columnLabel));
+ }
+
+ @Override
+ public float getFloat(String columnLabel) throws SQLException {
+ return getFloat(findColumn(columnLabel));
+ }
+
+ @Override
+ public double getDouble(String columnLabel) throws SQLException {
+ return getDouble(findColumn(columnLabel));
+ }
+
+ @Override
+ public BigDecimal getBigDecimal(String columnLabel, int scale) throws SQLException {
+ return getBigDecimal(findColumn(columnLabel), scale);
+ }
+
+ @Override
+ public byte[] getBytes(String columnLabel) throws SQLException {
+ return getBytes(findColumn(columnLabel));
+ }
+
+ @Override
+ public Date getDate(String columnLabel) throws SQLException {
+ return getDate(findColumn(columnLabel));
+ }
+
+ @Override
+ public Time getTime(String columnLabel) throws SQLException {
+ return getTime(findColumn(columnLabel));
+ }
+
+ @Override
+ public Timestamp getTimestamp(String columnLabel) throws SQLException {
+ return getTimestamp(findColumn(columnLabel));
+ }
+
+ @Override
+ public int findColumn(String columnLabel) throws SQLException {
+ checkClosed();
+ for (int i = 0; i < columnNames.size(); i++) {
+ if (columnNames.get(i).equalsIgnoreCase(columnLabel)) {
+ return i + 1; // JDBC uses 1-based indexing
+ }
+ }
+ throw new SQLException("Column not found: " + columnLabel);
+ }
+
+ @Override
+ public ResultSetMetaData getMetaData() throws SQLException {
+ checkClosed();
+ return new SeaweedFSResultSetMetaData(columnNames);
+ }
+
+ @Override
+ public Object getObject(int columnIndex) throws SQLException {
+ return getString(columnIndex);
+ }
+
+ @Override
+ public Object getObject(String columnLabel) throws SQLException {
+ return getString(columnLabel);
+ }
+
+ // Navigation methods
+ @Override
+ public boolean isBeforeFirst() throws SQLException {
+ checkClosed();
+ return currentRowIndex == -1 && !rows.isEmpty();
+ }
+
+ @Override
+ public boolean isAfterLast() throws SQLException {
+ checkClosed();
+ return currentRowIndex >= rows.size() && !rows.isEmpty();
+ }
+
+ @Override
+ public boolean isFirst() throws SQLException {
+ checkClosed();
+ return currentRowIndex == 0 && !rows.isEmpty();
+ }
+
+ @Override
+ public boolean isLast() throws SQLException {
+ checkClosed();
+ return currentRowIndex == rows.size() - 1 && !rows.isEmpty();
+ }
+
+ @Override
+ public void beforeFirst() throws SQLException {
+ checkClosed();
+ currentRowIndex = -1;
+ }
+
+ @Override
+ public void afterLast() throws SQLException {
+ checkClosed();
+ currentRowIndex = rows.size();
+ }
+
+ @Override
+ public boolean first() throws SQLException {
+ checkClosed();
+ if (!rows.isEmpty()) {
+ currentRowIndex = 0;
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public boolean last() throws SQLException {
+ checkClosed();
+ if (!rows.isEmpty()) {
+ currentRowIndex = rows.size() - 1;
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public int getRow() throws SQLException {
+ checkClosed();
+ return currentRowIndex >= 0 && currentRowIndex < rows.size() ? currentRowIndex + 1 : 0;
+ }
+
+ @Override
+ public boolean absolute(int row) throws SQLException {
+ checkClosed();
+ if (row > 0 && row <= rows.size()) {
+ currentRowIndex = row - 1;
+ return true;
+ } else if (row < 0 && Math.abs(row) <= rows.size()) {
+ currentRowIndex = rows.size() + row;
+ return true;
+ } else {
+ if (row > rows.size()) {
+ currentRowIndex = rows.size(); // After last
+ } else {
+ currentRowIndex = -1; // Before first
+ }
+ return false;
+ }
+ }
+
+ @Override
+ public boolean relative(int rows) throws SQLException {
+ return absolute(getRow() + rows);
+ }
+
+ @Override
+ public boolean previous() throws SQLException {
+ checkClosed();
+ if (currentRowIndex > 0) {
+ currentRowIndex--;
+ return true;
+ }
+ return false;
+ }
+
+ // Unsupported operations
+ @Override
+ public InputStream getAsciiStream(int columnIndex) throws SQLException {
+ throw new SQLFeatureNotSupportedException("ASCII streams are not supported");
+ }
+
+ @Override
+ public InputStream getUnicodeStream(int columnIndex) throws SQLException {
+ throw new SQLFeatureNotSupportedException("Unicode streams are not supported");
+ }
+
+ @Override
+ public InputStream getBinaryStream(int columnIndex) throws SQLException {
+ throw new SQLFeatureNotSupportedException("Binary streams are not supported");
+ }
+
+ @Override
+ public InputStream getAsciiStream(String columnLabel) throws SQLException {
+ throw new SQLFeatureNotSupportedException("ASCII streams are not supported");
+ }
+
+ @Override
+ public InputStream getUnicodeStream(String columnLabel) throws SQLException {
+ throw new SQLFeatureNotSupportedException("Unicode streams are not supported");
+ }
+
+ @Override
+ public InputStream getBinaryStream(String columnLabel) throws SQLException {
+ throw new SQLFeatureNotSupportedException("Binary streams are not supported");
+ }
+
+ @Override
+ public SQLWarning getWarnings() throws SQLException {
+ return null;
+ }
+
+ @Override
+ public void clearWarnings() throws SQLException {
+ // No-op
+ }
+
+ @Override
+ public String getCursorName() throws SQLException {
+ throw new SQLFeatureNotSupportedException("Named cursors are not supported");
+ }
+
+ // Additional getters with default implementations
+ @Override
+ public Reader getCharacterStream(int columnIndex) throws SQLException {
+ throw new SQLFeatureNotSupportedException("Character streams are not supported");
+ }
+
+ @Override
+ public Reader getCharacterStream(String columnLabel) throws SQLException {
+ throw new SQLFeatureNotSupportedException("Character streams are not supported");
+ }
+
+ @Override
+ public BigDecimal getBigDecimal(int columnIndex) throws SQLException {
+ String value = getString(columnIndex);
+ if (wasNull) return null;
+ try {
+ return new BigDecimal(value);
+ } catch (NumberFormatException e) {
+ throw new SQLException("Cannot convert '" + value + "' to BigDecimal", e);
+ }
+ }
+
+ @Override
+ public BigDecimal getBigDecimal(String columnLabel) throws SQLException {
+ return getBigDecimal(findColumn(columnLabel));
+ }
+
+ // Update operations (not supported - SeaweedFS is read-only)
+ @Override
+ public void updateNull(int columnIndex) throws SQLException {
+ throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only");
+ }
+
+ @Override
+ public void updateBoolean(int columnIndex, boolean x) throws SQLException {
+ throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only");
+ }
+
+ @Override
+ public void updateByte(int columnIndex, byte x) throws SQLException {
+ throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only");
+ }
+
+ @Override
+ public void updateShort(int columnIndex, short x) throws SQLException {
+ throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only");
+ }
+
+ @Override
+ public void updateInt(int columnIndex, int x) throws SQLException {
+ throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only");
+ }
+
+ @Override
+ public void updateLong(int columnIndex, long x) throws SQLException {
+ throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only");
+ }
+
+ @Override
+ public void updateFloat(int columnIndex, float x) throws SQLException {
+ throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only");
+ }
+
+ @Override
+ public void updateDouble(int columnIndex, double x) throws SQLException {
+ throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only");
+ }
+
+ @Override
+ public void updateBigDecimal(int columnIndex, BigDecimal x) throws SQLException {
+ throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only");
+ }
+
+ @Override
+ public void updateString(int columnIndex, String x) throws SQLException {
+ throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only");
+ }
+
+ @Override
+ public void updateBytes(int columnIndex, byte[] x) throws SQLException {
+ throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only");
+ }
+
+ @Override
+ public void updateDate(int columnIndex, Date x) throws SQLException {
+ throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only");
+ }
+
+ @Override
+ public void updateTime(int columnIndex, Time x) throws SQLException {
+ throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only");
+ }
+
+ @Override
+ public void updateTimestamp(int columnIndex, Timestamp x) throws SQLException {
+ throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only");
+ }
+
+ @Override
+ public void updateAsciiStream(int columnIndex, InputStream x, int length) throws SQLException {
+ throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only");
+ }
+
+ @Override
+ public void updateBinaryStream(int columnIndex, InputStream x, int length) throws SQLException {
+ throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only");
+ }
+
+ @Override
+ public void updateCharacterStream(int columnIndex, Reader x, int length) throws SQLException {
+ throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only");
+ }
+
+ @Override
+ public void updateObject(int columnIndex, Object x, int scaleOrLength) throws SQLException {
+ throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only");
+ }
+
+ @Override
+ public void updateObject(int columnIndex, Object x) throws SQLException {
+ throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only");
+ }
+
+ // String-based update operations (all throw exceptions)
+ @Override
+ public void updateNull(String columnLabel) throws SQLException {
+ throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only");
+ }
+
+ @Override
+ public void updateBoolean(String columnLabel, boolean x) throws SQLException {
+ throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only");
+ }
+
+ @Override
+ public void updateByte(String columnLabel, byte x) throws SQLException {
+ throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only");
+ }
+
+ @Override
+ public void updateShort(String columnLabel, short x) throws SQLException {
+ throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only");
+ }
+
+ @Override
+ public void updateInt(String columnLabel, int x) throws SQLException {
+ throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only");
+ }
+
+ @Override
+ public void updateLong(String columnLabel, long x) throws SQLException {
+ throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only");
+ }
+
+ @Override
+ public void updateFloat(String columnLabel, float x) throws SQLException {
+ throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only");
+ }
+
+ @Override
+ public void updateDouble(String columnLabel, double x) throws SQLException {
+ throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only");
+ }
+
+ @Override
+ public void updateBigDecimal(String columnLabel, BigDecimal x) throws SQLException {
+ throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only");
+ }
+
+ @Override
+ public void updateString(String columnLabel, String x) throws SQLException {
+ throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only");
+ }
+
+ @Override
+ public void updateBytes(String columnLabel, byte[] x) throws SQLException {
+ throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only");
+ }
+
+ @Override
+ public void updateDate(String columnLabel, Date x) throws SQLException {
+ throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only");
+ }
+
+ @Override
+ public void updateTime(String columnLabel, Time x) throws SQLException {
+ throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only");
+ }
+
+ @Override
+ public void updateTimestamp(String columnLabel, Timestamp x) throws SQLException {
+ throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only");
+ }
+
+ @Override
+ public void updateAsciiStream(String columnLabel, InputStream x, int length) throws SQLException {
+ throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only");
+ }
+
+ @Override
+ public void updateBinaryStream(String columnLabel, InputStream x, int length) throws SQLException {
+ throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only");
+ }
+
+ @Override
+ public void updateCharacterStream(String columnLabel, Reader reader, int length) throws SQLException {
+ throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only");
+ }
+
+ @Override
+ public void updateObject(String columnLabel, Object x, int scaleOrLength) throws SQLException {
+ throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only");
+ }
+
+ @Override
+ public void updateObject(String columnLabel, Object x) throws SQLException {
+ throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only");
+ }
+
+ @Override
+ public void insertRow() throws SQLException {
+ throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only");
+ }
+
+ @Override
+ public void updateRow() throws SQLException {
+ throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only");
+ }
+
+ @Override
+ public void deleteRow() throws SQLException {
+ throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only");
+ }
+
+ @Override
+ public void refreshRow() throws SQLException {
+ // No-op
+ }
+
+ @Override
+ public void cancelRowUpdates() throws SQLException {
+ throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only");
+ }
+
+ @Override
+ public void moveToInsertRow() throws SQLException {
+ throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only");
+ }
+
+ @Override
+ public void moveToCurrentRow() throws SQLException {
+ // No-op
+ }
+
+ @Override
+ public Statement getStatement() throws SQLException {
+ checkClosed();
+ return statement;
+ }
+
+ // Additional methods with empty/default implementations
+ @Override
+ public Object getObject(int columnIndex, Map> map) throws SQLException {
+ return getObject(columnIndex);
+ }
+
+ @Override
+ public Ref getRef(int columnIndex) throws SQLException {
+ throw new SQLFeatureNotSupportedException("Ref objects are not supported");
+ }
+
+ @Override
+ public Blob getBlob(int columnIndex) throws SQLException {
+ throw new SQLFeatureNotSupportedException("Blob objects are not supported");
+ }
+
+ @Override
+ public Clob getClob(int columnIndex) throws SQLException {
+ throw new SQLFeatureNotSupportedException("Clob objects are not supported");
+ }
+
+ @Override
+ public Array getArray(int columnIndex) throws SQLException {
+ throw new SQLFeatureNotSupportedException("Array objects are not supported");
+ }
+
+ @Override
+ public Object getObject(String columnLabel, Map> map) throws SQLException {
+ return getObject(columnLabel);
+ }
+
+ @Override
+ public Ref getRef(String columnLabel) throws SQLException {
+ throw new SQLFeatureNotSupportedException("Ref objects are not supported");
+ }
+
+ @Override
+ public Blob getBlob(String columnLabel) throws SQLException {
+ throw new SQLFeatureNotSupportedException("Blob objects are not supported");
+ }
+
+ @Override
+ public Clob getClob(String columnLabel) throws SQLException {
+ throw new SQLFeatureNotSupportedException("Clob objects are not supported");
+ }
+
+ @Override
+ public Array getArray(String columnLabel) throws SQLException {
+ throw new SQLFeatureNotSupportedException("Array objects are not supported");
+ }
+
+ @Override
+ public Date getDate(int columnIndex, Calendar cal) throws SQLException {
+ return getDate(columnIndex);
+ }
+
+ @Override
+ public Date getDate(String columnLabel, Calendar cal) throws SQLException {
+ return getDate(columnLabel);
+ }
+
+ @Override
+ public Time getTime(int columnIndex, Calendar cal) throws SQLException {
+ return getTime(columnIndex);
+ }
+
+ @Override
+ public Time getTime(String columnLabel, Calendar cal) throws SQLException {
+ return getTime(columnLabel);
+ }
+
+ @Override
+ public Timestamp getTimestamp(int columnIndex, Calendar cal) throws SQLException {
+ return getTimestamp(columnIndex);
+ }
+
+ @Override
+ public Timestamp getTimestamp(String columnLabel, Calendar cal) throws SQLException {
+ return getTimestamp(columnLabel);
+ }
+
+ @Override
+ public URL getURL(int columnIndex) throws SQLException {
+ String value = getString(columnIndex);
+ if (wasNull) return null;
+ try {
+ return new URL(value);
+ } catch (Exception e) {
+ throw new SQLException("Cannot convert '" + value + "' to URL", e);
+ }
+ }
+
+ @Override
+ public URL getURL(String columnLabel) throws SQLException {
+ return getURL(findColumn(columnLabel));
+ }
+
+ // More update operations (all not supported)
+ @Override
+ public void updateRef(int columnIndex, Ref x) throws SQLException {
+ throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only");
+ }
+
+ @Override
+ public void updateRef(String columnLabel, Ref x) throws SQLException {
+ throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only");
+ }
+
+ @Override
+ public void updateBlob(int columnIndex, Blob x) throws SQLException {
+ throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only");
+ }
+
+ @Override
+ public void updateBlob(String columnLabel, Blob x) throws SQLException {
+ throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only");
+ }
+
+ @Override
+ public void updateClob(int columnIndex, Clob x) throws SQLException {
+ throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only");
+ }
+
+ @Override
+ public void updateClob(String columnLabel, Clob x) throws SQLException {
+ throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only");
+ }
+
+ @Override
+ public void updateArray(int columnIndex, Array x) throws SQLException {
+ throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only");
+ }
+
+ @Override
+ public void updateArray(String columnLabel, Array x) throws SQLException {
+ throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only");
+ }
+
+ // More modern JDBC methods
+ @Override
+ public RowId getRowId(int columnIndex) throws SQLException {
+ throw new SQLFeatureNotSupportedException("RowId objects are not supported");
+ }
+
+ @Override
+ public RowId getRowId(String columnLabel) throws SQLException {
+ throw new SQLFeatureNotSupportedException("RowId objects are not supported");
+ }
+
+ @Override
+ public void updateRowId(int columnIndex, RowId x) throws SQLException {
+ throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only");
+ }
+
+ @Override
+ public void updateRowId(String columnLabel, RowId x) throws SQLException {
+ throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only");
+ }
+
+ @Override
+ public int getHoldability() throws SQLException {
+ return ResultSet.CLOSE_CURSORS_AT_COMMIT;
+ }
+
+ @Override
+ public boolean isClosed() throws SQLException {
+ return closed;
+ }
+
+ @Override
+ public void updateNString(int columnIndex, String nString) throws SQLException {
+ throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only");
+ }
+
+ @Override
+ public void updateNString(String columnLabel, String nString) throws SQLException {
+ throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only");
+ }
+
+ @Override
+ public void updateNClob(int columnIndex, NClob nClob) throws SQLException {
+ throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only");
+ }
+
+ @Override
+ public void updateNClob(String columnLabel, NClob nClob) throws SQLException {
+ throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only");
+ }
+
+ @Override
+ public NClob getNClob(int columnIndex) throws SQLException {
+ throw new SQLFeatureNotSupportedException("NClob objects are not supported");
+ }
+
+ @Override
+ public NClob getNClob(String columnLabel) throws SQLException {
+ throw new SQLFeatureNotSupportedException("NClob objects are not supported");
+ }
+
+ @Override
+ public SQLXML getSQLXML(int columnIndex) throws SQLException {
+ throw new SQLFeatureNotSupportedException("SQLXML objects are not supported");
+ }
+
+ @Override
+ public SQLXML getSQLXML(String columnLabel) throws SQLException {
+ throw new SQLFeatureNotSupportedException("SQLXML objects are not supported");
+ }
+
+ @Override
+ public void updateSQLXML(int columnIndex, SQLXML xmlObject) throws SQLException {
+ throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only");
+ }
+
+ @Override
+ public void updateSQLXML(String columnLabel, SQLXML xmlObject) throws SQLException {
+ throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only");
+ }
+
+ @Override
+ public String getNString(int columnIndex) throws SQLException {
+ return getString(columnIndex);
+ }
+
+ @Override
+ public String getNString(String columnLabel) throws SQLException {
+ return getString(columnLabel);
+ }
+
+ @Override
+ public Reader getNCharacterStream(int columnIndex) throws SQLException {
+ throw new SQLFeatureNotSupportedException("NCharacter streams are not supported");
+ }
+
+ @Override
+ public Reader getNCharacterStream(String columnLabel) throws SQLException {
+ throw new SQLFeatureNotSupportedException("NCharacter streams are not supported");
+ }
+
+ @Override
+ public void updateNCharacterStream(int columnIndex, Reader x, long length) throws SQLException {
+ throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only");
+ }
+
+ @Override
+ public void updateNCharacterStream(String columnLabel, Reader reader, long length) throws SQLException {
+ throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only");
+ }
+
+ @Override
+ public void updateAsciiStream(int columnIndex, InputStream x, long length) throws SQLException {
+ throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only");
+ }
+
+ @Override
+ public void updateBinaryStream(int columnIndex, InputStream x, long length) throws SQLException {
+ throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only");
+ }
+
+ @Override
+ public void updateCharacterStream(int columnIndex, Reader x, long length) throws SQLException {
+ throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only");
+ }
+
+ @Override
+ public void updateAsciiStream(String columnLabel, InputStream x, long length) throws SQLException {
+ throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only");
+ }
+
+ @Override
+ public void updateBinaryStream(String columnLabel, InputStream x, long length) throws SQLException {
+ throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only");
+ }
+
+ @Override
+ public void updateCharacterStream(String columnLabel, Reader reader, long length) throws SQLException {
+ throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only");
+ }
+
+ @Override
+ public void updateBlob(int columnIndex, InputStream inputStream, long length) throws SQLException {
+ throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only");
+ }
+
+ @Override
+ public void updateBlob(String columnLabel, InputStream inputStream, long length) throws SQLException {
+ throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only");
+ }
+
+ @Override
+ public void updateClob(int columnIndex, Reader reader, long length) throws SQLException {
+ throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only");
+ }
+
+ @Override
+ public void updateClob(String columnLabel, Reader reader, long length) throws SQLException {
+ throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only");
+ }
+
+ @Override
+ public void updateNClob(int columnIndex, Reader reader, long length) throws SQLException {
+ throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only");
+ }
+
+ @Override
+ public void updateNClob(String columnLabel, Reader reader, long length) throws SQLException {
+ throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only");
+ }
+
+ @Override
+ public void updateNCharacterStream(int columnIndex, Reader x) throws SQLException {
+ throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only");
+ }
+
+ @Override
+ public void updateNCharacterStream(String columnLabel, Reader reader) throws SQLException {
+ throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only");
+ }
+
+ @Override
+ public void updateAsciiStream(int columnIndex, InputStream x) throws SQLException {
+ throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only");
+ }
+
+ @Override
+ public void updateBinaryStream(int columnIndex, InputStream x) throws SQLException {
+ throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only");
+ }
+
+ @Override
+ public void updateCharacterStream(int columnIndex, Reader x) throws SQLException {
+ throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only");
+ }
+
+ @Override
+ public void updateAsciiStream(String columnLabel, InputStream x) throws SQLException {
+ throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only");
+ }
+
+ @Override
+ public void updateBinaryStream(String columnLabel, InputStream x) throws SQLException {
+ throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only");
+ }
+
+ @Override
+ public void updateCharacterStream(String columnLabel, Reader reader) throws SQLException {
+ throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only");
+ }
+
+ @Override
+ public void updateBlob(int columnIndex, InputStream inputStream) throws SQLException {
+ throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only");
+ }
+
+ @Override
+ public void updateBlob(String columnLabel, InputStream inputStream) throws SQLException {
+ throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only");
+ }
+
+ @Override
+ public void updateClob(int columnIndex, Reader reader) throws SQLException {
+ throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only");
+ }
+
+ @Override
+ public void updateClob(String columnLabel, Reader reader) throws SQLException {
+ throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only");
+ }
+
+ @Override
+ public void updateNClob(int columnIndex, Reader reader) throws SQLException {
+ throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only");
+ }
+
+ @Override
+ public void updateNClob(String columnLabel, Reader reader) throws SQLException {
+ throw new SQLFeatureNotSupportedException("Updates are not supported - SeaweedFS is read-only");
+ }
+
+ @Override
+ public T getObject(int columnIndex, Class type) throws SQLException {
+ Object value = getObject(columnIndex);
+ if (value == null || wasNull) {
+ return null;
+ }
+
+ if (type.isInstance(value)) {
+ return type.cast(value);
+ }
+
+ // Basic type conversions
+ if (type == String.class) {
+ return type.cast(value.toString());
+ } else if (type == Integer.class && value instanceof String) {
+ return type.cast(Integer.valueOf((String)value));
+ } else if (type == Long.class && value instanceof String) {
+ return type.cast(Long.valueOf((String)value));
+ } else if (type == Boolean.class && value instanceof String) {
+ return type.cast(Boolean.valueOf((String)value));
+ }
+
+ throw new SQLException("Cannot convert " + value.getClass().getName() + " to " + type.getName());
+ }
+
+ @Override
+ public T getObject(String columnLabel, Class type) throws SQLException {
+ return getObject(findColumn(columnLabel), type);
+ }
+
+ @Override
+ public T unwrap(Class iface) throws SQLException {
+ if (iface.isAssignableFrom(getClass())) {
+ return iface.cast(this);
+ }
+ throw new SQLException("Cannot unwrap to " + iface.getName());
+ }
+
+ @Override
+ public boolean isWrapperFor(Class> iface) throws SQLException {
+ return iface.isAssignableFrom(getClass());
+ }
+
+ @Override
+ public boolean rowDeleted() throws SQLException {
+ checkClosed();
+ return false; // SeaweedFS is read-only, no deletions possible
+ }
+
+ @Override
+ public boolean rowInserted() throws SQLException {
+ checkClosed();
+ return false; // SeaweedFS is read-only, no insertions possible
+ }
+
+ @Override
+ public boolean rowUpdated() throws SQLException {
+ checkClosed();
+ return false; // SeaweedFS is read-only, no updates possible
+ }
+
+ @Override
+ public int getConcurrency() throws SQLException {
+ checkClosed();
+ return ResultSet.CONCUR_READ_ONLY; // SeaweedFS is read-only
+ }
+
+ @Override
+ public int getType() throws SQLException {
+ checkClosed();
+ return ResultSet.TYPE_FORWARD_ONLY; // Forward-only scrolling
+ }
+
+ @Override
+ public int getFetchSize() throws SQLException {
+ checkClosed();
+ return 1000; // Default fetch size
+ }
+
+ @Override
+ public void setFetchSize(int rows) throws SQLException {
+ checkClosed();
+ // No-op for now, could be enhanced to affect performance
+ }
+
+ @Override
+ public int getFetchDirection() throws SQLException {
+ checkClosed();
+ return ResultSet.FETCH_FORWARD; // Always forward-only
+ }
+
+ @Override
+ public void setFetchDirection(int direction) throws SQLException {
+ checkClosed();
+ if (direction != ResultSet.FETCH_FORWARD) {
+ throw new SQLException("Only FETCH_FORWARD is supported");
+ }
+ }
+
+ // Helper methods
+ private void checkClosed() throws SQLException {
+ if (closed) {
+ throw new SQLException("ResultSet is closed");
+ }
+ }
+
+ private void checkRowPosition() throws SQLException {
+ if (currentRowIndex < 0 || currentRowIndex >= rows.size()) {
+ throw new SQLException("ResultSet is not positioned on a valid row");
+ }
+ }
+
+ private void checkColumnIndex(int columnIndex) throws SQLException {
+ if (columnIndex < 1 || columnIndex > columnNames.size()) {
+ throw new SQLException("Column index " + columnIndex + " is out of range (1-" + columnNames.size() + ")");
+ }
+ }
+
+ private List getCurrentRow() {
+ return rows.get(currentRowIndex);
+ }
+
+ // Get result set information
+ public int getColumnCount() {
+ return columnNames.size();
+ }
+
+ public List getColumnNames() {
+ return new ArrayList<>(columnNames);
+ }
+
+ public int getRowCount() {
+ return rows.size();
+ }
+}
diff --git a/jdbc-driver/src/main/java/com/seaweedfs/jdbc/SeaweedFSResultSetMetaData.java b/jdbc-driver/src/main/java/com/seaweedfs/jdbc/SeaweedFSResultSetMetaData.java
new file mode 100644
index 000000000..d79a2ef2b
--- /dev/null
+++ b/jdbc-driver/src/main/java/com/seaweedfs/jdbc/SeaweedFSResultSetMetaData.java
@@ -0,0 +1,202 @@
+package com.seaweedfs.jdbc;
+
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Types;
+import java.util.List;
+
+/**
+ * ResultSetMetaData implementation for SeaweedFS JDBC
+ */
+public class SeaweedFSResultSetMetaData implements ResultSetMetaData {
+
+ private final List columnNames;
+
+ public SeaweedFSResultSetMetaData(List columnNames) {
+ this.columnNames = columnNames;
+ }
+
+ @Override
+ public int getColumnCount() throws SQLException {
+ return columnNames.size();
+ }
+
+ @Override
+ public boolean isAutoIncrement(int column) throws SQLException {
+ checkColumnIndex(column);
+ return false; // SeaweedFS doesn't have auto-increment columns
+ }
+
+ @Override
+ public boolean isCaseSensitive(int column) throws SQLException {
+ checkColumnIndex(column);
+ return true; // Assume case sensitive
+ }
+
+ @Override
+ public boolean isSearchable(int column) throws SQLException {
+ checkColumnIndex(column);
+ return true; // All columns are searchable
+ }
+
+ @Override
+ public boolean isCurrency(int column) throws SQLException {
+ checkColumnIndex(column);
+ return false; // No currency columns
+ }
+
+ @Override
+ public int isNullable(int column) throws SQLException {
+ checkColumnIndex(column);
+ return columnNullable; // Assume nullable
+ }
+
+ @Override
+ public boolean isSigned(int column) throws SQLException {
+ checkColumnIndex(column);
+ // For simplicity, assume all numeric types are signed
+ return true;
+ }
+
+ @Override
+ public int getColumnDisplaySize(int column) throws SQLException {
+ checkColumnIndex(column);
+ return 50; // Default display size
+ }
+
+ @Override
+ public String getColumnLabel(int column) throws SQLException {
+ checkColumnIndex(column);
+ return columnNames.get(column - 1);
+ }
+
+ @Override
+ public String getColumnName(int column) throws SQLException {
+ checkColumnIndex(column);
+ return columnNames.get(column - 1);
+ }
+
+ @Override
+ public String getSchemaName(int column) throws SQLException {
+ checkColumnIndex(column);
+ return ""; // No schema concept in SeaweedFS
+ }
+
+ @Override
+ public int getPrecision(int column) throws SQLException {
+ checkColumnIndex(column);
+ return 0; // Unknown precision
+ }
+
+ @Override
+ public int getScale(int column) throws SQLException {
+ checkColumnIndex(column);
+ return 0; // Unknown scale
+ }
+
+ @Override
+ public String getTableName(int column) throws SQLException {
+ checkColumnIndex(column);
+ return ""; // Table name not available in result set metadata
+ }
+
+ @Override
+ public String getCatalogName(int column) throws SQLException {
+ checkColumnIndex(column);
+ return ""; // No catalog concept in SeaweedFS
+ }
+
+ @Override
+ public int getColumnType(int column) throws SQLException {
+ checkColumnIndex(column);
+ // For simplicity, we'll determine type based on column name patterns
+ String columnName = columnNames.get(column - 1).toLowerCase();
+
+ if (columnName.contains("timestamp") || columnName.contains("time") || columnName.equals("_timestamp_ns")) {
+ return Types.TIMESTAMP;
+ } else if (columnName.contains("id") || columnName.contains("count") || columnName.contains("size")) {
+ return Types.BIGINT;
+ } else if (columnName.contains("amount") || columnName.contains("price") || columnName.contains("rate")) {
+ return Types.DECIMAL;
+ } else if (columnName.contains("flag") || columnName.contains("enabled") || columnName.contains("active")) {
+ return Types.BOOLEAN;
+ } else {
+ return Types.VARCHAR; // Default to VARCHAR
+ }
+ }
+
+ @Override
+ public String getColumnTypeName(int column) throws SQLException {
+ int sqlType = getColumnType(column);
+ switch (sqlType) {
+ case Types.VARCHAR:
+ return "VARCHAR";
+ case Types.BIGINT:
+ return "BIGINT";
+ case Types.DECIMAL:
+ return "DECIMAL";
+ case Types.BOOLEAN:
+ return "BOOLEAN";
+ case Types.TIMESTAMP:
+ return "TIMESTAMP";
+ default:
+ return "VARCHAR";
+ }
+ }
+
+ @Override
+ public boolean isReadOnly(int column) throws SQLException {
+ checkColumnIndex(column);
+ return true; // SeaweedFS is read-only
+ }
+
+ @Override
+ public boolean isWritable(int column) throws SQLException {
+ checkColumnIndex(column);
+ return false; // SeaweedFS is read-only
+ }
+
+ @Override
+ public boolean isDefinitelyWritable(int column) throws SQLException {
+ checkColumnIndex(column);
+ return false; // SeaweedFS is read-only
+ }
+
+ @Override
+ public String getColumnClassName(int column) throws SQLException {
+ int sqlType = getColumnType(column);
+ switch (sqlType) {
+ case Types.VARCHAR:
+ return "java.lang.String";
+ case Types.BIGINT:
+ return "java.lang.Long";
+ case Types.DECIMAL:
+ return "java.math.BigDecimal";
+ case Types.BOOLEAN:
+ return "java.lang.Boolean";
+ case Types.TIMESTAMP:
+ return "java.sql.Timestamp";
+ default:
+ return "java.lang.String";
+ }
+ }
+
+ @Override
+ public T unwrap(Class iface) throws SQLException {
+ if (iface.isAssignableFrom(getClass())) {
+ return iface.cast(this);
+ }
+ throw new SQLException("Cannot unwrap to " + iface.getName());
+ }
+
+ @Override
+ public boolean isWrapperFor(Class> iface) throws SQLException {
+ return iface.isAssignableFrom(getClass());
+ }
+
+ private void checkColumnIndex(int column) throws SQLException {
+ if (column < 1 || column > columnNames.size()) {
+ throw new SQLException("Column index " + column + " is out of range (1-" + columnNames.size() + ")");
+ }
+ }
+}
diff --git a/jdbc-driver/src/main/java/com/seaweedfs/jdbc/SeaweedFSStatement.java b/jdbc-driver/src/main/java/com/seaweedfs/jdbc/SeaweedFSStatement.java
new file mode 100644
index 000000000..f21c40069
--- /dev/null
+++ b/jdbc-driver/src/main/java/com/seaweedfs/jdbc/SeaweedFSStatement.java
@@ -0,0 +1,389 @@
+package com.seaweedfs.jdbc;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.*;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * JDBC Statement implementation for SeaweedFS
+ */
+public class SeaweedFSStatement implements Statement {
+
+ private static final Logger logger = LoggerFactory.getLogger(SeaweedFSStatement.class);
+
+ protected final SeaweedFSConnection connection;
+ private boolean closed = false;
+ private ResultSet currentResultSet = null;
+ private int updateCount = -1;
+ private int maxRows = 0;
+ private int queryTimeout = 0;
+ private int fetchSize = 1000;
+ private List batch = new ArrayList<>();
+
+ public SeaweedFSStatement(SeaweedFSConnection connection) {
+ this.connection = connection;
+ }
+
+ @Override
+ public ResultSet executeQuery(String sql) throws SQLException {
+ checkClosed();
+ logger.debug("Executing query: {}", sql);
+
+ try {
+ // Send query to server
+ connection.sendMessage((byte)0x03, sql.getBytes()); // JDBC_MSG_EXECUTE_QUERY
+
+ // Read response
+ SeaweedFSConnection.Response response = connection.readResponse();
+
+ if (response.type == (byte)0x01) { // JDBC_RESP_ERROR
+ throw new SQLException("Query failed: " + new String(response.data));
+ } else if (response.type == (byte)0x02) { // JDBC_RESP_RESULT_SET
+ // Parse result set data
+ currentResultSet = new SeaweedFSResultSet(this, response.data);
+ updateCount = -1;
+ return currentResultSet;
+ } else {
+ throw new SQLException("Unexpected response type: " + response.type);
+ }
+
+ } catch (Exception e) {
+ throw new SQLException("Failed to execute query: " + e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public int executeUpdate(String sql) throws SQLException {
+ checkClosed();
+ logger.debug("Executing update: {}", sql);
+
+ try {
+ // Send update to server
+ connection.sendMessage((byte)0x04, sql.getBytes()); // JDBC_MSG_EXECUTE_UPDATE
+
+ // Read response
+ SeaweedFSConnection.Response response = connection.readResponse();
+
+ if (response.type == (byte)0x01) { // JDBC_RESP_ERROR
+ throw new SQLException("Update failed: " + new String(response.data));
+ } else if (response.type == (byte)0x03) { // JDBC_RESP_UPDATE_COUNT
+ // Parse update count
+ updateCount = parseUpdateCount(response.data);
+ currentResultSet = null;
+ return updateCount;
+ } else {
+ throw new SQLException("Unexpected response type: " + response.type);
+ }
+
+ } catch (Exception e) {
+ throw new SQLException("Failed to execute update: " + e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public void close() throws SQLException {
+ if (!closed) {
+ if (currentResultSet != null) {
+ currentResultSet.close();
+ currentResultSet = null;
+ }
+ closed = true;
+ logger.debug("Statement closed");
+ }
+ }
+
+ @Override
+ public int getMaxFieldSize() throws SQLException {
+ checkClosed();
+ return 0; // No limit
+ }
+
+ @Override
+ public void setMaxFieldSize(int max) throws SQLException {
+ checkClosed();
+ // No-op
+ }
+
+ @Override
+ public int getMaxRows() throws SQLException {
+ checkClosed();
+ return maxRows;
+ }
+
+ @Override
+ public void setMaxRows(int max) throws SQLException {
+ checkClosed();
+ this.maxRows = max;
+ }
+
+ @Override
+ public void setEscapeProcessing(boolean enable) throws SQLException {
+ checkClosed();
+ // No-op
+ }
+
+ @Override
+ public int getQueryTimeout() throws SQLException {
+ checkClosed();
+ return queryTimeout;
+ }
+
+ @Override
+ public void setQueryTimeout(int seconds) throws SQLException {
+ checkClosed();
+ this.queryTimeout = seconds;
+ }
+
+ @Override
+ public void cancel() throws SQLException {
+ checkClosed();
+ // No-op - cancellation not supported
+ }
+
+ @Override
+ public SQLWarning getWarnings() throws SQLException {
+ checkClosed();
+ return null;
+ }
+
+ @Override
+ public void clearWarnings() throws SQLException {
+ checkClosed();
+ // No-op
+ }
+
+ @Override
+ public void setCursorName(String name) throws SQLException {
+ checkClosed();
+ // No-op - cursors not supported
+ }
+
+ @Override
+ public boolean execute(String sql) throws SQLException {
+ checkClosed();
+ logger.debug("Executing: {}", sql);
+
+ // Determine if this is likely a query or update
+ String trimmedSql = sql.trim().toUpperCase();
+ if (trimmedSql.startsWith("SELECT") ||
+ trimmedSql.startsWith("SHOW") ||
+ trimmedSql.startsWith("DESCRIBE") ||
+ trimmedSql.startsWith("DESC") ||
+ trimmedSql.startsWith("EXPLAIN")) {
+ // It's a query
+ executeQuery(sql);
+ return true;
+ } else {
+ // It's an update
+ executeUpdate(sql);
+ return false;
+ }
+ }
+
+ @Override
+ public ResultSet getResultSet() throws SQLException {
+ checkClosed();
+ return currentResultSet;
+ }
+
+ @Override
+ public int getUpdateCount() throws SQLException {
+ checkClosed();
+ return updateCount;
+ }
+
+ @Override
+ public boolean getMoreResults() throws SQLException {
+ checkClosed();
+ if (currentResultSet != null) {
+ currentResultSet.close();
+ currentResultSet = null;
+ }
+ updateCount = -1;
+ return false; // No more results
+ }
+
+ @Override
+ public void setFetchDirection(int direction) throws SQLException {
+ checkClosed();
+ if (direction != ResultSet.FETCH_FORWARD) {
+ throw new SQLException("Only FETCH_FORWARD is supported");
+ }
+ }
+
+ @Override
+ public int getFetchDirection() throws SQLException {
+ checkClosed();
+ return ResultSet.FETCH_FORWARD;
+ }
+
+ @Override
+ public void setFetchSize(int rows) throws SQLException {
+ checkClosed();
+ this.fetchSize = rows;
+ }
+
+ @Override
+ public int getFetchSize() throws SQLException {
+ checkClosed();
+ return fetchSize;
+ }
+
+ @Override
+ public int getResultSetConcurrency() throws SQLException {
+ checkClosed();
+ return ResultSet.CONCUR_READ_ONLY;
+ }
+
+ @Override
+ public int getResultSetType() throws SQLException {
+ checkClosed();
+ return ResultSet.TYPE_FORWARD_ONLY;
+ }
+
+ @Override
+ public void addBatch(String sql) throws SQLException {
+ checkClosed();
+ batch.add(sql);
+ }
+
+ @Override
+ public void clearBatch() throws SQLException {
+ checkClosed();
+ batch.clear();
+ }
+
+ @Override
+ public int[] executeBatch() throws SQLException {
+ checkClosed();
+ int[] results = new int[batch.size()];
+
+ for (int i = 0; i < batch.size(); i++) {
+ try {
+ results[i] = executeUpdate(batch.get(i));
+ } catch (SQLException e) {
+ results[i] = EXECUTE_FAILED;
+ }
+ }
+
+ batch.clear();
+ return results;
+ }
+
+ @Override
+ public Connection getConnection() throws SQLException {
+ checkClosed();
+ return connection;
+ }
+
+ @Override
+ public boolean getMoreResults(int current) throws SQLException {
+ checkClosed();
+ return getMoreResults();
+ }
+
+ @Override
+ public ResultSet getGeneratedKeys() throws SQLException {
+ throw new SQLFeatureNotSupportedException("Generated keys are not supported");
+ }
+
+ @Override
+ public int executeUpdate(String sql, int autoGeneratedKeys) throws SQLException {
+ return executeUpdate(sql);
+ }
+
+ @Override
+ public int executeUpdate(String sql, int[] columnIndexes) throws SQLException {
+ return executeUpdate(sql);
+ }
+
+ @Override
+ public int executeUpdate(String sql, String[] columnNames) throws SQLException {
+ return executeUpdate(sql);
+ }
+
+ @Override
+ public boolean execute(String sql, int autoGeneratedKeys) throws SQLException {
+ return execute(sql);
+ }
+
+ @Override
+ public boolean execute(String sql, int[] columnIndexes) throws SQLException {
+ return execute(sql);
+ }
+
+ @Override
+ public boolean execute(String sql, String[] columnNames) throws SQLException {
+ return execute(sql);
+ }
+
+ @Override
+ public int getResultSetHoldability() throws SQLException {
+ checkClosed();
+ return ResultSet.CLOSE_CURSORS_AT_COMMIT;
+ }
+
+ @Override
+ public boolean isClosed() throws SQLException {
+ return closed;
+ }
+
+ @Override
+ public void setPoolable(boolean poolable) throws SQLException {
+ checkClosed();
+ // No-op
+ }
+
+ @Override
+ public boolean isPoolable() throws SQLException {
+ checkClosed();
+ return false;
+ }
+
+ @Override
+ public void closeOnCompletion() throws SQLException {
+ checkClosed();
+ // No-op
+ }
+
+ @Override
+ public boolean isCloseOnCompletion() throws SQLException {
+ checkClosed();
+ return false;
+ }
+
+ @Override
+ public T unwrap(Class iface) throws SQLException {
+ if (iface.isAssignableFrom(getClass())) {
+ return iface.cast(this);
+ }
+ throw new SQLException("Cannot unwrap to " + iface.getName());
+ }
+
+ @Override
+ public boolean isWrapperFor(Class> iface) throws SQLException {
+ return iface.isAssignableFrom(getClass());
+ }
+
+ protected void checkClosed() throws SQLException {
+ if (closed) {
+ throw new SQLException("Statement is closed");
+ }
+ if (connection.isClosed()) {
+ throw new SQLException("Connection is closed");
+ }
+ }
+
+ private int parseUpdateCount(byte[] data) {
+ if (data.length >= 4) {
+ return ((data[0] & 0xFF) << 24) |
+ ((data[1] & 0xFF) << 16) |
+ ((data[2] & 0xFF) << 8) |
+ (data[3] & 0xFF);
+ }
+ return 0;
+ }
+}
diff --git a/jdbc-driver/src/main/resources/META-INF/services/java.sql.Driver b/jdbc-driver/src/main/resources/META-INF/services/java.sql.Driver
new file mode 100644
index 000000000..24c5d53ef
--- /dev/null
+++ b/jdbc-driver/src/main/resources/META-INF/services/java.sql.Driver
@@ -0,0 +1 @@
+com.seaweedfs.jdbc.SeaweedFSDriver
diff --git a/jdbc-driver/src/test/java/com/seaweedfs/jdbc/SeaweedFSDriverTest.java b/jdbc-driver/src/test/java/com/seaweedfs/jdbc/SeaweedFSDriverTest.java
new file mode 100644
index 000000000..874323e77
--- /dev/null
+++ b/jdbc-driver/src/test/java/com/seaweedfs/jdbc/SeaweedFSDriverTest.java
@@ -0,0 +1,75 @@
+package com.seaweedfs.jdbc;
+
+import org.junit.jupiter.api.Test;
+import static org.junit.jupiter.api.Assertions.*;
+
+import java.sql.DriverManager;
+import java.sql.SQLException;
+
+/**
+ * Basic tests for SeaweedFS JDBC driver
+ */
+public class SeaweedFSDriverTest {
+
+ @Test
+ public void testDriverRegistration() {
+ // Driver should be automatically registered via META-INF/services
+ assertDoesNotThrow(() -> {
+ Class.forName("com.seaweedfs.jdbc.SeaweedFSDriver");
+ });
+ }
+
+ @Test
+ public void testURLAcceptance() throws SQLException {
+ SeaweedFSDriver driver = new SeaweedFSDriver();
+
+ // Valid URLs
+ assertTrue(driver.acceptsURL("jdbc:seaweedfs://localhost:8089/default"));
+ assertTrue(driver.acceptsURL("jdbc:seaweedfs://server:9000/test"));
+ assertTrue(driver.acceptsURL("jdbc:seaweedfs://192.168.1.100:8089/mydb"));
+
+ // Invalid URLs
+ assertFalse(driver.acceptsURL("jdbc:mysql://localhost:3306/test"));
+ assertFalse(driver.acceptsURL("jdbc:postgresql://localhost:5432/test"));
+ assertFalse(driver.acceptsURL(null));
+ assertFalse(driver.acceptsURL(""));
+ assertFalse(driver.acceptsURL("not-a-url"));
+ }
+
+ @Test
+ public void testDriverInfo() {
+ SeaweedFSDriver driver = new SeaweedFSDriver();
+
+ assertEquals(SeaweedFSDriver.DRIVER_MAJOR_VERSION, driver.getMajorVersion());
+ assertEquals(SeaweedFSDriver.DRIVER_MINOR_VERSION, driver.getMinorVersion());
+ assertFalse(driver.jdbcCompliant()); // We're not fully JDBC compliant
+
+ assertNotNull(SeaweedFSDriver.getDriverInfo());
+ assertTrue(SeaweedFSDriver.getDriverInfo().contains("SeaweedFS"));
+ assertTrue(SeaweedFSDriver.getDriverInfo().contains("JDBC"));
+ }
+
+ @Test
+ public void testPropertyInfo() throws SQLException {
+ SeaweedFSDriver driver = new SeaweedFSDriver();
+
+ var properties = driver.getPropertyInfo("jdbc:seaweedfs://localhost:8089/default", null);
+ assertNotNull(properties);
+ assertTrue(properties.length > 0);
+
+ // Check that basic properties are present
+ boolean foundHost = false, foundPort = false, foundDatabase = false;
+ for (var prop : properties) {
+ if ("host".equals(prop.name)) foundHost = true;
+ if ("port".equals(prop.name)) foundPort = true;
+ if ("database".equals(prop.name)) foundDatabase = true;
+ }
+
+ assertTrue(foundHost, "Host property should be present");
+ assertTrue(foundPort, "Port property should be present");
+ assertTrue(foundDatabase, "Database property should be present");
+ }
+
+ // Note: Connection tests would require a running SeaweedFS JDBC server
+ // These tests would be part of integration tests, not unit tests
+}
diff --git a/weed/command/command.go b/weed/command/command.go
index 5da7fdc73..f894e7dce 100644
--- a/weed/command/command.go
+++ b/weed/command/command.go
@@ -30,6 +30,7 @@ var Commands = []*Command{
cmdFix,
cmdFuse,
cmdIam,
+ cmdJdbc,
cmdMaster,
cmdMasterFollower,
cmdMount,
diff --git a/weed/command/jdbc.go b/weed/command/jdbc.go
new file mode 100644
index 000000000..2f05e0169
--- /dev/null
+++ b/weed/command/jdbc.go
@@ -0,0 +1,141 @@
+package command
+
+import (
+ "context"
+ "fmt"
+ "os"
+ "os/signal"
+ "syscall"
+ "time"
+
+ weed_server "github.com/seaweedfs/seaweedfs/weed/server"
+ "github.com/seaweedfs/seaweedfs/weed/util"
+)
+
+var (
+ jdbcOptions JdbcOptions
+)
+
+type JdbcOptions struct {
+ host *string
+ port *int
+ masterAddr *string
+}
+
+func init() {
+ cmdJdbc.Run = runJdbc // break init cycle
+ jdbcOptions.host = cmdJdbc.Flag.String("host", "localhost", "JDBC server host")
+ jdbcOptions.port = cmdJdbc.Flag.Int("port", 8089, "JDBC server port")
+ jdbcOptions.masterAddr = cmdJdbc.Flag.String("master", "localhost:9333", "SeaweedFS master server address")
+}
+
+var cmdJdbc = &Command{
+ UsageLine: "jdbc -port=8089 -master=",
+ Short: "start a JDBC server for SQL queries",
+ Long: `Start a JDBC server that provides SQL query access to SeaweedFS.
+
+This JDBC server allows standard JDBC clients and tools to connect to SeaweedFS
+and execute SQL queries against MQ topics. It implements a subset of the JDBC
+protocol for compatibility with most database tools and applications.
+
+Examples:
+
+ # Start JDBC server on default port 8089
+ weed jdbc
+
+ # Start on custom port with specific master
+ weed jdbc -port=8090 -master=master1:9333
+
+ # Allow connections from any host
+ weed jdbc -host=0.0.0.0 -port=8089
+
+Clients can then connect using JDBC URL:
+ jdbc:seaweedfs://hostname:port/database
+
+Supported SQL operations:
+ - SELECT queries on MQ topics
+ - DESCRIBE/DESC commands
+ - SHOW DATABASES/TABLES commands
+ - Aggregation functions (COUNT, SUM, AVG, MIN, MAX)
+ - WHERE clauses with filtering
+ - System columns (_timestamp_ns, _key, _source)
+
+Compatible with:
+ - Standard JDBC tools (DBeaver, IntelliJ DataGrip, etc.)
+ - Business Intelligence tools (Tableau, Power BI, etc.)
+ - Java applications using JDBC drivers
+ - SQL reporting tools
+
+`,
+}
+
+func runJdbc(cmd *Command, args []string) bool {
+
+ util.LoadConfiguration("security", false)
+
+ // Validate options
+ if *jdbcOptions.masterAddr == "" {
+ fmt.Fprintf(os.Stderr, "Error: master address is required\n")
+ return false
+ }
+
+ // Create JDBC server
+ jdbcServer, err := weed_server.NewJDBCServer(*jdbcOptions.host, *jdbcOptions.port, *jdbcOptions.masterAddr)
+ if err != nil {
+ fmt.Fprintf(os.Stderr, "Error creating JDBC server: %v\n", err)
+ return false
+ }
+
+ // Start the server
+ fmt.Printf("Starting SeaweedFS JDBC Server...\n")
+ fmt.Printf("Host: %s\n", *jdbcOptions.host)
+ fmt.Printf("Port: %d\n", *jdbcOptions.port)
+ fmt.Printf("Master: %s\n", *jdbcOptions.masterAddr)
+ fmt.Printf("\nJDBC URL: jdbc:seaweedfs://%s:%d/default\n", *jdbcOptions.host, *jdbcOptions.port)
+ fmt.Printf("\nSupported operations:\n")
+ fmt.Printf(" - SELECT queries on MQ topics\n")
+ fmt.Printf(" - DESCRIBE/DESC table_name\n")
+ fmt.Printf(" - SHOW DATABASES\n")
+ fmt.Printf(" - SHOW TABLES\n")
+ fmt.Printf(" - Aggregations: COUNT, SUM, AVG, MIN, MAX\n")
+ fmt.Printf(" - System columns: _timestamp_ns, _key, _source\n")
+ fmt.Printf("\nReady for JDBC connections!\n\n")
+
+ err = jdbcServer.Start()
+ if err != nil {
+ fmt.Fprintf(os.Stderr, "Error starting JDBC server: %v\n", err)
+ return false
+ }
+
+ // Set up signal handling for graceful shutdown
+ sigChan := make(chan os.Signal, 1)
+ signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
+
+ // Wait for shutdown signal
+ <-sigChan
+ fmt.Printf("\nReceived shutdown signal, stopping JDBC server...\n")
+
+ // Create context with timeout for graceful shutdown
+ ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
+ defer cancel()
+
+ // Stop the server with timeout
+ done := make(chan error, 1)
+ go func() {
+ done <- jdbcServer.Stop()
+ }()
+
+ select {
+ case err := <-done:
+ if err != nil {
+ fmt.Fprintf(os.Stderr, "Error stopping JDBC server: %v\n", err)
+ return false
+ }
+ fmt.Printf("JDBC server stopped successfully\n")
+ case <-ctx.Done():
+ fmt.Fprintf(os.Stderr, "Timeout waiting for JDBC server to stop\n")
+ return false
+ }
+
+ return true
+}
diff --git a/weed/server/jdbc_server.go b/weed/server/jdbc_server.go
new file mode 100644
index 000000000..9be59ae42
--- /dev/null
+++ b/weed/server/jdbc_server.go
@@ -0,0 +1,524 @@
+package weed_server
+
+import (
+ "bufio"
+ "context"
+ "encoding/binary"
+ "fmt"
+ "io"
+ "net"
+ "sync"
+ "time"
+
+ "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/query/engine"
+ "github.com/seaweedfs/seaweedfs/weed/query/sqltypes"
+)
+
+// JDBCServer provides JDBC-compatible access to SeaweedFS SQL engine
+type JDBCServer struct {
+ host string
+ port int
+ masterAddr string
+ listener net.Listener
+ sqlEngine *engine.SQLEngine
+ connections map[net.Conn]*JDBCConnection
+ connMutex sync.RWMutex
+ shutdown chan struct{}
+ wg sync.WaitGroup
+}
+
+// JDBCConnection represents a single JDBC client connection
+type JDBCConnection struct {
+ conn net.Conn
+ reader *bufio.Reader
+ writer *bufio.Writer
+ database string
+ autoCommit bool
+ connectionID uint32
+ closed bool
+ mutex sync.Mutex
+}
+
+// JDBC Protocol Constants
+const (
+ // Message Types
+ JDBC_MSG_CONNECT = 0x01
+ JDBC_MSG_DISCONNECT = 0x02
+ JDBC_MSG_EXECUTE_QUERY = 0x03
+ JDBC_MSG_EXECUTE_UPDATE = 0x04
+ JDBC_MSG_PREPARE = 0x05
+ JDBC_MSG_EXECUTE_PREP = 0x06
+ JDBC_MSG_GET_METADATA = 0x07
+ JDBC_MSG_SET_AUTOCOMMIT = 0x08
+ JDBC_MSG_COMMIT = 0x09
+ JDBC_MSG_ROLLBACK = 0x0A
+
+ // Response Types
+ JDBC_RESP_OK = 0x00
+ JDBC_RESP_ERROR = 0x01
+ JDBC_RESP_RESULT_SET = 0x02
+ JDBC_RESP_UPDATE_COUNT = 0x03
+ JDBC_RESP_METADATA = 0x04
+
+ // Default values
+ DEFAULT_JDBC_PORT = 8089
+)
+
+// NewJDBCServer creates a new JDBC server instance
+func NewJDBCServer(host string, port int, masterAddr string) (*JDBCServer, error) {
+ if port <= 0 {
+ port = DEFAULT_JDBC_PORT
+ }
+ if host == "" {
+ host = "localhost"
+ }
+
+ // Create SQL engine
+ sqlEngine := engine.NewSQLEngine(masterAddr)
+
+ server := &JDBCServer{
+ host: host,
+ port: port,
+ masterAddr: masterAddr,
+ sqlEngine: sqlEngine,
+ connections: make(map[net.Conn]*JDBCConnection),
+ shutdown: make(chan struct{}),
+ }
+
+ return server, nil
+}
+
+// Start begins listening for JDBC connections
+func (s *JDBCServer) Start() error {
+ addr := fmt.Sprintf("%s:%d", s.host, s.port)
+ listener, err := net.Listen("tcp", addr)
+ if err != nil {
+ return fmt.Errorf("failed to start JDBC server on %s: %v", addr, err)
+ }
+
+ s.listener = listener
+ glog.Infof("JDBC Server listening on %s", addr)
+
+ s.wg.Add(1)
+ go s.acceptConnections()
+
+ return nil
+}
+
+// Stop gracefully shuts down the JDBC server
+func (s *JDBCServer) Stop() error {
+ close(s.shutdown)
+
+ if s.listener != nil {
+ s.listener.Close()
+ }
+
+ // Close all connections
+ s.connMutex.Lock()
+ for conn, jdbcConn := range s.connections {
+ jdbcConn.close()
+ conn.Close()
+ }
+ s.connections = make(map[net.Conn]*JDBCConnection)
+ s.connMutex.Unlock()
+
+ s.wg.Wait()
+ glog.Infof("JDBC Server stopped")
+ return nil
+}
+
+// acceptConnections handles incoming JDBC connections
+func (s *JDBCServer) acceptConnections() {
+ defer s.wg.Done()
+
+ for {
+ select {
+ case <-s.shutdown:
+ return
+ default:
+ }
+
+ conn, err := s.listener.Accept()
+ if err != nil {
+ select {
+ case <-s.shutdown:
+ return
+ default:
+ glog.Errorf("Failed to accept JDBC connection: %v", err)
+ continue
+ }
+ }
+
+ s.wg.Add(1)
+ go s.handleConnection(conn)
+ }
+}
+
+// handleConnection processes a single JDBC connection
+func (s *JDBCServer) handleConnection(conn net.Conn) {
+ defer s.wg.Done()
+ defer conn.Close()
+
+ // Create JDBC connection wrapper
+ jdbcConn := &JDBCConnection{
+ conn: conn,
+ reader: bufio.NewReader(conn),
+ writer: bufio.NewWriter(conn),
+ database: "default",
+ autoCommit: true,
+ connectionID: s.generateConnectionID(),
+ }
+
+ // Register connection
+ s.connMutex.Lock()
+ s.connections[conn] = jdbcConn
+ s.connMutex.Unlock()
+
+ // Clean up on exit
+ defer func() {
+ s.connMutex.Lock()
+ delete(s.connections, conn)
+ s.connMutex.Unlock()
+ }()
+
+ glog.Infof("New JDBC connection from %s (ID: %d)", conn.RemoteAddr(), jdbcConn.connectionID)
+
+ // Handle connection messages
+ for {
+ select {
+ case <-s.shutdown:
+ return
+ default:
+ }
+
+ // Set read timeout
+ conn.SetReadDeadline(time.Now().Add(30 * time.Second))
+
+ err := s.handleMessage(jdbcConn)
+ if err != nil {
+ if err == io.EOF {
+ glog.Infof("JDBC client disconnected (ID: %d)", jdbcConn.connectionID)
+ } else {
+ glog.Errorf("Error handling JDBC message (ID: %d): %v", jdbcConn.connectionID, err)
+ }
+ return
+ }
+ }
+}
+
+// handleMessage processes a single JDBC protocol message
+func (s *JDBCServer) handleMessage(conn *JDBCConnection) error {
+ // Read message header (message type + length)
+ header := make([]byte, 5)
+ _, err := io.ReadFull(conn.reader, header)
+ if err != nil {
+ return err
+ }
+
+ msgType := header[0]
+ msgLength := binary.BigEndian.Uint32(header[1:5])
+
+ // Read message body
+ msgBody := make([]byte, msgLength)
+ if msgLength > 0 {
+ _, err = io.ReadFull(conn.reader, msgBody)
+ if err != nil {
+ return err
+ }
+ }
+
+ // Process message based on type
+ switch msgType {
+ case JDBC_MSG_CONNECT:
+ return s.handleConnect(conn, msgBody)
+ case JDBC_MSG_DISCONNECT:
+ return s.handleDisconnect(conn)
+ case JDBC_MSG_EXECUTE_QUERY:
+ return s.handleExecuteQuery(conn, msgBody)
+ case JDBC_MSG_EXECUTE_UPDATE:
+ return s.handleExecuteUpdate(conn, msgBody)
+ case JDBC_MSG_GET_METADATA:
+ return s.handleGetMetadata(conn, msgBody)
+ case JDBC_MSG_SET_AUTOCOMMIT:
+ return s.handleSetAutoCommit(conn, msgBody)
+ case JDBC_MSG_COMMIT:
+ return s.handleCommit(conn)
+ case JDBC_MSG_ROLLBACK:
+ return s.handleRollback(conn)
+ default:
+ return s.sendError(conn, fmt.Errorf("unknown message type: %d", msgType))
+ }
+}
+
+// handleConnect processes JDBC connection request
+func (s *JDBCServer) handleConnect(conn *JDBCConnection, msgBody []byte) error {
+ // Parse connection string (database name)
+ if len(msgBody) > 0 {
+ conn.database = string(msgBody)
+ }
+
+ glog.Infof("JDBC client connected to database: %s (ID: %d)", conn.database, conn.connectionID)
+
+ // Send OK response
+ return s.sendOK(conn, "Connected successfully")
+}
+
+// handleDisconnect processes JDBC disconnect request
+func (s *JDBCServer) handleDisconnect(conn *JDBCConnection) error {
+ glog.Infof("JDBC client disconnecting (ID: %d)", conn.connectionID)
+ conn.close()
+ return io.EOF // This will cause the connection handler to exit
+}
+
+// handleExecuteQuery processes SQL SELECT queries
+func (s *JDBCServer) handleExecuteQuery(conn *JDBCConnection, msgBody []byte) error {
+ sql := string(msgBody)
+
+ glog.V(2).Infof("Executing query (ID: %d): %s", conn.connectionID, sql)
+
+ // Execute SQL using the query engine
+ ctx := context.Background()
+ result, err := s.sqlEngine.ExecuteSQL(ctx, sql)
+ if err != nil {
+ return s.sendError(conn, err)
+ }
+
+ if result.Error != nil {
+ return s.sendError(conn, result.Error)
+ }
+
+ // Send result set
+ return s.sendResultSet(conn, result)
+}
+
+// handleExecuteUpdate processes SQL UPDATE/INSERT/DELETE queries
+func (s *JDBCServer) handleExecuteUpdate(conn *JDBCConnection, msgBody []byte) error {
+ sql := string(msgBody)
+
+ glog.V(2).Infof("Executing update (ID: %d): %s", conn.connectionID, sql)
+
+ // For now, treat updates same as queries since SeaweedFS SQL is read-only
+ ctx := context.Background()
+ result, err := s.sqlEngine.ExecuteSQL(ctx, sql)
+ if err != nil {
+ return s.sendError(conn, err)
+ }
+
+ if result.Error != nil {
+ return s.sendError(conn, result.Error)
+ }
+
+ // Send update count (0 for read-only operations)
+ return s.sendUpdateCount(conn, 0)
+}
+
+// handleGetMetadata processes JDBC metadata requests
+func (s *JDBCServer) handleGetMetadata(conn *JDBCConnection, msgBody []byte) error {
+ metadataType := string(msgBody)
+
+ glog.V(2).Infof("Getting metadata (ID: %d): %s", conn.connectionID, metadataType)
+
+ switch metadataType {
+ case "tables":
+ return s.sendTablesMetadata(conn)
+ case "databases", "schemas":
+ return s.sendDatabasesMetadata(conn)
+ default:
+ return s.sendError(conn, fmt.Errorf("unsupported metadata type: %s", metadataType))
+ }
+}
+
+// handleSetAutoCommit processes autocommit setting
+func (s *JDBCServer) handleSetAutoCommit(conn *JDBCConnection, msgBody []byte) error {
+ autoCommit := len(msgBody) > 0 && msgBody[0] == 1
+ conn.autoCommit = autoCommit
+
+ glog.V(2).Infof("Setting autocommit (ID: %d): %v", conn.connectionID, autoCommit)
+
+ return s.sendOK(conn, fmt.Sprintf("AutoCommit set to %v", autoCommit))
+}
+
+// handleCommit processes transaction commit (no-op for read-only)
+func (s *JDBCServer) handleCommit(conn *JDBCConnection) error {
+ glog.V(2).Infof("Commit (ID: %d): no-op for read-only", conn.connectionID)
+ return s.sendOK(conn, "Commit successful")
+}
+
+// handleRollback processes transaction rollback (no-op for read-only)
+func (s *JDBCServer) handleRollback(conn *JDBCConnection) error {
+ glog.V(2).Infof("Rollback (ID: %d): no-op for read-only", conn.connectionID)
+ return s.sendOK(conn, "Rollback successful")
+}
+
+// sendOK sends a success response
+func (s *JDBCServer) sendOK(conn *JDBCConnection, message string) error {
+ return s.sendResponse(conn, JDBC_RESP_OK, []byte(message))
+}
+
+// sendError sends an error response
+func (s *JDBCServer) sendError(conn *JDBCConnection, err error) error {
+ return s.sendResponse(conn, JDBC_RESP_ERROR, []byte(err.Error()))
+}
+
+// sendResultSet sends query results
+func (s *JDBCServer) sendResultSet(conn *JDBCConnection, result *engine.QueryResult) error {
+ // Serialize result set
+ data := s.serializeResultSet(result)
+ return s.sendResponse(conn, JDBC_RESP_RESULT_SET, data)
+}
+
+// sendUpdateCount sends update operation result
+func (s *JDBCServer) sendUpdateCount(conn *JDBCConnection, count int) error {
+ data := make([]byte, 4)
+ binary.BigEndian.PutUint32(data, uint32(count))
+ return s.sendResponse(conn, JDBC_RESP_UPDATE_COUNT, data)
+}
+
+// sendTablesMetadata sends table metadata
+func (s *JDBCServer) sendTablesMetadata(conn *JDBCConnection) error {
+ // For now, return empty metadata - this would need to query the schema catalog
+ data := s.serializeTablesMetadata([]string{})
+ return s.sendResponse(conn, JDBC_RESP_METADATA, data)
+}
+
+// sendDatabasesMetadata sends database/schema metadata
+func (s *JDBCServer) sendDatabasesMetadata(conn *JDBCConnection) error {
+ // Return default databases
+ databases := []string{"default", "test"}
+ data := s.serializeDatabasesMetadata(databases)
+ return s.sendResponse(conn, JDBC_RESP_METADATA, data)
+}
+
+// sendResponse sends a response with the given type and data
+func (s *JDBCServer) sendResponse(conn *JDBCConnection, responseType byte, data []byte) error {
+ conn.mutex.Lock()
+ defer conn.mutex.Unlock()
+
+ // Write response header
+ header := make([]byte, 5)
+ header[0] = responseType
+ binary.BigEndian.PutUint32(header[1:5], uint32(len(data)))
+
+ _, err := conn.writer.Write(header)
+ if err != nil {
+ return err
+ }
+
+ // Write response data
+ if len(data) > 0 {
+ _, err = conn.writer.Write(data)
+ if err != nil {
+ return err
+ }
+ }
+
+ return conn.writer.Flush()
+}
+
+// serializeResultSet converts QueryResult to JDBC wire format
+func (s *JDBCServer) serializeResultSet(result *engine.QueryResult) []byte {
+ var data []byte
+
+ // Column count
+ colCount := make([]byte, 4)
+ binary.BigEndian.PutUint32(colCount, uint32(len(result.Columns)))
+ data = append(data, colCount...)
+
+ // Column names
+ for _, col := range result.Columns {
+ colName := []byte(col)
+ colLen := make([]byte, 4)
+ binary.BigEndian.PutUint32(colLen, uint32(len(colName)))
+ data = append(data, colLen...)
+ data = append(data, colName...)
+ }
+
+ // Row count
+ rowCount := make([]byte, 4)
+ binary.BigEndian.PutUint32(rowCount, uint32(len(result.Rows)))
+ data = append(data, rowCount...)
+
+ // Rows
+ for _, row := range result.Rows {
+ for _, value := range row {
+ // Convert value to string and serialize
+ valueStr := s.valueToString(value)
+ valueBytes := []byte(valueStr)
+ valueLen := make([]byte, 4)
+ binary.BigEndian.PutUint32(valueLen, uint32(len(valueBytes)))
+ data = append(data, valueLen...)
+ data = append(data, valueBytes...)
+ }
+ }
+
+ return data
+}
+
+// serializeTablesMetadata converts table list to wire format
+func (s *JDBCServer) serializeTablesMetadata(tables []string) []byte {
+ var data []byte
+
+ // Table count
+ tableCount := make([]byte, 4)
+ binary.BigEndian.PutUint32(tableCount, uint32(len(tables)))
+ data = append(data, tableCount...)
+
+ // Table names
+ for _, table := range tables {
+ tableBytes := []byte(table)
+ tableLen := make([]byte, 4)
+ binary.BigEndian.PutUint32(tableLen, uint32(len(tableBytes)))
+ data = append(data, tableLen...)
+ data = append(data, tableBytes...)
+ }
+
+ return data
+}
+
+// serializeDatabasesMetadata converts database list to wire format
+func (s *JDBCServer) serializeDatabasesMetadata(databases []string) []byte {
+ var data []byte
+
+ // Database count
+ dbCount := make([]byte, 4)
+ binary.BigEndian.PutUint32(dbCount, uint32(len(databases)))
+ data = append(data, dbCount...)
+
+ // Database names
+ for _, db := range databases {
+ dbBytes := []byte(db)
+ dbLen := make([]byte, 4)
+ binary.BigEndian.PutUint32(dbLen, uint32(len(dbBytes)))
+ data = append(data, dbLen...)
+ data = append(data, dbBytes...)
+ }
+
+ return data
+}
+
+// valueToString converts a sqltypes.Value to string representation
+func (s *JDBCServer) valueToString(value sqltypes.Value) string {
+ if value.IsNull() {
+ return ""
+ }
+
+ return value.ToString()
+}
+
+// generateConnectionID generates a unique connection ID
+func (s *JDBCServer) generateConnectionID() uint32 {
+ return uint32(time.Now().UnixNano() % 1000000)
+}
+
+// close marks the connection as closed
+func (c *JDBCConnection) close() {
+ c.mutex.Lock()
+ defer c.mutex.Unlock()
+ c.closed = true
+}
+
+// GetAddress returns the server address
+func (s *JDBCServer) GetAddress() string {
+ return fmt.Sprintf("%s:%d", s.host, s.port)
+}