Merge pull request #8 from cyberemissary/bulk-insert

Implemented bulk insert
This commit is contained in:
Cyber Emissary
2018-02-12 15:22:39 -05:00
committed by GitHub
3 changed files with 125 additions and 63 deletions

View File

@@ -172,7 +172,8 @@ enum {
MDB_SHEXP_COMMENTS = 1<<3, /* export comments on columns & tables */
MDB_SHEXP_DEFVALUES = 1<<4, /* export default values */
MDB_SHEXP_INDEXES = 1<<5, /* export indices */
MDB_SHEXP_RELATIONS = 1<<6 /* export relation (foreign keys) */
MDB_SHEXP_RELATIONS = 1<<6, /* export relation (foreign keys) */
MDB_SHEXP_BULK_INSERT = 1 << 7 /* export data in bulk inserts */
};
#define MDB_SHEXP_DEFAULT (MDB_SHEXP_CST_NOTNULL | MDB_SHEXP_COMMENTS | MDB_SHEXP_INDEXES | MDB_SHEXP_RELATIONS)

View File

@@ -383,7 +383,7 @@ MDB_CONSTRUCTOR(_mdb_init_backends)
"COMMENT ON TABLE %s IS %s;\n",
quote_schema_name_dquote);
mdb_register_backend("postgres",
MDB_SHEXP_DROPTABLE|MDB_SHEXP_CST_NOTNULL|MDB_SHEXP_CST_NOTEMPTY|MDB_SHEXP_COMMENTS|MDB_SHEXP_INDEXES|MDB_SHEXP_RELATIONS|MDB_SHEXP_DEFVALUES,
MDB_SHEXP_DROPTABLE|MDB_SHEXP_CST_NOTNULL|MDB_SHEXP_CST_NOTEMPTY|MDB_SHEXP_COMMENTS|MDB_SHEXP_INDEXES|MDB_SHEXP_RELATIONS|MDB_SHEXP_DEFVALUES|MDB_SHEXP_BULK_INSERT,
mdb_postgres_types, &mdb_postgres_shortdate_type, &mdb_postgres_serial_type,
"current_date", "now()",
"SET client_encoding = '%s';\n",
@@ -393,7 +393,7 @@ MDB_CONSTRUCTOR(_mdb_init_backends)
"COMMENT ON TABLE %s IS %s;\n",
quote_schema_name_dquote);
mdb_register_backend("mysql",
MDB_SHEXP_DROPTABLE|MDB_SHEXP_CST_NOTNULL|MDB_SHEXP_CST_NOTEMPTY|MDB_SHEXP_INDEXES|MDB_SHEXP_DEFVALUES,
MDB_SHEXP_DROPTABLE|MDB_SHEXP_CST_NOTNULL|MDB_SHEXP_CST_NOTEMPTY|MDB_SHEXP_INDEXES|MDB_SHEXP_DEFVALUES|MDB_SHEXP_BULK_INSERT,
mdb_mysql_types, &mdb_mysql_shortdate_type, NULL,
"current_date", "now()",
"-- That file uses encoding %s\n",
@@ -403,7 +403,7 @@ MDB_CONSTRUCTOR(_mdb_init_backends)
NULL,
quote_schema_name_rquotes_merge);
mdb_register_backend("sqlite",
MDB_SHEXP_DROPTABLE|MDB_SHEXP_RELATIONS|MDB_SHEXP_DEFVALUES,
MDB_SHEXP_DROPTABLE|MDB_SHEXP_RELATIONS|MDB_SHEXP_DEFVALUES|MDB_SHEXP_BULK_INSERT,
mdb_sqlite_types, NULL, NULL,
"date('now')", "date('now')",
"-- That file uses encoding %s\n",

View File

@@ -90,6 +90,7 @@ main(int argc, char **argv)
int header_row = 1;
int quote_text = 1;
int boolean_words = 0;
int batch_size = 1000;
char *insert_dialect = NULL;
char *date_fmt = NULL;
char *namespace = NULL;
@@ -112,6 +113,7 @@ main(int argc, char **argv)
{"null", '0', 0, G_OPTION_ARG_STRING, &null_text, "Use <char> to represent a NULL value", "char"},
{"bin", 'b', 0, G_OPTION_ARG_STRING, &str_bin_mode, "Binary export mode", "strip|raw|octal"},
{"boolean-words", 'B', 0, G_OPTION_ARG_NONE, &boolean_words, "Use TRUE/FALSE in Boolean fields (default is 0/1)", NULL},
{"batch-size", 'S', 0, G_OPTION_ARG_INT, &batch_size, "Size of insert batches on supported platforms.", "int"},
{NULL},
};
GError *error = NULL;
@@ -220,6 +222,64 @@ main(int argc, char **argv)
fputs(row_delimiter, outfile);
}
// TODO refactor this into functions
if (mdb->default_backend->capabilities & MDB_SHEXP_BULK_INSERT) {
//for efficiency do multi row insert on engines that support this
unsigned int counter = 0;
while (mdb_fetch_row(table)) {
if (counter % batch_size == 0) {
counter = 0; // reset to 0, prevent overflow on extremely large data sets.
char *quoted_name;
quoted_name = mdb->default_backend->quote_schema_name(namespace, argv[2]);
fprintf(outfile, "INSERT INTO %s (", quoted_name);
free(quoted_name);
for (i = 0; i < table->num_cols; i++) {
if (i > 0) fputs(", ", outfile);
col = g_ptr_array_index(table->columns, i);
quoted_name = mdb->default_backend->quote_schema_name(NULL, col->name);
fputs(quoted_name, outfile);
free(quoted_name);
}
fputs(") VALUES ", outfile);
} else {
fputs(", ", outfile);
}
fputs("(", outfile);
for (i = 0; i < table->num_cols; i++) {
if (i > 0)
fputs(delimiter, outfile);
col = g_ptr_array_index(table->columns, i);
if (!bound_lens[i]) {
/* Don't quote NULLs */
if (insert_dialect)
fputs("NULL", outfile);
else
fputs(null_text, outfile);
} else {
if (col->col_type == MDB_OLE) {
value = mdb_ole_read_full(mdb, col, &length);
} else {
value = bound_values[i];
length = bound_lens[i];
}
print_col(outfile, value, quote_text, col->col_type, length, quote_char, escape_char, bin_mode);
if (col->col_type == MDB_OLE)
free(value);
}
}
fputs(")", outfile);
if (counter % batch_size == batch_size - 1) {
fputs(";", outfile);
fputs(row_delimiter, outfile);
}
counter++;
}
if (counter % batch_size != 0) {
//if our last row did not land on closing tag, close the stement here
fputs(";", outfile);
fputs(row_delimiter, outfile);
}
} else {
while (mdb_fetch_row(table)) {
if (insert_dialect) {
@@ -262,6 +322,7 @@ main(int argc, char **argv)
if (insert_dialect) fputs(");", outfile);
fputs(row_delimiter, outfile);
}
}
/* free the memory used to bind */
for (i=0;i<table->num_cols;i++) {