CARVIEW |
Navigation Menu
-
-
Notifications
You must be signed in to change notification settings - Fork 56.2k
G-API: Implement concurrent executor #24845
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
G-API: Implement concurrent executor #24845
Conversation
@@ -513,4 +515,23 @@ TEST(GAPI_Pipeline, 1DMatWithinSingleIsland) | |||
EXPECT_EQ(0, cv::norm(out_mat, ref_mat)); | |||
} | |||
|
|||
TEST(GAPI_Pipeline, NewExecutorTest) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Was a temp test, need to remove
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
|
||
#include "thread_pool.hpp" | ||
|
||
#include <iostream> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
|
||
void cv::gapi::own::WaitGroup::wait() { | ||
while (task_counter.load() != 0u) { | ||
std::unique_lock<std::mutex> lk{m}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Definitely shouldn't be in loop...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed
2b3fd6f
to
dbd7570
Compare
dbd7570
to
7ccd71a
Compare
7ccd71a
to
38b2c43
Compare
namespace magazine { | ||
namespace { | ||
|
||
void bindInArgExec(Mag& mag, const RcDesc &rc, const GRunArg &arg) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This part is pure copy-paste from gexecutor.cpp
, perhaps it should be somewhere in common (e.g gcommon.hpp)
} | ||
} | ||
|
||
void assignMetaStubExec(Mag& mag, const RcDesc &rc, const cv::GRunArg::Meta &meta) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dmatveev This is the place where meta is stored into Mag
. I assume there might be some data race that's why Mag
is currently protected by mutex.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there won't be a race if we statically preallocate the magazine for all relevant [rc.id]
s in all relevant types, but the mutex is fine and we can do that later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left mutex so far
|
||
virtual StreamMsg get() override | ||
{ | ||
std::lock_guard<std::mutex> lock{m_mutex}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removing lock causes the data race even though magazine::getArg
is supposed to be a read operation
void meta(const GRunArgP &out, const GRunArg::Meta &m) override | ||
{ | ||
const auto idx = out_idx.at(cv::gimpl::proto::ptr(out)); | ||
std::lock_guard<std::mutex> lock{m_mutex}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I assume it's impossible that two Island
's write to the same memory in parallel but it caused data race if mutex isn't used
|
||
void run(); | ||
void verify(); | ||
std::shared_ptr<GIslandExecutable> exec() { return m_isl_exec; } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
exec()
is redundant for IslandActor
, in fact GIslandExecutable::Ptr
can be stored & accessed in GThreadedExecutor
and passed to IslandActor
by pointer/reference.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please, leave a comment in code
#include <opencv2/gapi/util/throw.hpp> | ||
|
||
void cv::gapi::own::WaitGroup::add() { | ||
std::lock_guard<std::mutex> lk{m}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In fact it's not necessarily to acquire the mutex on add()
method, task_counter
might be an atomic
.
But since we still need the mutex
for wait
(as it's uses condvar) atomic
+ mutex
seems the weird combination and didn't give any significant performance improvements. Decided to keep it simple so far.
If fact WaitGroup
can be implemented without condvar and mutex by using 2 atomics. But it requires c++20 for this thing: https://en.cppreference.com/w/cpp/atomic/atomic/wait
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
WaitGroup has been removed
} | ||
} | ||
|
||
void cv::gapi::own::ThreadPool::schedule(cv::gapi::own::ThreadPool::Task task) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ThreadSanitizer
also signals problems when schedule
is called from different threads but it seems to be fine since both WaitGroup::add()
and Queue::push
are supposed to be thread-safe
void cv::gapi::own::ThreadPool::stop() { | ||
wait(); | ||
for (uint32_t i = 0; i < m_num_workers; ++i) { | ||
m_queue.push({}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Empty task - termination criterion for the worker, at least worth a comment...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
|
||
#include <opencv2/gapi/own/exports.hpp> // GAPI_EXPORTS | ||
|
||
#if defined(HAVE_TBB) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yet another copy-paste of this thing
38b2c43
to
bfd0ac8
Compare
* | ||
* Specifies a number of threads that should be used by executor. | ||
*/ | ||
struct use_threaded_executor |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should be exposed into python later
*/ | ||
struct use_threaded_executor | ||
{ | ||
explicit use_threaded_executor(uint32_t nthreads = std::thread::hardware_concurrency()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
probably here we should stick to the OpenCV thread settings: https://docs.opencv.org/4.9.0/db/de0/group__core__utils.html#ga2db334ec41d98da3129ef4a2342fc4d4
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
...but normally it is needed to respond/handle to the setNumThreads() which sets # of threads to OpenCV globally (as it is still a set of standalone functions)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, I think we can use cv::getNumThreads()
there instead. Done
src/executor/gstreamingexecutor.cpp | ||
src/executor/gasync.cpp | ||
src/executor/thread_pool.cpp |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it a third thread pool in this only module? :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess you refer to RequestPool
's from ie
and ov
backends.
switch (arg.index()) | ||
{ | ||
case GRunArg::index_of<Mat>() : | ||
mag_rmat = make_rmat<RMatOnMat>(util::get<Mat>(arg)); break; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
normally we have break
in the following line
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(or the whole case ...: ...; break
in a single line)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's just a copy-paste from gexecutor.cpp
but no problem to address it :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
} | ||
} | ||
|
||
void assignMetaStubExec(Mag& mag, const RcDesc &rc, const cv::GRunArg::Meta &meta) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there won't be a race if we statically preallocate the magazine for all relevant [rc.id]
s in all relevant types, but the mutex is fine and we can do that later.
cv::gimpl::Mag &mag; | ||
std::mutex &m_mutex; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe it is an abstraction leak to expose 1) a data structure and 2) its protection mutex by a reference to some external context like this.
If the goal of this whole stub is to populate an input vector for an island, you can query the object who owns those two (a magazine and a mutex) to do that for you. Then this thread safety concern falls to that context. So what you could do instead is to have a GraphState
structure - representing a running graph - and host the magazine in.
The pro of this way is that later you may have a single executor serving multiple executable graphs - to their state will be clearly separated one from another.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
} | ||
} | ||
|
||
static thread_local cv::gapi::own::ThreadPool* current; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
doesn't look good tbh :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Quite handy though, user doesn't need to keep ThreadPool
for scheduling nested
tasks.
static void foo() {
// do something
ThreadPool::get()->schedule(bar);
};
static void bar() {
// do something
ThreadPool::get()->schedule(baz);
}
...
ThreadPool tp;
tp.schedule(foo);
But if we have plan to make ThreadPool
available for all GCompiled
's, it can be removed for now
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed
} | ||
|
||
void cv::gapi::own::ThreadPool::worker() { | ||
current = this; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do you need current
at all?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Answered above
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed
} | ||
|
||
void cv::gapi::own::ThreadPool::schedule(cv::gapi::own::ThreadPool::Task task) { | ||
m_wg.add(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so you increase you wait count once you schedule a task, and decrease once you're done with it -- why?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The entire WaitGroup
are used to implement ThreadPool::wait()
that is supposed to block until all scheduled tasks, including nested ones, are completed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not relevant, WaitGroup has been removed
cv::gapi::own::ThreadPool::Task task; | ||
m_queue.pop(task); | ||
if (!task) { | ||
break; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this mean you terminate your worked threads once the graph is done?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The answer is NO, workers are terminated in the ThreadPool::stop
that called during GCompiled
destruction.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not it's done in shutdown
method which is private and called only in ~ThreadPool()
} | ||
} | ||
|
||
void cv::gapi::own::ThreadPool::schedule(cv::gapi::own::ThreadPool::Task task) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't this be &&task
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
} | ||
|
||
void cv::gimpl::IslandActor::verify() { | ||
if (m_e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we get a simultaneous read + write situation for the exception? Shouldn't it be protected (e.g. atomic)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IslandActor
itself doesn't give any thread-safe guarantees, besides this method is called only in main thread when all tasks are completed.
break; | ||
} | ||
task(); | ||
m_wg.done(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we care if this line can be unreachable due to hang/exception in task above?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's assumed that task
can't throw exception at this point.
Make sense to check it with something like this: static_assert(noexcept(task()))
Since this is available since C++17 only
void cv::gimpl::Task::run() { | ||
m_f(); | ||
for (auto* dep : m_dependents) { | ||
if (dep->m_ready_deps.fetch_add(1u) == dep->m_num_deps - 1) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks it's not memory_order::seq_cst
for sure
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
c73a0cc
to
65e2897
Compare
@@ -11,6 +11,7 @@ | |||
#include <functional> // std::hash | |||
#include <vector> // std::vector | |||
#include <type_traits> // decay | |||
#include <thread> // hardware_concurrency |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not needed any more
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
} | ||
} | ||
|
||
void assignMetaStubExec(Mag& mag, const RcDesc &rc, const cv::GRunArg::Meta &meta) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left mutex so far
@dmatveev Could you have a look one more time, please? |
explicit use_threaded_executor( | ||
uint32_t nthreads = static_cast<uint32_t>(cv::getNumThreads())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
probably it's not ok to have cv::getNumThreads()
called in the header.
Instead, you could introduce two constructors, a default one ()
with nthreads
set to cv::getNumThreads()
in its initializer list, and the value-based one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can't we just declare it here and move implementation to a cpp file?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can't we just declare it here and move implementation to a cpp file?
In this case it's needed to callcv::getNumThreads()
in header anyway
void bindInArgExec(Mag& mag, const RcDesc &rc, const GRunArg &arg) { | ||
if (rc.shape != GShape::GMAT) | ||
{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A strange mix of {
, can you please stick to the BSD-style one?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
} | ||
|
||
void cv::gimpl::GThreadedExecutor::Output::post(cv::GRunArgP&&, const std::exception_ptr& e) { | ||
if (e) { m_eptr = e; } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is the third indentation style I see in this file
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
// Notify every consumer about completion one of its dependencies | ||
for (auto* consumer : m_consumers) { | ||
const auto num_ready = | ||
consumer->m_ready_producers.fetch_add(1, std::memory_order_relaxed) + 1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why don't u use pre-increment here, again?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
only because I specify memory_order
there
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@TolyaTalamanov did you find out which is the best order here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a good question, I'd say we need to guarantee that the new scheduled task will see all changes in magazine
that have been made by its producers which means memory_order_acq_rel
.
But since all magazine
changes currently synchronized by mutex
it's already guaranteed so memory_order_relaxed
can be used until we get rid of the mutex
on magazine
:)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i'd vote for ++(int)
still as to a more concise construct. BUT that's up to you
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The thing is operator++(int)
doesn't accept std::memory_order
which isn't default there so I can't use it :)
No major problems, but I'd expect reviews from other first. If you align the indentation, we can merge this. Thanks! |
|
||
// Count the number of last tasks | ||
auto isLast = [](const std::shared_ptr<Task>& task) { return task->isLast(); }; | ||
const auto kNumLastsTasks = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does it handle recursive task creation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure what means "recursive" task creation, could you elaborate, please?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
when tasks may spawn new (previously unknown) tasks, I believe. May be the case with loops, but let's put it aside for now
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Task
itself is an abstraction which handles dependencies so if it spawns new tasks recursively it's already taken into account...
}; | ||
|
||
void run(ExecutionState& state); | ||
bool isLast() const { return m_consumers.empty(); } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could there be a data race for the vector of consumers here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Once all tasks are created, m_consumers
aren't changed, so different threads only perform read operations. Not obvious though
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok to go if it isn't breaking anything right now. Let it live in parallel with the primary one. Will swap them someday
// Notify every consumer about completion one of its dependencies | ||
for (auto* consumer : m_consumers) { | ||
const auto num_ready = | ||
consumer->m_ready_producers.fetch_add(1, std::memory_order_relaxed) + 1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i'd vote for ++(int)
still as to a more concise construct. BUT that's up to you
|
||
// Count the number of last tasks | ||
auto isLast = [](const std::shared_ptr<Task>& task) { return task->isLast(); }; | ||
const auto kNumLastsTasks = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
when tasks may spawn new (previously unknown) tasks, I believe. May be the case with loops, but let's put it aside for now
@asmorkalov Could you merge it, please? |
Clang (MacOS) reports warning:
|
Android build failed:
|
@asmorkalov, thanks! Build has passed |
Overview
This PR introduces the new G-API executor called
GThreadedExecutor
which can be selected when theGComputation
is compiled inserial
mode (a.k.aGComputation::compile(...)
)ThreadPool
cv::gapi::own::ThreadPool
has been introduced in order to abstract usage of threads inGThreadedExecutor
.ThreadPool
is implemented by usingown::concurrent_bounded_queue
ThreadPool
has only as single methodschedule
that will push task into the queue for the further execution.The important notice is that if
Task
executed inThreadPool
throws exception - this isUB
.GThreadedExecutor
The
GThreadedExecutor
is mostly copy-paste ofGExecutor
, should we extendGExecutor
instead?Implementation details
Island
nodes.vector
in order to run them first.GThreadedExecutor::run()
schedule the tasks that don't have dependencies that will schedule their dependents and wait for the completion.Pull Request Readiness Checklist
See details at https://github.com/opencv/opencv/wiki/How_to_contribute#making-a-good-pull-request
Patch to opencv_extra has the same branch name.