#include #include "exception.h" #include "csvreader.h" #include "parser.h" #include "threadpool.h" namespace usql { CsvReader::CsvReader(bool skip_hdr, char field_sep, char quote_ch, char line_sep, char line_sep2) { skip_header = skip_hdr; field_separator = field_sep; quote_character = quote_ch; line_separator = line_sep; line_separator2 = line_sep2; header_skiped = !skip_hdr; } size_t CsvReader::parseCSVFile(const std::string &filename, std::vector &cols_def, Table &table) { size_t row_cnt = 0; errno = 0; FILE* fp = fopen(filename.c_str(), "r"); if (fp == NULL) throw Exception("load from csv, cannot read from file(" + std::to_string(errno) + ")"); char* line_str = NULL; size_t len = 0; try { // TODO handle it by settings const std::size_t hw_concurrency = std::max(0, (int)(std::thread::hardware_concurrency() - 2)); // std::cout << "pool size: " << hw_concurrency << "\n"; const bool use_threadpool = hw_concurrency > 1; thread_pool tp{hw_concurrency}; std::mutex row_cnt_mutex; long read_chars; while ((read_chars = getline(&line_str, &len, fp)) != -1) { if (skip_header && !header_skiped) { header_skiped = true; continue; } if (read_chars > 0 && line_str[read_chars - 1] == '\n') { line_str[read_chars - 1] = '\0'; --read_chars; } if (use_threadpool) { //std::string csv_string(line_str); dispatch(tp, std::function ([this, line_str, &cols_def, &table, &row_cnt, &row_cnt_mutex]() { // std::cout << "thread: " << std::this_thread::get_id() << " rownum " << row_cnt << "\n"; auto parsed = parseCSVString(line_str, cols_def, table); { std::unique_lock lock(row_cnt_mutex); row_cnt += parsed; } } )); } else row_cnt += parseCSVString(line_str, cols_def, table); } if (use_threadpool) tp.finish(); fclose(fp); } catch (const std::exception &e) { if (line_str) free(line_str); throw e; } if (line_str) free(line_str); return row_cnt; } size_t CsvReader::parseCSVString(const std::string csvSource, std::vector &cols_def, Table& table) { size_t row_cnt = 0; bool inQuote(false); bool newLine(false); std::vector columns; std::string field; columns.reserve(256); field.reserve(64); std::string::const_iterator aChar = csvSource.begin(); std::string::const_iterator aEnd = csvSource.end(); while (aChar != aEnd) { if (*aChar == quote_character) { newLine = false; inQuote = !inQuote; } else if (*aChar == field_separator) { newLine = false; if (inQuote) { field += *aChar; } else { columns.push_back(field); field.clear(); } } else if (*aChar == line_separator || *aChar == line_separator2) { if (inQuote) { field += *aChar; } else { if (!newLine) { columns.push_back(field); if (header_skiped) { table.create_row_from_vector(cols_def, columns); row_cnt++; } header_skiped = true; field.clear(); columns.clear(); newLine = true; } } } else { newLine = false; field.push_back(*aChar); } aChar++; } if (!field.empty()) columns.push_back(field); if (header_skiped) { table.create_row_from_vector(cols_def, columns); row_cnt++; } else header_skiped = true; return row_cnt; } } // namespace