Пул потоков с запуском функций с переменным количеством параметров

Всем привет! Внезапно оказался без определённого места работы и в свободное время мучаю ИИ по поводу возможных задач и вопросов на собеседованиях по C++, а некоторые задачи делаю самостоятельно.

И недавно я попросил ИИ сделать пул тредов. Я его написал заранее, но захотелось проверить, где я мог ошибиться.

ИИ написал нерабочий код, но с интересной идеей: запускать вместо стандартного std::finction<void()> функции с параметрами с помощью std::packaged_task и varidic templates. И меня этот так вдохновило, что я решил таки сделать этот код рабочим. И сделал! Кстати, этот код не нравится ИИ и он его хочет переделать на std::condition_variable и, возможно, он прав, а я написал ерунду, таков путь!

Мой вариант на C++:

#include <vector>
#include <atomic>
#include <thread>
#include <mutex>
#include <functional>
#include <deque>
#include <optional>
#include <future>
#include <iostream>

class ThreadPool {
public:
    /// По умолчанию количество потоков равно количеству аппаратных потоков
    ThreadPool(size_t numThreads=std::jthread::hardware_concurrency()) {
        m_threads.reserve(numThreads);
        for (size_t i = 0; i < numThreads; ++i) {
            m_threads.emplace_back([this](){ runLoop(); });
        }
    }

    // Принимаем на входе функцию и её аргументы, на выходе автоматически
    // оборачиваем результат работы функции в std::future
    template<typename... Args>
    auto runTaskAsync(auto f, Args&&...args) {
        // Выясняем возращаемый тип у функции
        using return_type = std::invoke_result_t<decltype(f), Args...>;

        // Заворачиваем функцию с переменным количеством аргументов в
        // std::packaged_task, у которого можно получить std::future<return_type>
        auto task = std::make_shared<std::packaged_task<return_type()>>(
            std::bind(std::forward<decltype(f)>(f), std::forward<Args>(args)...)
            );

        // Получаем std::future
        auto res = task->get_future();

        if (m_Running) {
            std::unique_lock lock(m_guard);
            // Добавляем задачу в очередь на выполнение
            m_tasks.push_back([task](){(*task)();});

            // Уведомляем ожидающие потоки
            m_hasTask = true;
            // Может тут надо использовать notify_one (?)
            m_hasTask.notify_all();
        }

        return res;
    }

    // Вызываем функцию синхронно и возвращаем результат
    template<typename...Args>
    auto runTaskSync(auto f, Args &&...args) {
        return runTaskAsync(f, std::forward<Args>(args)...).get();
    }

    ~ThreadPool()
    {
        // Устанавливаем флаг остановки потоков
        m_Running = false;
        // Очищаем очередь задач
        {
            std::unique_lock lock(m_guard);
            m_tasks.clear();
        }
        // Обманом :-) выводим потоки из режима ожидания
        m_hasJob = true;
        m_hasJob.notify_all();

        // Для каждого потока
        for (auto &thread : m_threads) {
            if (thread.joinable()) {
                // Дожидаемся завершения работы
                thread.join();
            }
        }
    }
private:
    using Task = std::function<void()>;

    void runLoop() {
        while (m_Running) {
            // Ждём задачу
            m_hasTask.wait(false);

            std::optional<Task> task;

            // Забираем задачу из начала списка
            {
                std::unique_lock lock(m_guard);
                if (!m_tasks.empty()) {
                    task = m_tasks.front();
                    m_tasks.pop_front();
                }
            }

            // Задача есть, исполняем
            if (task) {
                (*task)();
            }
            else {
                // Отдаём процессорное время другим потокам
                std::this_thread::yield();
            }
        }
    }

    // Вектор потоков
    std::vector<std::thread> m_threads;
    // Атомарный флаг остановки потоков
    std::atomic_bool m_Running{ true };
    // Атомарный флаг наличия задачи
    std::atomic_bool m_hasTask{ false };
    // Очередь задач
    std::deque<Task> m_tasks;
    // Мьютекс для защиты очереди задач
    std::mutex m_guard;
};


int main()
{
    // Создаём пул потоков
    ThreadPool mgr;

    // Первая задача на две секунды, которая возвращает строку
    auto func = [](const std::string &s) {
        std::cout << "Long task1 begin..." << std::endl;
        std::this_thread::sleep_for(std::chrono::seconds(2));
        std::cout << "Long task1 end..." << std::endl;
        return s;
    };

    // Вторая задача на пять секунд, которая ничего не возвращает
    auto func2 = []() {
        std::cout << "Long task2 begin..." << std::endl;
        std::this_thread::sleep_for(std::chrono::seconds(5));
        std::cout << "Long task2 end..." << std::endl;
    };

    // Запускаем первую задачу с параметром
    auto result1 = mgr.runTaskAsync(func, "task1 result");
    // Запускаем вторую задачу без параметров
    auto result2 = mgr.runTaskAsync(func2);

    // Получаем результат выполнения первой задачи и выводим в консоль
    std::cout << "Task1 result = " << result1.get() << std::endl;
    // Ждём вторую задачу
    std::cout << "Wait task2 result" << std::endl;
    result2.wait();
}

Исходник от ИИ для сравнения:

#include <vector>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <functional>
#include <future>
#include <memory>
#include <stdexcept>

class ThreadPool {
public:
    // Конструктор, создающий указанное количество потоков
    ThreadPool(size_t threads) : stop(false) {
        for(size_t i = 0; i < threads; ++i) {
            workers.emplace_back([this] {
                for(;;) {
                    std::function<void()> task;
                    
                    {
                        std::unique_lock<std::mutex> lock(this->queue_mutex);
                        this->condition.wait(lock, 
                            [this]{ return this->stop || !this->tasks.empty(); });
                        if(this->stop && this->tasks.empty())
                            return;
                        task = std::move(this->tasks.front());
                        this->tasks.pop();
                    }
                    
                    task();
                }
            });
        }
    }
    
    // Добавление задачи в пул
    template<class F, class... Args>
    auto enqueue(F&& f, Args&&... args) 
        -> std::future<typename std::result_of<F(Args...)>::type> {
        using return_type = typename std::result_of<F(Args...)>::type;
        
        auto task = std::make_shared<std::packaged_task<return_type()>>(
            std::bind(std::forward<F>(f), std::forward<Args>(args)...)
        );
            
        std::future<return_type> res = task->get_future();
        {
            std::unique_lock<std::mutex> lock(queue_mutex);
            
            // Не добавляем новые задачи после остановки пула
            if(stop)
                throw std::runtime_error("enqueue on stopped ThreadPool");
                
            tasks.emplace([task](){ (*task)(); });
        }
        condition.notify_one();
        return res;
    }
    
    // Деструктор, останавливающий все потоки
    ~ThreadPool() {
        {
            std::unique_lock<std::mutex> lock(queue_mutex);
            stop = true;
        }
        condition.notify_all();
        for(std::thread &worker: workers)
            worker.join();
    }

private:
    std::vector<std::thread> workers;       // Рабочие потоки
    std::queue<std::function<void()>> tasks;// Очередь задач
    
    std::mutex queue_mutex;                 // Мьютекс для синхронизации доступа к очереди
    std::condition_variable condition;      // Условная переменная для уведомлений
    bool stop;                              // Флаг остановки пула
};

Буду признателен, если вы укажете на возможные ошибки и недочёты в моём коде.

Комментариев нет. Будьте первым!
Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *

Блог Евгения Жирнова