CARVIEW |
Navigation Menu
-
-
Notifications
You must be signed in to change notification settings - Fork 56.2k
[G-API] Handle exceptions in streaming executor #21660
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[G-API] Handle exceptions in streaming executor #21660
Conversation
a6f530d
to
086872a
Compare
|
||
|
||
#ifndef OPENCV_GAPI_GCPUKERNEL_HPP | ||
#define OPENCV_GAPI_GCPUKERNEL_HPP | ||
|
||
#ifdef _MSC_VER |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems to be false positive warning, but need to double check
@@ -889,6 +929,33 @@ class StreamingOutput final: public cv::gimpl::GIslandExecutable::IOutput | |||
} | |||
} | |||
} | |||
|
|||
virtual void post(cv::gimpl::Exception&& error) override |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Copy paste from post(EndOfStream&&)
except incrementing m_stops_sent
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMHO:
From my perspective there are a lot of sort-out
and shitfing from queue to queue
code here as a legacy part.
But it this function just copied & pasted then I would recommend introduce self-documented private function here like
dispatch<Message>()
because it would be easier to understand full-body of post
function when see that post
just makes dispatch
to subsequent
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's the problem, this function isn't fully copy-pasted. (m_stops_sent++
removed)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i assumed some customization here, for example
post(Exception )
{
(void)dispatch<Exception> ()
}
post(Stop)
{
stop_sent += dispatch <Stop>();
}
Please consider this comment as optional
GAPI_ITT_AUTO_TRACE_GUARD(collector_push_hndl); | ||
out_queue.push(Cmd{Result{std::move(this_result), flags}}); | ||
case QueueReader::V::index_of<Stop>(): | ||
if (handle_stop) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@AsyaPronina
Does it make sense to put traces here?
@sivanov-work Could you have a look? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMHO: there a lot of switch-like
code block which handles different types of variant and it's hard to understand & read it. Furthermore, for future customization ( adding new message type) it requires to sort-out a lot of similar code and doesn't look upgraded easily
I'd propose to introduce full-descriptive-class of Message
of ControlMessage
as part of current std::variant
at least then introduce message_handle(ControlMessage &)
to encapsulate operations with it - to make code clear
Also, there might be a sort of problem in working with queue capacity: when exception (and other command messages) cannot be pushed due to queue fullness. I'd propose to consider to modify queue to some kind of smart queue: which allow to push controlMessages when it's full as high priority messages
try { | ||
t.run(request); | ||
} catch (...) { | ||
request.SetCompletionCallback([](){}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i'd put LOG_WARNING/ERROR here something like:
request by id: " << id << " failed, error" << ex.what()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't think that log
is needed here, user will see exception message when call pull.
I'd propose to make exception
msg a bit clearer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's optional comment: please feel free to ignore it
in defense of logger I could say that it may provide more detailed information about what actually happened and what current states we have which will help in further troubleshooting
GAPI_ITT_STATIC_LOCAL_HANDLE(ie_cb_post_outputs_hndl, "IE_async_callback_PostOutputs"); | ||
GAPI_ITT_AUTO_TRACE_GUARD(ie_cb_post_outputs_hndl); | ||
|
||
if (code != IE::StatusCode::OK) { | ||
ctx->eptr = std::make_exception_ptr( | ||
std::logic_error("IE::InferRequest finished with not OK status")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
from my perspective it would be great also to print status value
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added
|
||
struct Exception { | ||
std::exception_ptr eptr; | ||
}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMHO:
encapsulation in some kind of ServiceMessage
will simplify future extensibility:
struct ServiceMessage
{
enum { EndOfStream, Exception }
....
};
using StreamMsg = cv::util::variant<ServiceMessage, cv::GRunArgs>;
then
if (cv::util::holds_alternative<cv::gimpl::EndOfStream>(in_msg)) {
}
if (cv::util::holds_alternative<cv::gimpl::Exception>(in_msg)) {
}
might be modified as
if (cv::util::holds_alternative<cv::gimpl::ServiceMessage>(in_msg)) {
out.post(in_msg);
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's really make code a bit clearer, but
- Is there any other possible message types? (I can't imagine)
EndOfStream
&Exception
should be handled in different ways (almost the same but not)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any other possible message types?
there had been only one Stop
message before you introduced Exception
- so hypothetically if any new messages are required we will need to repeat this effort again for adding new variant types
EndOfStream & Exception should be handled in different ways (almost the same but not)
that's because we would separate "traffic" messages & "service" messages in processing. Something like that:
if (cv::util::holds_alternative<cv::gimpl::ServiceMessage>(in_msg)) {
//encapsulate switch-case
process_service_message(in_msg); //dispatch for other islands, stop, exception and so on...
}
else
{
.. frame processing
}
P.S. optional comment
break; | ||
} | ||
default: | ||
cv::util::throw_error(std::logic_error("Unsupported cmd type in getInputVector()")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think it had better to put GAPI_DbgAssert
here in consideration that it acts as c-like assert
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you think about GAPI_Assert
instead? In order to keep consistency with other code (see above)
GAPI_Assert(stop.kind == Stop::Kind::HARD);
and etc...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
oq->push(Cmd{cv::gimpl::Exception{eptr}}); | ||
} | ||
// NB: Go to the next iteration. | ||
continue; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will we hang here forever?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, why?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
when emitter produces exception constantly then we catch it and go to 'continue' for while (true)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If source produces exceptions constantly then these exceptions will be propagated through the graph and rethrown in GStreamingExecutor::pull
to the user.
Emitter shouldn't be stopped if source caused exception, see the test where source throw exception every second frame
break; | ||
case Posting::V::index_of<cv::gimpl::EndOfStream>(): | ||
cmd = Cmd{Stop{}}; | ||
m_stops_sent++; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does it make sense to introduce m_exceptions_caught++
for exception case, doesn't it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't get the idea, why it's needed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not sure, for statistics/analytics maybe - in case of multiple exception are allowed in pipeline without stopping it
If this case is not supported - then please ignore this comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Multiple exceptions aren't supported.
In this PR, I try not to allow exceptions to be thrown from not main
thread.
@@ -889,6 +929,33 @@ class StreamingOutput final: public cv::gimpl::GIslandExecutable::IOutput | |||
} | |||
} | |||
} | |||
|
|||
virtual void post(cv::gimpl::Exception&& error) override |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMHO:
From my perspective there are a lot of sort-out
and shitfing from queue to queue
code here as a legacy part.
But it this function just copied & pasted then I would recommend introduce self-documented private function here like
dispatch<Message>()
because it would be easier to understand full-body of post
function when see that post
just makes dispatch
to subsequent
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM if old & new tests are passed
try { | ||
t.run(request); | ||
} catch (...) { | ||
request.SetCompletionCallback([](){}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's optional comment: please feel free to ignore it
in defense of logger I could say that it may provide more detailed information about what actually happened and what current states we have which will help in further troubleshooting
|
||
struct Exception { | ||
std::exception_ptr eptr; | ||
}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any other possible message types?
there had been only one Stop
message before you introduced Exception
- so hypothetically if any new messages are required we will need to repeat this effort again for adding new variant types
EndOfStream & Exception should be handled in different ways (almost the same but not)
that's because we would separate "traffic" messages & "service" messages in processing. Something like that:
if (cv::util::holds_alternative<cv::gimpl::ServiceMessage>(in_msg)) {
//encapsulate switch-case
process_service_message(in_msg); //dispatch for other islands, stop, exception and so on...
}
else
{
.. frame processing
}
P.S. optional comment
{ | ||
if (eptr) | ||
{ | ||
std::rethrow_exception(eptr); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it some kind of default/stub implementation?
because it now has a "state" and it affects multithreading behavior between simultaneous post
and verify
at least
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
GExecutor::IOutput
methods might be called from different threads, but not concurrently.
verify()
is supposed to be called after post()
That's a good point, but I don't know how guarantee that's methods won't be called concurrently in future, except putting comment
bool QueueReader::getInputVector(std::vector<Q*> &in_queues, | ||
cv::GRunArgs &in_constants, | ||
cv::GRunArgs &isl_inputs) | ||
cv::gimpl::StreamMsg QueueReader::getInputVector(std::vector<Q*> &in_queues, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you clarify - why it changed on returning value?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Previously bool
means != Stop{}
, now except Stop
there can be Exception
message, so it was changed from bool to StreamMsg
oq->push(Cmd{cv::gimpl::Exception{eptr}}); | ||
} | ||
// NB: Go to the next iteration. | ||
continue; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
when emitter produces exception constantly then we catch it and go to 'continue' for while (true)
break; | ||
case Posting::V::index_of<cv::gimpl::EndOfStream>(): | ||
cmd = Cmd{Stop{}}; | ||
m_stops_sent++; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not sure, for statistics/analytics maybe - in case of multiple exception are allowed in pipeline without stopping it
If this case is not supported - then please ignore this comment
@@ -889,6 +929,33 @@ class StreamingOutput final: public cv::gimpl::GIslandExecutable::IOutput | |||
} | |||
} | |||
} | |||
|
|||
virtual void post(cv::gimpl::Exception&& error) override |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i assumed some customization here, for example
post(Exception )
{
(void)dispatch<Exception> ()
}
post(Stop)
{
stop_sent += dispatch <Stop>();
}
Please consider this comment as optional
case Cmd::index_of<Exception>(): { | ||
std::rethrow_exception(cv::util::get<Exception>(cmd).eptr); | ||
return true; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any warning about missing 'default' ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
@dmatveev Could you have a look, please? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updating every possible backend with this new exception handling is quite invasive.
Can you please offer some other way to do it?
Can those changes be only introduced at the executors level?
if (cv::util::holds_alternative<cv::gimpl::Exception>(in_msg)) | ||
{ | ||
// (4) If the Exception message is revieved, propagate it further. | ||
out.post(std::move(cv::util::get<cv::gimpl::Exception>(in_msg))); | ||
return; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be done at this level? As well as every other island's level?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean, shouldn't this be handled uniformly for all possible islands? A level above.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Couldn't agree more. Handle the input exception
in a general way.
if (cv::util::holds_alternative<cv::gimpl::Exception>(in_msg)) { | ||
out.post(std::move(cv::util::get<cv::gimpl::Exception>(in_msg))); | ||
return; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
well, if we need to update every our backend now, this change is quite invasive.
What's about our huge internal part?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Answered above
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is probably done right but I can't say it for sure until it is properly documented
try { | ||
t.run(request); | ||
} catch (...) { | ||
request.SetCompletionCallback([](){}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A comment is required here. Previous single-liner was clear, now I have no idea why this should happen in catch. And I doubt if anybody knows.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
void cv::gimpl::ie::RequestPool::callback(cv::gimpl::ie::RequestPool::Task task, | ||
size_t id, | ||
IE::InferRequest request, | ||
IE::StatusCode code) { | ||
// FIXME: Any exception which is arrised here must not leave this callback, | ||
// because it won't be handled. | ||
try { | ||
if (code != IE::StatusCode::OK) { | ||
throw std::logic_error("IE::InferRequest finished with not OK status"); | ||
} | ||
task.callback(request); | ||
// NB: IE::InferRequest keeps the callback until the new one is set. | ||
// Since user's callback might keep resources that should be released, | ||
// need to destroy its after execution. | ||
// Let's set the empty one to cause the destruction of a callback. | ||
request.SetCompletionCallback([](){}); | ||
m_idle_ids.push(id); | ||
} catch (const std::exception& e) { | ||
GAPI_LOG_FATAL(NULL, "Callback failed with error: " << e.what()); | ||
//FIXME: Exception CAN't be rethrown here, since this callback works | ||
// in separate IE thread and such scenarios aren't handled properly in | ||
// G-API so far. | ||
} | ||
IE::StatusCode code) noexcept { | ||
task.callback(request, code); | ||
request.SetCompletionCallback([](){}); | ||
m_idle_ids.push(id); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is this all about? Why? What's for? A simple comment would help to understand the need in this method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
@@ -172,6 +172,7 @@ void Copy::Actor::run(cv::gimpl::GIslandExecutable::IInput &in, | |||
return; | |||
} | |||
|
|||
GAPI_Assert(cv::util::holds_alternative<cv::GRunArgs>(in_msg)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be a DgbAssert? get<>
will fail anyway if the message is of other type.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make sense, done
* Put more comments * Fix alignment * Move test outside of HAVE_NGRAPH
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
@alalek Could you merge it, please? |
…on-in-streamingexecutor [G-API] Handle exceptions in streaming executor * Handle exceptions in streaming executor * Rethrow exception in non-streaming executor * Clean up * Put more tests * Handle exceptions in IE backend * Handle exception in IE callbacks * Handle exception in GExecutor * Handle all exceptions in IE backend * Not only (std::exception& e) * Fix comments to review * Handle input exception in generic way * Fix comment * Clean up * Apply review comments * Put more comments * Fix alignment * Move test outside of HAVE_NGRAPH * Fix compilation
Pull Request Readiness Checklist
See details at https://github.com/opencv/opencv/wiki/How_to_contribute#making-a-good-pull-request
Patch to opencv_extra has the same branch name.
Build configuration