diff --git a/usql/csvreader.cpp b/usql/csvreader.cpp index b6bf5a0..4a14518 100644 --- a/usql/csvreader.cpp +++ b/usql/csvreader.cpp @@ -30,11 +30,10 @@ namespace usql { char *line_str = NULL; size_t len = 0; - // TODO handle it by settings - const std::size_t hw_concurrency = std::max(0, (int)(std::thread::hardware_concurrency() - 2)); + const std::size_t hw_concurrency = getConcurrency(); const bool use_threadpool = hw_concurrency > 1; - thread_pool tp{hw_concurrency}; + ThreadPool tp{hw_concurrency}; std::mutex row_cnt_mutex; try { @@ -140,4 +139,22 @@ namespace usql { return row_cnt; } + size_t CsvReader::getConcurrency() { + auto value = Settings::get_setting("MAX_PARALLELISM"); + if (value == "1" || value == "0") + return 0; + else if (value == "AUTO") + return std::thread::hardware_concurrency(); + else { + char *end; + long i = strtol( value.c_str(), &end, 10 ); + if ( *end == '\0' ) { // Success + if (i >= 0) return i; + else return std::max(std::thread::hardware_concurrency() + i, 0l); + } + } + return 0; + } + + } // namespace diff --git a/usql/csvreader.h b/usql/csvreader.h index eb298d4..72ef0b6 100644 --- a/usql/csvreader.h +++ b/usql/csvreader.h @@ -28,6 +28,8 @@ public: size_t parseCSVFile(const std::string &filename, std::vector &cols_def, Table& table); size_t parseCSVString(const std::string &csvSource, std::vector &cols_def, Table& table); + + static size_t getConcurrency(); }; } // namespace diff --git a/usql/settings.cpp b/usql/settings.cpp index f884af9..500d5db 100644 --- a/usql/settings.cpp +++ b/usql/settings.cpp @@ -12,7 +12,8 @@ std::vector> Settings::m_settings = std::make_pair("BOOL_TRUE_LITERAL", "Y"), std::make_pair("BOOL_FALSE_LITERAL", "N"), std::make_pair("DOUBLE_FORMAT", "%.2f"), - std::make_pair("USE_INDEXSCAN", "N") }; + std::make_pair("USE_INDEXSCAN", "N"), + std::make_pair("MAX_PARALLELISM", "1") }; // values "AUTO" or number of workers; when number negative means std::thread::hardware_concurrency() - number diff --git a/usql/threadpool.h b/usql/threadpool.h index ca2f03f..56a372b 100644 --- a/usql/threadpool.h +++ b/usql/threadpool.h @@ -5,15 +5,15 @@ #include #include -class thread_pool { +class ThreadPool { public: - thread_pool(std::size_t size) : stop(false) { + ThreadPool(std::size_t size) : stop(false) { for (std::size_t i = 0; i < size; ++i) { workers.emplace_back([this] { spawn(); }); } } - virtual ~thread_pool() { + virtual ~ThreadPool() { if (!stop) join(); } @@ -74,6 +74,6 @@ public: bool stop; }; -inline void dispatch(thread_pool &pool, std::function f) { +inline void dispatch(ThreadPool &pool, std::function f) { pool.post(f); }