parallesim a bit of improvement

This commit is contained in:
vaclavt 2022-01-19 19:29:25 +01:00
parent 471f218bdf
commit 33554706a4
4 changed files with 28 additions and 7 deletions

View File

@ -31,10 +31,10 @@ namespace usql {
size_t len = 0;
// TODO handle it by settings
const std::size_t hw_concurrency = std::max(0, (int)(std::thread::hardware_concurrency() - 2));
const std::size_t hw_concurrency = getConcurrency();
const bool use_threadpool = hw_concurrency > 1;
thread_pool tp{hw_concurrency};
ThreadPool tp{hw_concurrency};
std::mutex row_cnt_mutex;
try {
@ -140,4 +140,22 @@ namespace usql {
return row_cnt;
}
size_t CsvReader::getConcurrency() {
auto value = Settings::get_setting("MAX_PARALLELISM");
if (value == "1" || value == "0")
return 0;
else if (value == "AUTO")
return std::thread::hardware_concurrency();
else {
char *end;
long i = strtol( value.c_str(), &end, 10 );
if ( *end == '\0' ) { // Success
if (i >= 0) return i;
else return std::max(std::thread::hardware_concurrency() + i, 0l);
}
}
return 0;
}
} // namespace

View File

@ -28,6 +28,8 @@ public:
size_t parseCSVFile(const std::string &filename, std::vector<ColDefNode> &cols_def, Table& table);
size_t parseCSVString(const std::string &csvSource, std::vector<ColDefNode> &cols_def, Table& table);
static size_t getConcurrency();
};
} // namespace

View File

@ -12,7 +12,8 @@ std::vector<std::pair<std::string, std::string>> Settings::m_settings =
std::make_pair("BOOL_TRUE_LITERAL", "Y"),
std::make_pair("BOOL_FALSE_LITERAL", "N"),
std::make_pair("DOUBLE_FORMAT", "%.2f"),
std::make_pair("USE_INDEXSCAN", "N") };
std::make_pair("USE_INDEXSCAN", "N"),
std::make_pair("MAX_PARALLELISM", "1") }; // values "AUTO" or number of workers; when number negative means std::thread::hardware_concurrency() - number

View File

@ -5,15 +5,15 @@
#include <condition_variable>
#include <vector>
class thread_pool {
class ThreadPool {
public:
thread_pool(std::size_t size) : stop(false) {
ThreadPool(std::size_t size) : stop(false) {
for (std::size_t i = 0; i < size; ++i) {
workers.emplace_back([this] { spawn(); });
}
}
virtual ~thread_pool() {
virtual ~ThreadPool() {
if (!stop) join();
}
@ -74,6 +74,6 @@ public:
bool stop;
};
inline void dispatch(thread_pool &pool, std::function<void()> f) {
inline void dispatch(ThreadPool &pool, std::function<void()> f) {
pool.post(f);
}