Exotic C++ : Async
An Overview of 'Asynchronous' features available to us in C++ 17 and 20.
In this article, we will learn the basics of threads, jthreads, async, mutexes and locks in standard C++. The examples covered have been shown in various cppcon conferences and thereby highly advisable to follow their youtube channel.
C++ Threads
In a Linux environment, C++ threads are implemented using POSIX-threads, commonly known as pthreads. I would recommend to read my article on pthreads
for the basic understanding. Now, pthreads are not native to Linux, and under the hood every thread is seperate task.
C++ threads are concepts to be used in the abstract machine. In C++, this abstract machine is what we write our code for. There are many asynchronous features inside the language and we are going to explore the level 0 examples of each of the following,
- std::thread
- std::jthread(New)
- std::async
- std::mutex
- std::atomic
- std::lock_guard
RAII: Resource aquisition is initialization, is a cpp idiom that describes that the language actually reserve resources when a resourse is initialized.
std::thread
Std::thread is defined in thread header. Let us create a seperate thread to execute a function, void abc(int x)
, inside it.
#include<thread> #include<iostream> void abc(int x){ //can even pass a variable std::cout<<"abc: "<<x<<std::endl; } int main(){ //create the thread, RAII. Thread will start as soon as it is created. std::thread someName(&abc,20); //wait for it to join back! someName.join(); // exit return 0; }
To run, type in terminal: g++ -pthread code.cpp -o out.exe
and ./out.exe
to run.
In the next example we create multiple thread objects in a loop and add them to a std::vector
. This creates 10 different threads all running a lambda
function.
#include<thread> #include<iostream> #include<vector> auto lambda=[](int x){ std::cout<<std::this_thread::get_id()<<" abc: "<<x<<std::endl; }; std::vector<std::thread> threads; int main(){ for(int i=0;i<10;i++){ threads.push_back(std::thread(lambda,i)); } for(int i=0;i<10;i++){ threads[i].join(); } std::cout<<"we have joined"<<std::endl; return 0; }
The order in which threads run is undefined. Waiting for every thread likes this is fine.
Join() has to be called in order to wait for the thread. Every thread has to be waited for by the parent or the init process in case the parent is dead already, so for that purpose std::jthread
was introduced.
std::jthread
A jthread
is an object which calls join inside it's destructor. In that way, whenever we forget to join the thread, it will be automatically be waited up at the end of the scope of the jthread
scope ends.
Usage is quiet similar to the std::thread
, but the compiler needs an additional flag to support C++20, --std=c++20
. Also, we take a static shared variable and increment it in every thread.
#include<thread> #include<iostream> #include<vector> static int shared_value=0; auto lambda=[](int x){ shared_value+=1; }; std::vector<std::jthread> threads; int main(){ for(int i=0;i<1000;i++){ threads.push_back(std::jthread(lambda,i)); } std::cout<<"main thread about to return"; std::cout<<shared_value<<std::endl; return 0; } // <- this is where join happens as the vector gets destructed.
std::future & std::promise
If you are not familiar with the topics please read in this article: Promises and Futures.
std::async
std::async
is a high level wrapper for threads and futures. An example of running a function in std::async
.
#include<iostream> #include<future> int square(int i){ return i*i; } int main(){ auto async_function = std::async(&square,10); //getting return value from the future int result = async_function.get(); std::cout << result << std::endl; return 0; }
The call to std::async
returns a std::future
which upon waiting reveals the return value of the function square
.
Lets create a loop that waits for the processing to finish. Futures have functions to wait for a specific time and receive the status of the execution. We can use this status to check if it is equal to std::future_status::ready
else we sleep and loop.
#include<iostream> #include<future> #include<chrono> #include<thread> bool buffer_something(){ std::this_thread::sleep_for(std::chrono::milliseconds(2000)); return true; } int main(){ std::future<bool> async_function = std::async(std::launch::async,&buffer_something); std::future_status status; while(true) { status = async_function.wait_for(std::chrono::milliseconds(1)); std::cout<<"w_"; std::cout.flush(); std::this_thread::sleep_for(std::chrono::milliseconds(50)); if(status == std::future_status::ready) { std::cout<<std::endl<< "we are done"<< std::endl; break; } } //int result = async_function.get(); //std::cout << result << std::endl; return 0; }
Restricting access
Atomics
Anything that can be copied using memcopy can be made atomic, e.g. all the primitive types and user defined objects.
Multiplication, Division and Shifting of integers are not atomic processes!
Let's go back to the threads example and see what happens when we try to access a shared variable. There will be race conditions if no one regulates the access. To avoid that we shall use std::atomic
.
#include<thread> #include<iostream> #include<atomic> #include<vector> static std::atomic<int> shared_value{0}; int N=1000; std::vector<std::thread> threads; void increment(){ shared_value+=1; //std::cout<<"abc: "<<x<<std::endl; }; int main(){ for(int i=0;i<N;i++){ threads.push_back(std::thread(&increment)); } for(int i=0;i<N;i++){ threads[i].join(); } std::cout<<shared_value; return 0; }
Mutexes
Mutexes are implemented inside the processor using test_and_set
and similar instructions
Mutexes provide ability to create critical sections by locking and unlocking a section of the code. The section locked cannot be executed by two threads at the same time!
#include<thread> #include<iostream> #include<vector> #include<mutex> static int shared_value=0; static std::mutex mtx; auto lambda=[](int x){ mtx.lock();//critical section starts shared_value+=1; mtx.unlock();//critical section ends std::cout<<shared_value<<std::endl; }; std::vector<std::thread> threads; int N=1000; int main(){ for(int i=0;i<N;i++){ threads.push_back(std::thread(lambda,i)); } for(int i=0;i<N;i++){ threads[i].join(); } std::cout<<"we have joined"<<std::endl; std::cout<<"value of shared value is: "<<shared_value<<std::endl; return 0; }
The problem with this way of locking and unlocking mutexes is that it can be that before unlocking the mutex an exception can occur, or sometimes developers use return from inside the critical section, which leaves other threads waiting for the unlocking of the mutex which will never occur.
For this, a new element was included which follows the RAII principles, std::lock_guard
Lock Guards
Very much like with the mutexes, you create critical sections using a lock guard.
#include<thread> #include<iostream> #include<vector> #include<mutex> //#include<lock_guard> static int shared_value=0; static std::mutex mtx; auto lambda=[](int x){ std::lock_guard<std::mutex> lg(mtx);//lock aquired shared_value+=1; std::cout<<shared_value<<std::endl; };//unlock std::vector<std::thread> threads; int N=1000; int main(){ for(int i=0;i<N;i++){ threads.push_back(std::thread(lambda,i)); } for(int i=0;i<N;i++){ threads[i].join(); } std::cout<<"we have joined"<<std::endl; std::cout<<"value of shared value is: "<<shared_value<<std::endl; return 0; }
Lock guards upon initialization, creates the critical section, and they call unlock on the mutex in their destructors. So a developer is free to leave the scope without explicitly unlocking the mutex.
Scoped Locks
Make hold of multiple mutexes inside critical section RAII way!
#include<thread> #include<iostream> #include<vector> #include<mutex> //scoped lock is for multiple mutexes inside your function! static int shared_value=0; static std::mutex mtx,mtx2; auto lambda=[](int x){ std::scoped_lock<std::mutex> any_name(mtx,mtx2); shared_value+=1; std::cout<<shared_value<<std::endl; }; std::vector<std::thread> threads; int N=1000; int main(){ for(int i=0;i<N;i++){ threads.push_back(std::thread(lambda,i)); } for(int i=0;i<N;i++){ threads[i].join(); } std::cout<<"we have joined"<<std::endl; std::cout<<"value of shared value is: "<<shared_value<<std::endl; return 0; }
Scoped locks do not specify in which order the mutexes are locked but it gaurantees an order which a deadlock is avoided.
Bonus Section
std::packaged_task
The class template std::packaged_task
wraps any Callable target (function, lambda expression, bind expression, or another function object) so that it can be invoked asynchronously!
#include <iostream> #include <future> #include <cmath> void task_example() { std::packaged_task<int(int,int)> task([](int a, int b) { return std::pow(a, b); }); std::future<int> result = task.get_future(); task(2, 9); std::cout << "task example:\t" << result.get() << '\n'; } int main(){ task_example();// asynchronously run the lambda function. }