CARVIEW |
Navigation Menu
-
Notifications
You must be signed in to change notification settings - Fork 4.4k
Description
What happened?
The following error might occur in some pipelines, possibly non-deterministically:
Exception serializing message!
Traceback (most recent call last):
File "/usr/local/lib/python3.10/site-packages/grpc/_common.py", line 89, in _transform
return transformer(message)
ValueError: Message org.apache.beam.model.fn_execution.v1.Elements exceeds maximum protobuf size of 2GB: 2887086320
Traceback (most recent call last):
File \"/usr/local/lib/python3.10/site-packages/apache_beam/runners/worker/data_plane.py\", line 700, in _read_inputs
for elements in elements_iterator:
File \"/usr/local/lib/python3.10/site-packages/grpc/_channel.py\", line 542, in __next__
return self._next()
File \"/usr/local/lib/python3.10/site-packages/grpc/_channel.py\", line 968, in _next
raise self
This issue is caused by large elements in Beam pipeline. If you see this error, upgrade to Apache Beam Python 2.57.0 or later SDK. Apache Beam 2.57.0 improves a codepath that could suboptimally combine multiple large elements together. It also adds logging when large elements are detected.
If you run the pipeline on 2.57.0 and above, and failures persist, look for warnings like:
Data output stream buffer size ... exceeds 536870912 bytes. This is likely due to a large element in a PCollection.
or errors like:
Buffer size ... exceeds GRPC limit 2147483548. This is likely due to a single element that is too large.
If you see these warnings, inspect the logs to see which pipeline step emits these messages, and try to reduce the size of the elements in that step.
When constructing your pipeline, follow these best practices:
- In
PCollections
use multiple small elements instead of a single large element. - Store large blobs in external storage systems. Either use
PCollections
to pass their metadata, or use a custom coder that reduces the size of the element.
As a last resort, you could try to use a custom coder that transmits large elements via a file system side channel.
A Beam Coder that transmits large values via a filesystem side channel (Python)
class FileBackedElementCoder(Coder):
def __init__(self, underlying_coder, root, threshold=1 << 20):
self._underlying_coder = underlying_coder
self._root = root
self._threshold = threshold
def encode(self, value):
encoded = self._underlying_coder.encode(value)
if len(encoded) < self._threshold:
return b'\x00' + encoded
else:
path = beam.io.filesystems.FileSystems.join(self._root, uuid.uuid4().hex)
with beam.io.filesystems.FileSystems.create(path) as fout:
fout.write(encoded)
return b'\x01' + path.encode('utf-8')
def decode(self, encoded_value):
if encoded_value[0] == 0:
data = encoded_value[1:]
else:
path = encoded_value[1:].decode('utf-8')
with beam.io.filesystems.FileSystems.open(path) as fin:
data = fin.read(-1)
return self._underlying_coder.decode(data)
Please leave a note if this custom coder helped resolve your issue and other approaches didn't work.
A Beam Coder that transmits large values via a filesystem side channel (Java)
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
import java.util.UUID;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.fs.ResolveOptions;
import org.apache.beam.sdk.io.fs.ResourceId;
public class HugeElementCoder<T> extends CustomCoder<T> {
private final Coder<T> underlying;
private final long threshold;
private final ResourceId tempDir;
public HugeElementCoder(Coder<T> underlying, long threshold, String tempDir) {
this(underlying, threshold, FileSystems.matchNewResource(tempDir, true));
}
public HugeElementCoder(Coder<T> underlying, long threshold, ResourceId tempDir) {
this.underlying = underlying;
this.threshold = threshold;
assert tempDir.isDirectory();
this.tempDir = tempDir;
}
@Override
public void encode(T value, OutputStream outStream) throws CoderException, IOException {
MultiplexingOutputStream out = new MultiplexingOutputStream();
try {
underlying.encode(value, out);
} finally {
out.close(outStream);
}
}
@Override
public T decode(InputStream inStream) throws IOException {
int isInline = inStream.read();
if (isInline == 1) {
return underlying.decode(inStream);
} else {
String path = new DataInputStream(inStream).readUTF();
ReadableByteChannel fileChannel =
FileSystems.open(
tempDir.resolve(path, ResolveOptions.StandardResolveOptions.RESOLVE_FILE));
try {
return underlying.decode(Channels.newInputStream(fileChannel));
} finally {
fileChannel.close();
}
}
}
@Override
public void verifyDeterministic() throws NonDeterministicException {
this.underlying.verifyDeterministic();
}
private class MultiplexingOutputStream extends OutputStream {
private boolean inlined;
private ByteArrayOutputStream buffer;
private OutputStream fileOut;
private String path;
public MultiplexingOutputStream() {
this.inlined = true;
this.buffer = new ByteArrayOutputStream();
}
@Override
public void write(int b) throws IOException {
write(new byte[] {(byte) b}, 0, 1);
}
@Override
public void write(byte[] b) throws IOException {
write(b, 0, b.length);
}
@Override
public void write(byte[] b, int off, int len) throws IOException {
if (inlined) {
if (buffer.size() + len < threshold) {
buffer.write(b, off, len);
} else {
inlined = false;
path = UUID.randomUUID().toString();
WritableByteChannel fileChannel =
FileSystems.create(
tempDir.resolve(path, ResolveOptions.StandardResolveOptions.RESOLVE_FILE),
"application/octet-stream");
fileOut = Channels.newOutputStream(fileChannel);
fileOut.write(buffer.toByteArray());
buffer = null;
fileOut.write(b, off, len);
}
} else {
fileOut.write(b, off, len);
}
}
public void close(OutputStream out) throws IOException {
if (inlined) {
out.write(1);
out.write(buffer.toByteArray());
} else {
fileOut.close();
out.write(0);
new DataOutputStream(out).writeUTF(path);
}
}
}
}
Please leave a note if this custom coder helped resolve your issue and other approaches didn't work.
For more information on custom coders, see:
- https://beam.apache.org/releases/pydoc/current/apache_beam.coders.typecoders.html
- https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/cookbook/coders.py
Issue Priority
Priority: 2 (default / most bugs should be filed as P2)
Issue Components
- Component: Python SDK
- Component: Java SDK
- Component: Go SDK
- Component: Typescript SDK
- Component: IO connector
- Component: Beam YAML
- Component: Beam examples
- Component: Beam playground
- Component: Beam katas
- Component: Website
- Component: Spark Runner
- Component: Flink Runner
- Component: Samza Runner
- Component: Twister2 Runner
- Component: Hazelcast Jet Runner
- Component: Google Cloud Dataflow Runner