From 5a11f86cc2cfb2b76eade609f5cc7c381ca3f96d Mon Sep 17 00:00:00 2001 From: lvca Date: Sun, 10 Oct 2021 03:41:54 -0400 Subject: [PATCH 1/2] Postgres plugin: supported configurable host:port --- .../com/arcadedb/GlobalConfiguration.java | 5 + .../postgres/PostgresNetworkExecutor.java | 31 ++- .../postgres/PostgresProtocolPlugin.java | 14 +- .../com/arcadedb/postgres/PostgresWTest.java | 234 +++++++++--------- 4 files changed, 149 insertions(+), 135 deletions(-) diff --git a/engine/src/main/java/com/arcadedb/GlobalConfiguration.java b/engine/src/main/java/com/arcadedb/GlobalConfiguration.java index 7093897ce5..3050d61916 100644 --- a/engine/src/main/java/com/arcadedb/GlobalConfiguration.java +++ b/engine/src/main/java/com/arcadedb/GlobalConfiguration.java @@ -269,6 +269,11 @@ public Object call(final Object value) { 1000), // GREMLIN + + // POSTGRES + POSTGRES_PORT("arcadedb.postgres.port", "TCP/IP port number used for incoming connections for Postgres plugin. Default is 5432", Integer.class, 5432), + + POSTGRES_HOST("arcadedb.postgres.host", "TCP/IP host name used for incoming connections for Postgres plugin. Default is '0.0.0.0'", String.class, "0.0.0.0"), ; /** diff --git a/postgresw/src/main/java/com/arcadedb/postgres/PostgresNetworkExecutor.java b/postgresw/src/main/java/com/arcadedb/postgres/PostgresNetworkExecutor.java index fdf37dd24d..2764e57df0 100755 --- a/postgresw/src/main/java/com/arcadedb/postgres/PostgresNetworkExecutor.java +++ b/postgresw/src/main/java/com/arcadedb/postgres/PostgresNetworkExecutor.java @@ -63,7 +63,7 @@ public enum ERROR_SEVERITY {FATAL, ERROR} private long processIdSequence = 0; private static final Map> ACTIVE_SESSIONS = new ConcurrentHashMap<>(); private final Map portals = new HashMap<>(); - private final boolean DEBUG = false; + private final boolean DEBUG = true; private final Map connectionProperties = new HashMap<>(); private boolean explicitTransactionStarted = false; private boolean errorInTransaction = false; @@ -203,7 +203,7 @@ public void run() { private void syncCommand() { if (DEBUG) - LogManager.instance().log(this, Level.INFO, "PSQL: sync"); + LogManager.instance().log(this, Level.INFO, "PSQL: sync (thread=%s)", null, Thread.currentThread().getId()); if (!explicitTransactionStarted) { if (errorInTransaction) @@ -221,10 +221,12 @@ private void closeCommand() throws IOException { final byte closeType = channel.readByte(); final String prepStatementOrPortal = readString(); - getPortal(prepStatementOrPortal, true); + if (closeType == 'P') + getPortal(prepStatementOrPortal, true); if (DEBUG) - LogManager.instance().log(this, Level.INFO, "PSQL: close '%s' type=%s", null, prepStatementOrPortal, (char) closeType); + LogManager.instance() + .log(this, Level.INFO, "PSQL: close '%s' type=%s (thread=%s)", null, prepStatementOrPortal, (char) closeType, Thread.currentThread().getId()); writeMessage("close complete", null, '3', 4); } @@ -234,7 +236,7 @@ private void describeCommand() throws IOException { final String portalName = readString(); if (DEBUG) - LogManager.instance().log(this, Level.INFO, "PSQL: describe '%s' type=%s", null, portalName, (char) type); + LogManager.instance().log(this, Level.INFO, "PSQL: describe '%s' type=%s (thread=%s)", null, portalName, (char) type, Thread.currentThread().getId()); final PostgresPortal portal = getPortal(portalName, false); if (portal == null) { @@ -277,7 +279,8 @@ private void executeCommand() { } if (DEBUG) - LogManager.instance().log(this, Level.INFO, "PSQL: execute (portal=%s) (limit=%d)-> %s", null, portalName, limit, portal); + LogManager.instance() + .log(this, Level.INFO, "PSQL: execute (portal=%s) (limit=%d)-> %s (thread=%s)", null, portalName, limit, portal, Thread.currentThread().getId()); if (portal.ignoreExecution) writeMessage("empty query response", null, 'I', 4); @@ -322,7 +325,7 @@ private void queryCommand() { queryText = queryText.substring(0, queryText.length() - 1); if (DEBUG) - LogManager.instance().log(this, Level.INFO, "PSQL: query -> %s", null, queryText); + LogManager.instance().log(this, Level.INFO, "PSQL: query -> %s (thread=%s)", null, queryText, Thread.currentThread().getId()); if (queryText.isEmpty()) { @@ -481,7 +484,8 @@ private void writeDataRows(final List resultSet, final Map %d row data (%s)", null, resultSet.size(), FileUtils.getSizeAsString(bufferData.limit())); + LogManager.instance().log(this, Level.INFO, "PSQL:-> %d row data (%s) (thread=%s)", null, resultSet.size(), FileUtils.getSizeAsString(bufferData.limit()), + Thread.currentThread().getId()); } private void bindCommand() { @@ -497,7 +501,8 @@ private void bindCommand() { } if (DEBUG) - LogManager.instance().log(this, Level.INFO, "PSQL: bind (portal=%s) -> %s", null, portalName, sourcePreparedStatement); + LogManager.instance() + .log(this, Level.INFO, "PSQL: bind (portal=%s) -> %s (thread=%s)", null, portalName, sourcePreparedStatement, Thread.currentThread().getId()); final int paramFormatCount = channel.readShort(); if (paramFormatCount > 0) { @@ -557,7 +562,8 @@ private void parseCommand() { } if (DEBUG) - LogManager.instance().log(this, Level.INFO, "PSQL: parse (portal=%s) -> %s (params=%d)", null, portalName, portal.query, paramCount); + LogManager.instance().log(this, Level.INFO, "PSQL: parse (portal=%s) -> %s (params=%d) (thread=%s)", null, portalName, portal.query, paramCount, + Thread.currentThread().getId()); final String upperCaseText = portal.query.toUpperCase(); if (upperCaseText.startsWith("SET ")) { @@ -805,13 +811,14 @@ private void writeMessage(final String messageName, final WriteMessageCallback c channel.flush(); if (DEBUG) - LogManager.instance().log(this, Level.INFO, "PSQL:-> %s (%s - %s)", null, messageName, messageCode, FileUtils.getSizeAsString(length)); + LogManager.instance().log(this, Level.INFO, "PSQL:-> %s (%s - %s) (thread=%s)", null, messageName, messageCode, FileUtils.getSizeAsString(length), + Thread.currentThread().getId()); } catch (IOException e) { if (database.isTransactionActive()) errorInTransaction = true; - throw new PostgresProtocolException("Error on sending " + messageName + " message", e); + throw new PostgresProtocolException("Error on sending '" + messageName + "' message", e); } } diff --git a/postgresw/src/main/java/com/arcadedb/postgres/PostgresProtocolPlugin.java b/postgresw/src/main/java/com/arcadedb/postgres/PostgresProtocolPlugin.java index 9a1696cd8a..afd8b04eea 100644 --- a/postgresw/src/main/java/com/arcadedb/postgres/PostgresProtocolPlugin.java +++ b/postgresw/src/main/java/com/arcadedb/postgres/PostgresProtocolPlugin.java @@ -16,6 +16,7 @@ package com.arcadedb.postgres; import com.arcadedb.ContextConfiguration; +import com.arcadedb.GlobalConfiguration; import com.arcadedb.server.ArcadeDBServer; import com.arcadedb.server.ServerPlugin; import com.arcadedb.server.ha.network.DefaultServerSocketFactory; @@ -23,20 +24,23 @@ import io.undertow.server.handlers.PathHandler; public class PostgresProtocolPlugin implements ServerPlugin { - private ArcadeDBServer server; - private ContextConfiguration configuration; - private final static int DEF_PORT = 5432; - private PostgresNetworkListener listener; + private ArcadeDBServer server; + private ContextConfiguration configuration; + private PostgresNetworkListener listener; + private String host; + private int port; @Override public void configure(final ArcadeDBServer arcadeDBServer, final ContextConfiguration configuration) { this.server = arcadeDBServer; this.configuration = configuration; + this.host = configuration.getValueAsString(GlobalConfiguration.POSTGRES_HOST); + this.port = configuration.getValueAsInteger(GlobalConfiguration.POSTGRES_PORT); } @Override public void startService() { - listener = new PostgresNetworkListener(server, new DefaultServerSocketFactory(), "localhost", "" + DEF_PORT); + listener = new PostgresNetworkListener(server, new DefaultServerSocketFactory(), host, "" + port); } @Override diff --git a/postgresw/src/test/java/com/arcadedb/postgres/PostgresWTest.java b/postgresw/src/test/java/com/arcadedb/postgres/PostgresWTest.java index 26417dc354..4b56a86350 100644 --- a/postgresw/src/test/java/com/arcadedb/postgres/PostgresWTest.java +++ b/postgresw/src/test/java/com/arcadedb/postgres/PostgresWTest.java @@ -23,133 +23,131 @@ import org.postgresql.util.PSQLException; import java.sql.*; -import java.util.Properties; +import java.util.*; public class PostgresWTest extends BaseGraphServerTest { - @Override - public void setTestConfiguration() { - super.setTestConfiguration(); - GlobalConfiguration.SERVER_PLUGINS.setValue("Postgres Protocol:com.arcadedb.postgres.PostgresProtocolPlugin"); - - } - - @AfterEach - @Override - public void endTest() { - GlobalConfiguration.SERVER_PLUGINS.setValue(""); - super.endTest(); - } - - @Test - public void testTypeNotExistsErrorManagement() throws Exception { - try (final Connection conn = getConnection()) { - try (Statement st = conn.createStatement()) { - try { - st.executeQuery("SELECT * FROM V"); - Assertions.fail("The query should go in error"); - } catch (PSQLException e) { - } - } + @Override + public void setTestConfiguration() { + super.setTestConfiguration(); + GlobalConfiguration.SERVER_PLUGINS.setValue("Postgres Protocol:com.arcadedb.postgres.PostgresProtocolPlugin"); + } + + @AfterEach + @Override + public void endTest() { + GlobalConfiguration.SERVER_PLUGINS.setValue(""); + super.endTest(); + } + + @Test + public void testTypeNotExistsErrorManagement() throws Exception { + try (final Connection conn = getConnection()) { + try (Statement st = conn.createStatement()) { + try { + st.executeQuery("SELECT * FROM V"); + Assertions.fail("The query should go in error"); + } catch (PSQLException e) { } + } } - - @Test - public void queryVertices() throws Exception { - try (final Connection conn = getConnection()) { - try (Statement st = conn.createStatement()) { - st.execute("create vertex type V"); - st.execute("create vertex V set name = 'Jay', lastName = 'Miner'"); - - PreparedStatement pst = conn.prepareStatement("create vertex V set name = ?, lastName = ?"); - pst.setString(1, "Rocky"); - pst.setString(2, "Balboa"); - pst.execute(); - pst.close(); - - ResultSet rs = st.executeQuery("SELECT * FROM V"); - - Assertions.assertTrue(!rs.isAfterLast()); - - int i = 0; - while (rs.next()) { - if (rs.getString(1).equalsIgnoreCase("Jay")) { - Assertions.assertEquals("Jay", rs.getString(1)); - Assertions.assertEquals("Miner", rs.getString(2)); - ++i; - } else if (rs.getString(1).equalsIgnoreCase("Rocky")) { - Assertions.assertEquals("Rocky", rs.getString(1)); - Assertions.assertEquals("Balboa", rs.getString(2)); - ++i; - } else - Assertions.fail("Unknown value"); - } - - Assertions.assertEquals(2, i); - - rs.close(); - } + } + + @Test + public void queryVertices() throws Exception { + try (final Connection conn = getConnection()) { + try (Statement st = conn.createStatement()) { + st.execute("create vertex type V"); + st.execute("create vertex V set name = 'Jay', lastName = 'Miner'"); + + PreparedStatement pst = conn.prepareStatement("create vertex V set name = ?, lastName = ?"); + pst.setString(1, "Rocky"); + pst.setString(2, "Balboa"); + pst.execute(); + pst.close(); + + ResultSet rs = st.executeQuery("SELECT * FROM V"); + + Assertions.assertTrue(!rs.isAfterLast()); + + int i = 0; + while (rs.next()) { + if (rs.getString(1).equalsIgnoreCase("Jay")) { + Assertions.assertEquals("Jay", rs.getString(1)); + Assertions.assertEquals("Miner", rs.getString(2)); + ++i; + } else if (rs.getString(1).equalsIgnoreCase("Rocky")) { + Assertions.assertEquals("Rocky", rs.getString(1)); + Assertions.assertEquals("Balboa", rs.getString(2)); + ++i; + } else + Assertions.fail("Unknown value"); } - } - @Test - @Disabled - public void queryTransaction() throws Exception { - try (final Connection conn = getConnection()) { - conn.setAutoCommit(false); - try (Statement st = conn.createStatement()) { - st.execute("create vertex type V"); - st.execute("create vertex V set name = 'Jay', lastName = 'Miner'"); - - PreparedStatement pst = conn.prepareStatement("create vertex V set name = ?, lastName = ?"); - pst.setString(1, "Rocky"); - pst.setString(2, "Balboa"); - pst.execute(); - pst.close(); - - ResultSet rs = st.executeQuery("SELECT * FROM V"); - - Assertions.assertTrue(!rs.isAfterLast()); - - int i = 0; - while (rs.next()) { - if (rs.getString(1).equalsIgnoreCase("Jay")) { - Assertions.assertEquals("Jay", rs.getString(1)); - Assertions.assertEquals("Miner", rs.getString(2)); - ++i; - } else if (rs.getString(1).equalsIgnoreCase("Rocky")) { - Assertions.assertEquals("Rocky", rs.getString(1)); - Assertions.assertEquals("Balboa", rs.getString(2)); - ++i; - } else - Assertions.fail("Unknown value"); - } - - Assertions.assertEquals(2, i); - - rs.close(); - } - } - } + Assertions.assertEquals(2, i); - @Test - @Disabled - public void testWaitForConnectionFromExternal() throws InterruptedException { - Thread.sleep(1000000); + rs.close(); + } } + } + + //@Test + public void queryTransaction() throws Exception { + try (final Connection conn = getConnection()) { + conn.setAutoCommit(false); + try (Statement st = conn.createStatement()) { + st.execute("create vertex type V"); + st.execute("create vertex V set name = 'Jay', lastName = 'Miner'"); + + PreparedStatement pst = conn.prepareStatement("create vertex V set name = ?, lastName = ?"); + pst.setString(1, "Rocky"); + pst.setString(2, "Balboa"); + pst.execute(); + pst.close(); + + ResultSet rs = st.executeQuery("SELECT * FROM V"); + + Assertions.assertTrue(!rs.isAfterLast()); + + int i = 0; + while (rs.next()) { + if (rs.getString(1).equalsIgnoreCase("Jay")) { + Assertions.assertEquals("Jay", rs.getString(1)); + Assertions.assertEquals("Miner", rs.getString(2)); + ++i; + } else if (rs.getString(1).equalsIgnoreCase("Rocky")) { + Assertions.assertEquals("Rocky", rs.getString(1)); + Assertions.assertEquals("Balboa", rs.getString(2)); + ++i; + } else + Assertions.fail("Unknown value"); + } - private Connection getConnection() throws ClassNotFoundException, SQLException { - Class.forName("org.postgresql.Driver"); - - String url = "jdbc:postgresql://localhost/" + getDatabaseName(); - Properties props = new Properties(); - props.setProperty("user", "root"); - props.setProperty("password", DEFAULT_PASSWORD_FOR_TESTS); - props.setProperty("ssl", "false"); - Connection conn = DriverManager.getConnection(url, props); - return conn; - } + Assertions.assertEquals(2, i); - protected String getDatabaseName() { - return "postgresdb"; + rs.close(); + } } + } + + @Test + @Disabled + public void testWaitForConnectionFromExternal() throws InterruptedException { + Thread.sleep(1000000); + } + + private Connection getConnection() throws ClassNotFoundException, SQLException { + Class.forName("org.postgresql.Driver"); + + String url = "jdbc:postgresql://localhost/" + getDatabaseName(); + Properties props = new Properties(); + props.setProperty("user", "root"); + props.setProperty("password", DEFAULT_PASSWORD_FOR_TESTS); + props.setProperty("ssl", "false"); + Connection conn = DriverManager.getConnection(url, props); + return conn; + } + + protected String getDatabaseName() { + return "postgresdb"; + } } From 95a6c9e4cd4779c88b9faefacbdfd88aa09d26fa Mon Sep 17 00:00:00 2001 From: lvca Date: Sun, 10 Oct 2021 12:05:01 -0400 Subject: [PATCH 2/2] Postgres: disabled debug --- .../com/arcadedb/postgres/PostgresNetworkExecutor.java | 2 +- .../com/arcadedb/postgres/PostgresNetworkListener.java | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/postgresw/src/main/java/com/arcadedb/postgres/PostgresNetworkExecutor.java b/postgresw/src/main/java/com/arcadedb/postgres/PostgresNetworkExecutor.java index 2764e57df0..c448f743de 100755 --- a/postgresw/src/main/java/com/arcadedb/postgres/PostgresNetworkExecutor.java +++ b/postgresw/src/main/java/com/arcadedb/postgres/PostgresNetworkExecutor.java @@ -63,7 +63,7 @@ public enum ERROR_SEVERITY {FATAL, ERROR} private long processIdSequence = 0; private static final Map> ACTIVE_SESSIONS = new ConcurrentHashMap<>(); private final Map portals = new HashMap<>(); - private final boolean DEBUG = true; + private final boolean DEBUG = false; private final Map connectionProperties = new HashMap<>(); private boolean explicitTransactionStarted = false; private boolean errorInTransaction = false; diff --git a/postgresw/src/main/java/com/arcadedb/postgres/PostgresNetworkListener.java b/postgresw/src/main/java/com/arcadedb/postgres/PostgresNetworkListener.java index 29c4c16e08..7a35793c45 100755 --- a/postgresw/src/main/java/com/arcadedb/postgres/PostgresNetworkListener.java +++ b/postgresw/src/main/java/com/arcadedb/postgres/PostgresNetworkListener.java @@ -41,14 +41,14 @@ public interface ClientConnected { private int port; private ClientConnected callback; - public PostgresNetworkListener(final ArcadeDBServer server, final ServerSocketFactory iSocketFactory, final String iHostName, final String iHostPortRange) { - super(server.getServerName() + " PostgresW listening at " + iHostName + ":" + iHostPortRange); + public PostgresNetworkListener(final ArcadeDBServer server, final ServerSocketFactory iSocketFactory, final String hostName, final String hostPortRange) { + super(server.getServerName() + " PostgresW listening at " + hostName + ":" + hostPortRange); this.server = server; - this.hostName = iHostName; + this.hostName = hostName; this.socketFactory = iSocketFactory == null ? ServerSocketFactory.getDefault() : iSocketFactory; - listen(iHostName, iHostPortRange); + listen(hostName, hostPortRange); start(); }