usql update

This commit is contained in:
vaclavt 2022-01-20 20:35:33 +01:00
parent caf118f4f6
commit ea09216550
4 changed files with 28 additions and 8 deletions

View File

@ -30,11 +30,10 @@ namespace usql {
char *line_str = NULL; char *line_str = NULL;
size_t len = 0; size_t len = 0;
// TODO handle it by settings const std::size_t hw_concurrency = getConcurrency();
const std::size_t hw_concurrency = std::max(0, (int)(std::thread::hardware_concurrency() - 2));
const bool use_threadpool = hw_concurrency > 1; const bool use_threadpool = hw_concurrency > 1;
thread_pool tp{hw_concurrency}; ThreadPool tp{hw_concurrency};
std::mutex row_cnt_mutex; std::mutex row_cnt_mutex;
try { try {
@ -140,4 +139,22 @@ namespace usql {
return row_cnt; return row_cnt;
} }
size_t CsvReader::getConcurrency() {
auto value = Settings::get_setting("MAX_PARALLELISM");
if (value == "1" || value == "0")
return 0;
else if (value == "AUTO")
return std::thread::hardware_concurrency();
else {
char *end;
long i = strtol( value.c_str(), &end, 10 );
if ( *end == '\0' ) { // Success
if (i >= 0) return i;
else return std::max(std::thread::hardware_concurrency() + i, 0l);
}
}
return 0;
}
} // namespace } // namespace

View File

@ -28,6 +28,8 @@ public:
size_t parseCSVFile(const std::string &filename, std::vector<ColDefNode> &cols_def, Table& table); size_t parseCSVFile(const std::string &filename, std::vector<ColDefNode> &cols_def, Table& table);
size_t parseCSVString(const std::string &csvSource, std::vector<ColDefNode> &cols_def, Table& table); size_t parseCSVString(const std::string &csvSource, std::vector<ColDefNode> &cols_def, Table& table);
static size_t getConcurrency();
}; };
} // namespace } // namespace

View File

@ -12,7 +12,8 @@ std::vector<std::pair<std::string, std::string>> Settings::m_settings =
std::make_pair("BOOL_TRUE_LITERAL", "Y"), std::make_pair("BOOL_TRUE_LITERAL", "Y"),
std::make_pair("BOOL_FALSE_LITERAL", "N"), std::make_pair("BOOL_FALSE_LITERAL", "N"),
std::make_pair("DOUBLE_FORMAT", "%.2f"), std::make_pair("DOUBLE_FORMAT", "%.2f"),
std::make_pair("USE_INDEXSCAN", "N") }; std::make_pair("USE_INDEXSCAN", "N"),
std::make_pair("MAX_PARALLELISM", "1") }; // values "AUTO" or number of workers; when number negative means std::thread::hardware_concurrency() - number

View File

@ -5,15 +5,15 @@
#include <condition_variable> #include <condition_variable>
#include <vector> #include <vector>
class thread_pool { class ThreadPool {
public: public:
thread_pool(std::size_t size) : stop(false) { ThreadPool(std::size_t size) : stop(false) {
for (std::size_t i = 0; i < size; ++i) { for (std::size_t i = 0; i < size; ++i) {
workers.emplace_back([this] { spawn(); }); workers.emplace_back([this] { spawn(); });
} }
} }
virtual ~thread_pool() { virtual ~ThreadPool() {
if (!stop) join(); if (!stop) join();
} }
@ -74,6 +74,6 @@ public:
bool stop; bool stop;
}; };
inline void dispatch(thread_pool &pool, std::function<void()> f) { inline void dispatch(ThreadPool &pool, std::function<void()> f) {
pool.post(f); pool.post(f);
} }