神刀安全网

Multithreading in C++11/14 – Part 10

In the previous article about lock-free programming, we saw how complex concurrency can be. For some embedded systems and restrained environments, this is the way to go. However, for mainstream development on classic platforms, there is a wealth of libraries and frameworks that provide high level programming models. In this article, we will explore some alternatives to the C++11/14 standard library.

Concurrency libraries and frameworks

Native APIs

I would advise against the use of native API, because the point of the C++11 standard library is to abstract away the native APIs. Yet, the native APIs are here to stay, because they are the fundamental building blocks to implement the C++11 standard library.

POSIX native API : The following piece of code can run on most UNIX, GNU/Linux, Solaris, BSD and OS X systems.

int x{0};          auto f = [](void* args) -> void* {         int* px = static_cast<int*>(args);         ++(*px);                  return nullptr;     };     void* (*pf)(void*) = f;          pthread_t thread;     pthread_create(&thread, nullptr, pf, &x);          std::cout << "Hello, posix thread!/n";          pthread_join(thread, nullptr);          std::cout << x << std::endl;

Windows native API : The following piece of code is equivalent to the one above, and was tested with the Microsoft web compiler .

int x{0};          auto f = [](void* args) {         int* px = static_cast<int*>(args);         ++(*px);                  _endthread();     };     void (*pf)(void*) = f;          auto thread = reinterpret_cast<HANDLE>(_beginthread(pf, 0, &x));          std::cout << "Hello, win32 thread/n";          WaitForSingleObject(thread, INFINITE);          std::cout << x << std::endl;

Reactor toolkits

With all the hype around Node.js, it is unsurprising that its concurrency model was replicated in other languages. The reactor design pattern brings concurrency but not parallelism. Indeed, the reactor design pattern is based on an event loop that executes synchronously requests. It is essentially monothreaded. This pattern makes sense for I/O intensive applications. When a I/O request comes in, it is stacked and the program can instantly resume its execution. When the I/O task has completed, a callback is summoned. This is typically what happens in JavaScript for an async XMLHttpRequest.

Elle/Reactor : This toolkit is implemented by a French startup named Infinit. The code snippet below is extracted from a presentation they made recently. This toolkit should be open sourced in the coming months, according to its author.

