CARVIEW |
Class SpannerIO
- java.lang.Object
-
- org.apache.beam.sdk.io.gcp.spanner.SpannerIO
-
public class SpannerIO extends java.lang.Object
Reading from Cloud Spanner
Bulk reading of a single query or table
To perform a single read from Cloud Spanner, construct a
SpannerIO.Read
transform usingSpannerIO.read()
. It will return aPCollection
ofStructs
, where each element represents an individual row returned from the read operation. Both Query and Read APIs are supported. See more information about reading from Cloud SpannerTo execute a Query, specify a
SpannerIO.Read.withQuery(Statement)
orSpannerIO.Read.withQuery(String)
during the construction of the transform.PCollection<Struct> rows = p.apply( SpannerIO.read() .withInstanceId(instanceId) .withDatabaseId(dbId) .withQuery("SELECT id, name, email FROM users"));
Reads by default use the PartitionQuery API which enforces some limitations on the type of queries that can be used so that the data can be read in parallel. If the query is not supported by the PartitionQuery API, then you can specify a non-partitioned read by setting
withBatching(false)
. If the amount of data being read by a non-partitioned read is very large, it may be useful to add aReshuffle.viaRandomKey()
transform on the output so that the downstream transforms can run in parallel.To read an entire Table, use
SpannerIO.Read.withTable(String)
and optionally specify alist of columns
.PCollection<Struct> rows = p.apply( SpannerIO.read() .withInstanceId(instanceId) .withDatabaseId(dbId) .withTable("users") .withColumns("id", "name", "email"));
To read using an Index, specify the index name using
SpannerIO.Read.withIndex(String)
.PCollection<Struct> rows = p.apply( SpannerIO.read() .withInstanceId(instanceId) .withDatabaseId(dbId) .withTable("users") .withIndex("users_by_name") .withColumns("id", "name", "email"));
Read consistency
The transform is guaranteed to be executed on a consistent snapshot of data, utilizing the power of read only transactions. Staleness of data can be controlled using
SpannerIO.Read.withTimestampBound(com.google.cloud.spanner.TimestampBound)
orSpannerIO.Read.withTimestamp(Timestamp)
methods. Read more about transactions in Cloud Spanner.It is possible to read several
PCollections
within a single transaction. ApplycreateTransaction()
transform, that lazily creates a transaction. The result of this transformation can be passed to read operation usingSpannerIO.Read.withTransaction(PCollectionView)
.SpannerConfig spannerConfig = ... PCollectionView<Transaction> tx = p.apply( SpannerIO.createTransaction() .withSpannerConfig(spannerConfig) .withTimestampBound(TimestampBound.strong())); PCollection<Struct> users = p.apply( SpannerIO.read() .withSpannerConfig(spannerConfig) .withQuery("SELECT name, email FROM users") .withTransaction(tx)); PCollection<Struct> tweets = p.apply( SpannerIO.read() .withSpannerConfig(spannerConfig) .withQuery("SELECT user, tweet, date FROM tweets") .withTransaction(tx));
Bulk reading of multiple queries or tables
You can perform multiple consistent reads on a set of tables or using a set of queries by constructing aSpannerIO.ReadAll
transform usingSpannerIO.readAll()
. This transform takes aPCollection
ofReadOperation
elements, and performs the partitioned read on each of them using the same Read Only Transaction for consistent results.Note that this transform should not be used in Streaming pipelines. This is because the same Read Only Transaction, which is created once when the pipeline is first executed, will be used for all reads. The data being read will therefore become stale, and if no reads are made for more than 1 hour, the transaction will automatically timeout and be closed by the Spanner server, meaning that any subsequent reads will fail.
// Build a collection of ReadOperations. PCollection<ReadOperation> reads = ... PCollection<Struct> rows = reads.apply( SpannerIO.readAll() .withInstanceId(instanceId) .withDatabaseId(dbId)
Writing to Cloud Spanner
The Cloud Spanner
SpannerIO.Write
transform writes to Cloud Spanner by executing a collection of input rowMutations
. The mutations are grouped into batches for efficiency.To configure the write transform, create an instance using
write()
and then specify the destination Cloud Spanner instance (SpannerIO.Write.withInstanceId(String)
and destination database (SpannerIO.Write.withDatabaseId(String)
). For example:// Earlier in the pipeline, create a PCollection of Mutations to be written to Cloud Spanner. PCollection<Mutation> mutations = ...; // Write mutations. SpannerWriteResult result = mutations.apply( "Write", SpannerIO.write().withInstanceId("instance").withDatabaseId("database"));
SpannerWriteResult
The
SpannerWriteResult
object contains the results of the transform, including aPCollection
of MutationGroups that failed to write, and aPCollection
that can be used in batch pipelines as a completion signal toWait.OnSignal
to indicate when all input has been written. Note that in streaming pipelines, this signal will never be triggered as the input is unbounded and thisPCollection
is using theGlobalWindow
.Batching and Grouping
To reduce the number of transactions sent to Spanner, the
Mutations
are grouped into batches. The default maximum size of the batch is set to 1MB or 5000 mutated cells, or 500 rows (whichever is reached first). To override this usewithBatchSizeBytes()
,withMaxNumMutations()
orwithMaxNumRows()
. Setting either to a small value or zero disables batching.Note that the maximum size of a single transaction is 20,000 mutated cells - including cells in indexes. If you have a large number of indexes and are getting exceptions with message: INVALID_ARGUMENT: The transaction contains too many mutations you will need to specify a smaller number of
MaxNumMutations
.The batches written are obtained from by grouping enough
Mutations
from the Bundle provided by Beam to form several batches. This group ofMutations
is then sorted by table and primary key, and the batches are created from the sorted group. Each batch will then have rows for the same table, with keys that are 'close' to each other, thus optimising write efficiency by each batch affecting as few table splits as possible performance.This grouping factor (number of batches) is controlled by the parameter
withGroupingFactor()
.Note that each worker will need enough memory to hold
GroupingFactor x MaxBatchSizeBytes
Mutations, so if you have a largeMaxBatchSize
you may need to reduceGroupingFactor
While Grouping and Batching increases write efficiency, it dramatically increases the latency between when a Mutation is received by the transform, and when it is actually written to the database. This is because enough Mutations need to be received to fill the grouped batches. In Batch pipelines (bounded sources), this is not normally an issue, but in Streaming (unbounded) pipelines, this latency is often seen as unacceptable.
There are therefore 3 different ways that this transform can be configured:
- With Grouping and Batching.
This is the default for Batch pipelines, where sorted batches of Mutations are created and written. This is the most efficient way to ingest large amounts of data, but the highest latency before writing - With Batching but no Grouping
If.withGroupingFactor(1)
, is set, grouping is disabled. This is the default for Streaming pipelines. Unsorted batches are created and written as soon as enough mutations to fill a batch are received. This reflects a compromise where a small amount of additional latency enables more efficient writes - Without any Batching
If.withBatchSizeBytes(0)
is set, no batching is performed and the Mutations are written to the database as soon as they are received. ensuring the lowest latency before Mutations are written.
Monitoring
Several counters are provided for monitoring purpooses:
- batchable_mutation_groups
Counts the mutations that are batched for writing to Spanner. - unbatchable_mutation_groups
Counts the mutations that can not be batched and are applied individually - either because they are too large to fit into a batch, or they are ranged deletes. - mutation_group_batches_received, mutation_group_batches_write_success,
mutation_group_batches_write_failed
Count the number of batches that are processed. If Failure Mode is set toREPORT_FAILURES
, then failed batches will be split up and the individual mutation groups retried separately. - mutation_groups_received, mutation_groups_write_success,
mutation_groups_write_fail
Count the number of individual MutationGroups that are processed. - spanner_write_success, spanner_write_fail
The number of writes to Spanner that have occurred. - spanner_write_retries
The number of times a write is retried after a failure - either due to a timeout, or when batches fail andREPORT_FAILURES
is set so that individual Mutation Groups are retried. - spanner_write_timeouts
The number of timeouts that occur when writing to Spanner. Writes that timed out are retried after a backoff. Large numbers of timeouts suggest an overloaded Spanner instance. - spanner_write_total_latency_ms
The total amount of time spent writing to Spanner, in milliseconds.
Database Schema Preparation
The Write transform reads the database schema on pipeline start to know which columns are used as primary keys of the tables and indexes. This is so that the transform knows how to sort the grouped Mutations by table name and primary key as described above.
If the database schema, any additional tables or indexes are created in the same pipeline then there will be a race condition, leading to a situation where the schema is read before the table is created its primary key will not be known. This will mean that the sorting/batching will not be optimal and performance will be reduced (warnings will be logged for rows using unknown tables)
To prevent this race condition, use
SpannerIO.Write.withSchemaReadySignal(PCollection)
to pass a signalPCollection
(for example the output of the transform that creates the table(s)) which will be used withWait.OnSignal
to prevent the schema from being read until it is ready. The Write transform will be paused until this signalPCollection
is closed.Transactions
The transform does not provide same transactional guarantees as Cloud Spanner. In particular,
- Individual Mutations are submitted atomically, but all Mutations are not submitted in the same transaction.
- A Mutation is applied at least once;
- If the pipeline was unexpectedly stopped, mutations that were already applied will not get rolled back.
Use
MutationGroups
with theSpannerIO.WriteGrouped
transform to ensure that a small set mutations is bundled together. It is guaranteed that mutations in aMutationGroup
are submitted in the same transaction. Note that a MutationGroup must not exceed the Spanner transaction limits.// Earlier in the pipeline, create a PCollection of MutationGroups to be written to Cloud Spanner. PCollection<MutationGroup> mutationGroups = ...; // Write mutation groups. SpannerWriteResult result = mutationGroups.apply( "Write", SpannerIO.write().withInstanceId("instance").withDatabaseId("database").grouped());
Streaming Support
SpannerIO.Write
can be used as a streaming sink, however as with batch mode note that the write order of individualMutation
/MutationGroup
objects is not guaranteed.SpannerIO.Read
andSpannerIO.ReadAll
can be used in Streaming pipelines to read a set of Facts on pipeline startup.SpannerIO.ReadAll
should not be used on an unboundedPCollection<ReadOperation>
, for the reasons stated above.Updates to the I/O connector code
For any significant significant updates to this I/O connector, please consider involving corresponding code reviewers mentioned here. - With Grouping and Batching.
-
-
Nested Class Summary
Nested Classes Modifier and Type Class and Description static class
SpannerIO.CreateTransaction
APTransform
that create a transaction.static class
SpannerIO.FailureMode
A failure handling strategy.static class
SpannerIO.Read
Implementation ofread()
.static class
SpannerIO.ReadAll
Implementation ofreadAll()
.static class
SpannerIO.ReadChangeStream
static interface
SpannerIO.SpannerChangeStreamOptions
Interface to display the name of the metadata table on Dataflow UI.static class
SpannerIO.Write
APTransform
that writesMutation
objects to Google Cloud Spanner.static class
SpannerIO.WriteGrouped
Same asSpannerIO.Write
but supports grouped mutations.
-
Method Summary
All Methods Static Methods Concrete Methods Modifier and Type Method and Description static SpannerIO.CreateTransaction
createTransaction()
Returns a transform that creates a batch transaction.static SpannerIO.Read
read()
Creates an uninitialized instance ofSpannerIO.Read
.static SpannerIO.ReadAll
readAll()
static SpannerIO.ReadChangeStream
readChangeStream()
Creates an uninitialized instance ofSpannerIO.ReadChangeStream
.static SpannerIO.Read
readWithSchema()
static SpannerIO.Write
write()
Creates an uninitialized instance ofSpannerIO.Write
.
-
-
-
Method Detail
-
read
public static SpannerIO.Read read()
Creates an uninitialized instance ofSpannerIO.Read
. Before use, theSpannerIO.Read
must be configured with aSpannerIO.Read.withInstanceId(java.lang.String)
andSpannerIO.Read.withDatabaseId(java.lang.String)
that identify the Cloud Spanner database.
-
readAll
public static SpannerIO.ReadAll readAll()
-
readWithSchema
public static SpannerIO.Read readWithSchema()
-
createTransaction
public static SpannerIO.CreateTransaction createTransaction()
Returns a transform that creates a batch transaction. By default,TimestampBound.strong()
transaction is created, to override this useSpannerIO.CreateTransaction.withTimestampBound(TimestampBound)
.
-
write
public static SpannerIO.Write write()
Creates an uninitialized instance ofSpannerIO.Write
. Before use, theSpannerIO.Write
must be configured with aSpannerIO.Write.withInstanceId(java.lang.String)
andSpannerIO.Write.withDatabaseId(java.lang.String)
that identify the Cloud Spanner database being written.
-
readChangeStream
public static SpannerIO.ReadChangeStream readChangeStream()
Creates an uninitialized instance ofSpannerIO.ReadChangeStream
. Before use, theSpannerIO.ReadChangeStream
must be configured with aSpannerIO.ReadChangeStream.withProjectId(java.lang.String)
,SpannerIO.ReadChangeStream.withInstanceId(java.lang.String)
, andSpannerIO.ReadChangeStream.withDatabaseId(java.lang.String)
that identify the Cloud Spanner database being written. It must also be configured with the start time and the change stream name.
-
-