usql update
This commit is contained in:
parent
a45423692e
commit
c01571bb84
|
|
@ -4,6 +4,7 @@
|
||||||
#include "csvreader.h"
|
#include "csvreader.h"
|
||||||
#include "parser.h"
|
#include "parser.h"
|
||||||
|
|
||||||
|
#include "threadpool.h"
|
||||||
|
|
||||||
namespace usql {
|
namespace usql {
|
||||||
|
|
||||||
|
|
@ -18,19 +19,8 @@ CsvReader::CsvReader(bool skip_hdr, char field_sep, char quote_ch, char line_sep
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
int CsvReader::parseCSV(const std::string &filename, std::vector<ColDefNode> &cols_def, Table &table) {
|
size_t CsvReader::parseCSVFile(const std::string &filename, std::vector<ColDefNode> &cols_def, Table &table) {
|
||||||
std::vector<ColDefNode> cdefs;
|
size_t row_cnt = 0;
|
||||||
cdefs.reserve(cols_def.size());
|
|
||||||
for (auto &cd : cols_def) {
|
|
||||||
cdefs.emplace_back(table.get_column_def(cd.name));
|
|
||||||
}
|
|
||||||
|
|
||||||
int row_cnt = 0;
|
|
||||||
bool inQuote(false);
|
|
||||||
std::string field;
|
|
||||||
|
|
||||||
std::vector<std::string> line;
|
|
||||||
line.reserve(32);
|
|
||||||
|
|
||||||
errno = 0;
|
errno = 0;
|
||||||
FILE *fp = fopen(filename.c_str(), "r");
|
FILE *fp = fopen(filename.c_str(), "r");
|
||||||
|
|
@ -40,7 +30,14 @@ int CsvReader::parseCSV(const std::string &filename, std::vector<ColDefNode> &co
|
||||||
char *line_str = NULL;
|
char *line_str = NULL;
|
||||||
size_t len = 0;
|
size_t len = 0;
|
||||||
|
|
||||||
|
// TODO handle it by settings
|
||||||
|
const std::size_t hw_concurrency = std::max(0, (int)(std::thread::hardware_concurrency() - 2));
|
||||||
|
const bool use_threadpool = hw_concurrency > 1;
|
||||||
|
|
||||||
|
thread_pool tp{hw_concurrency};
|
||||||
|
std::mutex row_cnt_mutex;
|
||||||
|
|
||||||
|
try {
|
||||||
long read_chars;
|
long read_chars;
|
||||||
while ((read_chars = getline(&line_str, &len, fp)) != -1) {
|
while ((read_chars = getline(&line_str, &len, fp)) != -1) {
|
||||||
if (skip_header && !header_skiped) {
|
if (skip_header && !header_skiped) {
|
||||||
|
|
@ -51,56 +48,51 @@ int CsvReader::parseCSV(const std::string &filename, std::vector<ColDefNode> &co
|
||||||
line_str[read_chars - 1] = '\0';
|
line_str[read_chars - 1] = '\0';
|
||||||
--read_chars;
|
--read_chars;
|
||||||
}
|
}
|
||||||
std::string csvSource{line_str};
|
|
||||||
|
|
||||||
std::string::const_iterator aChar = csvSource.begin();
|
if (use_threadpool) {
|
||||||
std::string::const_iterator strEnd = csvSource.end();
|
std::string str{line_str};
|
||||||
while (aChar != strEnd) {
|
dispatch(tp, std::function<void()>
|
||||||
if (*aChar == quote_character) {
|
([this, str, &cols_def, &table, &row_cnt, &row_cnt_mutex]() {
|
||||||
inQuote = !inQuote;
|
auto parsed = parseCSVString(str, cols_def, table);
|
||||||
} else if (*aChar == field_separator) {
|
{
|
||||||
if (inQuote) {
|
std::unique_lock<std::mutex> lock(row_cnt_mutex);
|
||||||
field += *aChar;
|
row_cnt += parsed;
|
||||||
} else {
|
|
||||||
line.push_back(field);
|
|
||||||
field.clear();
|
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
field.push_back(*aChar);
|
|
||||||
}
|
}
|
||||||
|
));
|
||||||
++aChar;
|
} else
|
||||||
}
|
row_cnt += parseCSVString(line_str, cols_def, table);
|
||||||
|
|
||||||
if (!field.empty())
|
|
||||||
line.push_back(field);
|
|
||||||
|
|
||||||
table.create_row_from_vector(cols_def, line);
|
|
||||||
row_cnt++;
|
|
||||||
|
|
||||||
field.clear();
|
|
||||||
line.clear();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fclose(fp);
|
fclose(fp);
|
||||||
|
tp.join();
|
||||||
|
|
||||||
|
} catch (const std::exception &e) {
|
||||||
if (line_str)
|
if (line_str)
|
||||||
free(line_str);
|
free(line_str);
|
||||||
|
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (line_str)
|
||||||
|
free(line_str);
|
||||||
|
|
||||||
return row_cnt;
|
return row_cnt;
|
||||||
}
|
}
|
||||||
|
|
||||||
int CsvReader::parseCSV2(const std::string &csvSource, std::vector<ColDefNode> &cols_def, Table& table) {
|
size_t CsvReader::parseCSVString(const std::string &csvSource, std::vector<ColDefNode> &cols_def, Table &table) {
|
||||||
int row_cnt = 0;
|
size_t row_cnt = 0;
|
||||||
bool inQuote(false);
|
bool inQuote(false);
|
||||||
bool newLine(false);
|
bool newLine(false);
|
||||||
std::string field;
|
|
||||||
|
|
||||||
std::vector<std::string> line;
|
std::vector<std::string> columns;
|
||||||
line.reserve(32);
|
std::string field;
|
||||||
|
columns.reserve(256);
|
||||||
|
field.reserve(64);
|
||||||
|
|
||||||
std::string::const_iterator aChar = csvSource.begin();
|
std::string::const_iterator aChar = csvSource.begin();
|
||||||
while (aChar != csvSource.end()) {
|
std::string::const_iterator aEnd = csvSource.end();
|
||||||
|
while (aChar != aEnd) {
|
||||||
if (*aChar == quote_character) {
|
if (*aChar == quote_character) {
|
||||||
newLine = false;
|
newLine = false;
|
||||||
inQuote = !inQuote;
|
inQuote = !inQuote;
|
||||||
|
|
@ -109,7 +101,7 @@ int CsvReader::parseCSV2(const std::string &csvSource, std::vector<ColDefNode> &
|
||||||
if (inQuote) {
|
if (inQuote) {
|
||||||
field += *aChar;
|
field += *aChar;
|
||||||
} else {
|
} else {
|
||||||
line.push_back(field);
|
columns.push_back(field);
|
||||||
field.clear();
|
field.clear();
|
||||||
}
|
}
|
||||||
} else if (*aChar == line_separator || *aChar == line_separator2) {
|
} else if (*aChar == line_separator || *aChar == line_separator2) {
|
||||||
|
|
@ -117,14 +109,14 @@ int CsvReader::parseCSV2(const std::string &csvSource, std::vector<ColDefNode> &
|
||||||
field += *aChar;
|
field += *aChar;
|
||||||
} else {
|
} else {
|
||||||
if (!newLine) {
|
if (!newLine) {
|
||||||
line.push_back(field);
|
columns.push_back(field);
|
||||||
if (header_skiped) {
|
if (header_skiped) {
|
||||||
table.create_row_from_vector(cols_def, line);
|
table.create_row_from_vector(cols_def, columns);
|
||||||
row_cnt++;
|
row_cnt++;
|
||||||
}
|
}
|
||||||
header_skiped = true;
|
header_skiped = true;
|
||||||
field.clear();
|
field.clear();
|
||||||
line.clear();
|
columns.clear();
|
||||||
newLine = true;
|
newLine = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -136,13 +128,14 @@ int CsvReader::parseCSV2(const std::string &csvSource, std::vector<ColDefNode> &
|
||||||
aChar++;
|
aChar++;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!field.empty()) line.push_back(field);
|
if (!field.empty())
|
||||||
|
columns.push_back(field);
|
||||||
|
|
||||||
if (header_skiped) {
|
if (header_skiped) {
|
||||||
table.create_row_from_vector(cols_def, line);
|
table.create_row_from_vector(cols_def, columns);
|
||||||
row_cnt++;
|
row_cnt++;
|
||||||
|
} else
|
||||||
header_skiped = true;
|
header_skiped = true;
|
||||||
}
|
|
||||||
|
|
||||||
return row_cnt;
|
return row_cnt;
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -26,10 +26,8 @@ namespace usql {
|
||||||
public:
|
public:
|
||||||
explicit CsvReader(bool skip_hdr = true, char field_sep = ',', char quote_ch = '"', char line_sep = '\r', char line_sep2 = '\n');
|
explicit CsvReader(bool skip_hdr = true, char field_sep = ',', char quote_ch = '"', char line_sep = '\r', char line_sep2 = '\n');
|
||||||
|
|
||||||
int parseCSV2(const std::string &csvSource, std::vector<ColDefNode> &cols_def, Table& table);
|
size_t parseCSVFile(const std::string &filename, std::vector<ColDefNode> &cols_def, Table& table);
|
||||||
|
size_t parseCSVString(const std::string &csvSource, std::vector<ColDefNode> &cols_def, Table& table);
|
||||||
int parseCSV(const std::string &filename, std::vector<ColDefNode> &cols_def, Table& table);
|
|
||||||
|
|
||||||
};
|
};
|
||||||
|
|
||||||
} // namespace
|
} // namespace
|
||||||
|
|
|
||||||
|
|
@ -108,10 +108,10 @@ private:
|
||||||
|
|
||||||
|
|
||||||
private:
|
private:
|
||||||
bool m_uniq;
|
|
||||||
std::string m_index_name;
|
std::string m_index_name;
|
||||||
std::string m_column_name;
|
std::string m_column_name;
|
||||||
ColumnType m_data_type;
|
ColumnType m_data_type;
|
||||||
|
bool m_uniq;
|
||||||
|
|
||||||
std::map<IndexValue, std::vector<rowid_t> > m_index;
|
std::map<IndexValue, std::vector<rowid_t> > m_index;
|
||||||
};
|
};
|
||||||
|
|
|
||||||
|
|
@ -119,7 +119,7 @@ namespace usql {
|
||||||
private:
|
private:
|
||||||
std::string m_code_str;
|
std::string m_code_str;
|
||||||
std::vector<Token> m_tokens;
|
std::vector<Token> m_tokens;
|
||||||
int m_index = 0;
|
size_t m_index = 0;
|
||||||
|
|
||||||
std::regex k_words_regex;
|
std::regex k_words_regex;
|
||||||
std::regex k_int_regex;
|
std::regex k_int_regex;
|
||||||
|
|
@ -128,4 +128,4 @@ namespace usql {
|
||||||
std::regex k_identifier_regex;
|
std::regex k_identifier_regex;
|
||||||
};
|
};
|
||||||
|
|
||||||
}
|
} // namespace
|
||||||
|
|
@ -356,8 +356,8 @@ namespace usql {
|
||||||
}
|
}
|
||||||
|
|
||||||
OffsetLimitNode Parser::parse_offset_limit_clause() {
|
OffsetLimitNode Parser::parse_offset_limit_clause() {
|
||||||
int offset = 0;
|
size_t offset = 0;
|
||||||
int limit = 999999999;
|
size_t limit = SIZE_MAX;
|
||||||
|
|
||||||
if (m_lexer.tokenType() == TokenType::keyword_offset) {
|
if (m_lexer.tokenType() == TokenType::keyword_offset) {
|
||||||
m_lexer.skipToken(TokenType::keyword_offset);
|
m_lexer.skipToken(TokenType::keyword_offset);
|
||||||
|
|
|
||||||
|
|
@ -79,10 +79,10 @@ struct ColOrderNode : Node {
|
||||||
|
|
||||||
|
|
||||||
struct OffsetLimitNode : Node {
|
struct OffsetLimitNode : Node {
|
||||||
int offset;
|
size_t offset;
|
||||||
int limit;
|
size_t limit;
|
||||||
|
|
||||||
OffsetLimitNode(int off, int lim) : Node(NodeType::offset_limit), offset(off), limit(lim) {}
|
OffsetLimitNode(size_t off, size_t lim) : Node(NodeType::offset_limit), offset(off), limit(lim) {}
|
||||||
|
|
||||||
void dump() const override {
|
void dump() const override {
|
||||||
std::cout << "type: OffsetLimitNode, offset: " << offset << ", limit: " << limit << std::endl;
|
std::cout << "type: OffsetLimitNode, offset: " << offset << ", limit: " << limit << std::endl;
|
||||||
|
|
@ -107,10 +107,10 @@ struct ColDefNode : Node {
|
||||||
std::string name;
|
std::string name;
|
||||||
ColumnType type;
|
ColumnType type;
|
||||||
int order;
|
int order;
|
||||||
int length;
|
size_t length;
|
||||||
bool null;
|
bool null;
|
||||||
|
|
||||||
ColDefNode(std::string col_name, ColumnType col_type, int col_order, int col_len, bool nullable) :
|
ColDefNode(std::string col_name, ColumnType col_type, int col_order, size_t col_len, bool nullable) :
|
||||||
Node(NodeType::column_def), name(std::move(col_name)), type(col_type), order(col_order), length(col_len),
|
Node(NodeType::column_def), name(std::move(col_name)), type(col_type), order(col_order), length(col_len),
|
||||||
null(nullable) {}
|
null(nullable) {}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -52,8 +52,8 @@ int ColBooleanValue::compare(ColValue &other) const {
|
||||||
return m_bool == other.getBoolValue() ? 0 : m_bool && !other.getBoolValue() ? -1 : 1; // true first
|
return m_bool == other.getBoolValue() ? 0 : m_bool && !other.getBoolValue() ? -1 : 1; // true first
|
||||||
}
|
}
|
||||||
|
|
||||||
Row::Row(const Row &other) : m_columns(other.m_columns.size()), m_visible(other.m_visible) {
|
Row::Row(const Row &other) : m_visible(other.m_visible), m_columns(other.m_columns.size()) {
|
||||||
for (int i = 0; i < other.m_columns.size(); i++) {
|
for (size_t i = 0; i < other.m_columns.size(); i++) {
|
||||||
if (other[i].isNull())
|
if (other[i].isNull())
|
||||||
continue; // for null NOP
|
continue; // for null NOP
|
||||||
|
|
||||||
|
|
@ -156,7 +156,7 @@ void Row::setColumnValue(ColDefNode *col_def, ValueNode *col_value) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int Row::compare(const Row &other) const {
|
int Row::compare(const Row &other) const {
|
||||||
for (int ci = 0; ci < m_columns.size(); ci++) {
|
for (size_t ci = 0; ci < m_columns.size(); ci++) {
|
||||||
int cmp = this->operator[](ci).compare(other[ci]);
|
int cmp = this->operator[](ci).compare(other[ci]);
|
||||||
if (cmp != 0) return cmp;
|
if (cmp != 0) return cmp;
|
||||||
}
|
}
|
||||||
|
|
@ -166,7 +166,7 @@ int Row::compare(const Row &other) const {
|
||||||
void Row::print(const std::vector<ColDefNode> &col_defs) {
|
void Row::print(const std::vector<ColDefNode> &col_defs) {
|
||||||
std::string out{"| "};
|
std::string out{"| "};
|
||||||
|
|
||||||
for (int ci = 0; ci < m_columns.size(); ci++) {
|
for (size_t ci = 0; ci < m_columns.size(); ci++) {
|
||||||
auto & col_def = col_defs[ci];
|
auto & col_def = col_defs[ci];
|
||||||
int col_size = print_get_column_size(col_def);
|
int col_size = print_get_column_size(col_def);
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -138,7 +138,7 @@ struct ColBooleanValue : ColValue {
|
||||||
class Row {
|
class Row {
|
||||||
|
|
||||||
public:
|
public:
|
||||||
explicit Row(int cols_count, bool visible) : m_columns(cols_count), m_visible(visible) {};
|
explicit Row(int cols_count, bool visible) : m_visible(visible), m_columns(cols_count) {};
|
||||||
Row(const Row &other);
|
Row(const Row &other);
|
||||||
|
|
||||||
Row &operator=(Row other);
|
Row &operator=(Row other);
|
||||||
|
|
|
||||||
157
usql/table.cpp
157
usql/table.cpp
|
|
@ -44,81 +44,12 @@ ColDefNode Table::get_column_def(int col_index) {
|
||||||
}
|
}
|
||||||
|
|
||||||
Row& Table::create_empty_row() {
|
Row& Table::create_empty_row() {
|
||||||
|
std::unique_lock guard(m_insert_guard);
|
||||||
|
|
||||||
m_rows.emplace_back(columns_count(), false);
|
m_rows.emplace_back(columns_count(), false);
|
||||||
return m_rows.back();
|
return m_rows.back();
|
||||||
}
|
}
|
||||||
|
|
||||||
std::string Table::csv_string() {
|
|
||||||
const size_t k_row_size_est = m_col_defs.size() * 16;
|
|
||||||
|
|
||||||
std::string out_string;
|
|
||||||
out_string.reserve(m_rows.size() * k_row_size_est);
|
|
||||||
|
|
||||||
// header
|
|
||||||
for(int i = 0; i < m_col_defs.size(); i++) {
|
|
||||||
if (i > 0) out_string += ',';
|
|
||||||
out_string += m_col_defs[i].name;
|
|
||||||
}
|
|
||||||
|
|
||||||
// rows
|
|
||||||
for (auto & row : m_rows) {
|
|
||||||
if (row.is_visible()) {
|
|
||||||
std::string csv_line{"\n"};
|
|
||||||
csv_line.reserve(k_row_size_est);
|
|
||||||
|
|
||||||
for (int i = 0; i < m_col_defs.size(); i++) {
|
|
||||||
if (i > 0) csv_line += ',';
|
|
||||||
|
|
||||||
auto &col = row[i];
|
|
||||||
if (!col.isNull()) {
|
|
||||||
csv_line += col.getCsvStringValue();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
out_string += csv_line;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return out_string;
|
|
||||||
}
|
|
||||||
|
|
||||||
int Table::load_csv_string(const std::string &content) {
|
|
||||||
std::vector<ColDefNode> &colDefs = m_col_defs;
|
|
||||||
|
|
||||||
CsvReader csvparser{};
|
|
||||||
int row_cnt = csvparser.parseCSV2(content, colDefs, *this);
|
|
||||||
|
|
||||||
return row_cnt;
|
|
||||||
}
|
|
||||||
|
|
||||||
int Table::load_csv_file(const std::string &filename) {
|
|
||||||
std::vector<ColDefNode> &colDefs = m_col_defs;
|
|
||||||
|
|
||||||
// allocate enough space
|
|
||||||
int line_size = 128;
|
|
||||||
|
|
||||||
std::ifstream in(filename, std::ifstream::ate | std::ifstream::binary);
|
|
||||||
auto file_size = in.tellg();
|
|
||||||
|
|
||||||
std::ifstream infile(filename);
|
|
||||||
if (infile.good()) {
|
|
||||||
std::string sLine;
|
|
||||||
std::getline(infile, sLine);
|
|
||||||
line_size = (int)sLine.size() + 1;
|
|
||||||
}
|
|
||||||
infile.close();
|
|
||||||
|
|
||||||
if (file_size > 0) {
|
|
||||||
auto new_size = m_rows.size() + int((file_size / line_size) * 1.20);
|
|
||||||
m_rows.reserve(new_size);
|
|
||||||
}
|
|
||||||
|
|
||||||
// load rows
|
|
||||||
CsvReader csvparser{};
|
|
||||||
int row_cnt = csvparser.parseCSV(filename, colDefs, *this);
|
|
||||||
|
|
||||||
return row_cnt;
|
|
||||||
}
|
|
||||||
|
|
||||||
void Table::create_row_from_vector(const std::vector<ColDefNode> &colDefs, const std::vector<std::string> &csv_line) {
|
void Table::create_row_from_vector(const std::vector<ColDefNode> &colDefs, const std::vector<std::string> &csv_line) {
|
||||||
// prepare empty new_row
|
// prepare empty new_row
|
||||||
Row& new_row = create_empty_row();
|
Row& new_row = create_empty_row();
|
||||||
|
|
@ -147,6 +78,77 @@ void Table::create_row_from_vector(const std::vector<ColDefNode> &colDefs, const
|
||||||
commit_row(new_row);
|
commit_row(new_row);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
std::string Table::csv_string() {
|
||||||
|
const size_t k_row_size_est = m_col_defs.size() * 16;
|
||||||
|
|
||||||
|
std::string out_string;
|
||||||
|
out_string.reserve(m_rows.size() * k_row_size_est);
|
||||||
|
|
||||||
|
// header
|
||||||
|
for(size_t i = 0; i < m_col_defs.size(); i++) {
|
||||||
|
if (i > 0) out_string += ',';
|
||||||
|
out_string += m_col_defs[i].name;
|
||||||
|
}
|
||||||
|
|
||||||
|
// rows
|
||||||
|
for (auto & row : m_rows) {
|
||||||
|
if (row.is_visible()) {
|
||||||
|
std::string csv_line{"\n"};
|
||||||
|
csv_line.reserve(k_row_size_est);
|
||||||
|
|
||||||
|
for (size_t i = 0; i < m_col_defs.size(); i++) {
|
||||||
|
if (i > 0) csv_line += ',';
|
||||||
|
|
||||||
|
auto &col = row[i];
|
||||||
|
if (!col.isNull()) {
|
||||||
|
csv_line += col.getCsvStringValue();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
out_string += csv_line;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return out_string;
|
||||||
|
}
|
||||||
|
|
||||||
|
size_t Table::load_csv_string(const std::string &content) {
|
||||||
|
std::vector<ColDefNode> &colDefs = m_col_defs;
|
||||||
|
|
||||||
|
CsvReader csvparser{};
|
||||||
|
auto row_cnt = csvparser.parseCSVString(content, colDefs, *this);
|
||||||
|
|
||||||
|
return row_cnt;
|
||||||
|
}
|
||||||
|
|
||||||
|
size_t Table::load_csv_file(const std::string &filename) {
|
||||||
|
std::vector<ColDefNode> &colDefs = m_col_defs;
|
||||||
|
|
||||||
|
// allocate enough space
|
||||||
|
int line_size = 256;
|
||||||
|
|
||||||
|
std::ifstream in(filename, std::ifstream::ate | std::ifstream::binary);
|
||||||
|
auto file_size = in.tellg();
|
||||||
|
|
||||||
|
std::ifstream infile(filename);
|
||||||
|
if (infile.good()) {
|
||||||
|
std::string sLine;
|
||||||
|
std::getline(infile, sLine);
|
||||||
|
line_size = (int)sLine.size() + 1;
|
||||||
|
}
|
||||||
|
infile.close();
|
||||||
|
|
||||||
|
if (file_size > 0) {
|
||||||
|
auto new_size = m_rows.size() + int((file_size / line_size) * 1.20);
|
||||||
|
m_rows.reserve(new_size);
|
||||||
|
}
|
||||||
|
|
||||||
|
// load rows
|
||||||
|
CsvReader csvparser{};
|
||||||
|
auto row_cnt = csvparser.parseCSVFile(filename, colDefs, *this);
|
||||||
|
|
||||||
|
return row_cnt;
|
||||||
|
}
|
||||||
|
|
||||||
void Table::print() {
|
void Table::print() {
|
||||||
std::string out{"| "};
|
std::string out{"| "};
|
||||||
std::string out2{"+-"};
|
std::string out2{"+-"};
|
||||||
|
|
@ -191,7 +193,7 @@ void Table::commit_row(Row &row) {
|
||||||
void Table::commit_copy_of_row(Row &row) {
|
void Table::commit_copy_of_row(Row &row) {
|
||||||
Row& new_row = create_empty_row();
|
Row& new_row = create_empty_row();
|
||||||
|
|
||||||
for(int i = 0; i < m_col_defs.size(); i++) {
|
for(size_t i = 0; i < m_col_defs.size(); i++) {
|
||||||
ColValue &ct = row[i];
|
ColValue &ct = row[i];
|
||||||
|
|
||||||
if (ct.isNull()) {
|
if (ct.isNull()) {
|
||||||
|
|
@ -226,16 +228,15 @@ void Table::validate_column(const ColDefNode *col_def, ValueNode *col_val) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void Table::validate_column(const ColDefNode *col_def, ColValue &col_val) {
|
void Table::validate_column(const ColDefNode *col_def, ColValue &col_val) {
|
||||||
if (!col_def->null && col_val.isNull()) {
|
if (!col_def->null && col_val.isNull())
|
||||||
throw Exception("Column " + col_def->name + " cannot be null");
|
throw Exception("Column " + col_def->name + " cannot be null");
|
||||||
}
|
|
||||||
if (col_def->type == ColumnType::varchar_type && !col_val.isNull() && col_val.getStringValue().size() > col_def->length) {
|
if (col_def->type == ColumnType::varchar_type && !col_val.isNull() && col_val.getStringValue().size() > col_def->length)
|
||||||
throw Exception("Column value of " + col_def->name + " is too long (" + col_val.getStringValue() + ")");
|
throw Exception("Column value of " + col_def->name + " is too long (" + col_val.getStringValue() + ")");
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
void Table::validate_row(Row &row) {
|
void Table::validate_row(Row &row) {
|
||||||
for(int i = 0; i < m_col_defs.size(); i++) {
|
for(size_t i = 0; i < m_col_defs.size(); i++) {
|
||||||
ColDefNode col_def = m_col_defs[i];
|
ColDefNode col_def = m_col_defs[i];
|
||||||
ColValue &col_val = row[i];
|
ColValue &col_val = row[i];
|
||||||
|
|
||||||
|
|
@ -277,6 +278,8 @@ void Table::reindex_row(Index &index, const ColDefNode &col_def, const Row &old_
|
||||||
void Table::index_row(const Row &row) {
|
void Table::index_row(const Row &row) {
|
||||||
if (!m_indexes.empty()) {
|
if (!m_indexes.empty()) {
|
||||||
const size_t rowid = get_rowid(row);
|
const size_t rowid = get_rowid(row);
|
||||||
|
|
||||||
|
std::unique_lock guard(m_insert_guard);
|
||||||
for (auto &idx : m_indexes) {
|
for (auto &idx : m_indexes) {
|
||||||
ColDefNode cDef = get_column_def(idx.get_column_name());
|
ColDefNode cDef = get_column_def(idx.get_column_name());
|
||||||
index_row(idx, cDef, row, rowid);
|
index_row(idx, cDef, row, rowid);
|
||||||
|
|
|
||||||
|
|
@ -31,8 +31,8 @@ struct Table {
|
||||||
void validate_row(Row &row);
|
void validate_row(Row &row);
|
||||||
|
|
||||||
std::string csv_string();
|
std::string csv_string();
|
||||||
int load_csv_string(const std::string &content);
|
size_t load_csv_string(const std::string &content);
|
||||||
int load_csv_file(const std::string &filename);
|
size_t load_csv_file(const std::string &filename);
|
||||||
|
|
||||||
void print();
|
void print();
|
||||||
|
|
||||||
|
|
@ -40,6 +40,7 @@ struct Table {
|
||||||
std::vector<ColDefNode> m_col_defs;
|
std::vector<ColDefNode> m_col_defs;
|
||||||
std::vector<Row> m_rows;
|
std::vector<Row> m_rows;
|
||||||
std::vector<Index> m_indexes;
|
std::vector<Index> m_indexes;
|
||||||
|
std::mutex m_insert_guard;
|
||||||
|
|
||||||
void create_row_from_vector(const std::vector<ColDefNode> &colDefs, const std::vector<std::string> &csv_line);
|
void create_row_from_vector(const std::vector<ColDefNode> &colDefs, const std::vector<std::string> &csv_line);
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,79 @@
|
||||||
|
#include <thread>
|
||||||
|
#include <mutex>
|
||||||
|
#include <functional>
|
||||||
|
#include <queue>
|
||||||
|
#include <condition_variable>
|
||||||
|
#include <vector>
|
||||||
|
|
||||||
|
class thread_pool {
|
||||||
|
public:
|
||||||
|
thread_pool(std::size_t size) : stop(false) {
|
||||||
|
for (std::size_t i = 0; i < size; ++i) {
|
||||||
|
workers.emplace_back([this] { spawn(); });
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
virtual ~thread_pool() {
|
||||||
|
if (!stop) join();
|
||||||
|
}
|
||||||
|
|
||||||
|
void post(std::function<void()> f) {
|
||||||
|
{
|
||||||
|
std::unique_lock<std::mutex> lock(mutex);
|
||||||
|
tasks.push(f);
|
||||||
|
}
|
||||||
|
|
||||||
|
condition.notify_one();
|
||||||
|
}
|
||||||
|
|
||||||
|
void join() {
|
||||||
|
{
|
||||||
|
std::unique_lock<std::mutex> lock(mutex);
|
||||||
|
stop = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
condition.notify_all();
|
||||||
|
|
||||||
|
for (std::size_t i = 0; i < workers.size(); ++i) {
|
||||||
|
workers[i].join();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
void spawn() {
|
||||||
|
std::function<void()> task;
|
||||||
|
|
||||||
|
bool task_queue_empty = tasks.empty();
|
||||||
|
while (!stop || !task_queue_empty) {
|
||||||
|
bool task_valid = false;
|
||||||
|
{
|
||||||
|
std::unique_lock<std::mutex> lock(mutex);
|
||||||
|
condition.wait(lock, [this]() {
|
||||||
|
return (!tasks.empty()) || (tasks.empty() && stop);
|
||||||
|
});
|
||||||
|
|
||||||
|
if (!tasks.empty()) {
|
||||||
|
task = std::move(tasks.front());
|
||||||
|
tasks.pop();
|
||||||
|
task_valid = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
task_queue_empty = tasks.empty();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (task_valid) task();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public:
|
||||||
|
std::vector<std::thread> workers;
|
||||||
|
std::queue<std::function<void()>> tasks;
|
||||||
|
|
||||||
|
std::mutex mutex;
|
||||||
|
std::condition_variable condition;
|
||||||
|
bool stop;
|
||||||
|
};
|
||||||
|
|
||||||
|
inline void dispatch(thread_pool &pool, std::function<void()> f) {
|
||||||
|
pool.post(f);
|
||||||
|
}
|
||||||
|
|
@ -114,8 +114,8 @@ void USql::select_row(SelectFromTableNode &where_node,
|
||||||
}
|
}
|
||||||
|
|
||||||
bool USql::check_for_aggregate_only_functions(SelectFromTableNode &node, size_t result_cols_cnt) {
|
bool USql::check_for_aggregate_only_functions(SelectFromTableNode &node, size_t result_cols_cnt) {
|
||||||
int aggregate_funcs = 0;
|
size_t aggregate_funcs = 0;
|
||||||
for (int i = 0; i < node.cols_names->size(); i++) {
|
for (size_t i = 0; i < node.cols_names->size(); i++) {
|
||||||
SelectColNode * col_node = &node.cols_names->operator[](i);
|
SelectColNode * col_node = &node.cols_names->operator[](i);
|
||||||
if (col_node->value->node_type == NodeType::function) {
|
if (col_node->value->node_type == NodeType::function) {
|
||||||
auto func_node = static_cast<FunctionNode *>(col_node->value.get());
|
auto func_node = static_cast<FunctionNode *>(col_node->value.get());
|
||||||
|
|
@ -244,7 +244,7 @@ std::tuple<int, ColDefNode> USql::get_node_definition(Table *table, Node * node,
|
||||||
return std::make_tuple(-1, col_def);
|
return std::make_tuple(-1, col_def);
|
||||||
} else if (func_node->function == "min" || func_node->function == "max") {
|
} else if (func_node->function == "min" || func_node->function == "max") {
|
||||||
auto col_type= ColumnType::float_type;
|
auto col_type= ColumnType::float_type;
|
||||||
int col_len = 1;
|
size_t col_len = 1;
|
||||||
auto & v = func_node->params[0];
|
auto & v = func_node->params[0];
|
||||||
if (v->node_type == NodeType::database_value) {
|
if (v->node_type == NodeType::database_value) {
|
||||||
ColDefNode src_col_def = get_db_column_definition(table, v.get());
|
ColDefNode src_col_def = get_db_column_definition(table, v.get());
|
||||||
|
|
@ -387,7 +387,7 @@ std::unique_ptr<Table> USql::execute_select(SelectFromTableNode &node) const {
|
||||||
// create result table
|
// create result table
|
||||||
std::vector<ColDefNode> result_tbl_col_defs{};
|
std::vector<ColDefNode> result_tbl_col_defs{};
|
||||||
std::vector<int> source_table_col_index{};
|
std::vector<int> source_table_col_index{};
|
||||||
for (int i = 0; i < node.cols_names->size(); i++) {
|
for (size_t i = 0; i < node.cols_names->size(); i++) {
|
||||||
SelectColNode *col_node = &node.cols_names->operator[](i);
|
SelectColNode *col_node = &node.cols_names->operator[](i);
|
||||||
auto [src_tbl_col_index, rst_tbl_col_def] = get_column_definition(table, col_node, i);
|
auto [src_tbl_col_index, rst_tbl_col_def] = get_column_definition(table, col_node, i);
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -118,8 +118,7 @@ std::unique_ptr<ValueNode> USql::max_function(const std::vector<std::unique_ptr<
|
||||||
throw Exception("unsupported data type for max function");
|
throw Exception("unsupported data type for max function");
|
||||||
}
|
}
|
||||||
|
|
||||||
std::unique_ptr<ValueNode>
|
std::unique_ptr<ValueNode> USql::min_function(const std::vector<std::unique_ptr<ValueNode>> &evaluatedPars, const ColDefNode *col_def_node,
|
||||||
USql::min_function(const std::vector<std::unique_ptr<ValueNode>> &evaluatedPars, const ColDefNode *col_def_node,
|
|
||||||
ColValue *agg_func_value) {
|
ColValue *agg_func_value) {
|
||||||
if (col_def_node->type == ColumnType::integer_type || col_def_node->type == ColumnType::date_type) {
|
if (col_def_node->type == ColumnType::integer_type || col_def_node->type == ColumnType::date_type) {
|
||||||
if (!evaluatedPars[0]->isNull()) {
|
if (!evaluatedPars[0]->isNull()) {
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue