Commit e027cfbd authored by Michael Auer's avatar Michael Auer
Browse files

Merge branch '8-pgasync-not-compatible-with-node-v14-x-change-to-pg-promise' into 'master'

Resolve "PgAsync not compatible with node v14.x. Change to pg-promise"

Closes #8

See merge request !7
parents 1ab6657c 40c802bf
This diff is collapsed.
......@@ -30,7 +30,7 @@ export abstract class FeatureType {
/**
* physically deletes the feature type from the store (e.g. file from disk or table from database)
**/
abstract async delete(): Promise<void>;
abstract async delete(): Promise<null|undefined|void>;
/**
* Do some additional work in the end, e.g. close connections to a database
......
import {PgFeatureType} from './PgFeatureType';
import {GeoJsonFeatureType} from './GeoJsonFeatureType';
import PgAsync from 'pg-async';
import {FeatureTypeConfig, GeoJSONFeatureTypeConfig, PostgisFeatureTypeConfig} from './config_types_interfaces';
// const PgFeatureType = require('./PgFeatureType.js');
// const GeoJsonFeatureType = require('./GeoJsonFeatureType.js');
// const PgAsync = require('pg-async').default;
export class FeatureTypeFactory {
//factory
static async create(featureTypeConfig: FeatureTypeConfig<any>){
......
......@@ -162,6 +162,7 @@ class Ohsome2X extends EventEmitter {
this.emit('progress', {type:'progress', processed: this.currentFeatureCount, total: this.totalFeatureCount, cursor: cursor, fetchSize: fetchSize, timestamp: new Date().toISOString()});
const sourceFeatureCollection: any = await this.sourceFeatureType.getFeaturesByCursorAndLimit(cursor, fetchSize);
let targetFeatureCollection: any;
featureCount = sourceFeatureCollection.features.length;
......@@ -553,7 +554,8 @@ class Ohsome2X extends EventEmitter {
url: `${this.ohsomeApiUrl}/${queryType}`,
method: 'post',
header: {'content-type': 'application/x-www-form-urlencoded'},
maxContentLength: 1024 * 1024 * 1024 * 1024,
maxContentLength: Infinity,
maxBodyLength: Infinity,
data: dataString
});
//only for json requests with showMetadata=true, not csv
......@@ -696,4 +698,4 @@ class Ohsome2X extends EventEmitter {
}
export = Ohsome2X;
// export * from './config_types_interfaces';
\ No newline at end of file
// export * from './config_types_interfaces';
......@@ -7,14 +7,10 @@
// const moment = require('moment');
//extends FeatureType
import {
PostgisStore,
SourcePostgisFeatureTypeConfig,
TargetPostgisFeatureTypeConfig
} from './config_types_interfaces';
import {PostgisStore, SourcePostgisFeatureTypeConfig, TargetPostgisFeatureTypeConfig} from './config_types_interfaces';
import {FeatureType} from './FeatureType';
import PgAsync from 'pg-async';
import {types as pgTypes} from 'pg';
import pgPromise, {IDatabase} from 'pg-promise'
// import {types as pgTypes} from 'pg';
import * as turfHelpers from '@turf/helpers';
import {Feature, FeatureCollection} from '@turf/helpers';
// import * as moment from 'moment';
......@@ -33,8 +29,9 @@ export class PgFeatureType extends FeatureType {
private geometryId: string | number | undefined;
private geometryColumn: string;
private fetchSize: number | undefined;
private db: PgAsync;
private db: IDatabase<any>;
private tableExists: any;
private pgp: pgPromise.IMain<any>;
constructor(config: SourcePostgisFeatureTypeConfig | TargetPostgisFeatureTypeConfig) {
super(config);
......@@ -49,27 +46,33 @@ export class PgFeatureType extends FeatureType {
if ('fetchSize' in config) {
this.fetchSize = config.fetchSize;
}
// this.db = new PgAsync(this.store);
this.pgp = pgPromise({});
//per default pg parses int8 values as strings
pgTypes.setTypeParser(20, function (val) {
this.pgp.pg.types.setTypeParser(20, function (val) {
return parseInt(val)
});
this.db = new PgAsync(this.store);
this.db = this.pgp(this.store);
this.checkTableExists();
}
async checkTableExists() {
const sql = `SELECT EXISTS (
const sql = `SELECT exists(
SELECT 1
FROM pg_catalog.pg_class c
JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace
WHERE n.nspname = '${this.schemaName}'
AND c.relname = '${this.tableName}'
WHERE n.nspname = $1
AND c.relname = $2
AND (c.relkind = 'r' OR c.relkind = 'v' ) -- only tables or views
);`;
let exists;
try {
exists = await this.db.value(sql);
exists = await this.db.one(sql,[this.schemaName, this.tableName], row => row.exists);//.value(sql);
console.log(`Checked if TABLE ${this.schemaName}.${this.tableName} exists: ${exists}`);
this.tableExists = exists;
return exists;
......@@ -81,14 +84,14 @@ export class PgFeatureType extends FeatureType {
async closeConnections() {
console.log(`${this.schemaName}.${this.tableName}: ` + 'Close all connections of database: ' + this.store.database + ' on ' + this.store.host);
await this.db.closeConnections();
await this.pgp.end(); //this.db.$pool.end() //.closeConnections();
}
async delete() {
const sql = `DROP TABLE IF EXISTS ${this.schemaName}."${this.tableName}" CASCADE;`;
try {
const res = await this.db.query(sql);
const res = await this.db.none(sql);
this.tableExists = false;
console.log(sql);
return res;
......@@ -105,9 +108,9 @@ export class PgFeatureType extends FeatureType {
if (this.tableExists || await this.checkTableExists()){
const sql = `SELECT "${this.geometryId}" as id
FROM ${this.schemaName}."${this.tableName}" LIMIT 1;`;
const result = await this.db.query(sql);
const id = await this.db.one(sql, null, row=>row.id);
return typeof result.rows[0].id;
return typeof id;
} else {
throw Error(`Can't getId data type from table that doesn't exist: ${this.schemaName}.${this.tableName}`);
}
......@@ -119,7 +122,7 @@ export class PgFeatureType extends FeatureType {
async getFeatureCount(): Promise<number> {
const sql = `SELECT count(*)
FROM ${this.schemaName}."${this.tableName}";`;
return await this.db.value(sql);
return await this.db.one(sql, null, row=>row.count);
}
/**
......@@ -132,11 +135,11 @@ export class PgFeatureType extends FeatureType {
FROM ${this.schemaName}."${this.tableName}";`;
console.log(`Get all cells from ${this.schemaName}."${this.tableName}"`);
const result = await this.db.query(sql);
const result = await this.db.any(sql);
// console.log(JSON.stringify(result,undefined,2));
// console.log(result.rows);
let features = result.rows.map((row: any) => turfHelpers.feature(row.geom, {id: row.id}));
let features = result.map((row: any) => turfHelpers.feature(row.geom, {id: row.id}));
let featureCollection = turfHelpers.featureCollection(features);
return featureCollection;
......@@ -152,31 +155,22 @@ export class PgFeatureType extends FeatureType {
throw Error('Please set params: cursor: int | string, fetchSize: int');
}
const cursorSql = (typeof cursor === 'number')? cursor : `'${cursor}'`;
// const cursorSql = (typeof cursor === 'number')? cursor : `'${cursor}'`;
const sql = `SELECT "${this.geometryId}" as id,
st_asgeojson(st_multi("${this.geometryColumn}"))::json as geom
FROM ${this.schemaName}."${this.tableName}"
WHERE "${this.geometryId}" > ${cursorSql}
WHERE "${this.geometryId}" > $1
ORDER BY "${this.geometryId}" ASC
LIMIT ${fetchSize};`;
console.log('Get cells from ' + this.tableName + ' with id > ' + cursorSql + ' with fetchSize: ' + fetchSize);
console.log('Get cells from ' + this.tableName + ' with id > ' + cursor + ' with fetchSize: ' + fetchSize);
const result = await this.db.query(sql);
let features = result.rows.map((row: any) => turfHelpers.feature(row.geom, {id: row.id}));
const result = await this.db.any(sql, cursor);
let features = result.map((row: any) => turfHelpers.feature(row.geom, {id: row.id}));
let featureCollection = turfHelpers.featureCollection(features);
return featureCollection;
}
async getPropertyAgg(propertyName: string) {
let sql = `SELECT
array_agg("${propertyName}") as values
FROM ${this.schemaName}."${this.tableName}"
WHERE "timestamp" = (SELECT max("timestamp") FROM ${this.schemaName}."${this.tableName}")
;`;
const result = await this.db.query(sql);
return result;
}
async createTableFromGeoJSONFeature(geoJSONFeature: Feature, epsg?: number | string) {
//check if geometries exists
......@@ -236,10 +230,11 @@ export class PgFeatureType extends FeatureType {
const sql = `CREATE TABLE IF NOT EXISTS ${this.schemaName}."${this.tableName}" (${columnDefsString});`;
// console.log(sql);
let res = await this.db.query(sql);
this.tableExists = true;
console.log( (sql.length > 70)? sql.substring(0,67) + '...' : sql );
let res = await this.db.none(sql);
this.tableExists = true;
console.log(`✔️ TABLE is ready.`)
return res;
}
......@@ -302,7 +297,7 @@ export class PgFeatureType extends FeatureType {
}
}
const values = featureCollection.features.map((feature) => {
let values = featureCollection.features.map((feature) => {
let featureValues = (!!feature.properties) ? Object.values(feature.properties) : [];
featureValues.forEach((e: any, i: number, a: any[]) => {
return a[i] = castToDataType(e, i)
......@@ -314,37 +309,57 @@ export class PgFeatureType extends FeatureType {
return `(${featureValues.join(',')})`
}).join(',');
})//.join(','); //TODO use tx with lutiple inserts to get rid of Invalid string length
const sql = `INSERT INTO ${this.schemaName}."${this.tableName}" (${columnNames} ) VALUES ${values};`;
// console.log(sql);
console.log(`Writing ${featureCollection.features.length} features into ${this.schemaName}."${this.tableName}".`);
return await this.db.query(sql);
return await this.db.tx('insert-transaction', async t => {
const multiRowsSize = 10;
//modifying values into Array of Arrays with multiRowSize each. Chunking array into smaller chunkarrays.
let multiValues = [...Array(Math.ceil(values.length / multiRowsSize))]
.map(_ => values.splice(0, multiRowsSize))
.map(chunks => chunks.join(','));
// const inserts = multiValues.map(multiFeatureValues =>{
// return t.none(`INSERT INTO ${this.schemaName}."${this.tableName}" (${columnNames} ) VALUES ${multiFeatureValues};`)
// })
return await t.sequence(
async (index: number) => {
if (index < multiValues.length) {
return await t.none(`INSERT INTO ${this.schemaName}."${this.tableName}" (${columnNames} ) VALUES ${multiValues[index]}`);
}
}
)
})
}
async createIndex(column: string, indexType?: string) {
column = column.replace(' ', '_');
const usingIndexType = (!!indexType) ? `USING ${indexType}` : '';
const sql = `CREATE INDEX IF NOT EXISTS ${this.tableName.replace(' ', '_')}_${column}_idx ON ${this.schemaName}."${this.tableName}" ${usingIndexType} (${column});`;
const sql = `CREATE INDEX IF NOT EXISTS ${this.tableName.replace(' ', '_')}_${column}_idx ON ${this.schemaName}."${this.tableName}" ${usingIndexType} ("${column}");`;
console.log(sql);
return await this.db.query(sql);
return await this.db.none(sql);
}
async clusterTable(column: string) {
const sql = `CLUSTER ${this.schemaName}."${this.tableName}" USING ${this.tableName.replace(' ', '_')}_${column}_idx`;
console.log(sql);
return await this.db.query(sql);
return await this.db.none(sql);
}
async analyzeTable() {
const sql = `ANALYZE ${this.schemaName}."${this.tableName}"`;
console.log(sql);
return await this.db.query(sql);
return await this.db.none(sql);
}
async finalize() {
this.db.closeConnections();
await this.closeConnections();
console.log(`Connection to database '${this.store.database}' successfully closed.`)
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment