#!/usr/bin/env python3 """ Test client for SeaweedFS PostgreSQL protocol support. This script demonstrates how to connect to SeaweedFS using standard PostgreSQL libraries and execute various types of queries. Requirements: pip install psycopg2-binary Usage: python test_client.py python test_client.py --host localhost --port 5432 --user seaweedfs --database default """ import sys import argparse import time import traceback try: import psycopg2 import psycopg2.extras except ImportError: print("Error: psycopg2 not found. Install with: pip install psycopg2-binary") sys.exit(1) def test_connection(host, port, user, database, password=None): """Test basic connection to SeaweedFS PostgreSQL server.""" print(f"๐Ÿ”— Testing connection to {host}:{port}/{database} as user '{user}'") try: conn_params = { 'host': host, 'port': port, 'user': user, 'database': database, 'connect_timeout': 10 } if password: conn_params['password'] = password conn = psycopg2.connect(**conn_params) print("โœ… Connection successful!") # Test basic query cursor = conn.cursor() cursor.execute("SELECT 1 as test") result = cursor.fetchone() print(f"โœ… Basic query successful: {result}") cursor.close() conn.close() return True except Exception as e: print(f"โŒ Connection failed: {e}") return False def test_system_queries(host, port, user, database, password=None): """Test PostgreSQL system queries.""" print("\n๐Ÿ”ง Testing PostgreSQL system queries...") try: conn_params = { 'host': host, 'port': port, 'user': user, 'database': database } if password: conn_params['password'] = password conn = psycopg2.connect(**conn_params) cursor = conn.cursor(cursor_factory=psycopg2.extras.DictCursor) system_queries = [ ("Version", "SELECT version()"), ("Current Database", "SELECT current_database()"), ("Current User", "SELECT current_user"), ("Server Encoding", "SELECT current_setting('server_encoding')"), ("Client Encoding", "SELECT current_setting('client_encoding')"), ] for name, query in system_queries: try: cursor.execute(query) result = cursor.fetchone() print(f" โœ… {name}: {result[0]}") except Exception as e: print(f" โŒ {name}: {e}") cursor.close() conn.close() except Exception as e: print(f"โŒ System queries failed: {e}") def test_schema_queries(host, port, user, database, password=None): """Test schema and metadata queries.""" print("\n๐Ÿ“Š Testing schema queries...") try: conn_params = { 'host': host, 'port': port, 'user': user, 'database': database } if password: conn_params['password'] = password conn = psycopg2.connect(**conn_params) cursor = conn.cursor(cursor_factory=psycopg2.extras.DictCursor) schema_queries = [ ("Show Databases", "SHOW DATABASES"), ("Show Tables", "SHOW TABLES"), ("List Schemas", "SELECT 'public' as schema_name"), ] for name, query in schema_queries: try: cursor.execute(query) results = cursor.fetchall() print(f" โœ… {name}: Found {len(results)} items") for row in results[:3]: # Show first 3 results print(f" - {dict(row)}") if len(results) > 3: print(f" ... and {len(results) - 3} more") except Exception as e: print(f" โŒ {name}: {e}") cursor.close() conn.close() except Exception as e: print(f"โŒ Schema queries failed: {e}") def test_data_queries(host, port, user, database, password=None): """Test data queries on actual topics.""" print("\n๐Ÿ“ Testing data queries...") try: conn_params = { 'host': host, 'port': port, 'user': user, 'database': database } if password: conn_params['password'] = password conn = psycopg2.connect(**conn_params) cursor = conn.cursor(cursor_factory=psycopg2.extras.DictCursor) # First, try to get available tables/topics cursor.execute("SHOW TABLES") tables = cursor.fetchall() if not tables: print(" โ„น๏ธ No tables/topics found for data testing") cursor.close() conn.close() return # Test with first available table table_name = tables[0][0] if tables[0] else 'test_topic' print(f" ๐Ÿ“‹ Testing with table: {table_name}") test_queries = [ (f"Count records in {table_name}", f"SELECT COUNT(*) FROM \"{table_name}\""), (f"Sample data from {table_name}", f"SELECT * FROM \"{table_name}\" LIMIT 3"), (f"System columns from {table_name}", f"SELECT _timestamp_ns, _key, _source FROM \"{table_name}\" LIMIT 3"), (f"Describe {table_name}", f"DESCRIBE \"{table_name}\""), ] for name, query in test_queries: try: cursor.execute(query) results = cursor.fetchall() if "COUNT" in query.upper(): count = results[0][0] if results else 0 print(f" โœ… {name}: {count} records") elif "DESCRIBE" in query.upper(): print(f" โœ… {name}: {len(results)} columns") for row in results[:5]: # Show first 5 columns print(f" - {dict(row)}") else: print(f" โœ… {name}: {len(results)} rows") for row in results: print(f" - {dict(row)}") except Exception as e: print(f" โŒ {name}: {e}") cursor.close() conn.close() except Exception as e: print(f"โŒ Data queries failed: {e}") def test_prepared_statements(host, port, user, database, password=None): """Test prepared statements.""" print("\n๐Ÿ“ Testing prepared statements...") try: conn_params = { 'host': host, 'port': port, 'user': user, 'database': database } if password: conn_params['password'] = password conn = psycopg2.connect(**conn_params) cursor = conn.cursor() # Test parameterized query try: cursor.execute("SELECT %s as param1, %s as param2", ("hello", 42)) result = cursor.fetchone() print(f" โœ… Prepared statement: {result}") except Exception as e: print(f" โŒ Prepared statement: {e}") cursor.close() conn.close() except Exception as e: print(f"โŒ Prepared statements test failed: {e}") def test_transaction_support(host, port, user, database, password=None): """Test transaction support (should be no-op for read-only).""" print("\n๐Ÿ”„ Testing transaction support...") try: conn_params = { 'host': host, 'port': port, 'user': user, 'database': database } if password: conn_params['password'] = password conn = psycopg2.connect(**conn_params) cursor = conn.cursor() transaction_commands = [ "BEGIN", "SELECT 1 as in_transaction", "COMMIT", "SELECT 1 as after_commit", ] for cmd in transaction_commands: try: cursor.execute(cmd) if "SELECT" in cmd: result = cursor.fetchone() print(f" โœ… {cmd}: {result}") else: print(f" โœ… {cmd}: OK") except Exception as e: print(f" โŒ {cmd}: {e}") cursor.close() conn.close() except Exception as e: print(f"โŒ Transaction test failed: {e}") def test_performance(host, port, user, database, password=None, iterations=10): """Test query performance.""" print(f"\nโšก Testing performance ({iterations} iterations)...") try: conn_params = { 'host': host, 'port': port, 'user': user, 'database': database } if password: conn_params['password'] = password times = [] for i in range(iterations): start_time = time.time() conn = psycopg2.connect(**conn_params) cursor = conn.cursor() cursor.execute("SELECT 1") result = cursor.fetchone() cursor.close() conn.close() elapsed = time.time() - start_time times.append(elapsed) if i < 3: # Show first 3 iterations print(f" Iteration {i+1}: {elapsed:.3f}s") avg_time = sum(times) / len(times) min_time = min(times) max_time = max(times) print(f" โœ… Performance results:") print(f" - Average: {avg_time:.3f}s") print(f" - Min: {min_time:.3f}s") print(f" - Max: {max_time:.3f}s") except Exception as e: print(f"โŒ Performance test failed: {e}") def main(): parser = argparse.ArgumentParser(description="Test SeaweedFS PostgreSQL Protocol") parser.add_argument("--host", default="localhost", help="PostgreSQL server host") parser.add_argument("--port", type=int, default=5432, help="PostgreSQL server port") parser.add_argument("--user", default="seaweedfs", help="PostgreSQL username") parser.add_argument("--password", help="PostgreSQL password") parser.add_argument("--database", default="default", help="PostgreSQL database") parser.add_argument("--skip-performance", action="store_true", help="Skip performance tests") args = parser.parse_args() print("๐Ÿงช SeaweedFS PostgreSQL Protocol Test Client") print("=" * 50) # Test basic connection first if not test_connection(args.host, args.port, args.user, args.database, args.password): print("\nโŒ Basic connection failed. Cannot continue with other tests.") sys.exit(1) # Run all tests try: test_system_queries(args.host, args.port, args.user, args.database, args.password) test_schema_queries(args.host, args.port, args.user, args.database, args.password) test_data_queries(args.host, args.port, args.user, args.database, args.password) test_prepared_statements(args.host, args.port, args.user, args.database, args.password) test_transaction_support(args.host, args.port, args.user, args.database, args.password) if not args.skip_performance: test_performance(args.host, args.port, args.user, args.database, args.password) except KeyboardInterrupt: print("\n\nโš ๏ธ Tests interrupted by user") sys.exit(0) except Exception as e: print(f"\nโŒ Unexpected error during testing: {e}") traceback.print_exc() sys.exit(1) print("\n๐ŸŽ‰ All tests completed!") print("\nTo use SeaweedFS with PostgreSQL tools:") print(f" psql -h {args.host} -p {args.port} -U {args.user} -d {args.database}") print(f" Connection string: postgresql://{args.user}@{args.host}:{args.port}/{args.database}") if __name__ == "__main__": main()