while (true)     {         auto socket = tcp_server.accept();         new reactor::Thread([socket] {             try             {                 while (true)                     socket->write(socket->read_until("/n"));             }             catch (reactor::network::Error const&)             {}          }     }

Multithreading toolkits

It seems that there are as many C++ multithreading toolkits as visible stars in the Paris sky… Many are open source, some are proprietary, some target C++11, some are older. The following list is a snapshot of my current knowledge of the existing multithreading library landscape.

Boost Thread : Of course, I need to start with Boost. The first thing that it provides is a C++11/14 polyfill for older compilers. It contains also features that are not part of the C++11/14 multithreading library, like the scoped_thread, depicted below, or lock free data structures (which were illustrated in aprevious article).

boost::scoped_thread<> t([] (int v) {         std::vector<int> l(v);         std::cout << "iota/n";         std::iota(std::begin(l), std::end(l), 0);         std::cout << "shuffle/n";         auto r = std::mt19937{std::random_device{}()};         std::shuffle(std::begin(l), std::end(l), r);         std::cout << "done/n";     }, 1e8);          using namespace std::literals;     std::this_thread::sleep_for(250ms);          t.interrupt();     std::cout << "interrupted" << std::endl;          //t.join(); // not required

just::thread : This proprietary toolkit provides a polyfill of the C++11/14/17 multithreading standard library. It works with older versions of mainstream compilers that do not support C++11. Besides, it covers many TS of C++17, such as continuations and atomic_shared_ptr.

auto future = jss::async([] {         std::cout << "hello ";     })     .then([] {         std::cout << "continuation" << std::endl;     });

OpenMP : Open Multi-Processing is a multithreading framework for C, C++ and Fortran. It is special in the way it declares parallelism through #pragma precompiler directives. OpenMP has the powerful advantage to empower an existing piece of code with parallelism for a relatively low cost by simply adding #pragmas. However, it requires a special compiler. A C++14 compiler does not have to support OpenMP. The good news is that major compiler writers support OpenMP. In particular, gcc has a very strong support for OpenMP.

OpenMP is so broad that it would deserve its own series of articles. The following piece of code shows a parallel version of 2 common STL algorithms: iota and accumulate.

template <         typename Container,         typename T = typename Container::value_type     >     void parallel_iota(Container& c, T initial) {         int thread_id; // debug         std::atomic<T> val{initial};                  #pragma omp parallel for schedule(static) private(thread_id)         for(std::size_t i = 0; i < c.size(); i++) {             c[i] = val++;                          {                 thread_id = omp_get_thread_num();                 #pragma omp critical                 {                     std::cout << "affectation from thread " << thread_id << '/n';                 }             }         }     }      template <         typename Container,         typename T = typename Container::value_type     >     T parallel_accumulate(Container& c) {         int thread_id; // debug         T sum{};                  #pragma omp parallel for schedule(static,1) reduction(+:sum) private(thread_id)         for(std::size_t i = 0; i < c.size(); i++) {             sum += c[i];                          {                 thread_id = omp_get_thread_num();                 #pragma omp critical                 {                     std::cout << "sum from thread " << thread_id << '/n';                 }             }         }                  return sum;     }      int main() {         using vec = std::vector<int>;         using outit = std::ostream_iterator<int>;                  vec v(10, 0);         parallel_iota(v, 0);                  std::cout << "values: ";         std::copy(std::begin(v), std::end(v), outit{std::cout, ","});         std::cout << '/n';                  int sum = parallel_accumulate(v);         std::cout << "sum: " << sum << std::endl;     }

An execution of this program can produce the following output. Notice that iota provides unordered integers due to its multithreaded nature.

affectation from thread 1     affectation from thread 6     affectation from thread 3     affectation from thread 4     affectation from thread 5     affectation from thread 7     affectation from thread 0     affectation from thread 2     affectation from thread 1     affectation from thread 0     values: 6,9,0,8,7,2,3,4,1,5,     sum from thread 6     sum from thread 3     sum from thread 4     sum from thread 7     sum from thread 1     sum from thread 5     sum from thread 2     sum from thread 0     sum from thread 1     sum from thread 0     sum: 45

Intel TBB : The Intel Threading Building Block is an excellent open source multithreading library. It provides parallel algorithms that execute implicitly on a thread pool. It contains also thread-safe containers, atomics, etc. The following example shows a simple use of parallel_for_each and a parallel_reduce to respectively square all elements of a container, and to sum them.

auto square = [](auto first, auto end) {         tbb::parallel_for_each(first, end, [](auto& elem) {             elem = std::pow(elem, 2);         });     };          auto sum = [](auto first, auto end) {         using value_type = typename std::iterator_traits<decltype(first)>::value_type;         using blocked_range = tbb::blocked_range<decltype(first)>;                  auto range = blocked_range{first, end};         return tbb::parallel_reduce(range, value_type{}, [](const auto& r, auto val) {             return std::accumulate(std::begin(r), std::end(r), val);         },         std::plus<value_type>{});     };          std::vector<double> v(10, 0.);     std::iota(std::begin(v), std::end(v), 0.);          square(std::begin(v), std::end(v));          using outit = std::ostream_iterator<double>;     std::cout << "values: ";     std::copy(std::begin(v), std::end(v), outit{std::cout, " "});     std::cout << '/n';                                std::cout << "sum: " << sum(std::begin(v), std::end(v)) << std::endl;

It produces the following output:

values: 0 1 4 9 16 25 36 49 64 81      sum: 285

Intel MKL : The Intel Math Kernel Library is a proprietary toolkit containing advanced mathematical functions using multithreaded implementations. It looks rather low level, from the very few examples I stumbled upon.

Microsoft PPL : The Microsoft Parallel Patterns Library is also a very good multithreading library. It contains features similar to those of the TBB. There is a cross-platform version called PPLX. The interface is neat and provides also a task-based API with continuations. The following example is equivalent to the TBB one.

auto square = [](auto first, auto end) {         concurrency::parallel_for_each(first, end, [](auto& elem) {             elem = std::pow(elem, 2);         });     };          auto sum = [](auto first, auto end) {         using value_type = typename std::iterator_traits<decltype(first)>::value_type;                  return concurrency::parallel_reduce(first, end, value_type{});     };          std::vector<double> v(5, 0.);          auto result_sum = concurrency::create_task([&] {         std::iota(std::begin(v), std::end(v), 0.);     })     .then([&] {         square(std::begin(v), std::end(v));     })     .then([&] {         using outit = std::ostream_iterator<double>;         std::cout << "values: ";         std::copy(std::begin(v), std::end(v), outit{std::cout, " "});         std::cout << '/n';     })     .then([&] {         return sum(std::begin(v), std::end(v));     })     .get();          std::cout << "sum: " << result_sum << std::endl;

POCO : This is a very good C++ framework for network communication, serialization and database management. It contains both multithreaded and multi-process utils. In particular, it contains a thread pool and a task manager. The following example is a complex multithreaded way to display 16 integers.

class functor_runnable: public Poco::Runnable {     public:         using func = std::function<void()>;                  functor_runnable(func&& f): _f(std::move(f)) {}                  void run() {             _f();         }              private:         func _f;     };          Poco::Mutex mutex;          auto display_n = [&mutex](int n) {         return [n, &mutex]() {             Poco::Mutex::ScopedLock lock(mutex);             std::cout << n << '/n';         };     };          auto vec = std::vector<functor_runnable>{};          auto& pool = Poco::ThreadPool::defaultPool();          for (int i = 0; i < 16; i++) {         vec.emplace_back(display_n(i));                  pool.start(*vec.rbegin());     }          pool.joinAll();      std::cout << std::endl;

Qt : This is the broadest and most complete C++ cross-platform GUI framework. It comes with its own IDE, compiler layer (above gcc/clang/cl/etc.), reserve words (e.g. slot), and huge set of facilities. Among them, we can find raw threads, synchronization utilities, thread pool, task-level management, futures, etc. One facility I like is the map-reduce function which executes a map function asynchronously and then a reduction function on the main thread. The following example illustrates this function. The map function computes the power of 2 of the content of a vector, while the reduction function sums the resulting elements of the vector.

std::function<int(int)> map = [] (int i) {         return std::pow(i, 2);     };      auto reduce = [] (auto& sum, const auto& i) {         sum += i;     };      auto v = QVector<int>{1, 2, 3, 4, 5, 4, 3, 2, 1};      QFuture<int> future = QtConcurrent::mappedReduced<int>(v, map, reduce);      std::cout << future.result() << std::Lendl;      QtConcurrent::run([] {         std::cout << "QtConcurrent::run is equivalent to std::async" << std::endl;     });

BSL : The Bloomberg BDE Standard Library is an open source pre-C++11 threading library. It contains, in the bslmt namespace, many cross-platform facilities such as thread groups, mutexes, semaphores, etc. It might be useful if you need to use an old compiler.

Junction : Junction gathers a set of scalable concurrent containers, noticeably hash maps. It is built on top of Turf, which is yet another C++ concurrent library. Turf is specific in that it has 4 implementations (Win32, POSIX, Boost and C++11), meant to compare them, and it provides some facilities that are not native in C++11. Jeff Preshing, the author of Junction and Turf, has a fantastic blog about concurrency in C++.

Thread Pool : This thread pool was illustrated ina previous article.

This concludes this part about higher-level multithreading libraries. I am sure that there are many other sensible solutions out there in the wild. But I believe that the list above contains enough possibilities to suit most multithreaded needs, or at least to get started.

Initially, I wanted this article to include parts about GPGPU, many-cores, multi-process programming, grid computing, and some other related things. However, I am not quite close to the completion of these parts, so I decided to split the article and release this first half. Therefore, the discussion about MPI, OpenCL, MapReduce, etc. is left for the next article.

转载本站任何文章请注明:转载至神刀安全网,谢谢神刀安全网 » Multithreading in C++11/14 – Part 10

分享到:更多 ()

评论 抢沙发

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址