From a1eb0eecbb5a28a98a1974adf0d2d4f59aeffb4c Mon Sep 17 00:00:00 2001 From: VaclavT Date: Sun, 16 Jan 2022 02:39:14 +0100 Subject: [PATCH] parallel wip --- csvreader.cpp | 56 +++++++-------- csvreader.h | 12 ++-- table.cpp | 15 ++-- table.h | 5 +- threadpoool.h | 188 +++++++++++++++++++++----------------------------- 5 files changed, 121 insertions(+), 155 deletions(-) diff --git a/csvreader.cpp b/csvreader.cpp index c0b3b11..2a091bb 100644 --- a/csvreader.cpp +++ b/csvreader.cpp @@ -20,9 +20,7 @@ CsvReader::CsvReader(bool skip_hdr, char field_sep, char quote_ch, char line_sep size_t CsvReader::parseCSVFile(const std::string &filename, std::vector &cols_def, Table &table) { - size_t lines_cnt = 0; size_t row_cnt = 0; - bool inQuote(false); errno = 0; FILE* fp = fopen(filename.c_str(), "r"); @@ -34,7 +32,8 @@ size_t CsvReader::parseCSVFile(const std::string &filename, std::vector 1; thread_pool tp{hw_concurrency}; @@ -50,28 +49,24 @@ size_t CsvReader::parseCSVFile(const std::string &filename, std::vector - ([this, csv_string, &cols_def, &table, &row_cnt, &row_cnt_mutex]() { - int parsed = parseCSVString(csv_string, cols_def, table); - { - std::unique_lock lock(row_cnt_mutex); - row_cnt++; - } - } + ([this, line_str, &cols_def, &table, &row_cnt, &row_cnt_mutex]() { + // std::cout << "thread: " << std::this_thread::get_id() << " rownum " << row_cnt << "\n"; + auto parsed = parseCSVString(line_str, cols_def, table); + { + std::unique_lock lock(row_cnt_mutex); + row_cnt += parsed; + } + } )); - } - + } else + row_cnt += parseCSVString(line_str, cols_def, table); } - if (use_threadpool) { - tp.finish_tasks(); - } + if (use_threadpool) tp.finish(); fclose(fp); @@ -85,7 +80,6 @@ size_t CsvReader::parseCSVFile(const std::string &filename, std::vector line; + std::vector columns; std::string field; - line.reserve(256); + columns.reserve(256); field.reserve(64); std::string::const_iterator aChar = csvSource.begin(); @@ -110,7 +104,7 @@ size_t CsvReader::parseCSVString(const std::string csvSource, std::vector &cols_def, Table& table); - size_t parseCSVFile(const std::string &filename, std::vector &cols_def, Table& table); - - }; + size_t parseCSVString(const std::string csvSource, std::vector &cols_def, Table& table); +}; } // namespace diff --git a/table.cpp b/table.cpp index d782244..b72f713 100644 --- a/table.cpp +++ b/table.cpp @@ -43,9 +43,8 @@ ColDefNode Table::get_column_def(int col_index) { } } -std::mutex insert_guard; Row& Table::create_empty_row() { - std::unique_lock guard(insert_guard); + std::unique_lock guard(m_insert_guard); m_rows.emplace_back(columns_count(), false); return m_rows.back(); @@ -112,20 +111,20 @@ std::string Table::csv_string() { return out_string; } -int Table::load_csv_string(const std::string &content) { +size_t Table::load_csv_string(const std::string &content) { std::vector &colDefs = m_col_defs; CsvReader csvparser{}; - int row_cnt = csvparser.parseCSVString(content, colDefs, *this); + auto row_cnt = csvparser.parseCSVString(content, colDefs, *this); return row_cnt; } -int Table::load_csv_file(const std::string &filename) { +size_t Table::load_csv_file(const std::string &filename) { std::vector &colDefs = m_col_defs; // allocate enough space - int line_size = 128; + int line_size = 256; std::ifstream in(filename, std::ifstream::ate | std::ifstream::binary); auto file_size = in.tellg(); @@ -145,7 +144,7 @@ int Table::load_csv_file(const std::string &filename) { // load rows CsvReader csvparser{}; - int row_cnt = csvparser.parseCSVFile(filename, colDefs, *this); + auto row_cnt = csvparser.parseCSVFile(filename, colDefs, *this); return row_cnt; } @@ -279,6 +278,8 @@ void Table::reindex_row(Index &index, const ColDefNode &col_def, const Row &old_ void Table::index_row(const Row &row) { if (!m_indexes.empty()) { const size_t rowid = get_rowid(row); + + std::unique_lock guard(m_insert_guard); for (auto &idx : m_indexes) { ColDefNode cDef = get_column_def(idx.get_column_name()); index_row(idx, cDef, row, rowid); diff --git a/table.h b/table.h index 706558d..edba5ce 100644 --- a/table.h +++ b/table.h @@ -31,8 +31,8 @@ struct Table { void validate_row(Row &row); std::string csv_string(); - int load_csv_string(const std::string &content); - int load_csv_file(const std::string &filename); + size_t load_csv_string(const std::string &content); + size_t load_csv_file(const std::string &filename); void print(); @@ -40,6 +40,7 @@ struct Table { std::vector m_col_defs; std::vector m_rows; std::vector m_indexes; + std::mutex m_insert_guard; void create_row_from_vector(const std::vector &colDefs, const std::vector &csv_line); diff --git a/threadpoool.h b/threadpoool.h index 6b18e62..513afb4 100644 --- a/threadpoool.h +++ b/threadpoool.h @@ -4,129 +4,101 @@ #include #include #include +#include #include #include #include -class thread_pool { - public: - thread_pool(std::size_t size) : stop(false), exit_on_empty(false) { - for (std::size_t i = 0; i < size; ++i) { - workers.emplace_back([this] { spawn(); }); - } - } +struct thread_pool { + explicit thread_pool(std::size_t size) { + start(size); + finished.reserve(1024); + } - virtual ~thread_pool() { - if (!stop) join(); - } + std::mutex mutex; + std::condition_variable condition; + std::deque> work; - // template - // void post(F&& f, Args&&... args) { - void post(std::function f) { - { - std::unique_lock lock(mutex); - tasks.push(f); - } + std::vector> finished; - condition.notify_one(); - } + // queue( lambda ) will enqueue the lambda into the tasks for the threads + template> + std::future queue(F &&f) { + // wrap the function object into a packaged task, splitting + // execution from the return value: + std::packaged_task p(std::forward(f)); - void join() { - { - std::unique_lock lock(mutex); - stop = true; - } + auto r = p.get_future(); + { + std::unique_lock l(mutex); + work.emplace_back(std::move(p)); + } + condition.notify_one(); - condition.notify_all(); + return r; // return the future result of the task + } - for (std::size_t i = 0; i < workers.size(); ++i) { - workers[i].join(); - } - } + // start threads_num threads in the thread pool. + void start(std::size_t threads_num = 1) { + for (std::size_t i = 0; i < threads_num; ++i) { + finished.push_back( + std::async( + std::launch::async, + [this] { thread_task(); } + ) + ); + } + } - void finish_tasks() { - { - std::unique_lock lock(mutex); - exit_on_empty = true; - } - - while (!tasks.empty()) { - condition.notify_all(); + // abort() cancels all non-started tasks, and tells every working thread + // stop running, and waits for them to finish up. + void abort() { + cancel_pending(); + finish(); + } - std::unique_lock lock(mutex); - condition_empty.wait(lock, [this]() { - return (tasks.empty()); - }); - } + // cancel_pending() merely cancels all non-started tasks: + void cancel_pending() { + std::unique_lock l(mutex); + work.clear(); + } - // BLEJU, BLEJU - // while (!tasks.empty()) condition.notify_all(); - bool op = true; - } + // finish enques a "stop the thread" message for every thread, then waits for them: + void finish() { + { + std::unique_lock l(mutex); + for (auto &&unused:finished) { + work.emplace_back(); + } + } + condition.notify_all(); + finished.clear(); + } - private: - void spawn() { - std::function task; + ~thread_pool() { + finish(); + } - while (!stop && (!exit_on_empty || !tasks.empty())) { - std::unique_lock lock(mutex); - condition.wait(lock, [this]() { - return (!tasks.empty()) || (tasks.empty() && stop); - }); - - if (!tasks.empty()) { - task = std::move(tasks.front()); - tasks.pop(); - task(); - } - } - if (exit_on_empty) { - condition_empty.notify_one(); - } - } - -public: - std::vector workers; - std::queue> tasks; - - std::mutex mutex; - std::condition_variable condition; - std::condition_variable condition_empty; - bool stop; - bool exit_on_empty; +private: + void thread_task() { + while (true) { + std::packaged_task f; + { + std::unique_lock l(mutex); + if (work.empty()) { + condition.wait(l, [&] { return !work.empty(); }); + } + f = std::move(work.front()); + work.pop_front(); + } + // if the task is invalid, it means we are asked to abort: + if (!f.valid()) return; + + f(); + } + } }; -// template -// inline void dispatch(thread_pool& pool, F&& f, Args&&... args) { -inline void dispatch(thread_pool& pool, std::function f) { - pool.post(f); +inline void dispatch(thread_pool &pool, std::function f) { + pool.queue(f); } - - - -// int main() { -// int cnt = 0; - -// thread_pool tp{std::thread::hardware_concurrency()}; -// std::mutex mutex; - -// std::cout << "start" << std::endl; - -// for(int i=0; i<100; i++) { -// dispatch(tp, std::function -// ([i, &cnt, &mutex]() { -// std::cout << "test " << i << std::endl; -// { -// std::unique_lock lock(mutex); -// cnt++; -// } -// } -// )); -// } - - -// std::cout << "end" << std::endl; -// tp.join(); -// std::cout << "cnt:" << cnt << std::endl; - -// }