Exotic C++ : Async

An Overview of 'Asynchronous' features available to us in C++ 17 and 20.

Exotic C++ : Async

In this article, we will learn the basics of threads, jthreads, async, mutexes and locks in standard C++. The examples covered have been shown in various cppcon conferences and thereby highly advisable to follow their youtube channel.

C++ Threads

In a Linux environment, C++ threads are implemented using POSIX-threads, commonly known as pthreads. I would recommend to read my article on pthreads for the basic understanding. Now, pthreads are not native to Linux, and under the hood every thread is seperate task.

C++ threads are concepts to be used in the abstract machine. In C++, this abstract machine is what we write our code for. There are many asynchronous features inside the language and we are going to explore the level 0 examples of each of the following,

  • std::thread
  • std::jthread(New)
  • std::async
  • std::mutex
  • std::atomic
  • std::lock_guard
RAII: Resource aquisition is initialization, is a cpp idiom that describes that the language actually reserve resources when a resourse is initialized.

std::thread

Std::thread is defined in thread header. Let us create a seperate thread to execute a function, void abc(int x), inside it.

#include<thread>
#include<iostream>
void abc(int x){
	//can even pass a variable
	std::cout<<"abc: "<<x<<std::endl;
}

int main(){
	//create the thread, RAII. Thread will start as soon as it is created.
	std::thread someName(&abc,20);

	//wait for it to join back!
	someName.join();

	// exit
	return 0;
}


To run, type in terminal: g++ -pthread code.cpp -o out.exe and ./out.exe to run.

In the next example we create multiple thread objects in a loop and add them to a std::vector. This creates 10 different threads all running a lambda function.

#include<thread>
#include<iostream>
#include<vector>

auto lambda=[](int x){
	
	std::cout<<std::this_thread::get_id()<<" abc: "<<x<<std::endl;	
};

std::vector<std::thread> threads;

int main(){

	for(int i=0;i<10;i++){
		threads.push_back(std::thread(lambda,i));
	}

	for(int i=0;i<10;i++){
		threads[i].join();
	}

	std::cout<<"we have joined"<<std::endl;
	return 0;
}


The order in which threads run is undefined. Waiting for every thread likes this is fine.

Join() has to be called in order to wait for the thread. Every thread has to be waited for by the parent or the init process in case the parent is dead already, so for that purpose std::jthread was introduced.

std::jthread

A jthread is an object which calls join inside it's destructor. In that way, whenever we forget to join the thread, it will be automatically be waited up at the end of the scope of the jthread scope ends.

Usage is quiet similar to the std::thread, but the compiler needs an additional flag to support C++20, --std=c++20. Also, we take a static shared variable and increment it in every thread.

#include<thread>
#include<iostream>
#include<vector>

static int shared_value=0;

auto lambda=[](int x){
	shared_value+=1;
	
};

std::vector<std::jthread> threads;

int main(){
	for(int i=0;i<1000;i++){
		threads.push_back(std::jthread(lambda,i));
	}

	std::cout<<"main thread about to return";
    	std::cout<<shared_value<<std::endl;
	return 0;
} // <- this is where join happens as the vector gets destructed.

std::future & std::promise

If you are not familiar with the topics please read in this article: Promises and Futures.

std::async

std::async is a high level wrapper for threads and futures. An example of running a function in std::async.

#include<iostream>
#include<future>


int square(int i){
return i*i;
}


int main(){

	auto async_function = std::async(&square,10);
    
    	//getting return value from the future
	int result = async_function.get();
	std::cout << result << std::endl;
	return 0;

}

The call to std::async returns a std::future which upon waiting reveals the return value of the function square.

Lets create a loop that waits for the processing to finish. Futures have functions to wait for a specific time and receive the status of the execution. We can use this status to check if it is equal to std::future_status::ready else we sleep and loop.

#include<iostream>
#include<future>
#include<chrono>
#include<thread>

bool buffer_something(){
	std::this_thread::sleep_for(std::chrono::milliseconds(2000));
	return true;
}


int main(){

	std::future<bool> async_function = std::async(std::launch::async,&buffer_something);
	std::future_status status;
	while(true)
	{
		status = async_function.wait_for(std::chrono::milliseconds(1));
		
        	std::cout<<"w_";
		std::cout.flush();
		std::this_thread::sleep_for(std::chrono::milliseconds(50));
        
		if(status == std::future_status::ready)
		{
			std::cout<<std::endl<< "we are done"<< std::endl;
			break;
		}
		
	}
	//int result = async_function.get();
	//std::cout << result << std::endl;
	return 0;

}


