#include #include #include #include #include #include #include using namespace std; class IThreadPool { public: virtual void enqueue(std::function func) = 0; }; template class Future { public: void set(T v) { vector > tmpContinuations; { unique_lock lck(mtx); value = v; hasValue = true; cv.notify_all(); swap(continuations, tmpContinuations); } for(function& cont : tmpContinuations) { cont(v); } } void wait() { unique_lock lck(mtx); cv.wait(lck, [this](){return this->hasValue;}); } T get() { wait(); return value; } template shared_ptr > continueWithNoThread(Func f) { shared_ptr > ret = make_shared > (); unique_lock lck(mtx); if(hasValue) { ret-> set(f(value)); } else { continuations.push_back([ret,f](T srcVal){ ret->set(f(srcVal)); }); } return ret; } template shared_ptr > continueWith(Func f, IThreadPool* tp) { shared_ptr > ret = make_shared > (); unique_lock lck(mtx); if(hasValue) { tp->enqueue([ret, f, this](){ret-> set(f(this->value));}); } else { continuations.push_back([ret,f,tp](T srcVal){ tp->enqueue([ret,f,srcVal](){ret->set(f(srcVal));}); }); } return ret; } private: mutex mtx; condition_variable cv; bool hasValue; T value; vector > continuations; }; template shared_ptr > continueWhenAll(Func f, IThreadPool* tp, Future*... input) { f(input->get()...); } class ThreadPool : public IThreadPool { public: explicit ThreadPool(size_t nrThreads) :m_end(false), m_liveThreads(nrThreads) { m_threads.reserve(nrThreads); for(size_t i=0 ; irun();}); } } ~ThreadPool() { close(); for(std::thread& t : m_threads) { t.join(); } } void close() { std::unique_lock lck(m_mutex); m_end = true; m_cond.notify_all(); while(m_liveThreads > 0) { m_condEnd.wait(lck); } } void enqueue(std::function func) { std::unique_lock lck(m_mutex); m_queue.push_back(std::move(func)); m_cond.notify_one(); } // template // void enqueue(Func func, Args&&... args) { // std::function f = [=](){func(args...);}; // enqueue(std::move(f)); // } private: void run() { while(true) { std::function toExec; { std::unique_lock lck(m_mutex); while(m_queue.empty() && !m_end) { m_cond.wait(lck); } if(m_queue.empty()) { --m_liveThreads; if(0 == m_liveThreads) { m_condEnd.notify_all(); } return; } toExec = std::move(m_queue.front()); m_queue.pop_front(); } toExec(); } } std::mutex m_mutex; std::condition_variable m_cond; std::condition_variable m_condEnd; std::list > m_queue; bool m_end; size_t m_liveThreads; std::vector m_threads; }; int main() { shared_ptr > f1 = make_shared > (); ThreadPool tp(10); shared_ptr > f2 = f1->continueWith([](int x){return x+1;}, &tp); tp.enqueue([f1](){f1->set(1);}); //this_thread::sleep_for(chrono::milliseconds(1000)); shared_ptr > f3 = continueWhenAll([](int x, int y){return x+y;}, &tp, f1.get(), f2.get()); cout << "Val=" << f2->get() << endl; return 0; }