[activemq-artemis] branch master updated: ARTEMIS-2823 Use datasource with JDBC store db connections

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[activemq-artemis] branch master updated: ARTEMIS-2823 Use datasource with JDBC store db connections

nigrofranz
This is an automated email from the ASF dual-hosted git repository.

nigrofranz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/master by this push:
     new 2faafec  ARTEMIS-2823 Use datasource with JDBC store db connections
     new 4979262  This closes #3204
2faafec is described below

commit 2faafec737b10f68aa38925825029b7136f23ee1
Author: Mikko Uoti <[hidden email]>
AuthorDate: Thu May 28 09:42:26 2020 +0300

    ARTEMIS-2823 Use datasource with JDBC store db connections
   
    Replaces direct jdbc connections with dbcp2 datasource. Adds
    configuration options to use alternative datasources and to alter the
    parameters. While adding slight overhead, this vastly improves the
    management and pooling capabilities with db connections.
---
 .../api/config/ActiveMQDefaultConfiguration.java   |   7 +
 artemis-jdbc-store/pom.xml                         |   7 +
 .../jdbc/store/drivers/AbstractJDBCDriver.java     | 389 ++++++---------------
 .../jdbc/store/drivers/JDBCConnectionProvider.java | 103 ++++++
 .../jdbc/store/drivers/JDBCDataSourceUtils.java    |  48 +++
 .../artemis/jdbc/store/drivers/JDBCUtils.java      |   8 +
 .../jdbc/store/file/Db2SequentialFileDriver.java   |  40 +--
 .../artemis/jdbc/store/file/JDBCFileUtils.java     |  48 +--
 .../jdbc/store/file/JDBCSequentialFileFactory.java |  46 +--
 .../file/JDBCSequentialFileFactoryDriver.java      | 260 +++++++-------
 .../store/file/PostgresLargeObjectManager.java     |  39 ++-
 .../PostgresSequentialSequentialFileDriver.java    | 100 +++---
 .../jdbc/store/journal/JDBCJournalImpl.java        | 285 +++++++--------
 .../jdbc/store/sql/PropertySQLProvider.java        |  29 +-
 .../store/file/JDBCSequentialFileFactoryTest.java  |  14 +-
 .../store/file/PostgresLargeObjectManagerTest.java |  10 +-
 .../storage/DatabaseStorageConfiguration.java      |  55 ++-
 .../deployers/impl/FileConfigurationParser.java    |  10 +
 .../paging/impl/PagingStoreFactoryDatabase.java    |  51 +--
 .../impl/journal/JDBCJournalStorageManager.java    |  53 ++-
 .../core/server/impl/jdbc/JdbcLeaseLock.java       | 319 ++++++++---------
 .../core/server/impl/jdbc/JdbcNodeManager.java     |  91 +----
 .../server/impl/jdbc/JdbcSharedStateManager.java   | 272 ++++++--------
 .../resources/schema/artemis-configuration.xsd     |  46 ++-
 .../core/server/impl/jdbc/JdbcLeaseLockTest.java   |  15 +-
 .../impl/jdbc/JdbcSharedStateManagerTest.java      |  10 +-
 .../core/server/impl/jdbc/TestJDBCDriver.java      |  27 +-
 pom.xml                                            |   7 +
 .../jdbc/store/journal/JDBCJournalTest.java        |   7 +-
 29 files changed, 1137 insertions(+), 1259 deletions(-)

diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
index 12892ca..cb6bfec 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
@@ -455,6 +455,9 @@ public final class ActiveMQDefaultConfiguration {
    // Default JDBC Driver class name, derby by default just for demo purposes
    private static String DEFAULT_JDBC_DRIVER_CLASS_NAME = "org.apache.derby.jdbc.EmbeddedDriver";
 
+   // Default JDBC Driver class name. DBCP2 BasicDataSource is used by default.
+   private static String DEFAULT_JDBC_DATA_SOURCE_CLASS_NAME = "org.apache.commons.dbcp2.BasicDataSource";
+
    // Default message table name, used with Database storage type
    private static String DEFAULT_MESSAGE_TABLE_NAME = "MESSAGES";
 
@@ -1392,6 +1395,10 @@ public final class ActiveMQDefaultConfiguration {
       return DEFAULT_JDBC_DRIVER_CLASS_NAME;
    }
 
+   public static String getDefaultDataSourceClassName() {
+      return DEFAULT_JDBC_DATA_SOURCE_CLASS_NAME;
+   }
+
    public static String getDefaultLargeMessagesTableName() {
       return DEFAULT_LARGE_MESSAGES_TABLE_NAME;
    }
diff --git a/artemis-jdbc-store/pom.xml b/artemis-jdbc-store/pom.xml
index bd5a3b0..fe43920 100644
--- a/artemis-jdbc-store/pom.xml
+++ b/artemis-jdbc-store/pom.xml
@@ -81,6 +81,13 @@
          <version>${project.version}</version>
       </dependency>
 
+      <!-- Default DataSource for database -->
+      <dependency>
+         <groupId>org.apache.commons</groupId>
+         <artifactId>commons-dbcp2</artifactId>
+         <version>2.1.1</version>
+      </dependency>
+
       <!-- Database driver support -->
       <dependency>
          <groupId>org.apache.derby</groupId>
diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/AbstractJDBCDriver.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/AbstractJDBCDriver.java
index 7168cf0..ab89fc4 100644
--- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/AbstractJDBCDriver.java
+++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/AbstractJDBCDriver.java
@@ -16,23 +16,15 @@
  */
 package org.apache.activemq.artemis.jdbc.store.drivers;
 
-import javax.sql.DataSource;
 import java.sql.Connection;
-import java.sql.Driver;
-import java.sql.DriverManager;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.SQLWarning;
 import java.sql.Statement;
 import java.util.Arrays;
-import java.util.Properties;
-import java.util.concurrent.Executor;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Stream;
 
-import org.apache.activemq.artemis.jdbc.store.logging.LoggingConnection;
 import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
-import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
 import org.jboss.logging.Logger;
 
 /**
@@ -43,80 +35,27 @@ public abstract class AbstractJDBCDriver {
 
    private static final Logger logger = Logger.getLogger(AbstractJDBCDriver.class);
 
-   protected Connection connection;
-
    protected SQLProvider sqlProvider;
 
-   private String jdbcConnectionUrl;
-
-   private String jdbcDriverClass;
-
-   private DataSource dataSource;
-
-   private Executor networkTimeoutExecutor;
+   protected JDBCConnectionProvider connectionProvider;
 
-   private int networkTimeoutMillis;
+   public AbstractJDBCDriver() { }
 
-   private String user;
-
-   private String password;
-
-   public AbstractJDBCDriver() {
-      this.networkTimeoutExecutor = null;
-      this.networkTimeoutMillis = -1;
-   }
-
-   public AbstractJDBCDriver(SQLProvider sqlProvider, String jdbcConnectionUrl, String user, String password, String jdbcDriverClass) {
-      this.jdbcConnectionUrl = jdbcConnectionUrl;
-      this.user = user;
-      this.password = password;
-      this.jdbcDriverClass = jdbcDriverClass;
-      this.sqlProvider = sqlProvider;
-      this.networkTimeoutExecutor = null;
-      this.networkTimeoutMillis = -1;
-   }
-
-   public AbstractJDBCDriver(DataSource dataSource, SQLProvider provider) {
-      this.dataSource = dataSource;
+   public AbstractJDBCDriver(JDBCConnectionProvider connectionProvider, SQLProvider provider) {
+      this.connectionProvider = connectionProvider;
       this.sqlProvider = provider;
-      this.networkTimeoutExecutor = null;
-      this.networkTimeoutMillis = -1;
    }
 
    public void start() throws SQLException {
-      connect();
-      synchronized (connection) {
-         createSchema();
-         prepareStatements();
-      }
-   }
-
-   public AbstractJDBCDriver(Connection connection, SQLProvider sqlProvider) {
-      if (logger.isTraceEnabled() && !(connection instanceof LoggingConnection)) {
-         this.connection = new LoggingConnection(connection, logger);
-      } else {
-         this.connection = connection;
-      }
-      this.sqlProvider = sqlProvider;
-      this.networkTimeoutExecutor = null;
-      this.networkTimeoutMillis = -1;
+      createSchema();
+      prepareStatements();
    }
 
    public void stop() throws SQLException {
-      synchronized (connection) {
-         if (sqlProvider.closeConnectionOnShutdown()) {
-            try {
-               connection.setAutoCommit(true);
-               connection.close();
-            } catch (SQLException e) {
-               logger.error(JDBCUtils.appendSQLExceptionDetails(new StringBuilder(), e));
-               throw e;
-            }
-         }
-      }
+
    }
 
-   protected abstract void prepareStatements() throws SQLException;
+   protected abstract void prepareStatements();
 
    protected abstract void createSchema() throws SQLException;
 
@@ -124,217 +63,116 @@ public abstract class AbstractJDBCDriver {
       createTableIfNotExists(sqlProvider.getTableName(), schemaSqls);
    }
 
-   private void connect() throws SQLException {
-      if (connection == null) {
-         if (dataSource != null) {
-            try {
-               connection = dataSource.getConnection();
-
-               if (logger.isTraceEnabled() && !(connection instanceof LoggingConnection)) {
-                  this.connection = new LoggingConnection(connection, logger);
-               }
-
-            } catch (SQLException e) {
-               logger.error(JDBCUtils.appendSQLExceptionDetails(new StringBuilder(), e));
-               throw e;
-            }
-         } else {
-            try {
-               if (jdbcDriverClass == null || jdbcDriverClass.isEmpty()) {
-                  throw new IllegalStateException("jdbcDriverClass is null or empty!");
-               }
-               if (jdbcConnectionUrl == null || jdbcConnectionUrl.isEmpty()) {
-                  throw new IllegalStateException("jdbcConnectionUrl is null or empty!");
-               }
-               final Driver dbDriver = getDriver(jdbcDriverClass);
-               Properties properties = new Properties();
-               if (user != null) {
-                  properties.setProperty("user", user);
-                  properties.setProperty("password", password);
-               }
-               connection = dbDriver.connect(jdbcConnectionUrl, properties);
-
-               if (logger.isTraceEnabled() && !(connection instanceof LoggingConnection)) {
-                  this.connection = new LoggingConnection(connection, logger);
-               }
-
-               if (connection == null) {
-                  throw new IllegalStateException("the driver: " + jdbcDriverClass + " isn't able to connect to the requested url: " + jdbcConnectionUrl);
-               }
-            } catch (SQLException e) {
-               logger.error(JDBCUtils.appendSQLExceptionDetails(new StringBuilder(), e));
-               ActiveMQJournalLogger.LOGGER.error("Unable to connect to database using URL: " + jdbcConnectionUrl);
-               throw e;
-            }
-         }
-         if (this.networkTimeoutMillis >= 0 && this.networkTimeoutExecutor == null) {
-            logger.warn("Unable to set a network timeout on the JDBC connection: networkTimeoutExecutor is null");
-         }
-         if (this.networkTimeoutMillis >= 0 && this.networkTimeoutExecutor != null) {
-            try {
-               connection.setNetworkTimeout(this.networkTimeoutExecutor, this.networkTimeoutMillis);
-            } catch (SQLException e) {
-               logger.warn(JDBCUtils.appendSQLExceptionDetails(new StringBuilder(), e));
-               ActiveMQJournalLogger.LOGGER.warn("Unable to set a network timeout on the JDBC connection");
-            } catch (Throwable throwable) {
-               //it included SecurityExceptions and UnsupportedOperationException
-               logger.warn("Unable to set a network timeout on the JDBC connection", throwable);
-            }
-         }
-      }
-   }
-
    public void destroy() throws Exception {
       final String dropTableSql = "DROP TABLE " + sqlProvider.getTableName();
-      try {
-         connection.setAutoCommit(false);
-         try (Statement statement = connection.createStatement()) {
-            statement.executeUpdate(dropTableSql);
-         }
-         connection.commit();
-      } catch (SQLException e) {
-         logger.error(JDBCUtils.appendSQLExceptionDetails(new StringBuilder(), e, dropTableSql));
+      try (Connection connection = connectionProvider.getConnection()) {
          try {
-            connection.rollback();
-         } catch (SQLException rollbackEx) {
-            logger.error(JDBCUtils.appendSQLExceptionDetails(new StringBuilder(), rollbackEx, dropTableSql));
-            throw rollbackEx;
+            connection.setAutoCommit(false);
+            try (Statement statement = connection.createStatement()) {
+               statement.executeUpdate(dropTableSql);
+            }
+            connection.commit();
+         } catch (SQLException e) {
+            logger.error(JDBCUtils.appendSQLExceptionDetails(new StringBuilder(), e, dropTableSql));
+            try {
+               connection.rollback();
+            } catch (SQLException rollbackEx) {
+               logger.error(JDBCUtils.appendSQLExceptionDetails(new StringBuilder(), rollbackEx, dropTableSql));
+               throw rollbackEx;
+            }
+            throw e;
          }
-         throw e;
       }
    }
 
    private void createTableIfNotExists(String tableName, String... sqls) throws SQLException {
       logger.tracef("Validating if table %s didn't exist before creating", tableName);
-      try {
-         connection.setAutoCommit(false);
-         final boolean tableExists;
-         try (ResultSet rs = connection.getMetaData().getTables(null, null, tableName, null)) {
-            if (rs == null || !rs.next()) {
-               tableExists = false;
-               if (logger.isTraceEnabled()) {
-                  logger.tracef("Table %s did not exist, creating it with SQL=%s", tableName, Arrays.toString(sqls));
-               }
-               if (rs != null) {
-                  final SQLWarning sqlWarning = rs.getWarnings();
-                  if (sqlWarning != null) {
-                     logger.warn(JDBCUtils.appendSQLExceptionDetails(new StringBuilder(), sqlWarning));
+      try (Connection connection = connectionProvider.getConnection()) {
+         try {
+            connection.setAutoCommit(false);
+            final boolean tableExists;
+            try (ResultSet rs = connection.getMetaData().getTables(null, null, tableName, null)) {
+               if (rs == null || !rs.next()) {
+                  tableExists = false;
+                  if (logger.isTraceEnabled()) {
+                     logger.tracef("Table %s did not exist, creating it with SQL=%s", tableName, Arrays.toString(sqls));
                   }
-               }
-            } else {
-               tableExists = true;
-            }
-         }
-         if (tableExists) {
-            logger.tracef("Validating if the existing table %s is initialized or not", tableName);
-            try (Statement statement = connection.createStatement();
-                 ResultSet cntRs = statement.executeQuery(sqlProvider.getCountJournalRecordsSQL())) {
-               logger.tracef("Validation of the existing table %s initialization is started", tableName);
-               int rows;
-               if (cntRs.next() && (rows = cntRs.getInt(1)) > 0) {
-                  logger.tracef("Table %s did exist but is not empty. Skipping initialization. Found %d rows.", tableName, rows);
-                  if (logger.isDebugEnabled()) {
-                     final long expectedRows = Stream.of(sqls).map(String::toUpperCase).filter(sql -> sql.contains("INSERT INTO")).count();
-                     if (rows < expectedRows) {
-                        logger.debug("Table " + tableName + " was expected to contain " + expectedRows + " rows while it has " + rows + " rows.");
+                  if (rs != null) {
+                     final SQLWarning sqlWarning = rs.getWarnings();
+                     if (sqlWarning != null) {
+                        logger.warn(JDBCUtils.appendSQLExceptionDetails(new StringBuilder(), sqlWarning));
                      }
                   }
-                  connection.commit();
-                  return;
                } else {
-                  sqls = Stream.of(sqls).filter(sql -> {
-                     final String upperCaseSql = sql.toUpperCase();
-                     return !(upperCaseSql.contains("CREATE TABLE") || upperCaseSql.contains("CREATE INDEX"));
-                  }).toArray(String[]::new);
-                  if (sqls.length > 0) {
-                     logger.tracef("Table %s did exist but is empty. Starting initialization.", tableName);
+                  tableExists = true;
+               }
+            }
+            if (tableExists) {
+               logger.tracef("Validating if the existing table %s is initialized or not", tableName);
+               try (Statement statement = connection.createStatement();
+                    ResultSet cntRs = statement.executeQuery(sqlProvider.getCountJournalRecordsSQL())) {
+                  logger.tracef("Validation of the existing table %s initialization is started", tableName);
+                  int rows;
+                  if (cntRs.next() && (rows = cntRs.getInt(1)) > 0) {
+                     logger.tracef("Table %s did exist but is not empty. Skipping initialization. Found %d rows.", tableName, rows);
+                     if (logger.isDebugEnabled()) {
+                        final long expectedRows = Stream.of(sqls).map(String::toUpperCase).filter(sql -> sql.contains("INSERT INTO")).count();
+                        if (rows < expectedRows) {
+                           logger.debug("Table " + tableName + " was expected to contain " + expectedRows + " rows while it has " + rows + " rows.");
+                        }
+                     }
+                     connection.commit();
+                     return;
                   } else {
-                     logger.tracef("Table %s did exist but is empty. Initialization completed: no initialization statements left.", tableName);
+                     sqls = Stream.of(sqls).filter(sql -> {
+                        final String upperCaseSql = sql.toUpperCase();
+                        return !(upperCaseSql.contains("CREATE TABLE") || upperCaseSql.contains("CREATE INDEX"));
+                     }).toArray(String[]::new);
+                     if (sqls.length > 0) {
+                        logger.tracef("Table %s did exist but is empty. Starting initialization.", tableName);
+                     } else {
+                        logger.tracef("Table %s did exist but is empty. Initialization completed: no initialization statements left.", tableName);
+                     }
                   }
+               } catch (SQLException e) {
+                  //that's not a real issue and do not deserve any user-level log:
+                  //some DBMS just return stale information about table existence
+                  //and can fail on later attempts to access them
+                  if (logger.isTraceEnabled()) {
+                     logger.trace(JDBCUtils.appendSQLExceptionDetails(new StringBuilder("Can't verify the initialization of table ").append(tableName).append(" due to:"), e, sqlProvider.getCountJournalRecordsSQL()));
+                  }
+                  try {
+                     connection.rollback();
+                  } catch (SQLException rollbackEx) {
+                     logger.debug("Rollback failed while validating initialization of a table", rollbackEx);
+                  }
+                  connection.setAutoCommit(false);
+                  logger.tracef("Table %s seems to exist, but we can't verify the initialization. Keep trying to create and initialize.", tableName);
                }
-            } catch (SQLException e) {
-               //that's not a real issue and do not deserve any user-level log:
-               //some DBMS just return stale information about table existence
-               //and can fail on later attempts to access them
-               if (logger.isTraceEnabled()) {
-                  logger.trace(JDBCUtils.appendSQLExceptionDetails(new StringBuilder("Can't verify the initialization of table ").append(tableName).append(" due to:"), e, sqlProvider.getCountJournalRecordsSQL()));
-               }
-               try {
-                  connection.rollback();
-               } catch (SQLException rollbackEx) {
-                  logger.debug("Rollback failed while validating initialization of a table", rollbackEx);
-               }
-               connection.setAutoCommit(false);
-               logger.tracef("Table %s seems to exist, but we can't verify the initialization. Keep trying to create and initialize.", tableName);
             }
-         }
-         if (sqls.length > 0) {
-            try (Statement statement = connection.createStatement()) {
-               for (String sql : sqls) {
-                  statement.executeUpdate(sql);
-                  final SQLWarning statementSqlWarning = statement.getWarnings();
-                  if (statementSqlWarning != null) {
-                     logger.warn(JDBCUtils.appendSQLExceptionDetails(new StringBuilder(), statementSqlWarning, sql));
+            if (sqls.length > 0) {
+               try (Statement statement = connection.createStatement()) {
+                  for (String sql : sqls) {
+                     statement.executeUpdate(sql);
+                     final SQLWarning statementSqlWarning = statement.getWarnings();
+                     if (statementSqlWarning != null) {
+                        logger.warn(JDBCUtils.appendSQLExceptionDetails(new StringBuilder(), statementSqlWarning, sql));
+                     }
                   }
                }
-            }
-
-            connection.commit();
-         }
-      } catch (SQLException e) {
-         final String sqlStatements = String.join("\n", sqls);
-         logger.error(JDBCUtils.appendSQLExceptionDetails(new StringBuilder(), e, sqlStatements));
-         try {
-            connection.rollback();
-         } catch (SQLException rollbackEx) {
-            logger.error(JDBCUtils.appendSQLExceptionDetails(new StringBuilder(), rollbackEx, sqlStatements));
-            throw rollbackEx;
-         }
-         throw e;
-      }
-   }
-
-   private static AtomicBoolean shutAdded = new AtomicBoolean(false);
-
-   private static class ShutdownDerby extends Thread {
-      @Override
-      public void run() {
-         try {
-            DriverManager.getConnection("jdbc:derby:;shutdown=true");
-         } catch (Exception e) {
-         }
-      }
-
-   }
 
-   private Driver getDriver(String className) {
-      try {
-         Driver driver = (Driver) Class.forName(className).newInstance();
-
-         // Shutdown the derby if using the derby embedded driver.
-         if (className.equals("org.apache.derby.jdbc.EmbeddedDriver")) {
-            if (shutAdded.compareAndSet(false, true)) {
-               Runtime.getRuntime().addShutdownHook(new ShutdownDerby());
+               connection.commit();
             }
-         }
-         return driver;
-      } catch (ClassNotFoundException cnfe) {
-         throw new RuntimeException("Could not find class: " + className);
-      } catch (Exception e) {
-         throw new RuntimeException("Unable to instantiate driver class: ", e);
-      }
-   }
-
-   public Connection getConnection() {
-      return connection;
-   }
-
-   public final void setConnection(Connection connection) {
-      if (this.connection == null) {
-         if (logger.isTraceEnabled() && !(connection instanceof LoggingConnection)) {
-            this.connection = new LoggingConnection(connection, logger);
-         } else {
-            this.connection = connection;
+         } catch (SQLException e) {
+            final String sqlStatements = String.join("\n", sqls);
+            logger.error(JDBCUtils.appendSQLExceptionDetails(new StringBuilder(), e, sqlStatements));
+            try {
+               connection.rollback();
+            } catch (SQLException rollbackEx) {
+               logger.error(JDBCUtils.appendSQLExceptionDetails(new StringBuilder(), rollbackEx, sqlStatements));
+               throw rollbackEx;
+            }
+            throw e;
          }
       }
    }
@@ -343,37 +181,12 @@ public abstract class AbstractJDBCDriver {
       this.sqlProvider = sqlProvider;
    }
 
-   public void setJdbcConnectionUrl(String jdbcConnectionUrl) {
-      this.jdbcConnectionUrl = jdbcConnectionUrl;
-   }
-
-   public String getUser() {
-      return user;
-   }
-
-   public void setUser(String user) {
-      this.user = user;
-   }
-
-   public String getPassword() {
-      return password;
-   }
-
-   public void setPassword(String password) {
-      this.password = password;
-   }
-
-   public void setJdbcDriverClass(String jdbcDriverClass) {
-      this.jdbcDriverClass = jdbcDriverClass;
-   }
-
-   public void setDataSource(DataSource dataSource) {
-      this.dataSource = dataSource;
+   public void setJdbcConnectionProvider(JDBCConnectionProvider connectionProvider) {
+      this.connectionProvider = connectionProvider;
    }
 
-   public void setNetworkTimeout(Executor executor, int milliseconds) {
-      this.networkTimeoutExecutor = executor;
-      this.networkTimeoutMillis = milliseconds;
+   public JDBCConnectionProvider getJdbcConnectionProvider() {
+      return this.connectionProvider;
    }
 
 }
diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/JDBCConnectionProvider.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/JDBCConnectionProvider.java
new file mode 100644
index 0000000..6d2c9f2
--- /dev/null
+++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/JDBCConnectionProvider.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.jdbc.store.drivers;
+
+import org.apache.activemq.artemis.jdbc.store.logging.LoggingConnection;
+import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
+import org.jboss.logging.Logger;
+
+import javax.sql.DataSource;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class JDBCConnectionProvider {
+
+   private static final Logger logger = Logger.getLogger(JDBCConnectionProvider.class);
+   private DataSource dataSource;
+   private Executor networkTimeoutExecutor;
+   private int networkTimeoutMillis;
+
+   public JDBCConnectionProvider(DataSource dataSource) {
+      this.dataSource = dataSource;
+      this.networkTimeoutExecutor = null;
+      this.networkTimeoutMillis = -1;
+      addDerbyShutdownHook();
+   }
+
+   public synchronized Connection getConnection() throws SQLException {
+      Connection connection;
+      try {
+         connection = dataSource.getConnection();
+         if (logger.isTraceEnabled() && !(connection instanceof LoggingConnection)) {
+            connection = new LoggingConnection(connection, logger);
+         }
+      } catch (SQLException e) {
+         logger.error(JDBCUtils.appendSQLExceptionDetails(new StringBuilder(), e));
+         throw e;
+      }
+
+      if (this.networkTimeoutMillis >= 0 && this.networkTimeoutExecutor == null) {
+         logger.warn("Unable to set a network timeout on the JDBC connection: networkTimeoutExecutor is null");
+      }
+
+      if (this.networkTimeoutMillis >= 0 && this.networkTimeoutExecutor != null) {
+         try {
+            connection.setNetworkTimeout(this.networkTimeoutExecutor, this.networkTimeoutMillis);
+         } catch (SQLException e) {
+            logger.warn(JDBCUtils.appendSQLExceptionDetails(new StringBuilder(), e));
+            ActiveMQJournalLogger.LOGGER.warn("Unable to set a network timeout on the JDBC connection");
+         } catch (Throwable throwable) {
+            //it included SecurityExceptions and UnsupportedOperationException
+            logger.warn("Unable to set a network timeout on the JDBC connection", throwable);
+         }
+      }
+      return connection;
+   }
+
+   private static AtomicBoolean shutAdded = new AtomicBoolean(false);
+
+   private static class ShutdownDerby extends Thread {
+      @Override
+      public void run() {
+         try {
+            DriverManager.getConnection("jdbc:derby:;shutdown=true");
+         } catch (Exception e) { }
+      }
+
+   }
+
+   public void addDerbyShutdownHook() {
+      // Shutdown the derby if using the derby embedded driver.
+      try (Connection connection = getConnection()) {
+         if (connection.getMetaData().getDriverName().equals("org.apache.derby.jdbc.EmbeddedDriver")) {
+            if (shutAdded.compareAndSet(false, true)) {
+               Runtime.getRuntime().addShutdownHook(new ShutdownDerby());
+            }
+         }
+      } catch (SQLException e) {
+         logger.error(JDBCUtils.appendSQLExceptionDetails(new StringBuilder(), e));
+      }
+   }
+
+   public void setNetworkTimeout(Executor executor, int milliseconds) {
+      this.networkTimeoutExecutor = executor;
+      this.networkTimeoutMillis = milliseconds;
+   }
+}
diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/JDBCDataSourceUtils.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/JDBCDataSourceUtils.java
new file mode 100644
index 0000000..5e0c3d8
--- /dev/null
+++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/JDBCDataSourceUtils.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.jdbc.store.drivers;
+
+import org.apache.commons.beanutils.PropertyUtils;
+import org.jboss.logging.Logger;
+
+import javax.sql.DataSource;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class JDBCDataSourceUtils {
+
+   private static final Logger logger = Logger.getLogger(JDBCDataSourceUtils.class);
+
+   public static DataSource getDataSource(String dataSourceClassName, Map<String, Object> dataSourceProperties) {
+      logger.info(new StringBuilder("Initialising JDBC data source: ").append(dataSourceClassName).append(" ")
+              .append(dataSourceProperties.keySet().stream()
+              .map(key -> key + "=" + dataSourceProperties.get(key))
+              .collect(Collectors.joining(", ", "{", "}"))));
+      try {
+         DataSource dataSource = (DataSource) Class.forName(dataSourceClassName).newInstance();
+         for (Map.Entry<String, Object> entry : dataSourceProperties.entrySet()) {
+            PropertyUtils.setProperty(dataSource, entry.getKey(), entry.getValue());
+         }
+         return dataSource;
+      } catch (ClassNotFoundException cnfe) {
+         throw new RuntimeException("Could not find class: " + dataSourceClassName);
+      } catch (Exception e) {
+         throw new RuntimeException("Unable to instantiate DataSource", e);
+      }
+   }
+
+}
diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/JDBCUtils.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/JDBCUtils.java
index ce6dc6f..37c1e97 100644
--- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/JDBCUtils.java
+++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/JDBCUtils.java
@@ -17,6 +17,7 @@
 package org.apache.activemq.artemis.jdbc.store.drivers;
 
 import java.sql.SQLException;
+import java.util.Map;
 
 import org.apache.activemq.artemis.jdbc.store.sql.PropertySQLProvider;
 import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
@@ -39,6 +40,13 @@ public class JDBCUtils {
       return factory.create(tableName, storeType);
    }
 
+   public static SQLProvider getSQLProvider(Map<String, Object> dataSourceProperties, String tableName, SQLProvider.DatabaseStoreType storeType) {
+      PropertySQLProvider.Factory.SQLDialect dialect = PropertySQLProvider.Factory.investigateDialect(dataSourceProperties);
+      logger.tracef("getSQLProvider Returning SQL provider for dialect %s, tableName::%s", dialect, tableName);
+      PropertySQLProvider.Factory factory = new PropertySQLProvider.Factory(dialect);
+      return factory.create(tableName, storeType);
+   }
+
    /**
     * Append to {@code errorMessage} a detailed description of the provided {@link SQLException}.<br>
     * The information appended are:
diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/Db2SequentialFileDriver.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/Db2SequentialFileDriver.java
index c0066ab..12da3d2 100644
--- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/Db2SequentialFileDriver.java
+++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/Db2SequentialFileDriver.java
@@ -17,41 +17,38 @@
 
 package org.apache.activemq.artemis.jdbc.store.file;
 
-import javax.sql.DataSource;
 import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.SQLException;
 
+import org.apache.activemq.artemis.jdbc.store.drivers.JDBCConnectionProvider;
 import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
 
 @SuppressWarnings("SynchronizeOnNonFinalField")
 public final class Db2SequentialFileDriver extends JDBCSequentialFileFactoryDriver {
 
-   private PreparedStatement replaceLargeObject;
+   private String replaceLargeObject;
 
    public Db2SequentialFileDriver() {
       super();
    }
 
-   public Db2SequentialFileDriver(DataSource dataSource, SQLProvider provider) {
-      super(dataSource, provider);
-   }
-
-   public Db2SequentialFileDriver(Connection connection, SQLProvider provider) {
-      super(connection, provider);
+   public Db2SequentialFileDriver(JDBCConnectionProvider connectionProvider, SQLProvider provider) {
+      super(connectionProvider, provider);
    }
 
    @Override
-   protected void prepareStatements() throws SQLException {
-      this.deleteFile = connection.prepareStatement(sqlProvider.getDeleteFileSQL());
-      this.createFile = connection.prepareStatement(sqlProvider.getInsertFileSQL(), new String[]{"ID"});
-      this.selectFileByFileName = connection.prepareStatement(sqlProvider.getSelectFileByFileName());
-      this.copyFileRecord = connection.prepareStatement(sqlProvider.getCopyFileRecordByIdSQL());
-      this.renameFile = connection.prepareStatement(sqlProvider.getUpdateFileNameByIdSQL());
-      this.readLargeObject = connection.prepareStatement(sqlProvider.getReadLargeObjectSQL());
-      this.replaceLargeObject = connection.prepareStatement(sqlProvider.getReplaceLargeObjectSQL());
-      this.appendToLargeObject = connection.prepareStatement(sqlProvider.getAppendToLargeObjectSQL());
-      this.selectFileNamesByExtension = connection.prepareStatement(sqlProvider.getSelectFileNamesByExtensionSQL());
+   protected void prepareStatements() {
+      this.deleteFile = sqlProvider.getDeleteFileSQL();
+      this.createFile = sqlProvider.getInsertFileSQL();
+      this.createFileColumnNames = new String[]{"ID"};
+      this.selectFileByFileName = sqlProvider.getSelectFileByFileName();
+      this.copyFileRecord = sqlProvider.getCopyFileRecordByIdSQL();
+      this.renameFile = sqlProvider.getUpdateFileNameByIdSQL();
+      this.readLargeObject = sqlProvider.getReadLargeObjectSQL();
+      this.replaceLargeObject = sqlProvider.getReplaceLargeObjectSQL();
+      this.appendToLargeObject = sqlProvider.getAppendToLargeObjectSQL();
+      this.selectFileNamesByExtension = sqlProvider.getSelectFileNamesByExtensionSQL();
    }
 
    @Override
@@ -59,9 +56,8 @@ public final class Db2SequentialFileDriver extends JDBCSequentialFileFactoryDriv
       if (data == null || data.length == 0) {
          return 0;
       }
-      final PreparedStatement largeObjectStatement = append ? appendToLargeObject : replaceLargeObject;
-      synchronized (connection) {
-         try {
+      try (Connection connection = connectionProvider.getConnection()) {
+         try (PreparedStatement largeObjectStatement = connection.prepareStatement(append ? appendToLargeObject : replaceLargeObject)) {
             connection.setAutoCommit(false);
             int bytesWritten;
             largeObjectStatement.setBytes(1, data);
@@ -81,4 +77,4 @@ public final class Db2SequentialFileDriver extends JDBCSequentialFileFactoryDriv
          }
       }
    }
-}
+}
\ No newline at end of file
diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCFileUtils.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCFileUtils.java
index 58ac5b9..1d1ea62 100644
--- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCFileUtils.java
+++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCFileUtils.java
@@ -17,10 +17,10 @@
 
 package org.apache.activemq.artemis.jdbc.store.file;
 
-import javax.sql.DataSource;
 import java.sql.Connection;
 import java.sql.SQLException;
 
+import org.apache.activemq.artemis.jdbc.store.drivers.JDBCConnectionProvider;
 import org.apache.activemq.artemis.jdbc.store.sql.PropertySQLProvider;
 import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
 
@@ -29,54 +29,18 @@ import static org.apache.activemq.artemis.jdbc.store.sql.PropertySQLProvider.Fac
 
 class JDBCFileUtils {
 
-   static JDBCSequentialFileFactoryDriver getDBFileDriver(String driverClass,
-                                                          String jdbcConnectionUrl,
-                                                          String user,
-                                                          String password,
-                                                          SQLProvider provider) throws SQLException {
-      final JDBCSequentialFileFactoryDriver dbDriver;
-      final PropertySQLProvider.Factory.SQLDialect sqlDialect = PropertySQLProvider.Factory.identifyDialect(driverClass);
-      if (POSTGRESQL.equals(sqlDialect)) {
-         dbDriver = new PostgresSequentialSequentialFileDriver();
-      } else if (DB2.equals(sqlDialect)) {
-         dbDriver = new Db2SequentialFileDriver();
-      } else {
-         dbDriver = new JDBCSequentialFileFactoryDriver();
-      }
-      dbDriver.setSqlProvider(provider);
-      dbDriver.setJdbcConnectionUrl(jdbcConnectionUrl);
-      dbDriver.setJdbcDriverClass(driverClass);
-      dbDriver.setUser(user);
-      dbDriver.setPassword(password);
-      return dbDriver;
-   }
-
-   static JDBCSequentialFileFactoryDriver getDBFileDriver(DataSource dataSource, SQLProvider provider) throws SQLException {
+   static JDBCSequentialFileFactoryDriver getDBFileDriver(JDBCConnectionProvider connectionProvider, SQLProvider provider) throws SQLException {
       final JDBCSequentialFileFactoryDriver dbDriver;
       final PropertySQLProvider.Factory.SQLDialect sqlDialect;
-      try (Connection connection = dataSource.getConnection()) {
+      try (Connection connection = connectionProvider.getConnection()) {
          sqlDialect = PropertySQLProvider.Factory.investigateDialect(connection);
       }
       if (POSTGRESQL.equals(sqlDialect)) {
-         dbDriver = new PostgresSequentialSequentialFileDriver(dataSource, provider);
-      } else if (DB2.equals(sqlDialect)) {
-         dbDriver = new Db2SequentialFileDriver(dataSource, provider);
-      } else {
-         dbDriver = new JDBCSequentialFileFactoryDriver(dataSource, provider);
-      }
-      return dbDriver;
-   }
-
-   static JDBCSequentialFileFactoryDriver getDBFileDriver(Connection connection, SQLProvider provider) throws SQLException {
-      JDBCSequentialFileFactoryDriver dbDriver;
-      final PropertySQLProvider.Factory.SQLDialect sqlDialect = PropertySQLProvider.Factory.investigateDialect(connection);
-      if (POSTGRESQL.equals(sqlDialect)) {
-         dbDriver = new PostgresSequentialSequentialFileDriver(connection, provider);
-         dbDriver.setConnection(connection);
+         dbDriver = new PostgresSequentialSequentialFileDriver(connectionProvider, provider);
       } else if (DB2.equals(sqlDialect)) {
-         dbDriver = new Db2SequentialFileDriver(connection, provider);
+         dbDriver = new Db2SequentialFileDriver(connectionProvider, provider);
       } else {
-         dbDriver = new JDBCSequentialFileFactoryDriver(connection, provider);
+         dbDriver = new JDBCSequentialFileFactoryDriver(connectionProvider, provider);
       }
       return dbDriver;
    }
diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactory.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactory.java
index 4cd19fd..c9169c5 100644
--- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactory.java
+++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactory.java
@@ -16,10 +16,8 @@
  */
 package org.apache.activemq.artemis.jdbc.store.file;
 
-import javax.sql.DataSource;
 import java.io.File;
 import java.nio.ByteBuffer;
-import java.sql.Connection;
 import java.sql.SQLException;
 import java.util.List;
 import java.util.Map;
@@ -32,6 +30,7 @@ import org.apache.activemq.artemis.core.io.SequentialFile;
 import org.apache.activemq.artemis.core.io.SequentialFileFactory;
 import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
 import org.apache.activemq.artemis.core.server.ActiveMQComponent;
+import org.apache.activemq.artemis.jdbc.store.drivers.JDBCConnectionProvider;
 import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
 import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
 import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
@@ -53,7 +52,7 @@ public class JDBCSequentialFileFactory implements SequentialFileFactory, ActiveM
 
    private final IOCriticalErrorListener criticalErrorListener;
 
-   public JDBCSequentialFileFactory(final DataSource dataSource,
+   public JDBCSequentialFileFactory(final JDBCConnectionProvider connectionProvider,
                                     final SQLProvider sqlProvider,
                                     Executor executor,
                                     IOCriticalErrorListener criticalErrorListener) throws Exception {
@@ -62,38 +61,7 @@ public class JDBCSequentialFileFactory implements SequentialFileFactory, ActiveM
       this.criticalErrorListener = criticalErrorListener;
 
       try {
-         this.dbDriver = JDBCFileUtils.getDBFileDriver(dataSource, sqlProvider);
-      } catch (SQLException e) {
-         criticalErrorListener.onIOException(e, "Failed to start JDBC Driver", null);
-      }
-   }
-
-   public JDBCSequentialFileFactory(final String connectionUrl,
-                                    String userName,
-                                    String password,
-                                    final String className,
-                                    final SQLProvider sqlProvider,
-                                    Executor executor,
-                                    IOCriticalErrorListener criticalErrorListener) throws Exception {
-      this.executor = executor;
-      this.criticalErrorListener = criticalErrorListener;
-      try {
-         this.dbDriver = JDBCFileUtils.getDBFileDriver(className, connectionUrl, userName, password, sqlProvider);
-      } catch (SQLException e) {
-         criticalErrorListener.onIOException(e, "Failed to start JDBC Driver", null);
-      }
-
-   }
-
-   public JDBCSequentialFileFactory(final Connection connection,
-                                    final SQLProvider sqlProvider,
-                                    final Executor executor,
-                                    final IOCriticalErrorListener criticalErrorListener) throws Exception {
-      this.executor = executor;
-      this.criticalErrorListener = criticalErrorListener;
-
-      try {
-         this.dbDriver = JDBCFileUtils.getDBFileDriver(connection, sqlProvider);
+         this.dbDriver = JDBCFileUtils.getDBFileDriver(connectionProvider, sqlProvider);
       } catch (SQLException e) {
          criticalErrorListener.onIOException(e, "Failed to start JDBC Driver", null);
       }
@@ -103,14 +71,6 @@ public class JDBCSequentialFileFactory implements SequentialFileFactory, ActiveM
       return dbDriver;
    }
 
-   /**
-    * @see Connection#setNetworkTimeout(Executor, int)
-    **/
-   public JDBCSequentialFileFactory setNetworkTimeout(Executor executor, int milliseconds) {
-      this.dbDriver.setNetworkTimeout(executor, milliseconds);
-      return this;
-   }
-
    @Override
    public SequentialFileFactory setDatasync(boolean enabled) {
       return this;
diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactoryDriver.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactoryDriver.java
index 7d9aded..ab6906c 100644
--- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactoryDriver.java
+++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactoryDriver.java
@@ -16,7 +16,6 @@
  */
 package org.apache.activemq.artemis.jdbc.store.file;
 
-import javax.sql.DataSource;
 import java.nio.ByteBuffer;
 import java.sql.Blob;
 import java.sql.Connection;
@@ -28,6 +27,7 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.activemq.artemis.jdbc.store.drivers.AbstractJDBCDriver;
+import org.apache.activemq.artemis.jdbc.store.drivers.JDBCConnectionProvider;
 import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
 import org.jboss.logging.Logger;
 
@@ -36,32 +36,25 @@ public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver {
 
    private static final Logger logger = Logger.getLogger(JDBCSequentialFileFactoryDriver.class);
 
-   protected PreparedStatement deleteFile;
-
-   protected PreparedStatement createFile;
-
-   protected PreparedStatement selectFileByFileName;
-
-   protected PreparedStatement copyFileRecord;
-
-   protected PreparedStatement renameFile;
-
-   protected PreparedStatement readLargeObject;
-
-   protected PreparedStatement appendToLargeObject;
-
-   protected PreparedStatement selectFileNamesByExtension;
+   protected String deleteFile;
+   protected String createFile;
+   protected String[] createFileColumnNames;
+   protected int createFileAutogeneratedKeys;
+   protected String selectFileByFileName;
+   protected String copyFileRecord;
+   protected String renameFile;
+   protected String readLargeObject;
+   protected String appendToLargeObject;
+   protected Integer appendToLargeObjectResultSetType;
+   protected Integer appendToLargeObjectResultSetConcurrency;
+   protected String selectFileNamesByExtension;
 
    JDBCSequentialFileFactoryDriver() {
       super();
    }
 
-   JDBCSequentialFileFactoryDriver(DataSource dataSource, SQLProvider provider) {
-      super(dataSource, provider);
-   }
-
-   JDBCSequentialFileFactoryDriver(Connection connection, SQLProvider sqlProvider) {
-      super(connection, sqlProvider);
+   JDBCSequentialFileFactoryDriver(JDBCConnectionProvider connectionProvider, SQLProvider provider) {
+      super(connectionProvider, provider);
    }
 
    @Override
@@ -70,22 +63,25 @@ public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver {
    }
 
    @Override
-   protected void prepareStatements() throws SQLException {
-      this.deleteFile = connection.prepareStatement(sqlProvider.getDeleteFileSQL());
-      this.createFile = connection.prepareStatement(sqlProvider.getInsertFileSQL(), new String[] {"ID"});
-      this.selectFileByFileName = connection.prepareStatement(sqlProvider.getSelectFileByFileName());
-      this.copyFileRecord = connection.prepareStatement(sqlProvider.getCopyFileRecordByIdSQL());
-      this.renameFile = connection.prepareStatement(sqlProvider.getUpdateFileNameByIdSQL());
-      this.readLargeObject = connection.prepareStatement(sqlProvider.getReadLargeObjectSQL());
-      this.appendToLargeObject = connection.prepareStatement(sqlProvider.getAppendToLargeObjectSQL(), ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_UPDATABLE);
-      this.selectFileNamesByExtension = connection.prepareStatement(sqlProvider.getSelectFileNamesByExtensionSQL());
+   protected void prepareStatements() {
+      this.deleteFile = sqlProvider.getDeleteFileSQL();
+      this.createFile = sqlProvider.getInsertFileSQL();
+      this.createFileColumnNames = new String[] {"ID"};
+      this.selectFileByFileName = sqlProvider.getSelectFileByFileName();
+      this.copyFileRecord = sqlProvider.getCopyFileRecordByIdSQL();
+      this.renameFile = sqlProvider.getUpdateFileNameByIdSQL();
+      this.readLargeObject = sqlProvider.getReadLargeObjectSQL();
+      this.appendToLargeObject = sqlProvider.getAppendToLargeObjectSQL();
+      this.appendToLargeObjectResultSetType = ResultSet.TYPE_FORWARD_ONLY;
+      this.appendToLargeObjectResultSetConcurrency = ResultSet.CONCUR_UPDATABLE;
+      this.selectFileNamesByExtension = sqlProvider.getSelectFileNamesByExtensionSQL();
    }
 
    public List<String> listFiles(String extension) throws Exception {
-      synchronized (connection) {
-         List<String> fileNames = new ArrayList<>();
-         try {
-            connection.setAutoCommit(false);
+      List<String> fileNames = new ArrayList<>();
+      try (Connection connection = connectionProvider.getConnection()) {
+         connection.setAutoCommit(false);
+         try (PreparedStatement selectFileNamesByExtension = connection.prepareStatement(this.selectFileNamesByExtension)) {
             selectFileNamesByExtension.setString(1, extension);
             try (ResultSet rs = selectFileNamesByExtension.executeQuery()) {
                while (rs.next()) {
@@ -97,8 +93,8 @@ public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver {
             connection.rollback();
             throw e;
          }
-         return fileNames;
       }
+      return fileNames;
    }
 
    /**
@@ -108,14 +104,12 @@ public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver {
     * @throws SQLException
     */
    public void openFile(JDBCSequentialFile file) throws SQLException {
-      synchronized (connection) {
-         final long fileId = fileExists(file);
-         if (fileId < 0) {
-            createFile(file);
-         } else {
-            file.setId(fileId);
-            loadFile(file);
-         }
+      final long fileId = fileExists(file);
+      if (fileId < 0) {
+         createFile(file);
+      } else {
+         file.setId(fileId);
+         loadFile(file);
       }
    }
 
@@ -131,18 +125,18 @@ public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver {
     * @throws SQLException
     */
    public long fileExists(JDBCSequentialFile file) throws SQLException {
-      try {
-         synchronized (connection) {
+      try (Connection connection = connectionProvider.getConnection()) {
+         try (PreparedStatement selectFileByFileName = connection.prepareStatement(this.selectFileByFileName)) {
             connection.setAutoCommit(false);
             selectFileByFileName.setString(1, file.getFileName());
             try (ResultSet rs = selectFileByFileName.executeQuery()) {
                final long id = rs.next() ? rs.getLong(1) : -1;
                connection.commit();
                return id;
-            } catch (Exception e) {
-               connection.rollback();
-               throw e;
             }
+         } catch (Exception e) {
+            connection.rollback();
+            throw e;
          }
       } catch (NullPointerException npe) {
          npe.printStackTrace();
@@ -157,20 +151,22 @@ public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver {
     * @throws SQLException
     */
    public void loadFile(JDBCSequentialFile file) throws SQLException {
-      synchronized (connection) {
-         connection.setAutoCommit(false);
-         readLargeObject.setLong(1, file.getId());
-
-         try (ResultSet rs = readLargeObject.executeQuery()) {
-            if (rs.next()) {
-               Blob blob = rs.getBlob(1);
-               if (blob != null) {
-                  file.setWritePosition(blob.length());
-               } else {
-                  logger.trace("No Blob found for file: " + file.getFileName() + " " + file.getId());
+      try (Connection connection = connectionProvider.getConnection()) {
+         try (PreparedStatement readLargeObject = connection.prepareStatement(this.readLargeObject)) {
+            connection.setAutoCommit(false);
+            readLargeObject.setLong(1, file.getId());
+
+            try (ResultSet rs = readLargeObject.executeQuery()) {
+               if (rs.next()) {
+                  Blob blob = rs.getBlob(1);
+                  if (blob != null) {
+                     file.setWritePosition(blob.length());
+                  } else {
+                     logger.trace("No Blob found for file: " + file.getFileName() + " " + file.getId());
+                  }
                }
+               connection.commit();
             }
-            connection.commit();
          } catch (SQLException e) {
             connection.rollback();
             throw e;
@@ -185,18 +181,23 @@ public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver {
     * @throws SQLException
     */
    public void createFile(JDBCSequentialFile file) throws SQLException {
-      synchronized (connection) {
+      try (Connection connection = connectionProvider.getConnection()) {
          try {
             connection.setAutoCommit(false);
-            createFile.setString(1, file.getFileName());
-            createFile.setString(2, file.getExtension());
-            createFile.setBytes(3, new byte[0]);
-            createFile.executeUpdate();
-            try (ResultSet keys = createFile.getGeneratedKeys()) {
-               keys.next();
-               file.setId(keys.getLong(1));
+            try (PreparedStatement createFile =
+                         createFileColumnNames != null ?
+                                 connection.prepareStatement(this.createFile, this.createFileColumnNames) :
+                                 connection.prepareStatement(this.createFile, this.createFileAutogeneratedKeys)) {
+               createFile.setString(1, file.getFileName());
+               createFile.setString(2, file.getExtension());
+               createFile.setBytes(3, new byte[0]);
+               createFile.executeUpdate();
+               try (ResultSet keys = createFile.getGeneratedKeys()) {
+                  keys.next();
+                  file.setId(keys.getLong(1));
+               }
+               connection.commit();
             }
-            connection.commit();
          } catch (SQLException e) {
             connection.rollback();
             throw e;
@@ -212,9 +213,9 @@ public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver {
     * @throws SQLException
     */
    public void renameFile(JDBCSequentialFile file, String newFileName) throws SQLException {
-      synchronized (connection) {
-         try {
-            connection.setAutoCommit(false);
+      try (Connection connection = connectionProvider.getConnection()) {
+         connection.setAutoCommit(false);
+         try (PreparedStatement renameFile = connection.prepareStatement(this.renameFile)) {
             renameFile.setString(1, newFileName);
             renameFile.setLong(2, file.getId());
             renameFile.executeUpdate();
@@ -233,8 +234,8 @@ public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver {
     * @throws SQLException
     */
    public void deleteFile(JDBCSequentialFile file) throws SQLException {
-      synchronized (connection) {
-         try {
+      try (Connection connection = connectionProvider.getConnection()) {
+         try (PreparedStatement deleteFile = connection.prepareStatement(this.deleteFile)) {
             connection.setAutoCommit(false);
             deleteFile.setLong(1, file.getId());
             deleteFile.executeUpdate();
@@ -259,31 +260,36 @@ public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver {
     * @throws SQLException
     */
    public int writeToFile(JDBCSequentialFile file, byte[] data, boolean append) throws SQLException {
-      synchronized (connection) {
+      try (Connection connection = connectionProvider.getConnection()) {
          connection.setAutoCommit(false);
-         appendToLargeObject.setLong(1, file.getId());
-
-         int bytesWritten = 0;
-         try (ResultSet rs = appendToLargeObject.executeQuery()) {
-            if (rs.next()) {
-               Blob blob = rs.getBlob(1);
-               if (blob == null) {
-                  blob = connection.createBlob();
-               }
-               if (append) {
-                  bytesWritten = blob.setBytes(blob.length() + 1, data);
-               } else {
-                  blob.truncate(0);
-                  bytesWritten = blob.setBytes(1, data);
+         try (PreparedStatement appendToLargeObject =
+                      this.appendToLargeObjectResultSetType != null && this.appendToLargeObjectResultSetConcurrency != null ?
+                              connection.prepareStatement(this.appendToLargeObject, this.appendToLargeObjectResultSetType, this.appendToLargeObjectResultSetConcurrency) :
+                              connection.prepareStatement(this.appendToLargeObject)) {
+            appendToLargeObject.setLong(1, file.getId());
+
+            int bytesWritten = 0;
+            try (ResultSet rs = appendToLargeObject.executeQuery()) {
+               if (rs.next()) {
+                  Blob blob = rs.getBlob(1);
+                  if (blob == null) {
+                     blob = connection.createBlob();
+                  }
+                  if (append) {
+                     bytesWritten = blob.setBytes(blob.length() + 1, data);
+                  } else {
+                     blob.truncate(0);
+                     bytesWritten = blob.setBytes(1, data);
+                  }
+                  rs.updateBlob(1, blob);
+                  rs.updateRow();
                }
-               rs.updateBlob(1, blob);
-               rs.updateRow();
+               connection.commit();
+               return bytesWritten;
+            } catch (SQLException e) {
+               connection.rollback();
+               throw e;
             }
-            connection.commit();
-            return bytesWritten;
-         } catch (SQLException e) {
-            connection.rollback();
-            throw e;
          }
       }
    }
@@ -297,35 +303,37 @@ public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver {
     * @throws SQLException
     */
    public int readFromFile(JDBCSequentialFile file, ByteBuffer bytes) throws SQLException {
-      synchronized (connection) {
+      try (Connection connection = connectionProvider.getConnection()) {
          connection.setAutoCommit(false);
-         readLargeObject.setLong(1, file.getId());
-         int readLength = 0;
-         try (ResultSet rs = readLargeObject.executeQuery()) {
-            if (rs.next()) {
-               final Blob blob = rs.getBlob(1);
-               if (blob != null) {
-                  final long blobLength = blob.length();
-                  final int bytesRemaining = bytes.remaining();
-                  final long filePosition = file.position();
-                  readLength = (int) calculateReadLength(blobLength, bytesRemaining, filePosition);
-                  if (logger.isDebugEnabled()) {
-                     logger.debugf("trying read %d bytes: blobLength = %d bytesRemaining = %d filePosition = %d",
-                                   readLength, blobLength, bytesRemaining, filePosition);
-                  }
-                  if (readLength < 0) {
-                     readLength = -1;
-                  } else if (readLength > 0) {
-                     byte[] data = blob.getBytes(file.position() + 1, readLength);
-                     bytes.put(data);
+         try (PreparedStatement readLargeObject = connection.prepareStatement(this.readLargeObject)) {
+            readLargeObject.setLong(1, file.getId());
+            int readLength = 0;
+            try (ResultSet rs = readLargeObject.executeQuery()) {
+               if (rs.next()) {
+                  final Blob blob = rs.getBlob(1);
+                  if (blob != null) {
+                     final long blobLength = blob.length();
+                     final int bytesRemaining = bytes.remaining();
+                     final long filePosition = file.position();
+                     readLength = (int) calculateReadLength(blobLength, bytesRemaining, filePosition);
+                     if (logger.isDebugEnabled()) {
+                        logger.debugf("trying read %d bytes: blobLength = %d bytesRemaining = %d filePosition = %d",
+                                readLength, blobLength, bytesRemaining, filePosition);
+                     }
+                     if (readLength < 0) {
+                        readLength = -1;
+                     } else if (readLength > 0) {
+                        byte[] data = blob.getBytes(file.position() + 1, readLength);
+                        bytes.put(data);
+                     }
                   }
                }
+               connection.commit();
+               return readLength;
+            } catch (SQLException e) {
+               connection.rollback();
+               throw e;
             }
-            connection.commit();
-            return readLength;
-         } catch (SQLException e) {
-            connection.rollback();
-            throw e;
          }
       }
    }
@@ -338,9 +346,9 @@ public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver {
     * @throws SQLException
     */
    public void copyFileData(JDBCSequentialFile fileFrom, JDBCSequentialFile fileTo) throws SQLException {
-      synchronized (connection) {
-         try {
-            connection.setAutoCommit(false);
+      try (Connection connection = connectionProvider.getConnection()) {
+         connection.setAutoCommit(false);
+         try (PreparedStatement copyFileRecord = connection.prepareStatement(this.copyFileRecord)) {
             copyFileRecord.setLong(1, fileFrom.getId());
             copyFileRecord.setLong(2, fileTo.getId());
             copyFileRecord.executeUpdate();
@@ -357,7 +365,7 @@ public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver {
     */
    @Override
    public void destroy() throws SQLException {
-      synchronized (connection) {
+      try (Connection connection = connectionProvider.getConnection()) {
          try {
             connection.setAutoCommit(false);
             try (Statement statement = connection.createStatement()) {
diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/PostgresLargeObjectManager.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/PostgresLargeObjectManager.java
index 07212c4..65d2d1e 100644
--- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/PostgresLargeObjectManager.java
+++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/PostgresLargeObjectManager.java
@@ -19,6 +19,7 @@ import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.sql.Connection;
 import java.sql.SQLException;
+
 import org.postgresql.PGConnection;
 import org.postgresql.largeobject.LargeObject;
 
@@ -43,11 +44,10 @@ public class PostgresLargeObjectManager {
     */
    public static final int READWRITE = READ | WRITE;
 
-   private final Connection realConnection;
    private boolean shouldUseReflection;
 
-   public PostgresLargeObjectManager(Connection connection) throws SQLException {
-      this.realConnection = unwrap(connection);
+
+   public PostgresLargeObjectManager() {
       try {
          this.getClass().getClassLoader().loadClass("org.postgresql.PGConnection");
          shouldUseReflection = false;
@@ -56,9 +56,9 @@ public class PostgresLargeObjectManager {
       }
    }
 
-   public final Long createLO() throws SQLException {
+   public final Long createLO(Connection connection) throws SQLException {
       if (shouldUseReflection) {
-         Object largeObjectManager = getLargeObjectManager();
+         Object largeObjectManager = getLargeObjectManager(connection);
          try {
             Method method = largeObjectManager.getClass().getMethod("createLO");
             return (Long) method.invoke(largeObjectManager);
@@ -66,13 +66,13 @@ public class PostgresLargeObjectManager {
             throw new SQLException("Couldn't access org.postgresql.largeobject.LargeObjectManager", ex);
          }
       } else {
-         return ((PGConnection) realConnection).getLargeObjectAPI().createLO();
+         return ((PGConnection) unwrap(connection)).getLargeObjectAPI().createLO();
       }
    }
 
-   public Object open(long oid, int mode) throws SQLException {
+   public Object open(Connection connection, long oid, int mode) throws SQLException {
       if (shouldUseReflection) {
-         Object largeObjectManager = getLargeObjectManager();
+         Object largeObjectManager = getLargeObjectManager(connection);
          try {
             Method method = largeObjectManager.getClass().getMethod("open", long.class, int.class);
             return method.invoke(largeObjectManager, oid, mode);
@@ -80,7 +80,7 @@ public class PostgresLargeObjectManager {
             throw new SQLException("Couldn't access org.postgresql.largeobject.LargeObjectManager", ex);
          }
       } else {
-         return ((PGConnection) realConnection).getLargeObjectAPI().open(oid, mode);
+         return ((PGConnection) unwrap(connection)).getLargeObjectAPI().open(oid, mode);
       }
    }
 
@@ -162,22 +162,22 @@ public class PostgresLargeObjectManager {
       }
    }
 
-   private Object getLargeObjectManager() throws SQLException {
+   private Object getLargeObjectManager(Connection connection) throws SQLException {
       if (shouldUseReflection) {
          try {
-            Method method = realConnection.getClass().getMethod("getLargeObjectAPI");
-            return method.invoke(realConnection);
+            Connection conn = unwrap(connection);
+            Method method = conn.getClass().getMethod("getLargeObjectAPI");
+            return method.invoke(conn);
          } catch (NoSuchMethodException | SecurityException | IllegalAccessException | IllegalArgumentException | InvocationTargetException ex) {
             throw new SQLException("Couldn't access org.postgresql.largeobject.LargeObjectManager", ex);
          }
       } else {
-         return ((PGConnection) realConnection).getLargeObjectAPI();
+         return ((PGConnection) unwrap(connection)).getLargeObjectAPI();
       }
    }
 
    public final Connection unwrap(Connection connection) throws SQLException {
-      Connection conn = connection.unwrap(Connection.class);
-      return unwrapIronJacamar(unwrapDbcp(unwrapSpring(conn)));
+      return unwrapIronJacamar(unwrapDbcp(unwrapDbcp2(unwrapSpring(connection.unwrap(Connection.class)))));
    }
 
    private Connection unwrapIronJacamar(Connection conn) {
@@ -198,6 +198,15 @@ public class PostgresLargeObjectManager {
       }
    }
 
+   private Connection unwrapDbcp2(Connection conn) {
+      try {
+         Method method = conn.getClass().getMethod("getInnermostDelegateInternal");
+         return (Connection) method.invoke(conn);
+      } catch (NoSuchMethodException | SecurityException | IllegalAccessException | IllegalArgumentException | InvocationTargetException ex) {
+         return conn;
+      }
+   }
+
    private Connection unwrapSpring(Connection conn) {
       try {
          Method method = conn.getClass().getMethod("getTargetConnection");
diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/PostgresSequentialSequentialFileDriver.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/PostgresSequentialSequentialFileDriver.java
index 7d439da..7bfadcd 100644
--- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/PostgresSequentialSequentialFileDriver.java
+++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/PostgresSequentialSequentialFileDriver.java
@@ -18,14 +18,14 @@ package org.apache.activemq.artemis.jdbc.store.file;
 
 import java.nio.ByteBuffer;
 import java.sql.Connection;
+import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
 
+import org.apache.activemq.artemis.jdbc.store.drivers.JDBCConnectionProvider;
 import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
 
-import javax.sql.DataSource;
-
 @SuppressWarnings("SynchronizeOnNonFinalField")
 public final class PostgresSequentialSequentialFileDriver extends JDBCSequentialFileFactoryDriver {
 
@@ -36,37 +36,32 @@ public final class PostgresSequentialSequentialFileDriver extends JDBCSequential
       super();
    }
 
-   public PostgresSequentialSequentialFileDriver(DataSource dataSource, SQLProvider provider) {
-      super();
-      this.setDataSource(dataSource);
-      this.setSqlProvider(provider);
-   }
-
-   public PostgresSequentialSequentialFileDriver(Connection connection, SQLProvider provider) {
+   public PostgresSequentialSequentialFileDriver(JDBCConnectionProvider connectionProvider, SQLProvider provider) {
       super();
-      this.setConnection(connection);
+      this.setJdbcConnectionProvider(connectionProvider);
       this.setSqlProvider(provider);
    }
 
    @Override
-   protected void prepareStatements() throws SQLException {
-      this.largeObjectManager = new PostgresLargeObjectManager(connection);
-      this.deleteFile = connection.prepareStatement(sqlProvider.getDeleteFileSQL());
-      this.createFile = connection.prepareStatement(sqlProvider.getInsertFileSQL(), Statement.RETURN_GENERATED_KEYS);
-      this.selectFileByFileName = connection.prepareStatement(sqlProvider.getSelectFileByFileName());
-      this.copyFileRecord = connection.prepareStatement(sqlProvider.getCopyFileRecordByIdSQL());
-      this.renameFile = connection.prepareStatement(sqlProvider.getUpdateFileNameByIdSQL());
-      this.readLargeObject = connection.prepareStatement(sqlProvider.getReadLargeObjectSQL());
-      this.appendToLargeObject = connection.prepareStatement(sqlProvider.getAppendToLargeObjectSQL());
-      this.selectFileNamesByExtension = connection.prepareStatement(sqlProvider.getSelectFileNamesByExtensionSQL());
+   protected void prepareStatements() {
+      this.largeObjectManager = new PostgresLargeObjectManager();
+      this.deleteFile = sqlProvider.getDeleteFileSQL();
+      this.createFile = sqlProvider.getInsertFileSQL();
+      this.createFileAutogeneratedKeys = Statement.RETURN_GENERATED_KEYS;
+      this.selectFileByFileName = sqlProvider.getSelectFileByFileName();
+      this.copyFileRecord = sqlProvider.getCopyFileRecordByIdSQL();
+      this.renameFile = sqlProvider.getUpdateFileNameByIdSQL();
+      this.readLargeObject = sqlProvider.getReadLargeObjectSQL();
+      this.appendToLargeObject = sqlProvider.getAppendToLargeObjectSQL();
+      this.selectFileNamesByExtension = sqlProvider.getSelectFileNamesByExtensionSQL();
    }
 
    @Override
    public void createFile(JDBCSequentialFile file) throws SQLException {
-      synchronized (connection) {
-         try {
+      try (Connection connection = connectionProvider.getConnection()) {
+         try (PreparedStatement createFile = connection.prepareStatement(this.createFile, this.createFileAutogeneratedKeys)) {
             connection.setAutoCommit(false);
-            Long oid = largeObjectManager.createLO();
+            Long oid = largeObjectManager.createLO(connection);
 
             createFile.setString(1, file.getFileName());
             createFile.setString(2, file.getExtension());
@@ -87,31 +82,31 @@ public final class PostgresSequentialSequentialFileDriver extends JDBCSequential
 
    @Override
    public void loadFile(JDBCSequentialFile file) throws SQLException {
-      synchronized (connection) {
-         connection.setAutoCommit(false);
-         readLargeObject.setLong(1, file.getId());
+      try (Connection connection = connectionProvider.getConnection()) {
+         try (PreparedStatement readLargeObject = connection.prepareStatement(this.readLargeObject)) {
+            connection.setAutoCommit(false);
+            readLargeObject.setLong(1, file.getId());
 
-         try (ResultSet rs = readLargeObject.executeQuery()) {
-            if (rs.next()) {
-               file.setWritePosition(getPostGresLargeObjectSize(file));
+            try (ResultSet rs = readLargeObject.executeQuery()) {
+               if (rs.next()) {
+                  file.setWritePosition(getPostGresLargeObjectSize(file));
+               }
+               connection.commit();
+            } catch (SQLException e) {
+               connection.rollback();
+               throw e;
             }
-            connection.commit();
-         } catch (SQLException e) {
-            connection.rollback();
-            throw e;
          }
       }
    }
 
    @Override
    public int writeToFile(JDBCSequentialFile file, byte[] data, boolean append) throws SQLException {
-      synchronized (connection) {
-         Object largeObject = null;
-
+      try (Connection connection = connectionProvider.getConnection()) {
          Long oid = getOID(file);
          try {
             connection.setAutoCommit(false);
-            largeObject = largeObjectManager.open(oid, PostgresLargeObjectManager.WRITE);
+            Object largeObject = largeObjectManager.open(connection, oid, PostgresLargeObjectManager.WRITE);
             if (append) {
                largeObjectManager.seek(largeObject, largeObjectManager.size(largeObject));
             } else {
@@ -130,12 +125,11 @@ public final class PostgresSequentialSequentialFileDriver extends JDBCSequential
 
    @Override
    public int readFromFile(JDBCSequentialFile file, ByteBuffer bytes) throws SQLException {
-      Object largeObject = null;
       long oid = getOID(file);
-      synchronized (connection) {
+      try (Connection connection = connectionProvider.getConnection()) {
          try {
             connection.setAutoCommit(false);
-            largeObject = largeObjectManager.open(oid, PostgresLargeObjectManager.READ);
+            Object largeObject = largeObjectManager.open(connection, oid, PostgresLargeObjectManager.READ);
             int readLength = (int) calculateReadLength(largeObjectManager.size(largeObject), bytes.remaining(), file.position());
 
             if (readLength > 0) {
@@ -160,17 +154,19 @@ public final class PostgresSequentialSequentialFileDriver extends JDBCSequential
    private Long getOID(JDBCSequentialFile file) throws SQLException {
       Long oid = (Long) file.getMetaData(POSTGRES_OID_KEY);
       if (oid == null) {
-         synchronized (connection) {
-            connection.setAutoCommit(false);
-            readLargeObject.setLong(1, file.getId());
-            try (ResultSet rs = readLargeObject.executeQuery()) {
-               if (rs.next()) {
-                  file.addMetaData(POSTGRES_OID_KEY, rs.getLong(1));
+         try (Connection connection = connectionProvider.getConnection()) {
+            try (PreparedStatement readLargeObject = connection.prepareStatement(this.readLargeObject)) {
+               connection.setAutoCommit(false);
+               readLargeObject.setLong(1, file.getId());
+               try (ResultSet rs = readLargeObject.executeQuery()) {
+                  if (rs.next()) {
+                     file.addMetaData(POSTGRES_OID_KEY, rs.getLong(1));
+                  }
+                  connection.commit();
+               } catch (SQLException e) {
+                  connection.rollback();
+                  throw e;
                }
-               connection.commit();
-            } catch (SQLException e) {
-               connection.rollback();
-               throw e;
             }
          }
       }
@@ -184,10 +180,10 @@ public final class PostgresSequentialSequentialFileDriver extends JDBCSequential
       int size = 0;
       Long oid = getOID(file);
       if (oid != null) {
-         synchronized (connection) {
+         try (Connection connection = connectionProvider.getConnection()) {
             try {
                connection.setAutoCommit(false);
-               Object largeObject = largeObjectManager.open(oid, PostgresLargeObjectManager.READ);
+               Object largeObject = largeObjectManager.open(connection, oid, PostgresLargeObjectManager.READ);
                size = largeObjectManager.size(largeObject);
                largeObjectManager.close(largeObject);
                connection.commit();
diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java
index f87d7a7..a15377e 100644
--- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java
+++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java
@@ -17,7 +17,7 @@
 
 package org.apache.activemq.artemis.jdbc.store.journal;
 
-import javax.sql.DataSource;
+import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
@@ -50,6 +50,7 @@ import org.apache.activemq.artemis.core.journal.impl.JournalFile;
 import org.apache.activemq.artemis.core.journal.impl.SimpleWaitIOCallback;
 import org.apache.activemq.artemis.core.server.ActiveMQScheduledComponent;
 import org.apache.activemq.artemis.jdbc.store.drivers.AbstractJDBCDriver;
+import org.apache.activemq.artemis.jdbc.store.drivers.JDBCConnectionProvider;
 import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
 import org.apache.activemq.artemis.utils.collections.SparseArrayLinkedList;
 import org.jboss.logging.Logger;
@@ -67,15 +68,15 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
 
    private final List<JDBCJournalRecord> records;
 
-   private PreparedStatement insertJournalRecords;
+   private String insertJournalRecords;
 
-   private PreparedStatement selectJournalRecords;
+   private String selectJournalRecords;
 
-   private PreparedStatement countJournalRecords;
+   private String countJournalRecords;
 
-   private PreparedStatement deleteJournalRecords;
+   private String deleteJournalRecords;
 
-   private PreparedStatement deleteJournalTxRecords;
+   private String deleteJournalTxRecords;
 
    private boolean started;
 
@@ -95,30 +96,13 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
 
    private final IOCriticalErrorListener criticalIOErrorListener;
 
-   public JDBCJournalImpl(DataSource dataSource,
+   public JDBCJournalImpl(JDBCConnectionProvider connectionProvider,
                           SQLProvider provider,
                           ScheduledExecutorService scheduledExecutorService,
                           Executor completeExecutor,
                           IOCriticalErrorListener criticalIOErrorListener,
                           long syncDelay) {
-      super(dataSource, provider);
-      records = new ArrayList<>();
-      this.scheduledExecutorService = scheduledExecutorService;
-      this.completeExecutor = completeExecutor;
-      this.criticalIOErrorListener = criticalIOErrorListener;
-      this.syncDelay = syncDelay;
-   }
-
-   public JDBCJournalImpl(String jdbcUrl,
-                          String user,
-                          String password,
-                          String jdbcDriverClass,
-                          SQLProvider sqlProvider,
-                          ScheduledExecutorService scheduledExecutorService,
-                          Executor completeExecutor,
-                          IOCriticalErrorListener criticalIOErrorListener,
-                          long syncDelay) {
-      super(sqlProvider, jdbcUrl, user, password, jdbcDriverClass);
+      super(connectionProvider, provider);
       records = new ArrayList<>();
       this.scheduledExecutorService = scheduledExecutorService;
       this.completeExecutor = completeExecutor;
@@ -153,13 +137,13 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
    }
 
    @Override
-   protected void prepareStatements() throws SQLException {
+   protected void prepareStatements() {
       logger.tracef("preparing statements");
-      insertJournalRecords = connection.prepareStatement(sqlProvider.getInsertJournalRecordsSQL());
-      selectJournalRecords = connection.prepareStatement(sqlProvider.getSelectJournalRecordsSQL());
-      countJournalRecords = connection.prepareStatement(sqlProvider.getCountJournalRecordsSQL());
-      deleteJournalRecords = connection.prepareStatement(sqlProvider.getDeleteJournalRecordsSQL());
-      deleteJournalTxRecords = connection.prepareStatement(sqlProvider.getDeleteJournalTxRecordsSQL());
+      insertJournalRecords = sqlProvider.getInsertJournalRecordsSQL();
+      selectJournalRecords = sqlProvider.getSelectJournalRecordsSQL();
+      countJournalRecords = sqlProvider.getCountJournalRecordsSQL();
+      deleteJournalRecords = sqlProvider.getDeleteJournalRecordsSQL();
+      deleteJournalTxRecords = sqlProvider.getDeleteJournalTxRecordsSQL();
    }
 
    @Override
@@ -205,65 +189,70 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
 
       TransactionHolder holder;
 
-      try {
-         connection.setAutoCommit(false);
+      try (Connection connection = connectionProvider.getConnection()) {
 
-         for (JDBCJournalRecord record : recordRef) {
+         try (PreparedStatement deleteJournalRecords = connection.prepareStatement(this.deleteJournalRecords);
+              PreparedStatement deleteJournalTxRecords = connection.prepareStatement(this.deleteJournalTxRecords);
+              PreparedStatement insertJournalRecords = connection.prepareStatement(this.insertJournalRecords)) {
 
-            if (logger.isTraceEnabled()) {
-               logger.trace("sync::preparing JDBC statement for " + record);
-            }
+            connection.setAutoCommit(false);
+
+            for (JDBCJournalRecord record : recordRef) {
 
+               if (logger.isTraceEnabled()) {
+                  logger.trace("sync::preparing JDBC statement for " + record);
+               }
 
 
-            switch (record.getRecordType()) {
-               case JDBCJournalRecord.DELETE_RECORD:
-                  // Standard SQL Delete Record, Non transactional delete
-                  deletedRecords.add(record.getId());
-                  record.writeDeleteRecord(deleteJournalRecords);
-                  break;
-               case JDBCJournalRecord.ROLLBACK_RECORD:
-                  // Roll back we remove all records associated with this TX ID.  This query is always performed last.
-                  deleteJournalTxRecords.setLong(1, record.getTxId());
-                  deleteJournalTxRecords.addBatch();
-                  break;
-               case JDBCJournalRecord.COMMIT_RECORD:
-                  // We perform all the deletes and add the commit record in the same Database TX
-                  holder = transactions.get(record.getTxId());
-                  for (RecordInfo info : holder.recordsToDelete) {
+               switch (record.getRecordType()) {
+                  case JDBCJournalRecord.DELETE_RECORD:
+                     // Standard SQL Delete Record, Non transactional delete
                      deletedRecords.add(record.getId());
-                     deletedRecords.add(info.id);
-                     deleteJournalRecords.setLong(1, info.id);
-                     deleteJournalRecords.addBatch();
-                  }
-                  record.writeRecord(insertJournalRecords);
-                  committedTransactions.add(record.getTxId());
-                  break;
-               default:
-                  // Default we add a new record to the DB
-                  record.writeRecord(insertJournalRecords);
-                  break;
+                     record.writeDeleteRecord(deleteJournalRecords);
+                     break;
+                  case JDBCJournalRecord.ROLLBACK_RECORD:
+                     // Roll back we remove all records associated with this TX ID.  This query is always performed last.
+                     deleteJournalTxRecords.setLong(1, record.getTxId());
+                     deleteJournalTxRecords.addBatch();
+                     break;
+                  case JDBCJournalRecord.COMMIT_RECORD:
+                     // We perform all the deletes and add the commit record in the same Database TX
+                     holder = transactions.get(record.getTxId());
+                     for (RecordInfo info : holder.recordsToDelete) {
+                        deletedRecords.add(record.getId());
+                        deletedRecords.add(info.id);
+                        deleteJournalRecords.setLong(1, info.id);
+                        deleteJournalRecords.addBatch();
+                     }
+                     record.writeRecord(insertJournalRecords);
+                     committedTransactions.add(record.getTxId());
+                     break;
+                  default:
+                     // Default we add a new record to the DB
+                     record.writeRecord(insertJournalRecords);
+                     break;
+               }
             }
-         }
-
-         insertJournalRecords.executeBatch();
-         deleteJournalRecords.executeBatch();
-         deleteJournalTxRecords.executeBatch();
-
-         connection.commit();
-         if (logger.isTraceEnabled()) {
-            logger.trace("JDBC commit worked");
-         }
 
-         if (cleanupTxRecords(deletedRecords, committedTransactions)) {
+            insertJournalRecords.executeBatch();
+            deleteJournalRecords.executeBatch();
             deleteJournalTxRecords.executeBatch();
+
             connection.commit();
-            logger.trace("JDBC commit worked on cleanupTxRecords");
-         }
-         executeCallbacks(recordRef, true);
+            if (logger.isTraceEnabled()) {
+               logger.trace("JDBC commit worked");
+            }
+
+            if (cleanupTxRecords(deletedRecords, committedTransactions)) {
+               deleteJournalTxRecords.executeBatch();
+               connection.commit();
+               logger.trace("JDBC commit worked on cleanupTxRecords");
+            }
+            executeCallbacks(recordRef, true);
 
-         return recordRef.size();
+            return recordRef.size();
 
+         }
       } catch (Exception e) {
          handleException(recordRef, e);
          return 0;
@@ -280,18 +269,6 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
          logger.trace("Rolling back Transaction, just in case");
       }
 
-      try {
-         connection.rollback();
-      } catch (Throwable rollback) {
-         logger.warn(rollback);
-      }
-
-      try {
-         connection.close();
-      } catch (Throwable rollback) {
-         logger.warn(rollback);
-      }
-
       if (recordRef != null) {
          executeCallbacks(recordRef, false);
       }
@@ -308,23 +285,27 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
          transactions.get(txId).committed = true;
       }
       boolean hasDeletedJournalTxRecords = false;
-      // TODO (mtaylor) perhaps we could store a reverse mapping of IDs to prevent this O(n) loop
-      for (TransactionHolder h : iterableCopyTx) {
 
-         iterableCopy = new ArrayList<>();
-         iterableCopy.addAll(h.recordInfos);
+      try (Connection connection = connectionProvider.getConnection();
+           PreparedStatement deleteJournalTxRecords = connection.prepareStatement(this.deleteJournalTxRecords)) {
+         // TODO (mtaylor) perhaps we could store a reverse mapping of IDs to prevent this O(n) loop
+         for (TransactionHolder h : iterableCopyTx) {
 
-         for (RecordInfo info : iterableCopy) {
-            if (deletedRecords.contains(info.id)) {
-               h.recordInfos.remove(info);
+            iterableCopy = new ArrayList<>();
+            iterableCopy.addAll(h.recordInfos);
+
+            for (RecordInfo info : iterableCopy) {
+               if (deletedRecords.contains(info.id)) {
+                  h.recordInfos.remove(info);
+               }
             }
-         }
 
-         if (h.recordInfos.isEmpty() && h.committed) {
-            deleteJournalTxRecords.setLong(1, h.transactionID);
-            deleteJournalTxRecords.addBatch();
-            hasDeletedJournalTxRecords = true;
-            transactions.remove(h.transactionID);
+            if (h.recordInfos.isEmpty() && h.committed) {
+               deleteJournalTxRecords.setLong(1, h.transactionID);
+               deleteJournalTxRecords.addBatch();
+               hasDeletedJournalTxRecords = true;
+               transactions.remove(h.transactionID);
+            }
          }
       }
       return hasDeletedJournalTxRecords;
@@ -868,51 +849,54 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
       JDBCJournalReaderCallback jrc = new JDBCJournalReaderCallback(reloadManager);
       JDBCJournalRecord r;
 
-      try (ResultSet rs = selectJournalRecords.executeQuery()) {
-         int noRecords = 0;
-         while (rs.next()) {
-            r = JDBCJournalRecord.readRecord(rs);
-            switch (r.getRecordType()) {
-               case JDBCJournalRecord.ADD_RECORD:
-                  jrc.onReadAddRecord(r.toRecordInfo());
-                  break;
-               case JDBCJournalRecord.UPDATE_RECORD:
-                  jrc.onReadUpdateRecord(r.toRecordInfo());
-                  break;
-               case JDBCJournalRecord.DELETE_RECORD:
-                  jrc.onReadDeleteRecord(r.getId());
-                  break;
-               case JDBCJournalRecord.ADD_RECORD_TX:
-                  jrc.onReadAddRecordTX(r.getTxId(), r.toRecordInfo());
-                  break;
-               case JDBCJournalRecord.UPDATE_RECORD_TX:
-                  jrc.onReadUpdateRecordTX(r.getTxId(), r.toRecordInfo());
-                  break;
-               case JDBCJournalRecord.DELETE_RECORD_TX:
-                  jrc.onReadDeleteRecordTX(r.getTxId(), r.toRecordInfo());
-                  break;
-               case JDBCJournalRecord.PREPARE_RECORD:
-                  jrc.onReadPrepareRecord(r.getTxId(), r.getTxDataAsByteArray(), r.getTxCheckNoRecords());
-                  break;
-               case JDBCJournalRecord.COMMIT_RECORD:
-                  jrc.onReadCommitRecord(r.getTxId(), r.getTxCheckNoRecords());
-                  break;
-               case JDBCJournalRecord.ROLLBACK_RECORD:
-                  jrc.onReadRollbackRecord(r.getTxId());
-                  break;
-               default:
-                  throw new Exception("Error Reading Journal, Unknown Record Type: " + r.getRecordType());
-            }
-            noRecords++;
-            if (r.getSeq() > seq.longValue()) {
-               seq.set(r.getSeq());
+      try (Connection connection = connectionProvider.getConnection();
+           PreparedStatement selectJournalRecords = connection.prepareStatement(this.selectJournalRecords)) {
+         try (ResultSet rs = selectJournalRecords.executeQuery()) {
+            int noRecords = 0;
+            while (rs.next()) {
+               r = JDBCJournalRecord.readRecord(rs);
+               switch (r.getRecordType()) {
+                  case JDBCJournalRecord.ADD_RECORD:
+                     jrc.onReadAddRecord(r.toRecordInfo());
+                     break;
+                  case JDBCJournalRecord.UPDATE_RECORD:
+                     jrc.onReadUpdateRecord(r.toRecordInfo());
+                     break;
+                  case JDBCJournalRecord.DELETE_RECORD:
+                     jrc.onReadDeleteRecord(r.getId());
+                     break;
+                  case JDBCJournalRecord.ADD_RECORD_TX:
+                     jrc.onReadAddRecordTX(r.getTxId(), r.toRecordInfo());
+                     break;
+                  case JDBCJournalRecord.UPDATE_RECORD_TX:
+                     jrc.onReadUpdateRecordTX(r.getTxId(), r.toRecordInfo());
+                     break;
+                  case JDBCJournalRecord.DELETE_RECORD_TX:
+                     jrc.onReadDeleteRecordTX(r.getTxId(), r.toRecordInfo());
+                     break;
+                  case JDBCJournalRecord.PREPARE_RECORD:
+                     jrc.onReadPrepareRecord(r.getTxId(), r.getTxDataAsByteArray(), r.getTxCheckNoRecords());
+                     break;
+                  case JDBCJournalRecord.COMMIT_RECORD:
+                     jrc.onReadCommitRecord(r.getTxId(), r.getTxCheckNoRecords());
+                     break;
+                  case JDBCJournalRecord.ROLLBACK_RECORD:
+                     jrc.onReadRollbackRecord(r.getTxId());
+                     break;
+                  default:
+                     throw new Exception("Error Reading Journal, Unknown Record Type: " + r.getRecordType());
+               }
+               noRecords++;
+               if (r.getSeq() > seq.longValue()) {
+                  seq.set(r.getSeq());
+               }
             }
-         }
-         jrc.checkPreparedTx();
+            jrc.checkPreparedTx();
 
-         jli.setMaxID(((JDBCJournalLoaderCallback) reloadManager).getMaxId());
-         jli.setNumberOfRecords(noRecords);
-         transactions = jrc.getTransactions();
+            jli.setMaxID(((JDBCJournalLoaderCallback) reloadManager).getMaxId());
+            jli.setNumberOfRecords(noRecords);
+            transactions = jrc.getTransactions();
+         }
       } catch (Throwable e) {
          handleException(null, e);
       }
@@ -962,9 +946,12 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
    @Override
    public int getNumberOfRecords() {
       int count = 0;
-      try (ResultSet rs = countJournalRecords.executeQuery()) {
-         rs.next();
-         count = rs.getInt(1);
+      try (Connection connection = connectionProvider.getConnection();
+           PreparedStatement countJournalRecords = connection.prepareStatement(this.countJournalRecords)) {
+         try (ResultSet rs = countJournalRecords.executeQuery()) {
+            rs.next();
+            count = rs.getInt(1);
+         }
       } catch (SQLException e) {
          logger.warn(e.getMessage(), e);
          return -1;
diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/sql/PropertySQLProvider.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/sql/PropertySQLProvider.java
index b120ca2..d7399b6 100644
--- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/sql/PropertySQLProvider.java
+++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/sql/PropertySQLProvider.java
@@ -22,10 +22,12 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.sql.Connection;
 import java.sql.DatabaseMetaData;
+import java.util.Map;
 import java.util.Properties;
 import java.util.function.Function;
 import java.util.stream.Stream;
 
+import org.apache.activemq.artemis.jdbc.store.drivers.JDBCConnectionProvider;
 import org.apache.activemq.artemis.jdbc.store.journal.JDBCJournalImpl;
 import org.jboss.logging.Logger;
 
@@ -363,7 +365,15 @@ public class PropertySQLProvider implements SQLProvider {
       }
 
       public Factory(DataSource dataSource) {
-         this(investigateDialect(dataSource));
+         this(new JDBCConnectionProvider(dataSource));
+      }
+
+      public Factory(Map<String, Object> dataSourceProperties) {
+         this(investigateDialect(dataSourceProperties));
+      }
+
+      public Factory(JDBCConnectionProvider connectionProvider) {
+         this(investigateDialect(connectionProvider));
       }
 
       public static SQLDialect investigateDialect(Connection connection) {
@@ -388,8 +398,21 @@ public class PropertySQLProvider implements SQLProvider {
          return dialect;
       }
 
-      private static SQLDialect investigateDialect(DataSource dataSource) {
-         try (Connection connection = dataSource.getConnection()) {
+      public static SQLDialect investigateDialect(Map<String, Object> dataSourceProperties) {
+         SQLDialect dialect = null;
+         for (Object entry : dataSourceProperties.values()) {
+            if (entry instanceof String) {
+               dialect = identifyDialect((String) entry);
+               if (dialect != null) {
+                  return dialect;
+               }
+            }
+         }
+         return dialect;
+      }
+
+      private static SQLDialect investigateDialect(JDBCConnectionProvider connectionProvider) {
+         try (Connection connection = connectionProvider.getConnection()) {
             return investigateDialect(connection);
          } catch (Exception e) {
             logger.debug("Unable to read JDBC metadata.", e);
diff --git a/artemis-jdbc-store/src/test/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactoryTest.java b/artemis-jdbc-store/src/test/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactoryTest.java
index fef665c..40f848b 100644
--- a/artemis-jdbc-store/src/test/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactoryTest.java
+++ b/artemis-jdbc-store/src/test/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactoryTest.java
@@ -21,9 +21,11 @@ import java.sql.DriverManager;
 import java.sql.SQLException;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
@@ -31,11 +33,14 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
 import org.apache.activemq.artemis.core.io.IOCallback;
 import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
 import org.apache.activemq.artemis.core.io.SequentialFile;
+import org.apache.activemq.artemis.jdbc.store.drivers.JDBCConnectionProvider;
+import org.apache.activemq.artemis.jdbc.store.drivers.JDBCDataSourceUtils;
 import org.apache.activemq.artemis.jdbc.store.drivers.JDBCUtils;
 import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
 import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
@@ -79,15 +84,20 @@ public class JDBCSequentialFileFactoryTest {
    @Before
    public void setup() throws Exception {
       executor = Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory());
+      Map<String, Object> dataSourceProperties = new HashMap<>();
       if (useAuthentication) {
          user = "testuser";
          password = "testpassword";
          System.setProperty("derby.connection.requireAuthentication", "true");
          System.setProperty("derby.user." + user, password);
+         dataSourceProperties.put("username", user);
+         dataSourceProperties.put("password", password);
       }
-      String connectionUrl = "jdbc:derby:target/data;create=true";
+      dataSourceProperties.put("url", "jdbc:derby:target/data;create=true");
+      dataSourceProperties.put("driverClassName", className);
       String tableName = "FILES";
-      factory = new JDBCSequentialFileFactory(connectionUrl, user, password, className, JDBCUtils.getSQLProvider(className, tableName, SQLProvider.DatabaseStoreType.PAGE), executor, new IOCriticalErrorListener() {
+      String jdbcDatasourceClass = ActiveMQDefaultConfiguration.getDefaultDataSourceClassName();
+      factory = new JDBCSequentialFileFactory(new JDBCConnectionProvider(JDBCDataSourceUtils.getDataSource(jdbcDatasourceClass, dataSourceProperties)), JDBCUtils.getSQLProvider(dataSourceProperties, tableName, SQLProvider.DatabaseStoreType.PAGE), executor, new IOCriticalErrorListener() {
          @Override
          public void onIOException(Throwable code, String message, SequentialFile file) {
          }
diff --git a/artemis-jdbc-store/src/test/java/org/apache/activemq/artemis/jdbc/store/file/PostgresLargeObjectManagerTest.java b/artemis-jdbc-store/src/test/java/org/apache/activemq/artemis/jdbc/store/file/PostgresLargeObjectManagerTest.java
index 886ce10..e799e06 100644
--- a/artemis-jdbc-store/src/test/java/org/apache/activemq/artemis/jdbc/store/file/PostgresLargeObjectManagerTest.java
+++ b/artemis-jdbc-store/src/test/java/org/apache/activemq/artemis/jdbc/store/file/PostgresLargeObjectManagerTest.java
@@ -44,21 +44,21 @@ public class PostgresLargeObjectManagerTest {
 
    @Test
    public void testShouldNotUseReflection() throws SQLException {
-      PostgresLargeObjectManager manager = new PostgresLargeObjectManager(new MockConnection());
+      PostgresLargeObjectManager manager = new PostgresLargeObjectManager();
       try {
-         manager.createLO();
+         manager.createLO(new MockConnection());
          fail("Shouldn't be using reflection");
       } catch (ClassCastException ex) {
       }
    }
 
    @Test
-   public void testShouldUseReflection() throws SQLException, ClassNotFoundException, NoSuchMethodException, InstantiationException, IllegalAccessException, InvocationTargetException {
+   public void testShouldUseReflection() throws ClassNotFoundException, NoSuchMethodException, InstantiationException, IllegalAccessException, InvocationTargetException {
       ClassLoader loader = new FunkyClassLoader();
       Class funkyClass = loader.loadClass("org.apache.activemq.artemis.jdbc.store.file.PostgresLargeObjectManager");
-      Object manager = funkyClass.getConstructor(Connection.class).newInstance(new MockConnection());
+      Object manager = funkyClass.getConstructor().newInstance();
       try {
-         funkyClass.getMethod("createLO").invoke(manager);
+         funkyClass.getMethod("createLO", Connection.class).invoke(manager, new MockConnection());
          fail("Shouldn't be using reflection");
       } catch (java.lang.reflect.InvocationTargetException ex) {
          assertEquals("Couldn't access org.postgresql.largeobject.LargeObjectManager", ex.getCause().getMessage());
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/storage/DatabaseStorageConfiguration.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/storage/DatabaseStorageConfiguration.java
index 4762181..3a059f0 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/storage/DatabaseStorageConfiguration.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/storage/DatabaseStorageConfiguration.java
@@ -20,8 +20,13 @@ import javax.sql.DataSource;
 
 import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
 import org.apache.activemq.artemis.core.config.StoreConfiguration;
+import org.apache.activemq.artemis.jdbc.store.drivers.JDBCConnectionProvider;
+import org.apache.activemq.artemis.jdbc.store.drivers.JDBCDataSourceUtils;
 import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
 
+import java.util.HashMap;
+import java.util.Map;
+
 public class DatabaseStorageConfiguration implements StoreConfiguration {
 
    private String messageTableName = ActiveMQDefaultConfiguration.getDefaultMessageTableName();
@@ -44,6 +49,12 @@ public class DatabaseStorageConfiguration implements StoreConfiguration {
 
    private DataSource dataSource;
 
+   private String dataSourceClassName = ActiveMQDefaultConfiguration.getDefaultDataSourceClassName();
+
+   private Map<String, Object> dataSourceProperties = new HashMap();
+
+   private JDBCConnectionProvider connectionProvider;
+
    private SQLProvider.Factory sqlProviderFactory;
 
    private int jdbcNetworkTimeout = ActiveMQDefaultConfiguration.getDefaultJdbcNetworkTimeout();
@@ -138,7 +149,22 @@ public class DatabaseStorageConfiguration implements StoreConfiguration {
     *
     * @return the DataSource used to store Artemis data in the JDBC data store.
     */
-   public DataSource getDataSource() {
+   private DataSource getDataSource() {
+      if (dataSource == null) {
+         if (dataSourceProperties.isEmpty()) {
+            addDataSourceProperty("driverClassName", jdbcDriverClassName);
+            addDataSourceProperty("url", jdbcConnectionUrl);
+            if (jdbcUser != null) {
+               addDataSourceProperty("username", jdbcUser);
+            }
+            if (jdbcPassword != null) {
+               addDataSourceProperty("password", jdbcPassword);
+            }
+            // Let the pool to have unbounded number of connections by default to prevent connection starvation
+            addDataSourceProperty("maxTotal", "-1");
+         }
+         dataSource = JDBCDataSourceUtils.getDataSource(dataSourceClassName, dataSourceProperties);
+      }
       return dataSource;
    }
 
@@ -151,6 +177,33 @@ public class DatabaseStorageConfiguration implements StoreConfiguration {
       this.dataSource = dataSource;
    }
 
+   public JDBCConnectionProvider getConnectionProvider() {
+      if (connectionProvider == null) {
+         connectionProvider = new JDBCConnectionProvider(getDataSource());
+      }
+      return connectionProvider;
+   }
+   public void addDataSourceProperty(String key, String value) {
+      if (value.toLowerCase().equals("true") || value.toLowerCase().equals("false")) {
+         dataSourceProperties.put(key, Boolean.parseBoolean(value.toLowerCase()));
+      } else {
+         try {
+            int i = Integer.parseInt(value);
+            dataSourceProperties.put(key, i);
+         } catch (NumberFormatException nfe) {
+            dataSourceProperties.put(key, value);
+         }
+      }
+   }
+
+   public String getDataSourceClassName() {
+      return dataSourceClassName;
+   }
+
+   public void setDataSourceClassName(String dataSourceClassName) {
+      this.dataSourceClassName = dataSourceClassName;
+   }
+
    /**
     * The {@link SQLProvider.Factory} used to communicate with the JDBC data store.
     * It can be {@code null}. If the value is {@code null} and {@code dataSource} is set, the {@code {@link org.apache.activemq.artemis.jdbc.store.sql.PropertySQLProvider.Factory}} will be used,
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
index 6213d75..ad01a0e 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
@@ -1738,6 +1738,16 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
          password = PasswordMaskingUtil.resolveMask(mainConfig.isMaskPassword(), password, mainConfig.getPasswordCodec());
       }
       conf.setJdbcPassword(password);
+      conf.setDataSourceClassName(getString(storeNode, "data-source-class-name", conf.getDataSourceClassName(), Validators.NO_CHECK));
+      if (parameterExists(storeNode, "data-source-properties")) {
+         NodeList propertyNodeList = storeNode.getElementsByTagName("data-source-property");
+         for (int i = 0; i < propertyNodeList.getLength(); i++) {
+            Element propertyNode = (Element) propertyNodeList.item(i);
+            conf.addDataSourceProperty(propertyNode.getAttributeNode("key").getValue(), propertyNode.getAttributeNode("value").getValue());
+         }
+      }
+      //conf.initDataSource();
+
       return conf;
    }
 
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryDatabase.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryDatabase.java
index 2c6dbdd..4d164a5 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryDatabase.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryDatabase.java
@@ -40,10 +40,8 @@ import org.apache.activemq.artemis.core.persistence.StorageManager;
 import org.apache.activemq.artemis.core.server.files.FileStoreMonitor;
 import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
-import org.apache.activemq.artemis.jdbc.store.drivers.JDBCUtils;
 import org.apache.activemq.artemis.jdbc.store.file.JDBCSequentialFile;
 import org.apache.activemq.artemis.jdbc.store.file.JDBCSequentialFileFactory;
-import org.apache.activemq.artemis.jdbc.store.file.JDBCSequentialFileFactoryDriver;
 import org.apache.activemq.artemis.jdbc.store.sql.PropertySQLProvider;
 import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
 import org.apache.activemq.artemis.utils.ExecutorFactory;
@@ -72,16 +70,12 @@ public class PagingStoreFactoryDatabase implements PagingStoreFactory {
 
    protected final StorageManager storageManager;
 
-   private JDBCSequentialFileFactoryDriver dbDriver;
-
    private DatabaseStorageConfiguration dbConf;
 
    private ExecutorFactory executorFactory;
 
    private JDBCSequentialFileFactory pagingFactoryFileFactory;
 
-   private JDBCSequentialFile directoryList;
-
    private final boolean readWholePage;
 
    @Override
@@ -106,8 +100,8 @@ public class PagingStoreFactoryDatabase implements PagingStoreFactory {
                                      final ScheduledExecutorService scheduledExecutor,
                                      final ExecutorFactory executorFactory,
                                      final boolean syncNonTransactional,
-                                     final IOCriticalErrorListener critialErrorListener) throws Exception {
-      this(dbConf, storageManager, syncTimeout, scheduledExecutor, executorFactory, syncNonTransactional, critialErrorListener, false);
+                                     final IOCriticalErrorListener criticalErrorListener) throws Exception {
+      this(dbConf, storageManager, syncTimeout, scheduledExecutor, executorFactory, syncNonTransactional, criticalErrorListener, false);
    }
 
    public PagingStoreFactoryDatabase(final DatabaseStorageConfiguration dbConf,
@@ -116,7 +110,7 @@ public class PagingStoreFactoryDatabase implements PagingStoreFactory {
                                      final ScheduledExecutorService scheduledExecutor,
                                      final ExecutorFactory executorFactory,
                                      final boolean syncNonTransactional,
-                                     final IOCriticalErrorListener critialErrorListener,
+                                     final IOCriticalErrorListener criticalErrorListener,
                                      final boolean readWholePage) throws Exception {
       this.storageManager = storageManager;
       this.executorFactory = executorFactory;
@@ -124,7 +118,7 @@ public class PagingStoreFactoryDatabase implements PagingStoreFactory {
       this.scheduledExecutor = scheduledExecutor;
       this.syncTimeout = syncTimeout;
       this.dbConf = dbConf;
-      this.criticalErrorListener = critialErrorListener;
+      this.criticalErrorListener = criticalErrorListener;
       this.factoryToTableName = new HashMap<>();
       this.readWholePage = readWholePage;
       start();
@@ -137,20 +131,11 @@ public class PagingStoreFactoryDatabase implements PagingStoreFactory {
          if (pageStoreTableNamePrefix.length() > 10) {
             throw new IllegalStateException("The maximum name size for the page store table prefix is 10 characters: THE PAGING STORE CAN'T START");
          }
-         if (dbConf.getDataSource() != null) {
-            SQLProvider.Factory sqlProviderFactory = dbConf.getSqlProviderFactory();
-            if (sqlProviderFactory == null) {
-               sqlProviderFactory = new PropertySQLProvider.Factory(dbConf.getDataSource());
-            }
-            pagingFactoryFileFactory = new JDBCSequentialFileFactory(dbConf.getDataSource(), sqlProviderFactory.create(pageStoreTableNamePrefix, SQLProvider.DatabaseStoreType.PAGE), executorFactory.getExecutor(), criticalErrorListener);
-         } else {
-            String driverClassName = dbConf.getJdbcDriverClassName();
-            pagingFactoryFileFactory = new JDBCSequentialFileFactory(dbConf.getJdbcConnectionUrl(), dbConf.getJdbcUser(), dbConf.getJdbcPassword(), driverClassName, JDBCUtils.getSQLProvider(driverClassName, pageStoreTableNamePrefix, SQLProvider.DatabaseStoreType.PAGE), executorFactory.getExecutor(), criticalErrorListener);
-         }
-         final int jdbcNetworkTimeout = dbConf.getJdbcNetworkTimeout();
-         if (jdbcNetworkTimeout >= 0) {
-            pagingFactoryFileFactory.setNetworkTimeout(this.executorFactory.getExecutor(), jdbcNetworkTimeout);
+         SQLProvider.Factory sqlProviderFactory = dbConf.getSqlProviderFactory();
+         if (sqlProviderFactory == null) {
+            sqlProviderFactory = new PropertySQLProvider.Factory(dbConf.getConnectionProvider());
          }
+         pagingFactoryFileFactory = new JDBCSequentialFileFactory(dbConf.getConnectionProvider(), sqlProviderFactory.create(pageStoreTableNamePrefix, SQLProvider.DatabaseStoreType.PAGE), executorFactory.getExecutor(), criticalErrorListener);
          pagingFactoryFileFactory.start();
          started = true;
       }
@@ -278,22 +263,14 @@ public class PagingStoreFactoryDatabase implements PagingStoreFactory {
       directoryList.close();
 
       final SQLProvider sqlProvider;
-      if (dbConf.getDataSource() != null) {
-         final SQLProvider.Factory sqlProviderFactory;
-         if (dbConf.getSqlProviderFactory() != null) {
-            sqlProviderFactory = dbConf.getSqlProviderFactory();
-         } else {
-            sqlProviderFactory = new PropertySQLProvider.Factory(dbConf.getDataSource());
-         }
-         sqlProvider = sqlProviderFactory.create(getTableNameForGUID(directoryName), SQLProvider.DatabaseStoreType.PAGE);
+      final SQLProvider.Factory sqlProviderFactory;
+      if (dbConf.getSqlProviderFactory() != null) {
+         sqlProviderFactory = dbConf.getSqlProviderFactory();
       } else {
-         sqlProvider = JDBCUtils.getSQLProvider(dbConf.getJdbcDriverClassName(), getTableNameForGUID(directoryName), SQLProvider.DatabaseStoreType.PAGE);
-      }
-      final JDBCSequentialFileFactory fileFactory = new JDBCSequentialFileFactory(pagingFactoryFileFactory.getDbDriver().getConnection(), sqlProvider, executorFactory.getExecutor(), criticalErrorListener);
-      final int jdbcNetworkTimeout = dbConf.getJdbcNetworkTimeout();
-      if (jdbcNetworkTimeout >= 0) {
-         fileFactory.setNetworkTimeout(this.executorFactory.getExecutor(), jdbcNetworkTimeout);
+         sqlProviderFactory = new PropertySQLProvider.Factory(dbConf.getConnectionProvider());
       }
+      sqlProvider = sqlProviderFactory.create(getTableNameForGUID(directoryName), SQLProvider.DatabaseStoreType.PAGE);
+      final JDBCSequentialFileFactory fileFactory = new JDBCSequentialFileFactory(dbConf.getConnectionProvider(), sqlProvider, executorFactory.getExecutor(), criticalErrorListener);
       factoryToTableName.put(fileFactory, directoryName);
       return fileFactory;
    }
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JDBCJournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JDBCJournalStorageManager.java
index 629405b..fc9b3a2 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JDBCJournalStorageManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JDBCJournalStorageManager.java
@@ -17,7 +17,6 @@
 package org.apache.activemq.artemis.core.persistence.impl.journal;
 
 import java.nio.ByteBuffer;
-import java.sql.Connection;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -26,7 +25,7 @@ import org.apache.activemq.artemis.core.config.Configuration;
 import org.apache.activemq.artemis.core.config.storage.DatabaseStorageConfiguration;
 import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
 import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
-import org.apache.activemq.artemis.jdbc.store.drivers.JDBCUtils;
+import org.apache.activemq.artemis.jdbc.store.drivers.JDBCConnectionProvider;
 import org.apache.activemq.artemis.jdbc.store.file.JDBCSequentialFileFactory;
 import org.apache.activemq.artemis.jdbc.store.journal.JDBCJournalImpl;
 import org.apache.activemq.artemis.jdbc.store.sql.PropertySQLProvider;
@@ -36,8 +35,6 @@ import org.apache.activemq.artemis.utils.critical.CriticalAnalyzer;
 
 public class JDBCJournalStorageManager extends JournalStorageManager {
 
-   private Connection connection;
-
    public JDBCJournalStorageManager(Configuration config,
                                     CriticalAnalyzer analyzer,
                                     ExecutorFactory executorFactory,
@@ -59,33 +56,35 @@ public class JDBCJournalStorageManager extends JournalStorageManager {
    protected synchronized void init(Configuration config, IOCriticalErrorListener criticalErrorListener) {
       try {
          final DatabaseStorageConfiguration dbConf = (DatabaseStorageConfiguration) config.getStoreConfiguration();
-         final JDBCJournalImpl bindingsJournal;
-         final JDBCJournalImpl messageJournal;
-         final JDBCSequentialFileFactory largeMessagesFactory;
-         if (dbConf.getDataSource() != null) {
-            SQLProvider.Factory sqlProviderFactory = dbConf.getSqlProviderFactory();
-            if (sqlProviderFactory == null) {
-               sqlProviderFactory = new PropertySQLProvider.Factory(dbConf.getDataSource());
-            }
-            bindingsJournal = new JDBCJournalImpl(dbConf.getDataSource(), sqlProviderFactory.create(dbConf.getBindingsTableName(), SQLProvider.DatabaseStoreType.BINDINGS_JOURNAL), scheduledExecutorService, executorFactory.getExecutor(), criticalErrorListener,dbConf.getJdbcJournalSyncPeriodMillis());
-            messageJournal = new JDBCJournalImpl(dbConf.getDataSource(), sqlProviderFactory.create(dbConf.getMessageTableName(), SQLProvider.DatabaseStoreType.MESSAGE_JOURNAL), scheduledExecutorService, executorFactory.getExecutor(), criticalErrorListener, dbConf.getJdbcJournalSyncPeriodMillis());
-            largeMessagesFactory = new JDBCSequentialFileFactory(dbConf.getDataSource(), sqlProviderFactory.create(dbConf.getLargeMessageTableName(), SQLProvider.DatabaseStoreType.LARGE_MESSAGE), executorFactory.getExecutor(), criticalErrorListener);
-         } else {
-            String driverClassName = dbConf.getJdbcDriverClassName();
-            bindingsJournal = new JDBCJournalImpl(dbConf.getJdbcConnectionUrl(), dbConf.getJdbcUser(), dbConf.getJdbcPassword(), driverClassName, JDBCUtils.getSQLProvider(driverClassName, dbConf.getBindingsTableName(), SQLProvider.DatabaseStoreType.BINDINGS_JOURNAL), scheduledExecutorService, executorFactory.getExecutor(), criticalErrorListener, dbConf.getJdbcJournalSyncPeriodMillis());
-            messageJournal = new JDBCJournalImpl(dbConf.getJdbcConnectionUrl(), dbConf.getJdbcUser(), dbConf.getJdbcPassword(), driverClassName, JDBCUtils.getSQLProvider(driverClassName, dbConf.getMessageTableName(), SQLProvider.DatabaseStoreType.MESSAGE_JOURNAL), scheduledExecutorService, executorFactory.getExecutor(), criticalErrorListener, dbConf.getJdbcJournalSyncPeriodMillis());
-            largeMessagesFactory = new JDBCSequentialFileFactory(dbConf.getJdbcConnectionUrl(), dbConf.getJdbcUser(), dbConf.getJdbcPassword(), driverClassName, JDBCUtils.getSQLProvider(driverClassName, dbConf.getLargeMessageTableName(), SQLProvider.DatabaseStoreType.LARGE_MESSAGE), executorFactory.getExecutor(), criticalErrorListener);
-         }
+         final JDBCConnectionProvider connectionProvider = dbConf.getConnectionProvider();
          final int networkTimeout = dbConf.getJdbcNetworkTimeout();
          if (networkTimeout >= 0) {
-            bindingsJournal.setNetworkTimeout(executorFactory.getExecutor(), networkTimeout);
+            connectionProvider.setNetworkTimeout(executorFactory.getExecutor(), networkTimeout);
          }
-         if (networkTimeout >= 0) {
-            messageJournal.setNetworkTimeout(executorFactory.getExecutor(), networkTimeout);
-         }
-         if (networkTimeout >= 0) {
-            largeMessagesFactory.setNetworkTimeout(executorFactory.getExecutor(), networkTimeout);
+         final JDBCJournalImpl bindingsJournal;
+         final JDBCJournalImpl messageJournal;
+         final JDBCSequentialFileFactory largeMessagesFactory;
+         SQLProvider.Factory sqlProviderFactory = dbConf.getSqlProviderFactory();
+         if (sqlProviderFactory == null) {
+            sqlProviderFactory = new PropertySQLProvider.Factory(connectionProvider);
          }
+         bindingsJournal = new JDBCJournalImpl(
+                 connectionProvider,
+                 sqlProviderFactory.create(dbConf.getBindingsTableName(), SQLProvider.DatabaseStoreType.BINDINGS_JOURNAL),
+                 scheduledExecutorService,
+                 executorFactory.getExecutor(),
+                 criticalErrorListener,dbConf.getJdbcJournalSyncPeriodMillis());
+         messageJournal = new JDBCJournalImpl(
+                 connectionProvider,
+                 sqlProviderFactory.create(dbConf.getMessageTableName(), SQLProvider.DatabaseStoreType.MESSAGE_JOURNAL),
+                 scheduledExecutorService, executorFactory.getExecutor(),
+                 criticalErrorListener,
+                 dbConf.getJdbcJournalSyncPeriodMillis());
+         largeMessagesFactory = new JDBCSequentialFileFactory(
+                 connectionProvider,
+                 sqlProviderFactory.create(dbConf.getLargeMessageTableName(), SQLProvider.DatabaseStoreType.LARGE_MESSAGE),
+                 executorFactory.getExecutor(),
+                 criticalErrorListener);
          this.bindingsJournal = bindingsJournal;
          this.messageJournal = messageJournal;
          this.largeMessagesFactory = largeMessagesFactory;
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcLeaseLock.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcLeaseLock.java
index 11c1aab..3b7124e 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcLeaseLock.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcLeaseLock.java
@@ -26,6 +26,7 @@ import java.util.Objects;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Predicate;
 
+import org.apache.activemq.artemis.jdbc.store.drivers.JDBCConnectionProvider;
 import org.jboss.logging.Logger;
 
 /**
@@ -35,13 +36,13 @@ final class JdbcLeaseLock implements LeaseLock {
 
    private static final Logger LOGGER = Logger.getLogger(JdbcLeaseLock.class);
    private static final int MAX_HOLDER_ID_LENGTH = 128;
-   private final Connection connection;
+   private final JDBCConnectionProvider connectionProvider;
    private final String holderId;
-   private final PreparedStatement tryAcquireLock;
-   private final PreparedStatement tryReleaseLock;
-   private final PreparedStatement renewLock;
-   private final PreparedStatement isLocked;
-   private final PreparedStatement currentDateTime;
+   private final String tryAcquireLock;
+   private final String tryReleaseLock;
+   private final String renewLock;
+   private final String isLocked;
+   private final String currentDateTime;
    private final long expirationMillis;
    private boolean maybeAcquired;
    private final String lockName;
@@ -51,12 +52,12 @@ final class JdbcLeaseLock implements LeaseLock {
     * whose life cycle will be managed externally.
     */
    JdbcLeaseLock(String holderId,
-                 Connection connection,
-                 PreparedStatement tryAcquireLock,
-                 PreparedStatement tryReleaseLock,
-                 PreparedStatement renewLock,
-                 PreparedStatement isLocked,
-                 PreparedStatement currentDateTime,
+                 JDBCConnectionProvider connectionProvider,
+                 String tryAcquireLock,
+                 String tryReleaseLock,
+                 String renewLock,
+                 String isLocked,
+                 String currentDateTime,
                  long expirationMIllis,
                  String lockName) {
       if (holderId.length() > MAX_HOLDER_ID_LENGTH) {
@@ -70,7 +71,7 @@ final class JdbcLeaseLock implements LeaseLock {
       this.currentDateTime = currentDateTime;
       this.expirationMillis = expirationMIllis;
       this.maybeAcquired = false;
-      this.connection = connection;
+      this.connectionProvider = connectionProvider;
       this.lockName = lockName;
    }
 
@@ -84,13 +85,12 @@ final class JdbcLeaseLock implements LeaseLock {
    }
 
    private String readableLockStatus() {
-      try {
+      try (Connection connection = connectionProvider.getConnection()) {
          connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
          final boolean autoCommit = connection.getAutoCommit();
          connection.setAutoCommit(false);
-         try {
+         try (PreparedStatement preparedStatement = connection.prepareStatement(this.isLocked)) {
             final String lockStatus;
-            final PreparedStatement preparedStatement = this.isLocked;
             try (ResultSet resultSet = preparedStatement.executeQuery()) {
                if (!resultSet.next()) {
                   lockStatus = null;
@@ -114,100 +114,96 @@ final class JdbcLeaseLock implements LeaseLock {
       }
    }
 
-   private long dbCurrentTimeMillis() throws SQLException {
+   private long dbCurrentTimeMillis(Connection connection) throws SQLException {
       final long start = System.nanoTime();
-      try (ResultSet resultSet = currentDateTime.executeQuery()) {
-         resultSet.next();
-         final Timestamp currentTimestamp = resultSet.getTimestamp(1);
-         final long elapsedTime = System.nanoTime() - start;
-         if (LOGGER.isDebugEnabled()) {
-            LOGGER.debugf("[%s] %s query currentTimestamp = %s tooks %d ms",
-                          lockName, holderId, currentTimestamp, TimeUnit.NANOSECONDS.toMillis(elapsedTime));
+      try (PreparedStatement currentDateTime = connection.prepareStatement(this.currentDateTime)) {
+         try (ResultSet resultSet = currentDateTime.executeQuery()) {
+            resultSet.next();
+            final Timestamp currentTimestamp = resultSet.getTimestamp(1);
+            final long elapsedTime = System.nanoTime() - start;
+            if (LOGGER.isDebugEnabled()) {
+               LOGGER.debugf("[%s] %s query currentTimestamp = %s tooks %d ms",
+                       lockName, holderId, currentTimestamp, TimeUnit.NANOSECONDS.toMillis(elapsedTime));
+            }
+            return currentTimestamp.getTime();
          }
-         return currentTimestamp.getTime();
       }
    }
 
    @Override
    public boolean renew() {
-      synchronized (connection) {
-         try {
-            connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
-            final boolean autoCommit = connection.getAutoCommit();
-            connection.setAutoCommit(false);
-            try {
-               final PreparedStatement preparedStatement = this.renewLock;
-               final long now = dbCurrentTimeMillis();
-               final Timestamp expirationTime = new Timestamp(now + expirationMillis);
+      try (Connection connection = connectionProvider.getConnection()) {
+         connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
+         final boolean autoCommit = connection.getAutoCommit();
+         connection.setAutoCommit(false);
+         try (PreparedStatement preparedStatement = connection.prepareStatement(this.renewLock)) {
+            final long now = dbCurrentTimeMillis(connection);
+            final Timestamp expirationTime = new Timestamp(now + expirationMillis);
+            if (LOGGER.isDebugEnabled()) {
+               LOGGER.debugf("[%s] %s is renewing lock with expirationTime = %s",
+                             lockName, holderId, expirationTime);
+            }
+            preparedStatement.setTimestamp(1, expirationTime);
+            preparedStatement.setString(2, holderId);
+            preparedStatement.setTimestamp(3, expirationTime);
+            preparedStatement.setTimestamp(4, expirationTime);
+            final int updatedRows = preparedStatement.executeUpdate();
+            final boolean renewed = updatedRows == 1;
+            connection.commit();
+            if (!renewed) {
                if (LOGGER.isDebugEnabled()) {
-                  LOGGER.debugf("[%s] %s is renewing lock with expirationTime = %s",
-                                lockName, holderId, expirationTime);
-               }
-               preparedStatement.setTimestamp(1, expirationTime);
-               preparedStatement.setString(2, holderId);
-               preparedStatement.setTimestamp(3, expirationTime);
-               preparedStatement.setTimestamp(4, expirationTime);
-               final int updatedRows = preparedStatement.executeUpdate();
-               final boolean renewed = updatedRows == 1;
-               connection.commit();
-               if (!renewed) {
-                  if (LOGGER.isDebugEnabled()) {
-                     LOGGER.debugf("[%s] %s has failed to renew lock: lock status = { %s }",
-                                   lockName, holderId, readableLockStatus());
-                  }
-               } else {
-                  LOGGER.debugf("[%s] %s has renewed lock", lockName, holderId);
+                  LOGGER.debugf("[%s] %s has failed to renew lock: lock status = { %s }",
+                                lockName, holderId, readableLockStatus());
                }
-               return renewed;
-            } catch (SQLException ie) {
-               connection.rollback();
-               throw new IllegalStateException(ie);
-            } finally {
-               connection.setAutoCommit(autoCommit);
+            } else {
+               LOGGER.debugf("[%s] %s has renewed lock", lockName, holderId);
             }
-         } catch (SQLException e) {
-            throw new IllegalStateException(e);
+            return renewed;
+         } catch (SQLException ie) {
+            connection.rollback();
+            throw new IllegalStateException(ie);
+         } finally {
+            connection.setAutoCommit(autoCommit);
          }
+      } catch (SQLException e) {
+         throw new IllegalStateException(e);
       }
    }
 
    @Override
    public boolean tryAcquire() {
-      synchronized (connection) {
-         try {
-            connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
-            final boolean autoCommit = connection.getAutoCommit();
-            connection.setAutoCommit(false);
-            try {
-               final PreparedStatement preparedStatement = tryAcquireLock;
-               final long now = dbCurrentTimeMillis();
-               preparedStatement.setString(1, holderId);
-               final Timestamp expirationTime = new Timestamp(now + expirationMillis);
-               preparedStatement.setTimestamp(2, expirationTime);
-               preparedStatement.setTimestamp(3, expirationTime);
-               LOGGER.debugf("[%s] %s is trying to acquire lock with expirationTime %s",
-                             lockName, holderId, expirationTime);
-               final boolean acquired = preparedStatement.executeUpdate() == 1;
-               connection.commit();
-               if (acquired) {
-                  this.maybeAcquired = true;
-                  LOGGER.debugf("[%s] %s has acquired lock", lockName, holderId);
-               } else {
-                  if (LOGGER.isDebugEnabled()) {
-                     LOGGER.debugf("[%s] %s has failed to acquire lock: lock status = { %s }",
-                                   lockName, holderId, readableLockStatus());
-                  }
+      try (Connection connection = connectionProvider.getConnection()) {
+         connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
+         final boolean autoCommit = connection.getAutoCommit();
+         connection.setAutoCommit(false);
+         try (PreparedStatement preparedStatement = connection.prepareStatement(this.tryAcquireLock)) {
+            final long now = dbCurrentTimeMillis(connection);
+            preparedStatement.setString(1, holderId);
+            final Timestamp expirationTime = new Timestamp(now + expirationMillis);
+            preparedStatement.setTimestamp(2, expirationTime);
+            preparedStatement.setTimestamp(3, expirationTime);
+            LOGGER.debugf("[%s] %s is trying to acquire lock with expirationTime %s",
+                          lockName, holderId, expirationTime);
+            final boolean acquired = preparedStatement.executeUpdate() == 1;
+            connection.commit();
+            if (acquired) {
+               this.maybeAcquired = true;
+               LOGGER.debugf("[%s] %s has acquired lock", lockName, holderId);
+            } else {
+               if (LOGGER.isDebugEnabled()) {
+                  LOGGER.debugf("[%s] %s has failed to acquire lock: lock status = { %s }",
+                                lockName, holderId, readableLockStatus());
                }
-               return acquired;
-            } catch (SQLException ie) {
-               connection.rollback();
-               throw new IllegalStateException(ie);
-            } finally {
-               connection.setAutoCommit(autoCommit);
             }
-         } catch (SQLException e) {
-            throw new IllegalStateException(e);
+            return acquired;
+         } catch (SQLException ie) {
+            connection.rollback();
+            throw new IllegalStateException(ie);
+         } finally {
+            connection.setAutoCommit(autoCommit);
          }
+      } catch (SQLException e) {
+         throw new IllegalStateException(e);
       }
    }
 
@@ -222,104 +218,85 @@ final class JdbcLeaseLock implements LeaseLock {
    }
 
    private boolean checkValidHolderId(Predicate<? super String> holderIdFilter) {
-      synchronized (connection) {
-         try {
-            connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
-            final boolean autoCommit = connection.getAutoCommit();
-            connection.setAutoCommit(false);
-            try {
-               boolean result;
-               final PreparedStatement preparedStatement = this.isLocked;
-               try (ResultSet resultSet = preparedStatement.executeQuery()) {
-                  if (!resultSet.next()) {
-                     result = false;
-                  } else {
-                     final String currentHolderId = resultSet.getString(1);
-                     result = holderIdFilter.test(currentHolderId);
-                     final Timestamp expirationTime = resultSet.getTimestamp(2);
-                     final Timestamp currentTimestamp = resultSet.getTimestamp(3);
-                     final long currentTimestampMillis = currentTimestamp.getTime();
-                     boolean zombie = false;
-                     if (expirationTime != null) {
-                        final long lockExpirationTime = expirationTime.getTime();
-                        final long expiredBy = currentTimestampMillis - lockExpirationTime;
-                        if (expiredBy > 0) {
-                           result = false;
-                           zombie = true;
-                        }
-                     }
-                     if (LOGGER.isDebugEnabled()) {
-                        LOGGER.debugf("[%s] %s has found %s with holderId = %s expirationTime = %s currentTimestamp = %s",
-                                      lockName, holderId, zombie ? "zombie lock" : "lock",
-                                      currentHolderId, expirationTime, currentTimestamp);
+      try (Connection connection = connectionProvider.getConnection()) {
+         connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
+         final boolean autoCommit = connection.getAutoCommit();
+         connection.setAutoCommit(false);
+         try (PreparedStatement preparedStatement = connection.prepareStatement(this.isLocked)) {
+            boolean result;
+            try (ResultSet resultSet = preparedStatement.executeQuery()) {
+               if (!resultSet.next()) {
+                  result = false;
+               } else {
+                  final String currentHolderId = resultSet.getString(1);
+                  result = holderIdFilter.test(currentHolderId);
+                  final Timestamp expirationTime = resultSet.getTimestamp(2);
+                  final Timestamp currentTimestamp = resultSet.getTimestamp(3);
+                  final long currentTimestampMillis = currentTimestamp.getTime();
+                  boolean zombie = false;
+                  if (expirationTime != null) {
+                     final long lockExpirationTime = expirationTime.getTime();
+                     final long expiredBy = currentTimestampMillis - lockExpirationTime;
+                     if (expiredBy > 0) {
+                        result = false;
+                        zombie = true;
                      }
                   }
+                  if (LOGGER.isDebugEnabled()) {
+                     LOGGER.debugf("[%s] %s has found %s with holderId = %s expirationTime = %s currentTimestamp = %s",
+                                   lockName, holderId, zombie ? "zombie lock" : "lock",
+                                   currentHolderId, expirationTime, currentTimestamp);
+                  }
                }
-               connection.commit();
-               return result;
-            } catch (SQLException ie) {
-               connection.rollback();
-               throw new IllegalStateException(ie);
-            } finally {
-               connection.setAutoCommit(autoCommit);
             }
-         } catch (SQLException e) {
-            throw new IllegalStateException(e);
+            connection.commit();
+            return result;
+         } catch (SQLException ie) {
+            connection.rollback();
+            throw new IllegalStateException(ie);
+         } finally {
+            connection.setAutoCommit(autoCommit);
          }
+      } catch (SQLException e) {
+         throw new IllegalStateException(e);
       }
    }
 
    @Override
    public void release() {
-      synchronized (connection) {
-         try {
-            connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
-            final boolean autoCommit = connection.getAutoCommit();
-            connection.setAutoCommit(false);
-            try {
-               final PreparedStatement preparedStatement = this.tryReleaseLock;
-               preparedStatement.setString(1, holderId);
-               final boolean released = preparedStatement.executeUpdate() == 1;
-               //consider it as released to avoid on finalize to be reclaimed
-               this.maybeAcquired = false;
-               connection.commit();
-               if (!released) {
-                  if (LOGGER.isDebugEnabled()) {
-                     LOGGER.debugf("[%s] %s has failed to release lock: lock status = { %s }",
-                                   lockName, holderId, readableLockStatus());
-                  }
-               } else {
-                  LOGGER.debugf("[%s] %s has released lock", lockName, holderId);
+      try (Connection connection = connectionProvider.getConnection()) {
+         connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
+         final boolean autoCommit = connection.getAutoCommit();
+         connection.setAutoCommit(false);
+         try (PreparedStatement preparedStatement = connection.prepareStatement(this.tryReleaseLock)) {
+            preparedStatement.setString(1, holderId);
+            final boolean released = preparedStatement.executeUpdate() == 1;
+            //consider it as released to avoid on finalize to be reclaimed
+            this.maybeAcquired = false;
+            connection.commit();
+            if (!released) {
+               if (LOGGER.isDebugEnabled()) {
+                  LOGGER.debugf("[%s] %s has failed to release lock: lock status = { %s }",
+                                lockName, holderId, readableLockStatus());
                }
-            } catch (SQLException ie) {
-               connection.rollback();
-               throw new IllegalStateException(ie);
-            } finally {
-               connection.setAutoCommit(autoCommit);
+            } else {
+               LOGGER.debugf("[%s] %s has released lock", lockName, holderId);
             }
-         } catch (SQLException e) {
-            throw new IllegalStateException(e);
+         } catch (SQLException ie) {
+            connection.rollback();
+            throw new IllegalStateException(ie);
+         } finally {
+            connection.setAutoCommit(autoCommit);
          }
+      } catch (SQLException e) {
+         throw new IllegalStateException(e);
       }
    }
 
    @Override
    public void close() throws SQLException {
-      synchronized (connection) {
-         //to avoid being called if not needed
-         if (!this.tryReleaseLock.isClosed()) {
-            try {
-               if (this.maybeAcquired) {
-                  release();
-               }
-            } finally {
-               this.tryReleaseLock.close();
-               this.tryAcquireLock.close();
-               this.renewLock.close();
-               this.isLocked.close();
-               this.currentDateTime.close();
-            }
-         }
+      if (this.maybeAcquired) {
+         release();
       }
    }
 
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcNodeManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcNodeManager.java
index 6fd4b86..212e6e1 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcNodeManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcNodeManager.java
@@ -17,7 +17,6 @@
 
 package org.apache.activemq.artemis.core.server.impl.jdbc;
 
-import javax.sql.DataSource;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Supplier;
@@ -29,7 +28,7 @@ import org.apache.activemq.artemis.core.server.ActivateCallback;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import org.apache.activemq.artemis.core.server.NodeManager;
 import org.apache.activemq.artemis.core.server.impl.CleaningActivateCallback;
-import org.apache.activemq.artemis.jdbc.store.drivers.JDBCUtils;
+import org.apache.activemq.artemis.jdbc.store.drivers.JDBCConnectionProvider;
 import org.apache.activemq.artemis.jdbc.store.sql.PropertySQLProvider;
 import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
 import org.apache.activemq.artemis.utils.ExecutorFactory;
@@ -61,59 +60,37 @@ public final class JdbcNodeManager extends NodeManager {
                                       ExecutorFactory executorFactory,
                                       IOCriticalErrorListener ioCriticalErrorListener) {
       validateTimeoutConfiguration(configuration);
-      if (configuration.getDataSource() != null) {
-         final SQLProvider.Factory sqlProviderFactory;
-         if (configuration.getSqlProviderFactory() != null) {
-            sqlProviderFactory = configuration.getSqlProviderFactory();
-         } else {
-            sqlProviderFactory = new PropertySQLProvider.Factory(configuration.getDataSource());
-         }
-         final String brokerId = java.util.UUID.randomUUID().toString();
-         return usingDataSource(brokerId,
-                                configuration.getJdbcNetworkTimeout(),
-                                configuration.getJdbcLockExpirationMillis(),
-                                configuration.getJdbcLockRenewPeriodMillis(),
-                                configuration.getJdbcLockAcquisitionTimeoutMillis(),
-                                configuration.getDataSource(),
-                                sqlProviderFactory.create(configuration.getNodeManagerStoreTableName(), SQLProvider.DatabaseStoreType.NODE_MANAGER),
-                                scheduledExecutorService,
-                                executorFactory,
-                                ioCriticalErrorListener);
+      final SQLProvider.Factory sqlProviderFactory;
+      if (configuration.getSqlProviderFactory() != null) {
+         sqlProviderFactory = configuration.getSqlProviderFactory();
       } else {
-         final SQLProvider sqlProvider = JDBCUtils.getSQLProvider(configuration.getJdbcDriverClassName(), configuration.getNodeManagerStoreTableName(), SQLProvider.DatabaseStoreType.NODE_MANAGER);
-         final String brokerId = java.util.UUID.randomUUID().toString();
-         return usingConnectionUrl(brokerId,
-                                   configuration.getJdbcNetworkTimeout(),
-                                   configuration.getJdbcLockExpirationMillis(),
-                                   configuration.getJdbcLockRenewPeriodMillis(),
-                                   configuration.getJdbcLockAcquisitionTimeoutMillis(),
-                                   configuration.getJdbcConnectionUrl(),
-                                   configuration.getJdbcUser(),
-                                   configuration.getJdbcPassword(),
-                                   configuration.getJdbcDriverClassName(),
-                                   sqlProvider,
-                                   scheduledExecutorService,
-                                   executorFactory,
-                                   ioCriticalErrorListener);
+         sqlProviderFactory = new PropertySQLProvider.Factory(configuration.getConnectionProvider());
       }
+      final String brokerId = java.util.UUID.randomUUID().toString();
+      return usingConnectionProvider(brokerId,
+                             configuration.getJdbcLockExpirationMillis(),
+                             configuration.getJdbcLockRenewPeriodMillis(),
+                             configuration.getJdbcLockAcquisitionTimeoutMillis(),
+                             configuration.getConnectionProvider(),
+                             sqlProviderFactory.create(configuration.getNodeManagerStoreTableName(), SQLProvider.DatabaseStoreType.NODE_MANAGER),
+                             scheduledExecutorService,
+                             executorFactory,
+                             ioCriticalErrorListener);
    }
 
-   private static JdbcNodeManager usingDataSource(String brokerId,
-                                                  int networkTimeoutMillis,
+   private static JdbcNodeManager usingConnectionProvider(String brokerId,
                                                   long lockExpirationMillis,
                                                   long lockRenewPeriodMillis,
                                                   long lockAcquisitionTimeoutMillis,
-                                                  DataSource dataSource,
+                                                  JDBCConnectionProvider connectionProvider,
                                                   SQLProvider provider,
                                                   ScheduledExecutorService scheduledExecutorService,
                                                   ExecutorFactory executorFactory,
                                                   IOCriticalErrorListener ioCriticalErrorListener) {
       return new JdbcNodeManager(
-         () -> JdbcSharedStateManager.usingDataSource(brokerId,
-                                                      networkTimeoutMillis,
-                                                      executorFactory == null ? null : executorFactory.getExecutor(),
+         () -> JdbcSharedStateManager.usingConnectionProvider(brokerId,
                                                       lockExpirationMillis,
-                                                      dataSource,
+                                                      connectionProvider,
                                                       provider),
          lockRenewPeriodMillis,
          lockAcquisitionTimeoutMillis,
@@ -122,36 +99,6 @@ public final class JdbcNodeManager extends NodeManager {
          ioCriticalErrorListener);
    }
 
-   private static JdbcNodeManager usingConnectionUrl(String brokerId,
-                                                     int networkTimeoutMillis,
-                                                     long lockExpirationMillis,
-                                                     long lockRenewPeriodMillis,
-                                                     long lockAcquisitionTimeoutMillis,
-                                                     String jdbcUrl,
-                                                     String user,
-                                                     String password,
-                                                     String driverClass,
-                                                     SQLProvider provider,
-                                                     ScheduledExecutorService scheduledExecutorService,
-                                                     ExecutorFactory executorFactory,
-                                                     IOCriticalErrorListener ioCriticalErrorListener) {
-      return new JdbcNodeManager(
-         () -> JdbcSharedStateManager.usingConnectionUrl(brokerId,
-                                                         networkTimeoutMillis,
-                                                         executorFactory == null ? null : executorFactory.getExecutor(),
-                                                         lockExpirationMillis,
-                                                         jdbcUrl,
-                                                         user,
-                                                         password,
-                                                         driverClass,
-                                                         provider),
-         lockRenewPeriodMillis,
-         lockAcquisitionTimeoutMillis,
-         scheduledExecutorService,
-         executorFactory,
-         ioCriticalErrorListener);
-   }
-
    private static void validateTimeoutConfiguration(DatabaseStorageConfiguration configuration) {
       final long lockExpiration = configuration.getJdbcLockExpirationMillis();
       if (lockExpiration <= 0) {
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcSharedStateManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcSharedStateManager.java
index 8ad6f1e..06f7e2a 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcSharedStateManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcSharedStateManager.java
@@ -17,15 +17,14 @@
 
 package org.apache.activemq.artemis.core.server.impl.jdbc;
 
-import javax.sql.DataSource;
 import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
-import java.util.concurrent.Executor;
 import java.util.function.Supplier;
 
 import org.apache.activemq.artemis.jdbc.store.drivers.AbstractJDBCDriver;
+import org.apache.activemq.artemis.jdbc.store.drivers.JDBCConnectionProvider;
 import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
 import org.apache.activemq.artemis.utils.UUID;
 import org.jboss.logging.Logger;
@@ -42,21 +41,18 @@ final class JdbcSharedStateManager extends AbstractJDBCDriver implements SharedS
    private final long lockExpirationMillis;
    private JdbcLeaseLock liveLock;
    private JdbcLeaseLock backupLock;
-   private PreparedStatement readNodeId;
-   private PreparedStatement writeNodeId;
-   private PreparedStatement initializeNodeId;
-   private PreparedStatement readState;
-   private PreparedStatement writeState;
+   private String readNodeId;
+   private String writeNodeId;
+   private String initializeNodeId;
+   private String readState;
+   private String writeState;
 
-   public static JdbcSharedStateManager usingDataSource(String holderId,
-                                                        int networkTimeout,
-                                                        Executor networkTimeoutExecutor,
+   public static JdbcSharedStateManager usingConnectionProvider(String holderId,
                                                         long locksExpirationMillis,
-                                                        DataSource dataSource,
+                                                        JDBCConnectionProvider connectionProvider,
                                                         SQLProvider provider) {
       final JdbcSharedStateManager sharedStateManager = new JdbcSharedStateManager(holderId, locksExpirationMillis);
-      sharedStateManager.setNetworkTimeout(networkTimeoutExecutor, networkTimeout);
-      sharedStateManager.setDataSource(dataSource);
+      sharedStateManager.setJdbcConnectionProvider(connectionProvider);
       sharedStateManager.setSqlProvider(provider);
       try {
          sharedStateManager.start();
@@ -66,64 +62,6 @@ final class JdbcSharedStateManager extends AbstractJDBCDriver implements SharedS
       }
    }
 
-   public static JdbcSharedStateManager usingConnectionUrl(String holderId,
-                                                           long locksExpirationMillis,
-                                                           String jdbcConnectionUrl,
-                                                           String jdbcDriverClass,
-                                                           SQLProvider provider) {
-      return JdbcSharedStateManager.usingConnectionUrl(holderId,
-                                                       -1,
-                                                       null,
-                                                       locksExpirationMillis,
-                                                       jdbcConnectionUrl,
-                                                       null,
-                                                       null,
-                                                       jdbcDriverClass,
-                                                       provider);
-   }
-
-   public static JdbcSharedStateManager usingConnectionUrl(String holderId,
-                                                           long locksExpirationMillis,
-                                                           String jdbcConnectionUrl,
-                                                           String user,
-                                                           String password,
-                                                           String jdbcDriverClass,
-                                                           SQLProvider provider) {
-      return JdbcSharedStateManager.usingConnectionUrl(holderId,
-                                                       -1,
-                                                       null,
-                                                       locksExpirationMillis,
-                                                       jdbcConnectionUrl,
-                                                       user,
-                                                       password,
-                                                       jdbcDriverClass,
-                                                       provider);
-   }
-
-   public static JdbcSharedStateManager usingConnectionUrl(String holderId,
-                                                           int networkTimeout,
-                                                           Executor networkTimeoutExecutor,
-                                                           long locksExpirationMillis,
-                                                           String jdbcConnectionUrl,
-                                                           String user,
-                                                           String password,
-                                                           String jdbcDriverClass,
-                                                           SQLProvider provider) {
-      final JdbcSharedStateManager sharedStateManager = new JdbcSharedStateManager(holderId, locksExpirationMillis);
-      sharedStateManager.setNetworkTimeout(networkTimeoutExecutor, networkTimeout);
-      sharedStateManager.setJdbcConnectionUrl(jdbcConnectionUrl);
-      sharedStateManager.setJdbcDriverClass(jdbcDriverClass);
-      sharedStateManager.setSqlProvider(provider);
-      sharedStateManager.setUser(user);
-      sharedStateManager.setPassword(password);
-      try {
-         sharedStateManager.start();
-         return sharedStateManager;
-      } catch (SQLException e) {
-         throw new IllegalStateException(e);
-      }
-   }
-
    @Override
    protected void createSchema() {
       try {
@@ -135,28 +73,28 @@ final class JdbcSharedStateManager extends AbstractJDBCDriver implements SharedS
    }
 
    static JdbcLeaseLock createLiveLock(String holderId,
-                                       Connection connection,
+                                       JDBCConnectionProvider connectionProvider,
                                        SQLProvider sqlProvider,
-                                       long expirationMillis) throws SQLException {
-      return new JdbcLeaseLock(holderId, connection, connection.prepareStatement(sqlProvider.tryAcquireLiveLockSQL()), connection.prepareStatement(sqlProvider.tryReleaseLiveLockSQL()), connection.prepareStatement(sqlProvider.renewLiveLockSQL()), connection.prepareStatement(sqlProvider.isLiveLockedSQL()), connection.prepareStatement(sqlProvider.currentTimestampSQL()), expirationMillis, "LIVE");
+                                       long expirationMillis) {
+      return new JdbcLeaseLock(holderId, connectionProvider, sqlProvider.tryAcquireLiveLockSQL(), sqlProvider.tryReleaseLiveLockSQL(), sqlProvider.renewLiveLockSQL(), sqlProvider.isLiveLockedSQL(), sqlProvider.currentTimestampSQL(), expirationMillis, "LIVE");
    }
 
    static JdbcLeaseLock createBackupLock(String holderId,
-                                         Connection connection,
+                                         JDBCConnectionProvider connectionProvider,
                                          SQLProvider sqlProvider,
-                                         long expirationMillis) throws SQLException {
-      return new JdbcLeaseLock(holderId, connection, connection.prepareStatement(sqlProvider.tryAcquireBackupLockSQL()), connection.prepareStatement(sqlProvider.tryReleaseBackupLockSQL()), connection.prepareStatement(sqlProvider.renewBackupLockSQL()), connection.prepareStatement(sqlProvider.isBackupLockedSQL()), connection.prepareStatement(sqlProvider.currentTimestampSQL()), expirationMillis, "BACKUP");
+                                         long expirationMillis) {
+      return new JdbcLeaseLock(holderId, connectionProvider, sqlProvider.tryAcquireBackupLockSQL(), sqlProvider.tryReleaseBackupLockSQL(), sqlProvider.renewBackupLockSQL(), sqlProvider.isBackupLockedSQL(), sqlProvider.currentTimestampSQL(), expirationMillis, "BACKUP");
    }
 
    @Override
-   protected void prepareStatements() throws SQLException {
-      this.liveLock = createLiveLock(this.holderId, this.connection, sqlProvider, lockExpirationMillis);
-      this.backupLock = createBackupLock(this.holderId, this.connection, sqlProvider, lockExpirationMillis);
-      this.readNodeId = connection.prepareStatement(sqlProvider.readNodeIdSQL());
-      this.writeNodeId = connection.prepareStatement(sqlProvider.writeNodeIdSQL());
-      this.initializeNodeId = connection.prepareStatement(sqlProvider.initializeNodeIdSQL());
-      this.writeState = connection.prepareStatement(sqlProvider.writeStateSQL());
-      this.readState = connection.prepareStatement(sqlProvider.readStateSQL());
+   protected void prepareStatements() {
+      this.liveLock = createLiveLock(this.holderId, this.connectionProvider, sqlProvider, lockExpirationMillis);
+      this.backupLock = createBackupLock(this.holderId, this.connectionProvider, sqlProvider, lockExpirationMillis);
+      this.readNodeId = sqlProvider.readNodeIdSQL();
+      this.writeNodeId = sqlProvider.writeNodeIdSQL();
+      this.initializeNodeId = sqlProvider.initializeNodeIdSQL();
+      this.writeState = sqlProvider.writeStateSQL();
+      this.readState = sqlProvider.readStateSQL();
    }
 
    private JdbcSharedStateManager(String holderId, long lockExpirationMillis) {
@@ -174,17 +112,18 @@ final class JdbcSharedStateManager extends AbstractJDBCDriver implements SharedS
       return this.backupLock;
    }
 
-   private UUID rawReadNodeId() throws SQLException {
-      final PreparedStatement preparedStatement = this.readNodeId;
-      try (ResultSet resultSet = preparedStatement.executeQuery()) {
-         if (!resultSet.next()) {
-            return null;
-         } else {
-            final String nodeId = resultSet.getString(1);
-            if (nodeId != null) {
-               return new UUID(UUID.TYPE_TIME_BASED, UUID.stringToBytes(nodeId));
-            } else {
+   private UUID rawReadNodeId(Connection connection) throws SQLException {
+      try (PreparedStatement preparedStatement = connection.prepareStatement(this.readNodeId)) {
+         try (ResultSet resultSet = preparedStatement.executeQuery()) {
+            if (!resultSet.next()) {
                return null;
+            } else {
+               final String nodeId = resultSet.getString(1);
+               if (nodeId != null) {
+                  return new UUID(UUID.TYPE_TIME_BASED, UUID.stringToBytes(nodeId));
+               } else {
+                  return null;
+               }
             }
          }
       }
@@ -192,65 +131,71 @@ final class JdbcSharedStateManager extends AbstractJDBCDriver implements SharedS
 
    @Override
    public UUID readNodeId() {
-      synchronized (connection) {
+      try (Connection connection = connectionProvider.getConnection()) {
          try {
             connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
             final boolean autoCommit = connection.getAutoCommit();
             connection.setAutoCommit(true);
             try {
-               return rawReadNodeId();
+               return rawReadNodeId(connection);
             } finally {
                connection.setAutoCommit(autoCommit);
             }
          } catch (SQLException e) {
             throw new IllegalStateException(e);
          }
+      } catch (SQLException e) {
+         throw new IllegalStateException(e);
       }
    }
 
    @Override
    public void writeNodeId(UUID nodeId) {
-      synchronized (connection) {
+      try (Connection connection = connectionProvider.getConnection()) {
          try {
             connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
             final boolean autoCommit = connection.getAutoCommit();
             connection.setAutoCommit(true);
             try {
-               rawWriteNodeId(nodeId);
+               rawWriteNodeId(connection, nodeId);
             } finally {
                connection.setAutoCommit(autoCommit);
             }
          } catch (SQLException e) {
             throw new IllegalStateException(e);
          }
+      } catch (SQLException e) {
+         throw new IllegalStateException(e);
       }
    }
 
-   private void rawWriteNodeId(UUID nodeId) throws SQLException {
-      final PreparedStatement preparedStatement = this.writeNodeId;
-      preparedStatement.setString(1, nodeId.toString());
-      if (preparedStatement.executeUpdate() != 1) {
-         throw new IllegalStateException("can't write NodeId on the JDBC Node Manager Store!");
+   private void rawWriteNodeId(Connection connection, UUID nodeId) throws SQLException {
+      try (PreparedStatement preparedStatement = connection.prepareStatement(this.writeNodeId)) {
+         preparedStatement.setString(1, nodeId.toString());
+         if (preparedStatement.executeUpdate() != 1) {
+            throw new IllegalStateException("can't write NodeId on the JDBC Node Manager Store!");
+         }
       }
    }
 
-   private boolean rawInitializeNodeId(UUID nodeId) throws SQLException {
-      final PreparedStatement preparedStatement = this.initializeNodeId;
-      preparedStatement.setString(1, nodeId.toString());
-      final int rows = preparedStatement.executeUpdate();
-      assert rows <= 1;
-      return rows > 0;
+   private boolean rawInitializeNodeId(Connection connection, UUID nodeId) throws SQLException {
+      try (PreparedStatement preparedStatement = connection.prepareStatement(this.initializeNodeId)) {
+         preparedStatement.setString(1, nodeId.toString());
+         final int rows = preparedStatement.executeUpdate();
+         assert rows <= 1;
+         return rows > 0;
+      }
    }
 
    @Override
    public UUID setup(Supplier<? extends UUID> nodeIdFactory) {
       SQLException lastError = null;
-      synchronized (connection) {
+      try (Connection connection = connectionProvider.getConnection()) {
          final UUID newNodeId = nodeIdFactory.get();
          for (int attempts = 0; attempts < MAX_SETUP_ATTEMPTS; attempts++) {
             lastError = null;
             try {
-               final UUID nodeId = initializeOrReadNodeId(newNodeId);
+               final UUID nodeId = initializeOrReadNodeId(connection, newNodeId);
                if (nodeId != null) {
                   return nodeId;
                }
@@ -259,6 +204,8 @@ final class JdbcSharedStateManager extends AbstractJDBCDriver implements SharedS
                lastError = e;
             }
          }
+      } catch (SQLException e) {
+         lastError = e;
       }
       if (lastError != null) {
          logger.error("Unable to setup a NodeId on the JDBC shared state", lastError);
@@ -268,7 +215,7 @@ final class JdbcSharedStateManager extends AbstractJDBCDriver implements SharedS
       throw new IllegalStateException("FAILED TO SETUP the JDBC Shared State NodeId");
    }
 
-   private UUID initializeOrReadNodeId(final UUID newNodeId) throws SQLException {
+   private UUID initializeOrReadNodeId(Connection connection, final UUID newNodeId) throws SQLException {
       synchronized (connection) {
          connection.setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE);
          final boolean autoCommit = connection.getAutoCommit();
@@ -276,10 +223,10 @@ final class JdbcSharedStateManager extends AbstractJDBCDriver implements SharedS
          try {
             final UUID nodeId;
             //optimistic try to initialize nodeId
-            if (rawInitializeNodeId(newNodeId)) {
+            if (rawInitializeNodeId(connection, newNodeId)) {
                nodeId = newNodeId;
             } else {
-               nodeId = rawReadNodeId();
+               nodeId = rawReadNodeId(connection);
             }
             if (nodeId != null) {
                connection.commit();
@@ -335,76 +282,65 @@ final class JdbcSharedStateManager extends AbstractJDBCDriver implements SharedS
 
    @Override
    public State readState() {
-      synchronized (connection) {
-         try {
-            connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
-            final boolean autoCommit = connection.getAutoCommit();
-            connection.setAutoCommit(false);
-            final State state;
-            try {
-               final PreparedStatement preparedStatement = this.readState;
-               try (ResultSet resultSet = preparedStatement.executeQuery()) {
-                  if (!resultSet.next()) {
-                     state = State.FIRST_TIME_START;
-                  } else {
-                     state = decodeState(resultSet.getString(1));
-                  }
+      try (Connection connection = connectionProvider.getConnection()) {
+         connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
+         final boolean autoCommit = connection.getAutoCommit();
+         connection.setAutoCommit(false);
+         final State state;
+         try (PreparedStatement preparedStatement = connection.prepareStatement(this.readState)) {
+            try (ResultSet resultSet = preparedStatement.executeQuery()) {
+               if (!resultSet.next()) {
+                  state = State.FIRST_TIME_START;
+               } else {
+                  state = decodeState(resultSet.getString(1));
                }
-               connection.commit();
-               return state;
-            } catch (SQLException ie) {
-               connection.rollback();
-               throw new IllegalStateException(ie);
-            } finally {
-               connection.setAutoCommit(autoCommit);
             }
-         } catch (SQLException e) {
-            throw new IllegalStateException(e);
+            connection.commit();
+            return state;
+         } catch (SQLException ie) {
+            connection.rollback();
+            throw new IllegalStateException(ie);
+         } finally {
+            connection.setAutoCommit(autoCommit);
          }
+      } catch (SQLException e) {
+         throw new IllegalStateException(e);
       }
    }
 
    @Override
    public void writeState(State state) {
       final String encodedState = encodeState(state);
-      synchronized (connection) {
-         try {
-            connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
-            final boolean autoCommit = connection.getAutoCommit();
-            connection.setAutoCommit(false);
-            try {
-               final PreparedStatement preparedStatement = this.writeState;
-               preparedStatement.setString(1, encodedState);
-               if (preparedStatement.executeUpdate() != 1) {
-                  throw new IllegalStateException("can't write state to the JDBC Node Manager Store!");
-               }
-               connection.commit();
-            } catch (SQLException ie) {
-               connection.rollback();
-               connection.setAutoCommit(true);
-               throw new IllegalStateException(ie);
-            } finally {
-               connection.setAutoCommit(autoCommit);
+      try (Connection connection = connectionProvider.getConnection()) {
+         connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
+         final boolean autoCommit = connection.getAutoCommit();
+         connection.setAutoCommit(false);
+         try (PreparedStatement preparedStatement = connection.prepareStatement(this.writeState)) {
+            preparedStatement.setString(1, encodedState);
+            if (preparedStatement.executeUpdate() != 1) {
+               throw new IllegalStateException("can't write state to the JDBC Node Manager Store!");
             }
-         } catch (SQLException e) {
-            throw new IllegalStateException(e);
+            connection.commit();
+         } catch (SQLException ie) {
+            connection.rollback();
+            connection.setAutoCommit(true);
+            throw new IllegalStateException(ie);
+         } finally {
+            connection.setAutoCommit(autoCommit);
          }
+      } catch (SQLException e) {
+         throw new IllegalStateException(e);
       }
    }
 
    @Override
    public void stop() throws SQLException {
       //release all the managed resources inside the connection lock
-      synchronized (connection) {
-         this.readNodeId.close();
-         this.writeNodeId.close();
-         this.initializeNodeId.close();
-         this.readState.close();
-         this.writeState.close();
-         this.liveLock.close();
-         this.backupLock.close();
-         super.stop();
-      }
+      //synchronized (connection) {
+      this.liveLock.close();
+      this.backupLock.close();
+      super.stop();
+      //}
    }
 
    @Override
diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
index 0e0ea15..d03b466 100644
--- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd
+++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
@@ -2361,14 +2361,14 @@
 
    <xsd:complexType name="databaseStoreType">
       <xsd:all>
-         <xsd:element name="jdbc-driver-class-name" type="xsd:string" minOccurs="1" maxOccurs="1">
+         <xsd:element name="jdbc-driver-class-name" type="xsd:string" minOccurs="0" maxOccurs="1">
             <xsd:annotation>
                <xsd:documentation>
                   The JDBC Driver class name
                </xsd:documentation>
             </xsd:annotation>
          </xsd:element>
-         <xsd:element name="jdbc-connection-url" type="xsd:string" minOccurs="1" maxOccurs="1">
+         <xsd:element name="jdbc-connection-url" type="xsd:string" minOccurs="0" maxOccurs="1">
             <xsd:annotation>
                <xsd:documentation>
                   The JDBC Connection URL e.g. jdbc:mysql://localhost:3306/
@@ -2391,6 +2391,31 @@
                </xsd:documentation>
             </xsd:annotation>
          </xsd:element>
+         <xsd:element name="data-source-class-name" type="xsd:string" minOccurs="0" maxOccurs="1">
+            <xsd:annotation>
+               <xsd:documentation>
+                  The DataSource class name
+               </xsd:documentation>
+            </xsd:annotation>
+         </xsd:element>
+         <xsd:element name="data-source-properties" minOccurs="0" maxOccurs="1">
+            <xsd:annotation>
+               <xsd:documentation>
+                  A list of options for the DataSource
+               </xsd:documentation>
+            </xsd:annotation>
+            <xsd:complexType>
+               <xsd:sequence>
+                  <xsd:element name="data-source-property" type="dataSourcePropertyType" minOccurs="1" maxOccurs="unbounded">
+                     <xsd:annotation>
+                        <xsd:documentation>
+                           A key-value pair option for the DataSource
+                        </xsd:documentation>
+                     </xsd:annotation>
+                  </xsd:element>
+               </xsd:sequence>
+            </xsd:complexType>
+         </xsd:element>
          <xsd:element name="message-table-name" type="xsd:string" minOccurs="1" maxOccurs="1">
             <xsd:annotation>
                <xsd:documentation>
@@ -2458,6 +2483,23 @@
       <xsd:attributeGroup ref="xml:specialAttrs"/>
    </xsd:complexType>
 
+   <xsd:complexType name="dataSourcePropertyType">
+      <xsd:attribute name="key" type="xsd:string" use="required">
+         <xsd:annotation>
+            <xsd:documentation>
+               Configuration option key
+            </xsd:documentation>
+         </xsd:annotation>
+      </xsd:attribute>
+      <xsd:attribute name="value" type="xsd:string" use="required">
+         <xsd:annotation>
+            <xsd:documentation>
+               Configuration option value
+            </xsd:documentation>
+         </xsd:annotation>
+      </xsd:attribute>
+   </xsd:complexType>
+
    <xsd:complexType name="haPolicyType">
       <xsd:choice>
          <xsd:element name="live-only" type="haLiveOnlyPolicyType" minOccurs="0" maxOccurs="1">
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcLeaseLockTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcLeaseLockTest.java
index 2ecddbf..77f488e 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcLeaseLockTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcLeaseLockTest.java
@@ -17,7 +17,6 @@
 
 package org.apache.activemq.artemis.core.server.impl.jdbc;
 
-import java.sql.SQLException;
 import java.util.Arrays;
 import java.util.List;
 import java.util.UUID;
@@ -67,10 +66,10 @@ public class JdbcLeaseLockTest extends ActiveMQTestBase {
          return JdbcSharedStateManager
             .createLiveLock(
                UUID.randomUUID().toString(),
-               jdbcSharedStateManager.getConnection(),
+               jdbcSharedStateManager.getJdbcConnectionProvider(),
                sqlProvider,
                acquireMillis);
-      } catch (SQLException e) {
+      } catch (Exception e) {
          throw new IllegalStateException(e);
       }
    }
@@ -85,20 +84,18 @@ public class JdbcLeaseLockTest extends ActiveMQTestBase {
 
       if (withExistingTable) {
          TestJDBCDriver testDriver = TestJDBCDriver
-            .usingConnectionUrl(
-                dbConf.getJdbcConnectionUrl(),
-                dbConf.getJdbcDriverClassName(),
+            .usingDbConf(
+                dbConf,
                 sqlProvider);
          testDriver.start();
          testDriver.stop();
       }
 
       jdbcSharedStateManager = JdbcSharedStateManager
-         .usingConnectionUrl(
+         .usingConnectionProvider(
             UUID.randomUUID().toString(),
             dbConf.getJdbcLockExpirationMillis(),
-            dbConf.getJdbcConnectionUrl(),
-            dbConf.getJdbcDriverClassName(),
+            dbConf.getConnectionProvider(),
             sqlProvider);
    }
 
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcSharedStateManagerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcSharedStateManagerTest.java
index 7340026..db146e1 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcSharedStateManagerTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcSharedStateManagerTest.java
@@ -41,19 +41,17 @@ public class JdbcSharedStateManagerTest extends ActiveMQTestBase {
    }
 
    private TestJDBCDriver createFakeDriver(boolean initializeTable) {
-      return TestJDBCDriver.usingConnectionUrl(
-         dbConf.getJdbcConnectionUrl(),
-         dbConf.getJdbcDriverClassName(),
+      return TestJDBCDriver.usingDbConf(
+         dbConf,
          sqlProvider,
          initializeTable);
    }
 
    private JdbcSharedStateManager createSharedStateManager() {
-      return JdbcSharedStateManager.usingConnectionUrl(
+      return JdbcSharedStateManager.usingConnectionProvider(
          UUID.randomUUID().toString(),
          dbConf.getJdbcLockExpirationMillis(),
-         dbConf.getJdbcConnectionUrl(),
-         dbConf.getJdbcDriverClassName(),
+         dbConf.getConnectionProvider(),
          sqlProvider);
    }
 
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/jdbc/TestJDBCDriver.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/jdbc/TestJDBCDriver.java
index 2df6274..061a5d5 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/jdbc/TestJDBCDriver.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/jdbc/TestJDBCDriver.java
@@ -16,28 +16,28 @@
  */
 package org.apache.activemq.artemis.core.server.impl.jdbc;
 
+import java.sql.Connection;
 import java.sql.SQLException;
 
+import org.apache.activemq.artemis.core.config.storage.DatabaseStorageConfiguration;
 import org.apache.activemq.artemis.jdbc.store.drivers.AbstractJDBCDriver;
 import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
 import org.junit.Assert;
 
 public class TestJDBCDriver extends AbstractJDBCDriver {
 
-   public static TestJDBCDriver usingConnectionUrl(String jdbcConnectionUrl,
-                                                   String jdbcDriverClass,
-                                                   SQLProvider provider) {
-      return usingConnectionUrl(jdbcConnectionUrl, jdbcDriverClass, provider, false);
+   public static TestJDBCDriver usingDbConf(DatabaseStorageConfiguration dbConf,
+                                            SQLProvider provider) {
+      return usingDbConf(dbConf, provider, false);
    }
 
-   public static TestJDBCDriver usingConnectionUrl(String jdbcConnectionUrl,
-                                                   String jdbcDriverClass,
-                                                   SQLProvider provider,
-                                                   boolean initialize) {
+   public static TestJDBCDriver usingDbConf(DatabaseStorageConfiguration dbConf,
+                                            SQLProvider provider,
+                                            boolean initialize) {
+
       TestJDBCDriver driver = new TestJDBCDriver(initialize);
       driver.setSqlProvider(provider);
-      driver.setJdbcConnectionUrl(jdbcConnectionUrl);
-      driver.setJdbcDriverClass(jdbcDriverClass);
+      driver.setJdbcConnectionProvider(dbConf.getConnectionProvider());
       return driver;
    }
 
@@ -48,12 +48,11 @@ public class TestJDBCDriver extends AbstractJDBCDriver {
    }
 
    @Override
-   protected void prepareStatements() throws SQLException {
-   }
+   protected void prepareStatements() { }
 
    @Override
-   protected void createSchema() throws SQLException {
-      try {
+   protected void createSchema() {
+      try (Connection connection = getJdbcConnectionProvider().getConnection()) {
          connection.createStatement().execute(sqlProvider.createNodeManagerStoreTableSQL());
          if (initialize) {
             connection.createStatement().execute(sqlProvider.createNodeIdSQL());
diff --git a/pom.xml b/pom.xml
index afeac19..f7eeec3 100644
--- a/pom.xml
+++ b/pom.xml
@@ -747,6 +747,13 @@
             <version>2.7.2</version>
          </dependency>
 
+         <!-- needed by artemis-jdbc-store -->
+         <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-dbcp2</artifactId>
+            <version>2.1.1</version>
+         </dependency>
+
          <!-- Needed for Micrometer -->
          <dependency>
             <groupId>io.micrometer</groupId>
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jdbc/store/journal/JDBCJournalTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jdbc/store/journal/JDBCJournalTest.java
index 46a1027..0215953 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jdbc/store/journal/JDBCJournalTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jdbc/store/journal/JDBCJournalTest.java
@@ -123,7 +123,7 @@ public class JDBCJournalTest extends ActiveMQTestBase {
          SQLProvider.DatabaseStoreType.MESSAGE_JOURNAL);
       scheduledExecutorService = new ScheduledThreadPoolExecutor(5);
       executorService = Executors.newSingleThreadExecutor();
-      journal = new JDBCJournalImpl(dbConf.getJdbcConnectionUrl(), getJdbcUser(), getJdbcPassword(), dbConf.getJdbcDriverClassName(), sqlProvider, scheduledExecutorService, executorService, new IOCriticalErrorListener() {
+      journal = new JDBCJournalImpl(dbConf.getConnectionProvider(), sqlProvider, scheduledExecutorService, executorService, new IOCriticalErrorListener() {
          @Override
          public void onIOException(Throwable code, String message, SequentialFile file) {
 
@@ -145,10 +145,7 @@ public class JDBCJournalTest extends ActiveMQTestBase {
    public void testConcurrentEmptyJournal() throws SQLException {
       Assert.assertTrue(journal.isStarted());
       Assert.assertEquals(0, journal.getNumberOfRecords());
-      final JDBCJournalImpl secondJournal = new JDBCJournalImpl(dbConf.getJdbcConnectionUrl(),
-                                                                          getJdbcUser(),
-                                                                          getJdbcPassword(),
-                                                                          dbConf.getJdbcDriverClassName(),
+      final JDBCJournalImpl secondJournal = new JDBCJournalImpl(dbConf.getConnectionProvider(),
                                                                           sqlProvider, scheduledExecutorService,
                                                                           executorService, (code, message, file) -> {
          Assert.fail(message);