parallel wip

This commit is contained in:
2022-01-16 02:39:14 +01:00
parent ee24964057
commit a1eb0eecbb
5 changed files with 121 additions and 155 deletions

View File

@@ -20,9 +20,7 @@ CsvReader::CsvReader(bool skip_hdr, char field_sep, char quote_ch, char line_sep
size_t CsvReader::parseCSVFile(const std::string &filename, std::vector<ColDefNode> &cols_def, Table &table) {
size_t lines_cnt = 0;
size_t row_cnt = 0;
bool inQuote(false);
errno = 0;
FILE* fp = fopen(filename.c_str(), "r");
@@ -34,7 +32,8 @@ size_t CsvReader::parseCSVFile(const std::string &filename, std::vector<ColDefNo
try {
// TODO handle it by settings
const std::size_t hw_concurrency = 2; // std::thread::hardware_concurrency();
const std::size_t hw_concurrency = std::max(0, (int)(std::thread::hardware_concurrency() - 2));
// std::cout << "pool size: " << hw_concurrency << "\n";
const bool use_threadpool = hw_concurrency > 1;
thread_pool tp{hw_concurrency};
@@ -50,28 +49,24 @@ size_t CsvReader::parseCSVFile(const std::string &filename, std::vector<ColDefNo
line_str[read_chars - 1] = '\0';
--read_chars;
}
lines_cnt++;
if (!use_threadpool) {
row_cnt += parseCSVString(line_str, cols_def, table);
} else {
std::string csv_string(line_str);
if (use_threadpool) {
//std::string csv_string(line_str);
dispatch(tp, std::function<void()>
([this, csv_string, &cols_def, &table, &row_cnt, &row_cnt_mutex]() {
int parsed = parseCSVString(csv_string, cols_def, table);
{
std::unique_lock<std::mutex> lock(row_cnt_mutex);
row_cnt++;
}
}
([this, line_str, &cols_def, &table, &row_cnt, &row_cnt_mutex]() {
// std::cout << "thread: " << std::this_thread::get_id() << " rownum " << row_cnt << "\n";
auto parsed = parseCSVString(line_str, cols_def, table);
{
std::unique_lock<std::mutex> lock(row_cnt_mutex);
row_cnt += parsed;
}
}
));
}
} else
row_cnt += parseCSVString(line_str, cols_def, table);
}
if (use_threadpool) {
tp.finish_tasks();
}
if (use_threadpool) tp.finish();
fclose(fp);
@@ -85,7 +80,6 @@ size_t CsvReader::parseCSVFile(const std::string &filename, std::vector<ColDefNo
if (line_str)
free(line_str);
return row_cnt;
}
@@ -94,9 +88,9 @@ size_t CsvReader::parseCSVString(const std::string csvSource, std::vector<ColDef
bool inQuote(false);
bool newLine(false);
std::vector<std::string> line;
std::vector<std::string> columns;
std::string field;
line.reserve(256);
columns.reserve(256);
field.reserve(64);
std::string::const_iterator aChar = csvSource.begin();
@@ -110,7 +104,7 @@ size_t CsvReader::parseCSVString(const std::string csvSource, std::vector<ColDef
if (inQuote) {
field += *aChar;
} else {
line.push_back(field);
columns.push_back(field);
field.clear();
}
} else if (*aChar == line_separator || *aChar == line_separator2) {
@@ -118,14 +112,14 @@ size_t CsvReader::parseCSVString(const std::string csvSource, std::vector<ColDef
field += *aChar;
} else {
if (!newLine) {
line.push_back(field);
columns.push_back(field);
if (header_skiped) {
table.create_row_from_vector(cols_def, line);
row_cnt++;
table.create_row_from_vector(cols_def, columns);
row_cnt++;
}
header_skiped = true;
field.clear();
line.clear();
columns.clear();
newLine = true;
}
}
@@ -138,13 +132,13 @@ size_t CsvReader::parseCSVString(const std::string csvSource, std::vector<ColDef
}
if (!field.empty())
line.push_back(field);
columns.push_back(field);
if (header_skiped) {
table.create_row_from_vector(cols_def, line);
table.create_row_from_vector(cols_def, columns);
row_cnt++;
} else
header_skiped = true;
}
return row_cnt;
}