usql/csvreader.cpp

161 lines
3.6 KiB
C++

#include <cerrno>
#include "exception.h"
#include "csvreader.h"
#include "parser.h"
#include "threadpool.h"
namespace usql {
CsvReader::CsvReader(bool skip_hdr, char field_sep, char quote_ch, char line_sep, char line_sep2) {
skip_header = skip_hdr;
field_separator = field_sep;
quote_character = quote_ch;
line_separator = line_sep;
line_separator2 = line_sep2;
header_skiped = !skip_hdr;
}
size_t CsvReader::parseCSVFile(const std::string &filename, std::vector<ColDefNode> &cols_def, Table &table) {
size_t row_cnt = 0;
errno = 0;
FILE *fp = fopen(filename.c_str(), "r");
if (fp == NULL)
throw Exception("load from csv, cannot read from file(" + std::to_string(errno) + ")");
char *line_str = NULL;
size_t len = 0;
const std::size_t hw_concurrency = getConcurrency();
const bool use_threadpool = hw_concurrency > 1;
ThreadPool tp{hw_concurrency};
std::mutex row_cnt_mutex;
try {
long read_chars;
while ((read_chars = getline(&line_str, &len, fp)) != -1) {
if (skip_header && !header_skiped) {
header_skiped = true;
continue;
}
if (read_chars > 0 && line_str[read_chars - 1] == '\n') {
line_str[read_chars - 1] = '\0';
--read_chars;
}
if (use_threadpool) {
std::string str{line_str};
dispatch(tp, std::function<void()>
([this, str, &cols_def, &table, &row_cnt, &row_cnt_mutex]() {
auto parsed = parseCSVString(str, cols_def, table);
{
std::unique_lock<std::mutex> lock(row_cnt_mutex);
row_cnt += parsed;
}
}
));
} else
row_cnt += parseCSVString(line_str, cols_def, table);
}
fclose(fp);
tp.join();
} catch (const std::exception &e) {
if (line_str)
free(line_str);
throw e;
}
if (line_str)
free(line_str);
return row_cnt;
}
size_t CsvReader::parseCSVString(const std::string &csvSource, std::vector<ColDefNode> &cols_def, Table &table) {
size_t row_cnt = 0;
bool inQuote(false);
bool newLine(false);
std::vector<std::string> columns;
std::string field;
columns.reserve(256);
field.reserve(64);
std::string::const_iterator aChar = csvSource.begin();
std::string::const_iterator aEnd = csvSource.end();
while (aChar != aEnd) {
if (*aChar == quote_character) {
newLine = false;
inQuote = !inQuote;
} else if (*aChar == field_separator) {
newLine = false;
if (inQuote) {
field += *aChar;
} else {
columns.push_back(field);
field.clear();
}
} else if (*aChar == line_separator || *aChar == line_separator2) {
if (inQuote) {
field += *aChar;
} else {
if (!newLine) {
columns.push_back(field);
if (header_skiped) {
table.create_row_from_vector(cols_def, columns);
row_cnt++;
}
header_skiped = true;
field.clear();
columns.clear();
newLine = true;
}
}
} else {
newLine = false;
field.push_back(*aChar);
}
aChar++;
}
if (!field.empty())
columns.push_back(field);
if (header_skiped) {
table.create_row_from_vector(cols_def, columns);
row_cnt++;
} else
header_skiped = true;
return row_cnt;
}
size_t CsvReader::getConcurrency() {
auto value = Settings::get_setting("MAX_PARALLELISM");
if (value == "1" || value == "0")
return 0;
else if (value == "AUTO")
return std::thread::hardware_concurrency();
else {
char *end;
long i = strtol( value.c_str(), &end, 10 );
if ( *end == '\0' ) { // Success
if (i >= 0) return i;
else return std::max(std::thread::hardware_concurrency() + i, 0l);
}
}
return 0;
}
} // namespace