#include #include #include #include #include #include #include #include class IThreadPool { public: virtual void enqueue(std::function func) = 0; }; template class Future { public: T get() { wait(); return value; } void wait() { std::unique_lock lck(mtx); cv.wait(lck, [this](){return isCompleted;}); } // func executes on the thread calling this function (if the future is already completed) // or on the thread calling set() (if the future is not completed yet) template std::shared_ptr > continueWithOnSameThread(std::function func) { std::shared_ptr > ret = std::make_shared >(); { std::unique_lock lck(mtx); if (isCompleted) { ret->set(func(value)); } else { continuations.push_back([ret,func](T v){ ret->set(func(v)); }); } } return ret; } template std::shared_ptr > continueWith(std::function func, IThreadPool* threadPool) { std::shared_ptr > ret = std::make_shared >(); { std::unique_lock lck(mtx); if (isCompleted) { threadPool->enqueue([ret, func, this](){ ret->set(func(value)); }); } else { continuations.push_back([ret,func, threadPool](T v){ threadPool->enqueue([ret, func, v](){ ret->set(func(v)); }); }); } } return ret; } void set(T v) { std::unique_lock lck(mtx); assert(!isCompleted); isCompleted = true; value = v; cv.notify_all(); for(std::function& f : continuations) { f(v); } } private: std::mutex mtx; std::condition_variable cv; bool isCompleted; T value; std::vector > continuations; }; class ThreadPool : public IThreadPool { public: explicit ThreadPool(size_t nrThreads) :m_end(false), m_liveThreads(nrThreads) { m_threads.reserve(nrThreads); for(size_t i=0 ; irun();}); } } ~ThreadPool() { close(); for(std::thread& t : m_threads) { t.join(); } } void close() { std::unique_lock lck(m_mutex); m_end = true; m_cond.notify_all(); while(m_liveThreads > 0) { m_condEnd.wait(lck); } } void enqueue(std::function func) { std::unique_lock lck(m_mutex); m_queue.push_back(std::move(func)); m_cond.notify_one(); } // template // void enqueue(Func func, Args&&... args) { // std::function f = [=](){func(args...);}; // enqueue(std::move(f)); // } private: void run() { while(true) { std::function toExec; { std::unique_lock lck(m_mutex); while(m_queue.empty() && !m_end) { m_cond.wait(lck); } if(m_queue.empty()) { --m_liveThreads; if(0 == m_liveThreads) { m_condEnd.notify_all(); } return; } toExec = std::move(m_queue.front()); m_queue.pop_front(); } toExec(); } } std::mutex m_mutex; std::condition_variable m_cond; std::condition_variable m_condEnd; std::list > m_queue; bool m_end; size_t m_liveThreads; std::vector m_threads; }; int main() { ThreadPool tp(10); std::shared_ptr > f1 = std::make_shared >(); std::shared_ptr > f2 = f1->continueWith([](int x){return x+1;}, &tp); f1->set(42); std::shared_ptr > f3 = f1->continueWith([](int x){return x*2;}, &tp); std::shared_ptr > f4 = f3->continueWith([](int x){return x*3;}, &tp); std::cout<get()<<", "<get()<<", "<get()<<", "<get()<<"\n"; }