Всем привет! Внезапно оказался без определённого места работы и в свободное время мучаю ИИ по поводу возможных задач и вопросов на собеседованиях по C++, а некоторые задачи делаю самостоятельно.
И недавно я попросил ИИ сделать пул тредов. Я его написал заранее, но захотелось проверить, где я мог ошибиться.
ИИ написал нерабочий код, но с интересной идеей: запускать вместо стандартного std::finction<void()>
функции с параметрами с помощью std::packaged_task
и varidic templates. И меня этот так вдохновило, что я решил таки сделать этот код рабочим. И сделал! Кстати, этот код не нравится ИИ и он его хочет переделать на std::condition_variable
и, возможно, он прав, а я написал ерунду, таков путь!
#include <vector>
#include <atomic>
#include <thread>
#include <mutex>
#include <functional>
#include <deque>
#include <optional>
#include <future>
#include <iostream>
class ThreadPool {
public:
/// По умолчанию количество потоков равно количеству аппаратных потоков
ThreadPool(size_t numThreads=std::jthread::hardware_concurrency()) {
m_threads.reserve(numThreads);
for (size_t i = 0; i < numThreads; ++i) {
m_threads.emplace_back([this](){ runLoop(); });
}
}
// Принимаем на входе функцию и её аргументы, на выходе автоматически
// оборачиваем результат работы функции в std::future
template<typename... Args>
auto runTaskAsync(auto f, Args&&...args) {
// Выясняем возращаемый тип у функции
using return_type = std::invoke_result_t<decltype(f), Args...>;
// Заворачиваем функцию с переменным количеством аргументов в
// std::packaged_task, у которого можно получить std::future<return_type>
auto task = std::make_shared<std::packaged_task<return_type()>>(
std::bind(std::forward<decltype(f)>(f), std::forward<Args>(args)...)
);
// Получаем std::future
auto res = task->get_future();
if (m_Running) {
std::unique_lock lock(m_guard);
// Добавляем задачу в очередь на выполнение
m_tasks.push_back([task](){(*task)();});
// Уведомляем ожидающие потоки
m_hasTask = true;
// Может тут надо использовать notify_one (?)
m_hasTask.notify_all();
}
return res;
}
// Вызываем функцию синхронно и возвращаем результат
template<typename...Args>
auto runTaskSync(auto f, Args &&...args) {
return runTaskAsync(f, std::forward<Args>(args)...).get();
}
~ThreadPool()
{
// Устанавливаем флаг остановки потоков
m_Running = false;
// Очищаем очередь задач
{
std::unique_lock lock(m_guard);
m_tasks.clear();
}
// Обманом :-) выводим потоки из режима ожидания
m_hasJob = true;
m_hasJob.notify_all();
// Для каждого потока
for (auto &thread : m_threads) {
if (thread.joinable()) {
// Дожидаемся завершения работы
thread.join();
}
}
}
private:
using Task = std::function<void()>;
void runLoop() {
while (m_Running) {
// Ждём задачу
m_hasTask.wait(false);
std::optional<Task> task;
// Забираем задачу из начала списка
{
std::unique_lock lock(m_guard);
if (!m_tasks.empty()) {
task = m_tasks.front();
m_tasks.pop_front();
}
}
// Задача есть, исполняем
if (task) {
(*task)();
}
else {
// Отдаём процессорное время другим потокам
std::this_thread::yield();
}
}
}
// Вектор потоков
std::vector<std::thread> m_threads;
// Атомарный флаг остановки потоков
std::atomic_bool m_Running{ true };
// Атомарный флаг наличия задачи
std::atomic_bool m_hasTask{ false };
// Очередь задач
std::deque<Task> m_tasks;
// Мьютекс для защиты очереди задач
std::mutex m_guard;
};
int main()
{
// Создаём пул потоков
ThreadPool mgr;
// Первая задача на две секунды, которая возвращает строку
auto func = [](const std::string &s) {
std::cout << "Long task1 begin..." << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(2));
std::cout << "Long task1 end..." << std::endl;
return s;
};
// Вторая задача на пять секунд, которая ничего не возвращает
auto func2 = []() {
std::cout << "Long task2 begin..." << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(5));
std::cout << "Long task2 end..." << std::endl;
};
// Запускаем первую задачу с параметром
auto result1 = mgr.runTaskAsync(func, "task1 result");
// Запускаем вторую задачу без параметров
auto result2 = mgr.runTaskAsync(func2);
// Получаем результат выполнения первой задачи и выводим в консоль
std::cout << "Task1 result = " << result1.get() << std::endl;
// Ждём вторую задачу
std::cout << "Wait task2 result" << std::endl;
result2.wait();
}
#include <vector>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <functional>
#include <future>
#include <memory>
#include <stdexcept>
class ThreadPool {
public:
// Конструктор, создающий указанное количество потоков
ThreadPool(size_t threads) : stop(false) {
for(size_t i = 0; i < threads; ++i) {
workers.emplace_back([this] {
for(;;) {
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(this->queue_mutex);
this->condition.wait(lock,
[this]{ return this->stop || !this->tasks.empty(); });
if(this->stop && this->tasks.empty())
return;
task = std::move(this->tasks.front());
this->tasks.pop();
}
task();
}
});
}
}
// Добавление задачи в пул
template<class F, class... Args>
auto enqueue(F&& f, Args&&... args)
-> std::future<typename std::result_of<F(Args...)>::type> {
using return_type = typename std::result_of<F(Args...)>::type;
auto task = std::make_shared<std::packaged_task<return_type()>>(
std::bind(std::forward<F>(f), std::forward<Args>(args)...)
);
std::future<return_type> res = task->get_future();
{
std::unique_lock<std::mutex> lock(queue_mutex);
// Не добавляем новые задачи после остановки пула
if(stop)
throw std::runtime_error("enqueue on stopped ThreadPool");
tasks.emplace([task](){ (*task)(); });
}
condition.notify_one();
return res;
}
// Деструктор, останавливающий все потоки
~ThreadPool() {
{
std::unique_lock<std::mutex> lock(queue_mutex);
stop = true;
}
condition.notify_all();
for(std::thread &worker: workers)
worker.join();
}
private:
std::vector<std::thread> workers; // Рабочие потоки
std::queue<std::function<void()>> tasks;// Очередь задач
std::mutex queue_mutex; // Мьютекс для синхронизации доступа к очереди
std::condition_variable condition; // Условная переменная для уведомлений
bool stop; // Флаг остановки пула
};
Буду признателен, если вы укажете на возможные ошибки и недочёты в моём коде.