diff --git a/CMakeLists.txt b/CMakeLists.txt index d5ed9be..36acdbe 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -20,5 +20,5 @@ set(SOURCE add_executable(${PROJECT_NAME} ${SOURCE}) -target_link_libraries(${PROJECT_NAME} stdc++ m) +target_link_libraries(${PROJECT_NAME} stdc++ m pthread) diff --git a/csvreader.cpp b/csvreader.cpp index f5c6754..c0b3b11 100644 --- a/csvreader.cpp +++ b/csvreader.cpp @@ -4,6 +4,7 @@ #include "csvreader.h" #include "parser.h" +#include "threadpoool.h" namespace usql { @@ -19,13 +20,8 @@ CsvReader::CsvReader(bool skip_hdr, char field_sep, char quote_ch, char line_sep size_t CsvReader::parseCSVFile(const std::string &filename, std::vector &cols_def, Table &table) { + size_t lines_cnt = 0; size_t row_cnt = 0; - std::vector cdefs; - cdefs.reserve(cols_def.size()); - for (auto &cd : cols_def) { - cdefs.emplace_back(table.get_column_def(cd.name)); - } - bool inQuote(false); errno = 0; @@ -37,6 +33,13 @@ size_t CsvReader::parseCSVFile(const std::string &filename, std::vector 1; + + thread_pool tp{hw_concurrency}; + std::mutex row_cnt_mutex; + long read_chars; while ((read_chars = getline(&line_str, &len, fp)) != -1) { if (skip_header && !header_skiped) { @@ -47,8 +50,27 @@ size_t CsvReader::parseCSVFile(const std::string &filename, std::vector + ([this, csv_string, &cols_def, &table, &row_cnt, &row_cnt_mutex]() { + int parsed = parseCSVString(csv_string, cols_def, table); + { + std::unique_lock lock(row_cnt_mutex); + row_cnt++; + } + } + )); + } + + } + + if (use_threadpool) { + tp.finish_tasks(); } fclose(fp); @@ -67,7 +89,7 @@ size_t CsvReader::parseCSVFile(const std::string &filename, std::vector &cols_def, Table& table) { +size_t CsvReader::parseCSVString(const std::string csvSource, std::vector &cols_def, Table& table) { size_t row_cnt = 0; bool inQuote(false); bool newLine(false); diff --git a/csvreader.h b/csvreader.h index 5f5c8e6..90f72b8 100644 --- a/csvreader.h +++ b/csvreader.h @@ -26,7 +26,7 @@ namespace usql { public: explicit CsvReader(bool skip_hdr = true, char field_sep = ',', char quote_ch = '"', char line_sep = '\r', char line_sep2 = '\n'); - size_t parseCSVString(const std::string &csvSource, std::vector &cols_def, Table& table); + size_t parseCSVString(const std::string csvSource, std::vector &cols_def, Table& table); size_t parseCSVFile(const std::string &filename, std::vector &cols_def, Table& table); diff --git a/table.cpp b/table.cpp index 5262dfb..d782244 100644 --- a/table.cpp +++ b/table.cpp @@ -43,17 +43,15 @@ ColDefNode Table::get_column_def(int col_index) { } } -// std::mutex insert_guard; +std::mutex insert_guard; Row& Table::create_empty_row() { -// std::unique_lock guard(insert_guard); + std::unique_lock guard(insert_guard); m_rows.emplace_back(columns_count(), false); return m_rows.back(); } void Table::create_row_from_vector(const std::vector &colDefs, const std::vector &csv_line) { -// std::unique_lock guard(insert_guard); - // prepare empty new_row Row& new_row = create_empty_row(); diff --git a/threadpoool.h b/threadpoool.h new file mode 100644 index 0000000..6b18e62 --- /dev/null +++ b/threadpoool.h @@ -0,0 +1,132 @@ +#include + + +#include +#include +#include +#include +#include +#include + +class thread_pool { + public: + thread_pool(std::size_t size) : stop(false), exit_on_empty(false) { + for (std::size_t i = 0; i < size; ++i) { + workers.emplace_back([this] { spawn(); }); + } + } + + virtual ~thread_pool() { + if (!stop) join(); + } + + // template + // void post(F&& f, Args&&... args) { + void post(std::function f) { + { + std::unique_lock lock(mutex); + tasks.push(f); + } + + condition.notify_one(); + } + + void join() { + { + std::unique_lock lock(mutex); + stop = true; + } + + condition.notify_all(); + + for (std::size_t i = 0; i < workers.size(); ++i) { + workers[i].join(); + } + } + + void finish_tasks() { + { + std::unique_lock lock(mutex); + exit_on_empty = true; + } + + while (!tasks.empty()) { + condition.notify_all(); + + std::unique_lock lock(mutex); + condition_empty.wait(lock, [this]() { + return (tasks.empty()); + }); + } + + // BLEJU, BLEJU + // while (!tasks.empty()) condition.notify_all(); + bool op = true; + } + + private: + void spawn() { + std::function task; + + while (!stop && (!exit_on_empty || !tasks.empty())) { + std::unique_lock lock(mutex); + condition.wait(lock, [this]() { + return (!tasks.empty()) || (tasks.empty() && stop); + }); + + if (!tasks.empty()) { + task = std::move(tasks.front()); + tasks.pop(); + task(); + } + } + if (exit_on_empty) { + condition_empty.notify_one(); + } + } + +public: + std::vector workers; + std::queue> tasks; + + std::mutex mutex; + std::condition_variable condition; + std::condition_variable condition_empty; + bool stop; + bool exit_on_empty; +}; + +// template +// inline void dispatch(thread_pool& pool, F&& f, Args&&... args) { +inline void dispatch(thread_pool& pool, std::function f) { + pool.post(f); +} + + + +// int main() { +// int cnt = 0; + +// thread_pool tp{std::thread::hardware_concurrency()}; +// std::mutex mutex; + +// std::cout << "start" << std::endl; + +// for(int i=0; i<100; i++) { +// dispatch(tp, std::function +// ([i, &cnt, &mutex]() { +// std::cout << "test " << i << std::endl; +// { +// std::unique_lock lock(mutex); +// cnt++; +// } +// } +// )); +// } + + +// std::cout << "end" << std::endl; +// tp.join(); +// std::cout << "cnt:" << cnt << std::endl; + +// }