CARVIEW |
Navigation Menu
-
-
Notifications
You must be signed in to change notification settings - Fork 56.2k
G-API: Introduce Streaming API #15216
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
G-API: Introduce Streaming API #15216
Conversation
89e9370
to
3268b86
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added some self-review. There is a work to be done...
, m_gim.metadata(nh).get<IslandExec>().object}); | ||
|
||
// Initialize queues for every operation's input | ||
ade::TypedGraph<DataQueue> qgr(*m_island_graph); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can create this above, to not create twice (NodeKind::SINK
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe the lifetime of a variable should be as short as possible. Creating a TypedGraph is cheap. Having it declared only here means it is accessed only here.
// FIXME: probably we can maintain a pool of (then) pre-allocated | ||
// buffers to avoid runtime allocations. | ||
// Pool size can be determined given the internal queue size. | ||
cv::Mat nextFrame; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mat
is allocated once ? Otherwise may be to move this to class fields ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good question, but no. Once captured, the frame is pushed down to queues via move()
. The ownership goes away from this point in any case. How to manage this memory properly is an open question.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
~30 more comments.
, m_gim.metadata(nh).get<IslandExec>().object}); | ||
|
||
// Initialize queues for every operation's input | ||
ade::TypedGraph<DataQueue> qgr(*m_island_graph); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe the lifetime of a variable should be as short as possible. Creating a TypedGraph is cheap. Having it declared only here means it is accessed only here.
4268bb8
to
2f30b2a
Compare
@AsyaPronina @TolyaTalamanov please review the updates and close the conversations for issues which are fixed already. |
b6b998b
to
4235d05
Compare
3aa6b43
to
9736955
Compare
Now a GComputation can be compiled in a special "streaming" way and then "played" on a video stream. Currently only VideoCapture is supported as an input source.
- Added very simple pipeline tests, not all data types are covered yet (in fact, only GMat is tested now); - Started testing non-OCV backends in the streaming mode; - Added required fixes to Fluid backend, likely it works OK now; - Added required fixes to OCL backend, and now it is likely broken - Also added a UMat-based (OCL) version of Copy kernel
- Used only if TBB is not available
- Added missing header to CMakeLists.txt - Fixed various CI issues and warnings
- GStreamingExecutor blindly created island's input queues for compile-time (value-initialized) GScalars which didn't have any producers, making island actor threads wait there forever
One was added into master already
- Added tests on mov() - Removed unnecessary changes in garray.hpp
Also fixed some other comments in the code
- Now every island is triggered with own:: (instead of cv::) data objects as inputs; - Changes in Fluid backend required to support cv::Mat/Scalar were reverted;
… test - Also fixed regression test comments - Also added metadata check comments for GStreamingCompiled
- Fixed various possible deadlocks - Unified the shutdown code - Added more tests covering different corner cases on start/stop
In fact the problem hasn't been Windows-only. Island thread popped data from queues without preserving the Cmd objects and without taking the ownership over data acquired so when islands started to process the data, this data may be already freed. Linux version worked only by occasion.
- Also added some more explanation on Streaming/OpenCL status
- Various start()/stop()/setSource() call flow combinations
- Vector/Scalar passed as input; - Vector/Scalar passed in-between islands; - Some more assertions; - Also fixed a deadlock problem when inputs are mixed (1 constant, 1 stream)
- Vector - Scalar
- Now the core G-API doesn't use a cv::VideoCapture directly, it comes in via an abstract interface; - Polished a little bit the setSource()/start()/stop() semantics, now setSource() is mandatory before ANY call to start().
9736955
to
43017be
Compare
@alalek thanks! Just pushed a quick fix |
@alalek 🥳 |
…ing-api * G-API-NG/Streaming: Introduced a Streaming API Now a GComputation can be compiled in a special "streaming" way and then "played" on a video stream. Currently only VideoCapture is supported as an input source. * G-API-NG/Streaming: added threading & real streaming * G-API-NG/Streaming: Added tests & docs on Copy kernel - Added very simple pipeline tests, not all data types are covered yet (in fact, only GMat is tested now); - Started testing non-OCV backends in the streaming mode; - Added required fixes to Fluid backend, likely it works OK now; - Added required fixes to OCL backend, and now it is likely broken - Also added a UMat-based (OCL) version of Copy kernel * G-API-NG/Streaming: Added own concurrent queue class - Used only if TBB is not available * G-API-NG/Streaming: Fixing various issues - Added missing header to CMakeLists.txt - Fixed various CI issues and warnings * G-API-NG/Streaming: Fixed a compile-time GScalar queue deadlock - GStreamingExecutor blindly created island's input queues for compile-time (value-initialized) GScalars which didn't have any producers, making island actor threads wait there forever * G-API-NG/Streaming: Dropped own version of Copy kernel One was added into master already * G-API-NG/Streaming: Addressed GArray<T> review comments - Added tests on mov() - Removed unnecessary changes in garray.hpp * G-API-NG/Streaming: Added Doxygen comments to new public APIs Also fixed some other comments in the code * G-API-NG/Streaming: Removed debug info, added some comments & renamed vars * G-API-NG/Streaming: Fixed own-vs-cv abstraction leak - Now every island is triggered with own:: (instead of cv::) data objects as inputs; - Changes in Fluid backend required to support cv::Mat/Scalar were reverted; * G-API-NG/Streaming: use holds_alternative<> instead of index/index_of test - Also fixed regression test comments - Also added metadata check comments for GStreamingCompiled * G-API-NG/Streaming: Made start()/stop() more robust - Fixed various possible deadlocks - Unified the shutdown code - Added more tests covering different corner cases on start/stop * G-API-NG/Streaming: Finally fixed Windows crashes In fact the problem hasn't been Windows-only. Island thread popped data from queues without preserving the Cmd objects and without taking the ownership over data acquired so when islands started to process the data, this data may be already freed. Linux version worked only by occasion. * G-API-NG/Streaming: Fixed (I hope so) Windows warnings * G-API-NG/Streaming: fixed typos in internal comments - Also added some more explanation on Streaming/OpenCL status * G-API-NG/Streaming: Added more unit tests on streaming - Various start()/stop()/setSource() call flow combinations * G-API-NG/Streaming: Added tests on own concurrent bounded queue * G-API-NG/Streaming: Added more tests on various data types, + more - Vector/Scalar passed as input; - Vector/Scalar passed in-between islands; - Some more assertions; - Also fixed a deadlock problem when inputs are mixed (1 constant, 1 stream) * G-API-NG/Streaming: Added tests on output data types handling - Vector - Scalar * G-API-NG/Streaming: Fixed test issues with IE + Windows warnings * G-API-NG/Streaming: Decoupled G-API from videoio - Now the core G-API doesn't use a cv::VideoCapture directly, it comes in via an abstract interface; - Polished a little bit the setSource()/start()/stop() semantics, now setSource() is mandatory before ANY call to start(). * G-API-NG/Streaming: Fix STANDALONE build (errors brought by render)
Streaming in G-API
This PR introduces basic streaming support in G-API. Now G-API can
handle video streams and pipeline the execution (especially
heterogeneous & inference) efficiently, thus maximizing the overall
throughput.
API Overview
Now in addition to a regular
GComputation::compile()
a user cancompile the same (exactly the same!) graph for streaming -- via
GComputation::compileStreaming()
. The returned object isGStreamingCompiled
-- a stream-oriented version of a compiled graph.Use
setSource()
,start()
, andstop()
methods to specify theinput data (stream or constant), start the processing, and stop it,
respectively. Use methods
pull()
ortry_pull()
to obtain a /nextprocessed frame/ data from the pipeline in synchronous/asynchronous
mode, respectively.
Creating and running a pipeline is now as easy as:
Note that a generic version of
setSource()
takes a vector of inputdata sources via
cv::gin()
, and the output data vector is written bypull()
/try_pull()
into objects passed viacv::gout()
; it is verysimilar how regular
GComputation::apply()
orGCompiled::operator()
work.
compileStreaming
also takes G-API compilation arguments similar to aregular
compile()
-- so custom kernels, backend selection, anddebugging options are welcome!
Implementation overview
The streaming implementation relies heavily on the existing G-API's
heterogeneous infrastructure. If G-API runs a mixture of kernels from
different backends (e.g., OpenCV, Fluid, Inference), the appropriate
graph nodes for these backends are organized into clusters called
"Islands". Kernels in every "Island" belong to the same
device/backend. Islands are regions in the Directed Acyclic Graph (the
G-API's underlying representation model).
Current streaming implementation makes these Islands run in parallel.
Currently it is done naively by assigning every Island to its own
dedicated thread. Since Islands are connected earch other at the graph
level, the execution threads are connected appropriately too (there is a
direct 1:1 mapping).
Island graph model is a bipartite graph with Islands ("super-nodes" or
"super-operations") connected each other via Data objects.
A Data object can be written by only one Island (exclusively) and can
be read from multiple other Islands.
In Streaming mode, every READ edge for Data node is associated with
its own queue (
tbb::concurrent_bounded_queue
or our own equivalentif TBB is not available). Reader Islands poll their input queues and
fetch elements to a vector to form a call frame.
Writer Islands produce data and write it to ALL queues associated with
every output Data object at the graph level.
When a graph is compiled for Streaming, the Island Graph model is
extended with some extra nodes to maintain the above invariant &
reduce complexity: Emitters and Sinks.
Emitters push the input data (whether it is a video stream's next
frame or a constant ("frozen" in GStreamer terms) Mat or scalar) its
queues, which are then consumed by first Islands in the graph.
Sinks collect data from the last (output) Islands in the graph and
synchronize the data (if a pipeline has multiple outputs) to one
single vector to make the implementation of
try_pull()
/pull()
easier.
When there's no data anymore or
stop()
was called explicitly, aspecial
STOP
sign is pushed by emitters/to internal queues and thenbroadcasted to the rest of the pipeline. Some complex logic is
involved to avoid deadlocks in this situation.
Current limitations
GArray<>
andutil::variant
--all were reported internally, workarounded gracefully, and to be
addressed within a release or two.
objects are declared under
cv::gapi::wip
namespace.