Restricting access

Atomics

Anything that can be copied using memcopy can be made atomic, e.g. all the primitive types and user defined objects.

Multiplication, Division and Shifting of integers are not atomic processes!

Let's go back to the threads example and see what happens when we try to access a shared variable. There will be race conditions if no one regulates the access. To avoid that we shall use std::atomic.

#include<thread>
#include<iostream>
#include<atomic>
#include<vector>

static std::atomic<int> shared_value{0};

int N=1000;
std::vector<std::thread> threads;

void increment(){
	shared_value+=1;
	//std::cout<<"abc: "<<x<<std::endl;
};

int main(){

	for(int i=0;i<N;i++){
		threads.push_back(std::thread(&increment));
	}

	for(int i=0;i<N;i++){
		threads[i].join();
	}
    
	std::cout<<shared_value;
	return 0;
}

Mutexes

Mutexes are implemented inside the processor using test_and_set and similar instructions

Mutexes provide ability to create critical sections by locking and unlocking a section of the code. The section locked cannot be executed by two threads at the same time!

#include<thread>
#include<iostream>
#include<vector>
#include<mutex>

static int shared_value=0;
static std::mutex mtx;
auto lambda=[](int x){
	
	mtx.lock();//critical section starts
	shared_value+=1;
	mtx.unlock();//critical section ends
	std::cout<<shared_value<<std::endl;
};

std::vector<std::thread> threads;
int N=1000;

int main(){
	for(int i=0;i<N;i++){
		threads.push_back(std::thread(lambda,i));
	}

	for(int i=0;i<N;i++){
		threads[i].join();
	}

	std::cout<<"we have joined"<<std::endl;
	std::cout<<"value of shared value is: "<<shared_value<<std::endl;
	return 0;
}

The problem with this way of locking and unlocking mutexes is that it can be that before unlocking the mutex an exception can occur, or sometimes developers use return from inside the critical section, which leaves other threads waiting for the unlocking of the mutex which will never occur.

For this, a new element was included which follows the RAII principles, std::lock_guard

Lock Guards

Very much like with the mutexes, you create critical sections using a lock guard.

#include<thread>
#include<iostream>
#include<vector>
#include<mutex>
//#include<lock_guard>
static int shared_value=0;

static std::mutex mtx;

auto lambda=[](int x){

	std::lock_guard<std::mutex> lg(mtx);//lock aquired

	shared_value+=1;
	
	std::cout<<shared_value<<std::endl;
};//unlock

std::vector<std::thread> threads;
int N=1000;

int main(){
	for(int i=0;i<N;i++){
		threads.push_back(std::thread(lambda,i));
	}

	for(int i=0;i<N;i++){
		threads[i].join();
	}

	std::cout<<"we have joined"<<std::endl;
	std::cout<<"value of shared value is: "<<shared_value<<std::endl;
	return 0;

}

Lock guards upon initialization, creates the critical section, and they call unlock on the mutex in their destructors. So a developer is free to leave the scope without explicitly unlocking the mutex.

Scoped Locks

Make hold of multiple mutexes inside critical section RAII way!

#include<thread>
#include<iostream>
#include<vector>
#include<mutex>
//scoped lock is for multiple mutexes inside your function!
static int shared_value=0;
static std::mutex mtx,mtx2;
auto lambda=[](int x){

	std::scoped_lock<std::mutex> any_name(mtx,mtx2);
		shared_value+=1;
	std::cout<<shared_value<<std::endl;
};

std::vector<std::thread> threads;
int N=1000;
int main(){

	for(int i=0;i<N;i++){
		threads.push_back(std::thread(lambda,i));
	}

	for(int i=0;i<N;i++){
		threads[i].join();
	}

	std::cout<<"we have joined"<<std::endl;
	std::cout<<"value of shared value is: "<<shared_value<<std::endl;
	return 0;

}

Scoped locks do not specify in which order the mutexes are locked but it gaurantees an order which a deadlock is avoided.

Bonus Section

std::packaged_task

The class template std::packaged_task wraps any Callable target (function, lambda expression, bind expression, or another function object) so that it can be invoked asynchronously!

#include <iostream>
#include <future>
#include <cmath>

void task_example()
{
    std::packaged_task<int(int,int)> task([](int a, int b) {
        return std::pow(a, b); 
    });
    std::future<int> result = task.get_future();
 
    task(2, 9);
 
    std::cout << "task example:\t" << result.get() << '\n';
}

int main(){
    task_example();// asynchronously run the lambda function.
}
Sign up for the blog-newsletter! I promise that we do not spam...