Skip to content

Commit

Permalink
openusd autoupdater: patch Work to use modern tbb versions.
Browse files Browse the repository at this point in the history
  • Loading branch information
furby-tm committed Aug 5, 2024
1 parent a211c5d commit 43f5e98
Show file tree
Hide file tree
Showing 3 changed files with 266 additions and 113 deletions.
69 changes: 42 additions & 27 deletions Sources/OpenUSD/Resources/Work/dispatcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,61 +22,76 @@
// language governing permissions and limitations under the Apache License.
//

#include "pxr/pxrns.h"
#include "Work/dispatcher.h"
#include <pxr/pxrns.h>

PXR_NAMESPACE_OPEN_SCOPE

#if !WITH_TBB_LEGACY
WorkDispatcher::WorkDispatcher() { _waitCleanupFlag.clear(); }

WorkDispatcher::~WorkDispatcher() { Wait(); }

void WorkDispatcher::Wait() {
_tg.wait();

// If we take the flag from false -> true, we do the cleanup.
if (_waitCleanupFlag.test_and_set() == false) {
// Post all diagnostics to this thread's list.
for (auto &et : _errors) {
et.Post();
}
_errors.clear();
_waitCleanupFlag.clear();
}
}

void WorkDispatcher::Cancel() { _tg.cancel(); }
#else /* WITH_TBB_LEGACY */
WorkDispatcher::WorkDispatcher()
: _context(
tbb::task_group_context::isolated,
tbb::task_group_context::concurrent_wait |
tbb::task_group_context::default_traits)
{
: _context(tbb::task_group_context::isolated,
tbb::task_group_context::concurrent_wait |
tbb::task_group_context::default_traits) {
_waitCleanupFlag.clear();

_taskGroup = new tbb::task_group(_context);

// The concurrent_wait flag used with the task_group_context ensures
// the ref count will remain at 1 after all predecessor tasks are
// completed, so we don't need to keep resetting it in Wait().
_rootTask = new (tbb::task::allocate_root(_context)) tbb::empty_task;
_rootTask->set_ref_count(1);
}

WorkDispatcher::~WorkDispatcher()
{
WorkDispatcher::~WorkDispatcher() {
Wait();
delete _taskGroup;
tbb::task::destroy(*_rootTask);
}

void
WorkDispatcher::Wait()
{
void WorkDispatcher::Wait() {
// Wait for tasks to complete.
_taskGroup->wait();
_rootTask->wait_for_all();

// If we take the flag from false -> true, we do the cleanup.
if (_waitCleanupFlag.test_and_set() == false) {
// Reset the context if canceled.
if (_context.is_group_execution_cancelled()) {
_context.reset();
}

// Post all diagnostics to this thread's list.
for (auto &et: _errors) {
for (auto &et : _errors) {
et.Post();
}
_errors.clear();
_waitCleanupFlag.clear();
}
}

void
WorkDispatcher::Cancel()
{
_context.cancel_group_execution();
}
void WorkDispatcher::Cancel() { _context.cancel_group_execution(); }
#endif /* WITH_TBB_LEGACY */

/* static */
void
WorkDispatcher::_TransportErrors(const TfErrorMark &mark,
_ErrorTransports *errors)
{
void WorkDispatcher::_TransportErrors(const TfErrorMark &mark,
_ErrorTransports *errors) {
TfErrorTransport transport = mark.Transport();
errors->grow_by(1)->swap(transport);
}
Expand Down
194 changes: 163 additions & 31 deletions Sources/OpenUSD/Resources/Work/dispatcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,23 +26,126 @@

/// \file work/dispatcher.h

#include "pxr/pxrns.h"
#include "Work/threadLimits.h"
#include <pxr/pxrns.h>
#include "Work/api.h"
#include "Work/threadLimits.h"

#include "Tf/errorMark.h"
#include "Tf/errorTransport.h"

#include <OneTBB/tbb/concurrent_vector.h>
#include <OneTBB/tbb/task.h>
#if !WITH_TBB_LEGACY
#include <OneTBB/tbb/global_control.h>
#include <OneTBB/tbb/task_group.h>
#include <OneTBB/tbb/task_scheduler_observer.h>
#endif /* WITH_TBB_LEGACY */

#include <functional>
#include <type_traits>
#include <utility>

PXR_NAMESPACE_OPEN_SCOPE

#if !WITH_TBB_LEGACY
class WorkDispatcher {
public:
/// Construct a new dispatcher.
WORK_API WorkDispatcher();

/// Wait() for any pending tasks to complete, then destroy the dispatcher.
WORK_API ~WorkDispatcher();

WorkDispatcher(WorkDispatcher const &) = delete;
WorkDispatcher &operator=(WorkDispatcher const &) = delete;

#ifdef doxygen

/// Add work for the dispatcher to run.
///
/// Before a call to Wait() is made it is safe for any client to invoke
/// Run(). Once Wait() is invoked, it is \b only safe to invoke Run() from
/// within the execution of tasks already added via Run().
///
/// This function does not block, in general. It may block if concurrency
/// is limited to 1. The added work may be not yet started, may be started
/// but not completed, or may be completed upon return. No guarantee is
/// made.
template <class Callable, class A1, class A2, ... class AN>
void Run(Callable &&c, A1 &&a1, A2 &&a2, ... AN &&aN);

#else // doxygen

template <class Callable> inline void Run(Callable &&c) {
_tg.run(_InvokerTask<typename std::remove_reference<Callable>::type>(
std::forward<Callable>(c), &_errors));
}

template <class Callable, class A0, class... Args>
inline void Run(Callable &&c, A0 &&a0, Args &&...args) {
Run(std::bind(std::forward<Callable>(c), std::forward<A0>(a0),
std::forward<Args>(args)...));
}

#endif // doxygen

/// Block until the work started by Run() completes.
WORK_API void Wait();

/// Cancel remaining work and return immediately.
///
/// Calling this function affects task that are being run directly
/// by this dispatcher. If any of these tasks are using their own
/// dispatchers to run tasks, these dispatchers will not be affected
/// and these tasks will run to completion, unless they are also
/// explicitly cancelled.
///
/// This call does not block. Call Wait() after Cancel() to wait for
/// pending tasks to complete.
WORK_API void Cancel();

private:
typedef tbb::concurrent_vector<TfErrorTransport> _ErrorTransports;

// Function invoker helper that wraps the invocation with an ErrorMark so we
// can transmit errors that occur back to the thread that Wait() s for tasks
// to complete.
template <class Fn> struct _InvokerTask {
explicit _InvokerTask(Fn &&fn, _ErrorTransports *err)
: _fn(std::move(fn)), _errors(err) {}

explicit _InvokerTask(Fn const &fn, _ErrorTransports *err)
: _fn(fn), _errors(err) {}

void operator()() const {
TfErrorMark m;
_fn();
if (!m.IsClean())
WorkDispatcher::_TransportErrors(m, _errors);
}

private:
Fn _fn;
_ErrorTransports *_errors;
};

// Helper function that removes errors from \p m and stores them in a new
// entry in \p errors.
WORK_API static void _TransportErrors(const TfErrorMark &m,
_ErrorTransports *errors);

// Task group.
tbb::task_group _tg;

// The error transports we use to transmit errors in other threads back to
// this thread.
_ErrorTransports _errors;

// Concurrent calls to Wait() have to serialize certain cleanup operations.
std::atomic_flag _waitCleanupFlag;
};
#else /* WITH_TBB_LEGACY */

/// \class WorkDispatcher
///
/// A work dispatcher runs concurrent tasks. The dispatcher supports adding
Expand Down Expand Up @@ -73,20 +176,19 @@ PXR_NAMESPACE_OPEN_SCOPE
/// Additionally, Wait() must never be called by a task added by Run(), since
/// that task could never complete.
///
class WorkDispatcher
{
class WorkDispatcher {
public:
/// Construct a new dispatcher.
WORK_API WorkDispatcher();

/// Wait() for any pending tasks to complete, then destroy the dispatcher.
WORK_API ~WorkDispatcher();

WorkDispatcher(WorkDispatcher const &) = delete;
WorkDispatcher &operator=(WorkDispatcher const &) = delete;

#ifdef doxygen

/// Add work for the dispatcher to run.
///
/// Before a call to Wait() is made it is safe for any client to invoke
Expand All @@ -99,26 +201,24 @@ class WorkDispatcher
/// made.
template <class Callable, class A1, class A2, ... class AN>
void Run(Callable &&c, A1 &&a1, A2 &&a2, ... AN &&aN);

#else // doxygen

template <class Callable>
inline void Run(Callable &&c) {
_taskGroup->run(std::forward<Callable>(c));

template <class Callable> inline void Run(Callable &&c) {
_rootTask->spawn(_MakeInvokerTask(std::forward<Callable>(c)));
}

template <class Callable, class A0, class ... Args>
inline void Run(Callable &&c, A0 &&a0, Args&&... args) {
Run(std::bind(std::forward<Callable>(c),
std::forward<A0>(a0),

template <class Callable, class A0, class... Args>
inline void Run(Callable &&c, A0 &&a0, Args &&...args) {
Run(std::bind(std::forward<Callable>(c), std::forward<A0>(a0),
std::forward<Args>(args)...));
}

#endif // doxygen

/// Block until the work started by Run() completes.
WORK_API void Wait();

/// Cancel remaining work and return immediately.
///
/// Calling this function affects task that are being run directly
Expand All @@ -130,28 +230,60 @@ class WorkDispatcher
/// This call does not block. Call Wait() after Cancel() to wait for
/// pending tasks to complete.
WORK_API void Cancel();

private:
typedef tbb::concurrent_vector<TfErrorTransport> _ErrorTransports;


// Function invoker helper that wraps the invocation with an ErrorMark so we
// can transmit errors that occur back to the thread that Wait() s for tasks
// to complete.
template <class Fn> struct _InvokerTask : public tbb::task {
explicit _InvokerTask(Fn &&fn, _ErrorTransports *err)
: _fn(std::move(fn)), _errors(err) {}

explicit _InvokerTask(Fn const &fn, _ErrorTransports *err)
: _fn(fn), _errors(err) {}

virtual tbb::task *execute() {
TfErrorMark m;
_fn();
if (!m.IsClean())
WorkDispatcher::_TransportErrors(m, _errors);
return NULL;
}

private:
Fn _fn;
_ErrorTransports *_errors;
};

// Make an _InvokerTask instance, letting the function template deduce Fn.
template <class Fn>
_InvokerTask<typename std::remove_reference<Fn>::type> &
_MakeInvokerTask(Fn &&fn) {
return *new (_rootTask->allocate_additional_child_of(*_rootTask))
_InvokerTask<typename std::remove_reference<Fn>::type>(
std::forward<Fn>(fn), &_errors);
}

// Helper function that removes errors from \p m and stores them in a new
// entry in \p errors.
WORK_API static void
_TransportErrors(const TfErrorMark &m, _ErrorTransports *errors);
WORK_API static void _TransportErrors(const TfErrorMark &m,
_ErrorTransports *errors);

// Task group context and associated root task that allows us to cancel
// tasks invoked directly by this dispatcher.
tbb::detail::d1::task_group_context _context;
tbb::detail::d1::task_group *_taskGroup;
tbb::task_group_context _context;
tbb::empty_task *_rootTask;

// The error transports we use to transmit errors in other threads back to
// this thread.
_ErrorTransports _errors;

// Concurrent calls to Wait() have to serialize certain cleanup operations.
std::atomic_flag _waitCleanupFlag;
};

#endif /* !WITH_TBB_LEGACY */
///////////////////////////////////////////////////////////////////////////////

PXR_NAMESPACE_CLOSE_SCOPE
Expand Down
Loading

0 comments on commit 43f5e98

Please sign in to comment.