CARVIEW |
Navigation Menu
-
-
Notifications
You must be signed in to change notification settings - Fork 56.2k
Fixed handling of new stream, especially for stateful OCV kernels #21731
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
4e40049
to
a31dfe8
Compare
a31dfe8
to
3c2c3ba
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks a lot, Ruslan!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good, thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Issues:
- No meaningful description to the problem and the way you solve it. I can't approve it without that; I won't reconstruct the understating from the code either. In the future, MRs with no proper description won't be accepted. Review can't start if there's no description, especially if it is a bug you're fixing. Which bug?? Why it is a bug? What is the solution?
- Tests on regular mode execution are missing. Are you sure it is working OK?
@@ -382,16 +383,19 @@ void cv::gimpl::GExecutor::run(cv::gimpl::GRuntimeArgs &&args) | |||
magazine::resetInternalData(m_res, data); | |||
} | |||
|
|||
// Run the script | |||
// Ask every IslandExecutable to prepate its internal states for new coming stream (4) | |||
std::call_once(m_prep_flag, [this](){ this->prepareForNewStream(); }); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So this happens only once. Is this flag reset anywhere?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, this flag can't be reset as I've got it
By the way, this flag is different for any different object of GExecutor
.
So, I might say that prepareForNewStream()
is called once per each GExecutor
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is wrong.
It may be called at any moment for a GCompiled and NO internal recompilation may happen.
https://docs.opencv.org/4.x/d2/d2c/classcv_1_1GCompiled.html#ab3f7a54ed698a86ca6c63b4351eddb20
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also. Should this happen in run or during the gexecutor construction? When it is constructed, valid meta
should be there already/
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I mean, first prepareForNewStream()
is called once per each GExecutor
, which is not called by user.
Yes, it should be called upon the GExecutor
construction, std::call_once
is used to emulate similar behaviour.
But, in the GExecutor
execution model it seemed for me that it just fits better to be as 4th step:
// Naive execution model is similar to current CPU (OpenCV) plugin
// execution model:
// 1. Allocate all internal resources first (NB - CPU plugin doesn't do it)
// 2. Put input/output GComputation arguments to the storage
// 3. For every Island, prepare vectors of input/output parameter descs
// 4. Ask every GIslandExecutable to prepate its internal states for a new stream
// 5. Iterate over a list of operations (sorted in the topological order)
// 6. For every operation, form a list of input/output data objects
// 7. Run GIslandExecutable
// 8. writeBack
And I think. that 4th step should be implemented in run()
. But I was wrong, it shouldn't be exactly in run()
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, fixed!!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note I am not talking here about the initialization procedure.
prepareForNewStream
can be called by user manually for the existing GExecutor
.
- Fixed explanation comments - Expanded test for stateful OCV kernels in Regular mode
Hello! Thanks a lot! Added description and test for regular mode! |
@@ -382,16 +383,19 @@ void cv::gimpl::GExecutor::run(cv::gimpl::GRuntimeArgs &&args) | |||
magazine::resetInternalData(m_res, data); | |||
} | |||
|
|||
// Run the script | |||
// Ask every IslandExecutable to prepate its internal states for new coming stream (4) | |||
std::call_once(m_prep_flag, [this](){ this->prepareForNewStream(); }); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also. Should this happen in run or during the gexecutor construction? When it is constructed, valid meta
should be there already/
cv::compile_args(cv::gapi::kernels<GOCVCountStateSetups>(), params)); | ||
EXPECT_TRUE(params.pSetupsCount != nullptr); | ||
EXPECT_EQ(1, *params.pSetupsCount); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So there is still not test on regular mode entering a new stream.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, you are right, there is no test which tests right new stream and state after that, but there is a test which does such testing in the context of another test case:
callsCounter.prepareForNewStream(); |
By the way, it seems needed to add such separate test
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, fixed!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
but there is a test which does such testing in the context of another test case:
That test should fail with your previous implementation. Obviously it tests some other aspect.
- Moved notification about new stream to the constructor
- Moved notification about new stream to the constructor - Added test on state reset for Regular mode
c5a9c4b
to
d940969
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OPTIONAL:
added several hint to turn logger up
@@ -82,6 +83,9 @@ cv::gimpl::GExecutor::GExecutor(std::unique_ptr<ade::Graph> &&g_model) | |||
break; | |||
} // switch(kind) | |||
} // for(gim nodes) | |||
|
|||
// (4) | |||
prepareForNewStream(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suggest you add logger somewhere in
void ***:prepareForNewStream()
{
GAPI_LOG_INFO(nullptr, "inputs nodes: " << m_ops.size()) <----
for (auto &op : m_ops)
{
op.isl_exec->handleNewStream();
}
}
// 5. For every operation, form a list of input/output data objects | ||
// 6. Run GIslandExecutable | ||
// 7. writeBack | ||
// 4. Ask every GIslandExecutable to prepate its internal states for a new stream |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As for me - it's good idea to put logger with INFO in such places 1,2,3...
@@ -382,16 +386,16 @@ void cv::gimpl::GExecutor::run(cv::gimpl::GRuntimeArgs &&args) | |||
magazine::resetInternalData(m_res, data); | |||
} | |||
|
|||
// Run the script | |||
// Run the script (5) | |||
for (auto &op : m_ops) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
GAPI_LOG_DEBUG(nullptr, "Run the script for inputs nodes: " << m_ops.size())
Input i{m_res, op.in_objects}; | ||
Output o{m_res, op.out_objects}; | ||
op.isl_exec->run(i, o); | ||
} | ||
|
||
// (7) | ||
// (8) | ||
for (auto it : ade::util::zip(ade::util::toRange(proto.outputs), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
GAPI_LOG_DEBUG(nullptr, "WriteBack ".... )
{ | ||
op.isl_exec->handleNewStream(); | ||
} | ||
// Notify island executable about a new stream to let it update its internal variables. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LOG_INFO/DEBUG(nullptr, "Notify island executable about a new stream to let it update its internal variables.")
op.isl_exec->handleNewStream(); | ||
} | ||
// Notify island executable about a new stream to let it update its internal variables. | ||
op.isl_exec->handleNewStream(); | ||
|
||
m_threads.emplace_back(islandActorThread, | ||
op.in_objects, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If islandActorThread
is infinitive actor thread (while(true)
) then it is good idea to improve it
islandActorThread ()
...
GAPI_LOG_INFO(..., <name of island executor> " Started")
while(true) {
}
GAPI_LOG_INFO(..., Finished")
|
||
struct CountStateSetupsParams | ||
{ | ||
int* pSetupsCount = nullptr; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
weak_ptr?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks a lot for the catch!!
I 've chosen to not use exactly weak_ptr
, because think that CountStateSetupsParams
is good enough to be owner of passed memory
Fixed with std::shared_ptr
, not std::unique_ptr
, just to allow copying
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
but if you indented to use copy then how's about mltithreading? do wee need to make operation ++ atomic?
static void setup(const cv::GMatDesc &, std::shared_ptr<int> &, | ||
const cv::GCompileArgs &compileArgs) | ||
{ | ||
auto params = cv::gapi::getCompileArg<CountStateSetupsParams>(compileArgs) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if it once-time triggered then i suggest:
auto params =...
if () {
}
GAPI_LOG_INFO(nullptr, "done with setup count: " << params.pSetupsCount ? *params.pSetupsCount : nullptr)
// 5. For every operation, form a list of input/output data objects | ||
// 6. Run GIslandExecutable | ||
// 7. writeBack | ||
// 4. Ask every GIslandExecutable to prepate its internal states for a new stream |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
prepate -> prepare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, (4) is misleading. It doesn't happen on any run
.
cv::compile_args(cv::gapi::kernels<GOCVCountStateSetups>(), params)); | ||
EXPECT_TRUE(params.pSetupsCount != nullptr); | ||
EXPECT_EQ(1, *params.pSetupsCount); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
but there is a test which does such testing in the context of another test case:
That test should fail with your previous implementation. Obviously it tests some other aspect.
// States re-initialization will be called upon their kernels executions only | ||
EXPECT_EQ(1, *params.pSetupsCount); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But why?
Now this handleNewStream()
method is called for every island executable immediately on prepareForNewStream()
, isn't it? In this case, I'd expect to see 2
here.
Hello, Sergey! Could I create for now a ticket for me to update code according to the comments? |
Hi Asya, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Better but still have some questions.
Can be addressed later, though.
GAPI_LOG_WARNING(NULL, | ||
"\nGCPUExecutable::reshape was called. Resetting states of stateful kernels."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't this be printed every time or there are reasons to say it only once?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only not to spam this message every reshape() if it happens often
@@ -387,6 +530,45 @@ TEST(StatefulKernel, StateIsChangedViaCompArgsOnReshape) | |||
run("cv/video/768x576.avi", "knn"); | |||
run("cv/video/1920x1080.avi", "mog2"); | |||
} | |||
|
|||
TEST(StatefulKernel, StateIsResetOnceOnReshapeInStreaming) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we care about reshape in streaming?
If source changed, reset should happen - isn't it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test is here to highlight an issue that we can have 2 setup() calls in such cases
@alalek can we proceed with the merge please? |
…am_event Fixed handling of new stream, especially for stateful OCV kernels * Fixed handling of new stream, especially for stateful OCV kernels * Removed duplication from StateInitOnce tests * Addressed review comments for PR opencv#21731 - Fixed explanation comments - Expanded test for stateful OCV kernels in Regular mode * Addressed review comments for PR opencv#21731 - Moved notification about new stream to the constructor - Added test on state reset for Regular mode * Addresed review comments * Addressed review comments Co-authored-by: Ruslan Garnov <ruslan.garnov@intel.com>
Pull Request Readiness Checklist
See details at https://github.com/opencv/opencv/wiki/How_to_contribute#making-a-good-pull-request
Patch to opencv_extra has the same branch name.
There is a stateful kernel in OCV which can preserve the state to work with it. Once new stream happens we should re-initialize the state from the scratch. For the Streaming G-API mode, re-initialization is called automatically once new stream happened, for Regular G-API mode user should take care of it.
To update OCV stateful kernels for a new stream,
handleNewStream()
should be called on theGCPUExecutable
instance.In the Streaming mode,
GStreamingExecutor::setSource()
method handles that, as it is indeed responsible for configuration of the executor for a new stream and notifying other peers about it.In the Regular mode, user should call
prepareForNewStream()
onGCompiled
instance.BUG description
Problem was in that, that in streaming G-API mode, on the first graph run, kernels' states initialization is called twice for the
GComputation
compiled with metadata.Main root cause here is that kernel states initialization isn't ran just in response to
GCPUExecutable::handleNewStream()
. It is also called upon theGCPUExecutable
instantiation -- in theGCPUExecutable
constructor.So, firstly, kernel states are initialized in
GCPUExecutable
constructor called at graph compilation stage if the metadatas are passed. And, secondly, during the first run, they will also be re-initialized byGStreamingExecutor::setSource()
call, notifying about new happened stream and calledGCPUExecutable::handleNewStream()
.It wasn't the case for graphs compiled without meta, because the
GCompiler::compileStreaming()
call doesn't launchGCPUExecutable
constructor in case if metadata is not provided.But,
GStreamingExectutor::setSource()
callsGCPUExecutable
constructor.In
GStreamingExecutor::setSource()
execution goes to islands recompilation or graph reshape in case if there is no original metadata. Reshape was not the case for OCV backend until recently and execution always falls to the islands recompilation. Islands recompilation will call theGCPUExecutable
constructor and setup kernel states.But after that, there is a check in
GStreamingExecutor::setSource()
if re-compilation was called or not and if called, then no notification about new stream will be sent to the peers and no second call to kernel states re-initialization will happen.There are 2 problems here:
GStreamingExecutor
in different scenarios. What will happened if multiple backends will handlehandleNewStream()
call and not only OCV?GStreamingExecutor
knows something about independent backends(that OCV backend calls states initialization in constructor) and acts based on this knowledge leading to tight coupling between backend component and itself.Solution
Solution was:
handleNewStream()
unconditionally inGStreamingExecutor::setSource()
.handleNewStream()
call.apply()
was for previous stream and new call toapply()
is already for new one). But, G-API needs somehow say about new stream for the first time, to prepare all the stuff before execution. Because there were no previous calls toapply()
and everything is clear, G-API can notify internal components itself about new stream.So, call to the
prepareForNewStream()
was added to theGExecutor::run()
method under condition that it will be called only once.