CARVIEW |
Class JdbcIO
- java.lang.Object
-
- org.apache.beam.sdk.io.jdbc.JdbcIO
-
public class JdbcIO extends java.lang.Object
IO to read and write data on JDBC.Reading from JDBC datasource
JdbcIO source returns a bounded collection of
T
as aPCollection<T>
. T is the type returned by the providedJdbcIO.RowMapper
.To configure the JDBC source, you have to provide a
JdbcIO.DataSourceConfiguration
using
1.JdbcIO.DataSourceConfiguration.create(DataSource)
(which must beSerializable
);
2. orJdbcIO.DataSourceConfiguration.create(String, String)
(driver class name and url). Optionally,JdbcIO.DataSourceConfiguration.withUsername(String)
andJdbcIO.DataSourceConfiguration.withPassword(String)
allows you to define username and password.For example:
pipeline.apply(JdbcIO.<KV<Integer, String>>read() .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create( "com.mysql.jdbc.Driver", "jdbc:mysql://hostname:3306/mydb") .withUsername("username") .withPassword("password")) .withQuery("select id,name from Person") .withRowMapper(new JdbcIO.RowMapper<KV<Integer, String>>() { public KV<Integer, String> mapRow(ResultSet resultSet) throws Exception { return KV.of(resultSet.getInt(1), resultSet.getString(2)); } }) );
Note you should check with your database provider for the JDBC Driver and Connection Url that used to create the DataSourceConfiguration. For example, if you use Cloud SQL with postgres, the JDBC connection Url has this pattern with SocketFactory: "jdbc:postgresql://google/mydb?cloudSqlInstance=project:region:myinstance& socketFactory=com.google.cloud.sql.postgres.SocketFactory". Check here for more details.
Query parameters can be configured using a user-provided
JdbcIO.StatementPreparator
. For example:pipeline.apply(JdbcIO.<KV<Integer, String>>read() .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create( "com.mysql.jdbc.Driver", "jdbc:mysql://hostname:3306/mydb", "username", "password")) .withQuery("select id,name from Person where name = ?") .withStatementPreparator(new JdbcIO.StatementPreparator() { public void setParameters(PreparedStatement preparedStatement) throws Exception { preparedStatement.setString(1, "Darwin"); } }) .withRowMapper(new JdbcIO.RowMapper<KV<Integer, String>>() { public KV<Integer, String> mapRow(ResultSet resultSet) throws Exception { return KV.of(resultSet.getInt(1), resultSet.getString(2)); } }) );
To customize the building of the
DataSource
we can provide aSerializableFunction
. For example if you need to provide aPoolingDataSource
from an existingJdbcIO.DataSourceConfiguration
: you can use aJdbcIO.PoolableDataSourceProvider
:pipeline.apply(JdbcIO.<KV<Integer, String>>read() .withDataSourceProviderFn(JdbcIO.PoolableDataSourceProvider.of( JdbcIO.DataSourceConfiguration.create( "com.mysql.jdbc.Driver", "jdbc:mysql://hostname:3306/mydb", "username", "password"))) // ... );
By default, the provided function requests a DataSource per execution thread. In some circumstances this can quickly overwhelm the database by requesting too many connections. In that case you should look into sharing a single instance of a
PoolingDataSource
across all the execution threads. For example:private static class MyDataSourceProviderFn implements SerializableFunction<Void, DataSource> { private static transient DataSource dataSource; @Override public synchronized DataSource apply(Void input) { if (dataSource == null) { dataSource = ... build data source ... } return dataSource; } } pipeline.apply(JdbcIO.<KV<Integer, String>>read() .withDataSourceProviderFn(new MyDataSourceProviderFn()) // ... );
Parallel reading from a JDBC datasource
Beam supports partitioned reading of all data from a table. Automatic partitioning is supported for a few data types:
Long
,DateTime
. To enable this, usereadWithPartitions(TypeDescriptor)
. For other types, usereadWithPartitions(JdbcReadWithPartitionsHelper)
with customJdbcReadWithPartitionsHelper
.The partitioning scheme depends on these parameters, which can be user-provided, or automatically inferred by Beam (for the supported types):
- Upper bound
- Lower bound
- Number of partitions - when auto-inferred, the number of partitions defaults to the square
root of the number of rows divided by 5 (i.e.:
Math.floor(Math.sqrt(numRows) / 5)
).
To trigger auto-inference of these parameters, the user just needs to not provide them. To infer them automatically, Beam runs either of these statements:
SELECT min(column), max(column), COUNT(*) from table
when none of the parameters is passed to the transform.SELECT min(column), max(column) from table
when only number of partitions is provided, but not upper or lower bounds.
Should I use this transform? Consider using this transform in the following situations:
- The partitioning column is indexed. This will help speed up the range queries
- Use auto-inference if the queries for bound and partition inference are efficient to execute in your DBMS.
- The distribution of data over the partitioning column is roughly uniform. Uniformity is not mandatory, but this transform will work best in that situation.
The following example shows usage of auto-inferred ranges, number of partitions, and schema
pipeline.apply(JdbcIO.<Row>readWithPartitions() .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create( "com.mysql.jdbc.Driver", "jdbc:mysql://hostname:3306/mydb") .withUsername("username") .withPassword("password")) .withTable("Person") .withPartitionColumn("id") .withRowOutput() );
Instead of a full table you could also use a subquery in parentheses. The subquery can be specified using Table option instead and partition columns can be qualified using the subquery alias provided as part of Table. Note that a subquery may not perform as well with auto-inferred ranges and partitions, because it may not rely on indices to speed up the partitioning.
pipeline.apply(JdbcIO.<KV<Integer, String>>readWithPartitions() .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create( "com.mysql.jdbc.Driver", "jdbc:mysql://hostname:3306/mydb") .withUsername("username") .withPassword("password")) .withTable("(select id, name from Person) as subq") .withPartitionColumn("id") .withLowerBound(0) .withUpperBound(1000) .withNumPartitions(5) .withRowMapper(new JdbcIO.RowMapper<KV<Integer, String>>() { public KV<Integer, String> mapRow(ResultSet resultSet) throws Exception { return KV.of(resultSet.getInt(1), resultSet.getString(2)); } }) );
Writing to JDBC datasource
JDBC sink supports writing records into a database. It writes a
PCollection
to the database by converting each T into aPreparedStatement
via a user-providedJdbcIO.PreparedStatementSetter
.Like the source, to configure the sink, you have to provide a
JdbcIO.DataSourceConfiguration
.pipeline .apply(...) .apply(JdbcIO.<KV<Integer, String>>write() .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create( "com.mysql.jdbc.Driver", "jdbc:mysql://hostname:3306/mydb") .withUsername("username") .withPassword("password")) .withStatement("insert into Person values(?, ?)") .withPreparedStatementSetter(new JdbcIO.PreparedStatementSetter<KV<Integer, String>>() { public void setParameters(KV<Integer, String> element, PreparedStatement query) throws SQLException { query.setInt(1, element.getKey()); query.setString(2, element.getValue()); } }) );
NB: in case of transient failures, Beam runners may execute parts of JdbcIO.Write multiple times for fault tolerance. Because of that, you should avoid using
INSERT
statements, since that risks duplicating records in the database, or failing due to primary key conflicts. Consider using MERGE ("upsert") statements supported by your database instead.
-
-
Nested Class Summary
Nested Classes Modifier and Type Class and Description static class
JdbcIO.DataSourceConfiguration
A POJO describing aDataSource
, either providing directly aDataSource
or all properties allowing to create aDataSource
.static class
JdbcIO.DataSourceProviderFromDataSourceConfiguration
Wraps aJdbcIO.DataSourceConfiguration
to provide aDataSource
.static class
JdbcIO.DefaultRetryStrategy
This is the defaultPredicate
we use to detect DeadLock.static class
JdbcIO.PoolableDataSourceProvider
Wraps aJdbcIO.DataSourceConfiguration
to provide aPoolingDataSource
.static interface
JdbcIO.PreparedStatementSetter<T>
An interface used by the JdbcIO Write to set the parameters of thePreparedStatement
used to setParameters into the database.static class
JdbcIO.Read<T>
Implementation ofread()
.static class
JdbcIO.ReadAll<ParameterT,OutputT>
Implementation ofreadAll()
.static class
JdbcIO.ReadRows
Implementation ofreadRows()
.static class
JdbcIO.ReadWithPartitions<T,PartitionColumnT>
static class
JdbcIO.RetryConfiguration
Builder used to help with retry configuration forJdbcIO
.static interface
JdbcIO.RetryStrategy
An interface used to control if we retry the statements when aSQLException
occurs.static interface
JdbcIO.RowMapper<T>
An interface used byJdbcIO.Read
for converting each row of theResultSet
into an element of the resultingPCollection
.static interface
JdbcIO.StatementPreparator
An interface used by the JdbcIO Write to set the parameters of thePreparedStatement
used to setParameters into the database.static class
JdbcIO.Write<T>
This class is used as the default return value ofwrite()
.static class
JdbcIO.WriteVoid<T>
APTransform
to write to a JDBC datasource.static class
JdbcIO.WriteWithResults<T,V extends JdbcWriteResult>
APTransform
to write to a JDBC datasource.
-
Method Summary
All Methods Static Methods Concrete Methods Modifier and Type Method and Description static <T> JdbcIO.Read<T>
read()
Read data from a JDBC datasource.static <ParameterT,OutputT>
JdbcIO.ReadAll<ParameterT,OutputT>readAll()
Likeread()
, but executes multiple instances of the query substituting each element of aPCollection
as query parameters.static JdbcIO.ReadRows
readRows()
Read BeamRow
s from a JDBC data source.static <T> JdbcIO.ReadWithPartitions<T,java.lang.Long>
readWithPartitions()
static <T,PartitionColumnT>
JdbcIO.ReadWithPartitions<T,PartitionColumnT>readWithPartitions(JdbcReadWithPartitionsHelper<PartitionColumnT> partitionsHelper)
LikereadAll()
, but executes multiple instances of the query on the same table (subquery) using ranges.static <T,PartitionColumnT>
JdbcIO.ReadWithPartitions<T,PartitionColumnT>readWithPartitions(TypeDescriptor<PartitionColumnT> partitioningColumnType)
LikereadAll()
, but executes multiple instances of the query on the same table (subquery) using ranges.static <T> JdbcIO.Write<T>
write()
Write data to a JDBC datasource.static <T> JdbcIO.WriteVoid<T>
writeVoid()
-
-
-
Method Detail
-
read
public static <T> JdbcIO.Read<T> read()
Read data from a JDBC datasource.- Type Parameters:
T
- Type of the data to be read.
-
readRows
public static JdbcIO.ReadRows readRows()
Read BeamRow
s from a JDBC data source.
-
readAll
public static <ParameterT,OutputT> JdbcIO.ReadAll<ParameterT,OutputT> readAll()
Likeread()
, but executes multiple instances of the query substituting each element of aPCollection
as query parameters.- Type Parameters:
ParameterT
- Type of the data representing query parameters.OutputT
- Type of the data to be read.
-
readWithPartitions
public static <T,PartitionColumnT> JdbcIO.ReadWithPartitions<T,PartitionColumnT> readWithPartitions(TypeDescriptor<PartitionColumnT> partitioningColumnType)
LikereadAll()
, but executes multiple instances of the query on the same table (subquery) using ranges.- Type Parameters:
T
- Type of the data to be read.- Parameters:
partitioningColumnType
- Type descriptor for the partition column.
-
readWithPartitions
public static <T,PartitionColumnT> JdbcIO.ReadWithPartitions<T,PartitionColumnT> readWithPartitions(JdbcReadWithPartitionsHelper<PartitionColumnT> partitionsHelper)
LikereadAll()
, but executes multiple instances of the query on the same table (subquery) using ranges.- Type Parameters:
T
- Type of the data to be read.- Parameters:
partitionsHelper
- Custom helper for defining partitions.
-
readWithPartitions
public static <T> JdbcIO.ReadWithPartitions<T,java.lang.Long> readWithPartitions()
-
write
public static <T> JdbcIO.Write<T> write()
Write data to a JDBC datasource.- Type Parameters:
T
- Type of the data to be written.
-
writeVoid
public static <T> JdbcIO.WriteVoid<T> writeVoid()
-
-