A std::promise
jest tworzony jako punkt końcowy dla pary obietnica / przyszłość, a std::future
(utworzony z std :: promise przy użyciuget_future()
metody) jest drugim punktem końcowym. Jest to prosta metoda umożliwiająca synchronizację dwóch wątków, ponieważ jeden wątek dostarcza dane do drugiego wątku za pośrednictwem wiadomości.
Możesz myśleć o tym, jak jeden wątek tworzy obietnicę dostarczenia danych, a drugi wątek zbiera obietnicę w przyszłości. Tego mechanizmu można użyć tylko raz.
Mechanizm obietnica / przyszłość jest tylko jednym kierunkiem, od wątku, który używa set_value()
metody a, std::promise
do wątku, który używa get()
a std::future
do otrzymania danych. Wyjątek jest generowany, jeśliget()
metoda przyszłości zostanie wywołana więcej niż jeden raz.
Jeśli wątek z std::promise
nie wykorzystał set_value()
spełnić swoją obietnicę wtedy gdy drugie połączenia gwintowe get()
z std::future
zebrać obietnicę, druga nitka przejdzie w stan oczekiwania, aż obietnica jest spełniona przez pierwszy wątek z std::promise
kiedy używa set_value()
metody wysłać dane.
Dzięki zaproponowanym coroutines specyfikacji technicznej N4663 Języki programowania - rozszerzenia C ++ dla Coroutines i obsłudze kompilatora Visual Studio 2017 C ++ co_await
, możliwe jest także używanie std::future
i std::async
pisanie funkcjonalności coroutine. Zobacz dyskusję i przykład w https://stackoverflow.com/a/50753040/1466970, który ma jedną sekcję omawiającą użycie std::future
z co_await
.
Poniższy przykładowy kod, prosta aplikacja konsoli Windows dla programu Visual Studio 2013, pokazuje użycie kilku klas / szablonów współbieżności C ++ 11 i innych funkcji. Ilustruje zastosowanie dobrze działającej obietnicy / przyszłości, autonomicznych wątków, które wykonają pewne zadania i zatrzymanie, oraz zastosowania, w którym wymagane jest bardziej synchroniczne zachowanie, a ze względu na potrzebę wielu powiadomień para obietnica / przyszła nie działa.
Jedna uwaga na temat tego przykładu to opóźnienia dodane w różnych miejscach. Opóźnienia te zostały dodane tylko po to, aby upewnić się, że różne komunikaty drukowane przy użyciu konsoli std::cout
będą wyraźne, a tekst z kilku wątków nie będzie mieszany.
Pierwsza część main()
polega na utworzeniu trzech dodatkowych wątków oraz wykorzystaniu std::promise
i std::future
wysłaniu danych między wątkami. Ciekawym punktem jest to, że główny wątek uruchamia wątek T2, który będzie czekał na dane z głównego wątku, zrobi coś, a następnie wyśle dane do trzeciego wątku, T3, który następnie coś zrobi i wyśle dane z powrotem do główny wątek.
Druga część main()
tworzy dwa wątki i zestaw kolejek, aby umożliwić wiele wiadomości z głównego wątku do każdego z dwóch utworzonych wątków. Nie możemy użyć std::promise
i std::future
do tego, ponieważ obietnica / przyszłość duet to jeden strzał i nie może być użyty wielokrotnie.
Źródłem klasy Sync_queue
jest Stroustrup's The C ++ Programming Language: 4th Edition.
// cpp_threads.cpp : Defines the entry point for the console application.
//
#include "stdafx.h"
#include <iostream>
#include <thread> // std::thread is defined here
#include <future> // std::future and std::promise defined here
#include <list> // std::list which we use to build a message queue on.
static std::atomic<int> kount(1); // this variable is used to provide an identifier for each thread started.
//------------------------------------------------
// create a simple queue to let us send notifications to some of our threads.
// a future and promise are one shot type of notifications.
// we use Sync_queue<> to have a queue between a producer thread and a consumer thread.
// this code taken from chapter 42 section 42.3.4
// The C++ Programming Language, 4th Edition by Bjarne Stroustrup
// copyright 2014 by Pearson Education, Inc.
template<typename Ttype>
class Sync_queue {
public:
void put(const Ttype &val);
void get(Ttype &val);
private:
std::mutex mtx; // mutex used to synchronize queue access
std::condition_variable cond; // used for notifications when things are added to queue
std::list <Ttype> q; // list that is used as a message queue
};
template<typename Ttype>
void Sync_queue<Ttype>::put(const Ttype &val) {
std::lock_guard <std::mutex> lck(mtx);
q.push_back(val);
cond.notify_one();
}
template<typename Ttype>
void Sync_queue<Ttype>::get(Ttype &val) {
std::unique_lock<std::mutex> lck(mtx);
cond.wait(lck, [this]{return !q.empty(); });
val = q.front();
q.pop_front();
}
//------------------------------------------------
// thread function that starts up and gets its identifier and then
// waits for a promise to be filled by some other thread.
void func(std::promise<int> &jj) {
int myId = std::atomic_fetch_add(&kount, 1); // get my identifier
std::future<int> intFuture(jj.get_future());
auto ll = intFuture.get(); // wait for the promise attached to the future
std::cout << " func " << myId << " future " << ll << std::endl;
}
// function takes a promise from one thread and creates a value to provide as a promise to another thread.
void func2(std::promise<int> &jj, std::promise<int>&pp) {
int myId = std::atomic_fetch_add(&kount, 1); // get my identifier
std::future<int> intFuture(jj.get_future());
auto ll = intFuture.get(); // wait for the promise attached to the future
auto promiseValue = ll * 100; // create the value to provide as promised to the next thread in the chain
pp.set_value(promiseValue);
std::cout << " func2 " << myId << " promised " << promiseValue << " ll was " << ll << std::endl;
}
// thread function that starts up and waits for a series of notifications for work to do.
void func3(Sync_queue<int> &q, int iBegin, int iEnd, int *pInts) {
int myId = std::atomic_fetch_add(&kount, 1);
int ll;
q.get(ll); // wait on a notification and when we get it, processes it.
while (ll > 0) {
std::cout << " func3 " << myId << " start loop base " << ll << " " << iBegin << " to " << iEnd << std::endl;
for (int i = iBegin; i < iEnd; i++) {
pInts[i] = ll + i;
}
q.get(ll); // we finished this job so now wait for the next one.
}
}
int _tmain(int argc, _TCHAR* argv[])
{
std::chrono::milliseconds myDur(1000);
// create our various promise and future objects which we are going to use to synchronise our threads
// create our three threads which are going to do some simple things.
std::cout << "MAIN #1 - create our threads." << std::endl;
// thread T1 is going to wait on a promised int
std::promise<int> intPromiseT1;
std::thread t1(func, std::ref(intPromiseT1));
// thread T2 is going to wait on a promised int and then provide a promised int to thread T3
std::promise<int> intPromiseT2;
std::promise<int> intPromiseT3;
std::thread t2(func2, std::ref(intPromiseT2), std::ref(intPromiseT3));
// thread T3 is going to wait on a promised int and then provide a promised int to thread Main
std::promise<int> intPromiseMain;
std::thread t3(func2, std::ref(intPromiseT3), std::ref(intPromiseMain));
std::this_thread::sleep_for(myDur);
std::cout << "MAIN #2 - provide the value for promise #1" << std::endl;
intPromiseT1.set_value(22);
std::this_thread::sleep_for(myDur);
std::cout << "MAIN #2.2 - provide the value for promise #2" << std::endl;
std::this_thread::sleep_for(myDur);
intPromiseT2.set_value(1001);
std::this_thread::sleep_for(myDur);
std::cout << "MAIN #2.4 - set_value 1001 completed." << std::endl;
std::future<int> intFutureMain(intPromiseMain.get_future());
auto t3Promised = intFutureMain.get();
std::cout << "MAIN #2.3 - intFutureMain.get() from T3. " << t3Promised << std::endl;
t1.join();
t2.join();
t3.join();
int iArray[100];
Sync_queue<int> q1; // notification queue for messages to thread t11
Sync_queue<int> q2; // notification queue for messages to thread t12
std::thread t11(func3, std::ref(q1), 0, 5, iArray); // start thread t11 with its queue and section of the array
std::this_thread::sleep_for(myDur);
std::thread t12(func3, std::ref(q2), 10, 15, iArray); // start thread t12 with its queue and section of the array
std::this_thread::sleep_for(myDur);
// send a series of jobs to our threads by sending notification to each thread's queue.
for (int i = 0; i < 5; i++) {
std::cout << "MAIN #11 Loop to do array " << i << std::endl;
std::this_thread::sleep_for(myDur); // sleep a moment for I/O to complete
q1.put(i + 100);
std::this_thread::sleep_for(myDur); // sleep a moment for I/O to complete
q2.put(i + 1000);
std::this_thread::sleep_for(myDur); // sleep a moment for I/O to complete
}
// close down the job threads so that we can quit.
q1.put(-1); // indicate we are done with agreed upon out of range data value
q2.put(-1); // indicate we are done with agreed upon out of range data value
t11.join();
t12.join();
return 0;
}
Ta prosta aplikacja tworzy następujące dane wyjściowe.
MAIN #1 - create our threads.
MAIN #2 - provide the value for promise #1
func 1 future 22
MAIN #2.2 - provide the value for promise #2
func2 2 promised 100100 ll was 1001
func2 3 promised 10010000 ll was 100100
MAIN #2.4 - set_value 1001 completed.
MAIN #2.3 - intFutureMain.get() from T3. 10010000
MAIN #11 Loop to do array 0
func3 4 start loop base 100 0 to 5
func3 5 start loop base 1000 10 to 15
MAIN #11 Loop to do array 1
func3 4 start loop base 101 0 to 5
func3 5 start loop base 1001 10 to 15
MAIN #11 Loop to do array 2
func3 4 start loop base 102 0 to 5
func3 5 start loop base 1002 10 to 15
MAIN #11 Loop to do array 3
func3 4 start loop base 103 0 to 5
func3 5 start loop base 1003 10 to 15
MAIN #11 Loop to do array 4
func3 4 start loop base 104 0 to 5
func3 5 start loop base 1004 10 to 15