PgFeatureType.ts 20.4 KB
Newer Older
1
2
3
4
5
6
7
// const axios = require('axios');
// const PgAsync = require('pg-async').default;
// const pgTypes = require('pg').types;
// const querystring = require('querystring');
// const turfHelpers = require('@turf/helpers');
// const FeatureType = require('./FeatureType.js');
// const moment = require('moment');
8
9

//extends FeatureType
10
import {PostgisStore, SourcePostgisFeatureTypeConfig, TargetPostgisFeatureTypeConfig} from './config_types_interfaces';
11
import {FeatureType} from './FeatureType';
12
import pgPromise, {Column, IDatabase} from 'pg-promise'
Michael Auer's avatar
Michael Auer committed
13
// import {types as pgTypes} from 'pg';
14
import * as turfHelpers from '@turf/helpers';
15
import {Feature, FeatureCollection} from '@turf/helpers';
16
import {PgColumnTypes} from './PgColumnTypes';
17
18
19
20
21
22

// import Moment = require('moment');
// import MomentRange = require('moment-range');

// @ts-ignore
// const moment = MomentRange.extendMoment(Moment);
23
24

export class PgFeatureType extends FeatureType {
25
26
27
    public readonly store: PostgisStore;
    private schemaName: string;
    private tableName: string;
28
29
30
    private shouldCreatePartitions: boolean = false;
    private partitionBaseName: string;
    private partitionBy: { key: string; values: any[]; createDefaultPartition: boolean; baseName?: string } | undefined;
Michael Auer's avatar
Michael Auer committed
31
    private geometryId: string | number | undefined;
32
    private geometryColumn: string;
33
34
35

    public pgp: pgPromise.IMain<any>;
    public db: IDatabase<any>;
36
37
    private tableExists: any;

Michael Auer's avatar
Michael Auer committed
38
    constructor(config: SourcePostgisFeatureTypeConfig | TargetPostgisFeatureTypeConfig) {
39
        super(config);
40

41
42
43
        this.store = config.store; //type:'postgis'
        this.schemaName = config.schema || 'public';
        this.tableName = config.name;
Michael Auer's avatar
Michael Auer committed
44
45
46
        if ('geometryId' in config) {
            this.geometryId = config.geometryId;
        }
47
        this.geometryColumn = config.geometryColumn;
Michael Auer's avatar
Michael Auer committed
48

49
50
51
52
53
54
55
56
57
58
        this.partitionBaseName = ('partitionBy' in config && config.partitionBy!.baseName != null)? config.partitionBy!.baseName : config.name;

        // create List Partitions in case of target being a postgres DB
        if ("partitionBy" in config) {
            this.shouldCreatePartitions =
                config.partitionBy!.key != null
                && config.partitionBy!.values.length > 0;
            this.partitionBy = (this.shouldCreatePartitions)? config.partitionBy : undefined;
        }

Michael Auer's avatar
Michael Auer committed
59
60
        this.pgp = pgPromise({});

61
        //per default pg parses int8 values as strings
Michael Auer's avatar
Michael Auer committed
62
        this.pgp.pg.types.setTypeParser(20, function (val) {
63
64
            return parseInt(val)
        });
65
66
67

        this.db = this.pgp(this.store);

68
        this.checkTableExists();
69
70
71
    }

    async checkTableExists() {
72
        const sql = `SELECT exists(
73
74
75
                   SELECT 1
                   FROM   pg_catalog.pg_class c
                   JOIN   pg_catalog.pg_namespace n ON n.oid = c.relnamespace
76
77
                   WHERE  n.nspname = $1
                   AND    c.relname = $2
78
79
                   AND    c.relkind IN ('r','v','p')
                   -- only tables or views or 'partitioned tables'
80
81
                   );`;

82
83
        let exists;
        try {
84
            exists = await this.db.one(sql, [this.schemaName, this.tableName], row => row.exists);//.value(sql);
Michael Auer's avatar
Michael Auer committed
85
            console.log(`Checked if TABLE ${this.schemaName}.${this.tableName} exists: ${exists}`);
86
            this.tableExists = exists;
87
88
89
90
91
            return exists;
        } catch (e) {
            console.log(e);
            throw new Error('Could not check if table exists. Is the database accessible?');
        }
92
    }
93
94
95

    async closeConnections() {
        console.log(`${this.schemaName}.${this.tableName}: ` + 'Close all connections of database: ' + this.store.database + ' on ' + this.store.host);
96
        await this.pgp.end(); //this.db.$pool.end() //.closeConnections();
97
98
    }

99
100
    async delete() {
        const sql = `DROP TABLE IF EXISTS ${this.schemaName}."${this.tableName}" CASCADE;`;
101

102
        try {
103
            const res = await this.db.none(sql);
104
105
106
107
108
109
110
111
112
113
114
115
            this.tableExists = false;
            console.log(sql);
            return res;
        } catch (e) {
            console.log(e);
        }

    }

    /**
     * get JS data type of id column (number or string)
     */
116
    async getIdJsType(): Promise<string> {
117
        if (this.tableExists || await this.checkTableExists()) {
118
119
            const sql = `SELECT "${this.geometryId}" as id
                         FROM ${this.schemaName}."${this.tableName}" LIMIT 1;`;
120
            const id = await this.db.one(sql, null, row => row.id);
121

122
            return typeof id;
123
124
125
        } else {
            throw Error(`Can't getId data type from table that doesn't exist: ${this.schemaName}.${this.tableName}`);
        }
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
    }

    /**
     * Returns a look-up object with column names as keys and their data types as values
     */
    async getPgColumnTypes(): Promise<{ [columns: string]: string }> {
        if (this.tableExists || await this.checkTableExists()) {

            const sql = `WITH 
                            cols AS (
                            SELECT 
                            ordinal_position,
                            column_name, 
                            udt_name AS dtype 
                            FROM information_schema."columns"  
                            WHERE table_schema = $1
                            AND table_name = $2
                            ORDER BY ordinal_position
                            ),
                            geom_cols AS (
                            SELECT f_geometry_column AS column_name ,'geometry('||"type"||','||srid||')' AS dtype 
                            FROM geometry_columns 
                            WHERE f_table_schema = $1
                            AND f_table_name = $2
                            )
                            SELECT 
--                            ordinal_position,
                            column_name,
154
155
156
                            CASE 
                              WHEN dtype = 'geometry' 
                                THEN (SELECT dtype FROM geom_cols WHERE geom_cols.column_name = cols.column_name)
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
                              ELSE dtype
                            END AS dtype
                            FROM cols
                            ORDER BY ordinal_position
                            ;`;
            // console.log(this.pgp.as.format(sql, [this.schemaName, this.tableName]))

            const types = await this.db.many(sql, [this.schemaName, this.tableName]);

            // console.log("raw", types);

            let columnTypes: { [column: string]: string } = types.map(row => {
                return {[row.column_name]: row.dtype}
            }).reduce((acc, curr, i) => {
                return {...acc, ...curr};
            }, {});
            // console.log("reduced", columnTypes);

            //rename id column
            if (this.geometryId != 'id' && this.geometryId != null) {
                columnTypes.id = columnTypes[this.geometryId];
                delete columnTypes[this.geometryId];
            }


            // rename colums with alias
            for (const columnDef of this.keepColumns) {
                if (columnDef?.as && columnDef.name in columnTypes) {
                    columnTypes[columnDef.as] = columnTypes[columnDef.name];
                    delete columnTypes[columnDef.name];
                }
            }

            //rename geometryColumn
            if (this.geometryColumn != 'geom' && this.geometryColumn != null) {
                columnTypes.geom = columnTypes[this.geometryColumn];
                delete columnTypes[this.geometryColumn];
            }

            return columnTypes;
        } else {
            throw Error(`Can't get data types from table that doesn't exist: ${this.schemaName}.${this.tableName}`);
        }
    }
201

202
203
204
    /**
     * returns the total number of features
     */
205
    async getFeatureCount(): Promise<number> {
206
207
        const sql = `SELECT count(*)
              FROM ${this.schemaName}."${this.tableName}";`;
208
209
        return await this.db.one(sql, null, row => row.count);
    }
210

211
212
213
214
    /**
     get all Features
     **/
    async getFeatures(): Promise<FeatureCollection> {
215
216
217
218
219
220
221
222
223
224

        let keepColumnsSql = this.keepColumns.map(col => {
            return (typeof col.as === 'string') ? `"${col.name}" as "${col.as}"` : `"${col.name}"`;
        }).join(',');

        // add trailing comma if not empty
        if (this.keepColumns.length > 0) {
            keepColumnsSql += ',';
        }

225
        const sql = `SELECT "${this.geometryId}" as id,
226
227
              ${keepColumnsSql}
              st_asgeojson("${this.geometryColumn}")::json as geom
228
              FROM ${this.schemaName}."${this.tableName}";`;
229
        console.log(`Get all cells from ${this.schemaName}."${this.tableName}"`);
230

231
        const result = await this.db.any(sql);
232
233
        // console.log(JSON.stringify(result,undefined,2));
        // console.log(result.rows);
234

235
236
237
238
239
        let features = result.map((row: any) => {
                let {geom, ...props} = row;
                return turfHelpers.feature(geom, props)
            }
        );
240
        let featureCollection = turfHelpers.featureCollection(features);
241

242
243
        return featureCollection;
    }
244

245
246
247
248
249
    /**
     * @returns GeoJSON<FeatureCollection>
     **/
    async getFeaturesByCursorAndLimit(cursor: string | number | null, fetchSize: number | null) {
        console.log('cursor', cursor);
250

251
252
253
254
255
256
257
258
259
        let keepColumnsSql = this.keepColumns.map(col => {
            return (typeof col.as === 'string') ? `"${col.name}" as "${col.as}"` : `"${col.name}"`;
        }).join(',');

        // add trailing comma if not empty
        if (this.keepColumns.length > 0) {
            keepColumnsSql += ',';
        }

260
261
262
        if (cursor == null || fetchSize == null) {
            throw Error('Please set params: cursor: int | string, fetchSize: int');
        }
263

264
        // const cursorSql = (typeof cursor === 'number')? cursor : `'${cursor}'`;
265
266

        const sql = `SELECT "${this.geometryId}" as id,
267
268
              ${keepColumnsSql}
              st_asgeojson("${this.geometryColumn}")::json as geom
269
              FROM ${this.schemaName}."${this.tableName}"
270
              WHERE "${this.geometryId}" > $1
271
              ORDER BY "${this.geometryId}" ASC
272
              LIMIT ${fetchSize};`;
273
        // console.log(sql);
274
        console.log('Get cells from ' + this.tableName + ' with id > ' + cursor + ' with fetchSize: ' + fetchSize);
Michael Auer's avatar
Michael Auer committed
275

276
        const result = await this.db.any(sql, cursor);
277
278
279
280
281
282
283

        let features = result.map((row: any) => {
                let {geom, ...props} = row;
                return turfHelpers.feature(geom, props)
            }
        );

284
285
286
        let featureCollection = turfHelpers.featureCollection(features);
        return featureCollection;
    }
287
288


289
290
    async createTableFromGeoJSONFeature(geoJSONFeature: Feature, pgColumnTypes: PgColumnTypes) {

291
292
293
        //check if geometries exists
        const hasGeometry = !!geoJSONFeature.geometry;

294
295
        if (!hasGeometry && geoJSONFeature.properties == null) {
            throw new Error("Can't create TABLE from GeoJSONFeature without properties and null-geometry)");
296
297
        }

298
299
300
        let columnDefs;
        // if (!!geoJSONFeature.properties) {
        const properties = geoJSONFeature.properties || {};
301
        //get columns from properties
302
        let propertyKeys: string[] = Object.keys(properties);
303
304

        if (hasGeometry) {
305
            propertyKeys.push('geom');
306
307
        }

308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
        //order of columns
        //id always first column, geom always last
        propertyKeys.sort(PgColumnTypes.columnOrderFn);


        columnDefs = propertyKeys.map((property: string) => `"${property}" ${pgColumnTypes.getTypeByColumnName(property)}`);

        // console.log(columnDefs);
        //

        // function getDataTypeByColumnName(columnName: string) {
        //
        //     switch (columnName) {
        //         case 'id':
        //             return idType; //'bigint' or 'text';
        //         case 'timestamp':
        //         case 'from_timestamp':
        //         case 'to_timestamp':
        //         case 'fromTimestamp':
        //         case 'toTimestamp':
        //             return 'timestamptz';
        //         case 'node':
        //         case 'way':
        //         case 'relation':
        //         case 'value':
        //         case 'value2':
        //         case 'ratio':
        //         case 'value_per_area':
        //             return 'double precision';
        //         case 'attrName':
        //         case 'tag':
        //             return 'text';
        //         //timestamp or timerange in horizontal mode as columnNames
        //         case  (columnName.split('/').every((datetime) => moment(datetime).isValid())) ? columnName : null:
        //             return 'double precision';
        //         default:
        //             return 'bigint';
        //     }
        // }


        // let columnDefs = properties.map((property) => `"${property}" ${getDataTypeByColumnName(property)}`);

        // if (hasGeometry) {
        //     const geometryType = geoJSONFeature.geometry.type;
        //     epsg = (!!epsg) ? epsg : '4326';
        //     const geomColumnDef = `geom geometry(${geometryType},${epsg})`;
        //     columnDefs.push(geomColumnDef);
        // }

358
359
        const columnDefsString = columnDefs.join(',');

360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
        let partitionBySQL = '';
        let partitionsCreateSQL = '';
        if(this.shouldCreatePartitions){
            partitionBySQL = `PARTITION BY LIST(${this.partitionBy!.key});`
            partitionsCreateSQL = this.partitionBy!.values.map(
                val => `CREATE TABLE IF NOT EXISTS ${this.schemaName}."${this.partitionBaseName}_${val}" PARTITION OF ${this.schemaName}."${this.tableName}" FOR VALUES IN (${val});`)
                .join(`
                `);

            if(!!this.partitionBy!.createDefaultPartition){
                partitionsCreateSQL += `
                CREATE TABLE IF NOT EXISTS ${this.schemaName}."${this.partitionBaseName}_default" 
                PARTITION OF ${this.schemaName}."${this.tableName}" DEFAULT;
                `
            }
        }


        const sql = `CREATE TABLE IF NOT EXISTS ${this.schemaName}."${this.tableName}" (${columnDefsString})
         ${partitionBySQL}
         ${partitionsCreateSQL}
        `;
382

383
        console.log((sql.length > 70) ? sql.substring(0, 67) + '...' : sql);
384
        console.log(sql);
385
386
387
        let res = await this.db.none(sql);
        this.tableExists = true;
        console.log(`✔️ TABLE is ready.`)
388
        return res;
389
    }
390

391
392
    /**
     * @param featureCollection may or may not have geometries (geometry: null)
393
     * @param pgColumnTypes
394
     **/
395
396
    //async writeFeatures(featureCollection: FeatureCollection, pgTypeMappings: object) {}
    async writeFeatures(featureCollection: FeatureCollection, pgColumnTypes: PgColumnTypes): Promise<void> {
397
398
399
400
401
402
403
404
405
406
407

        //do nothing when feature collection is empty
        if (featureCollection.features.length == 0) {
            console.log('WARNING: No features in feature collection. No feature writing.');
            return;
        }

        // @ts-ignore
        let epsg = (featureCollection.crs && featureCollection.crs.properties && featureCollection.crs.properties.name == 'EPSG:3857') ? '3857' : '4326';

        if (!await this.tableExists) {
408
            await this.createTableFromGeoJSONFeature(featureCollection.features[0], pgColumnTypes);
409
410
411
412
413
414
415
            // await this.createTableFromOhsome(ohsomeGroupByBoundaryResponseJSON, false);
        }

        //check if geometries exists
        const hasGeometry = !!featureCollection.features[0].geometry;

        //get columns from properties
Michael Auer's avatar
Michael Auer committed
416

417
418
419
        // create a column SET
        let columns: string[] = (!!featureCollection.features[0].properties) ? Object.keys(featureCollection.features[0].properties) : [];
        let columnsDef: Column[] = columns.sort(PgColumnTypes.columnOrderFn).slice().map(col => {
420
            return new this.pgp.helpers.Column({name:col, prop:`_${col.replace(/[-:\/]/g,'_')}`});
421
422
423
424
425
426
427
428
429
430
431
        });
        let geomFromGeoJSONFn = (col: { value: any; }) => {
            const geometry = col.value;
            if (geometry) {
                return {
                    toPostgres: () => this.pgp.as.format('ST_SetSRID(ST_GeomFromGeoJSON(${geom}), ${epsg})', {
                        geom: JSON.stringify(geometry),
                        epsg
                    }),
                    rawType: true
                };
432
            }
433
            // otherwise, we return nothing, which will result into NULL automatically
434
435
        }

436
437
438
        if (hasGeometry){
            columnsDef.push(new this.pgp.helpers.Column({name: 'geom', init: geomFromGeoJSONFn}));
        }
439

Michael Auer's avatar
Michael Auer committed
440

441
442
443
444
445
446
        let columnSet = new this.pgp.helpers.ColumnSet(columnsDef, {
            table: new this.pgp.helpers.TableName({
                table: this.tableName,
                schema: this.schemaName
            })
        });
447

448

449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
        //
        // function castToDataType(value: any, index: number) {
        //     let property = properties[index];
        //     switch (property) {
        //         case 'id':
        //             return (typeof value === 'number') ? value : `'${value}'`; //bigint or text
        //         case 'timestamp':
        //         case 'from_timestamp':
        //         case 'to_timestamp':
        //         case 'fromTimestamp':
        //         case 'toTimestamp':
        //         case 'attrName':
        //         case 'tag':
        //             return `'${value}'`;
        //         case 'node':
        //         case 'way':
        //         case 'relation':
        //         case 'value':
        //         case 'value2':
        //         case 'ratio':
        //         case 'value_per_area':
        //             return (!isNaN(parseFloat(value))) ? parseFloat(value) : 'NULL';
        //         // check for ISO ranges too to support horizontal timestamps for contributionView data
        //         case (property.split('/').every((datetime) => moment(datetime).isValid())) ? property : null:
        //             return (!isNaN(parseFloat(value))) ? parseFloat(value) : 'NULL';
        //         default:
        //             return value;
        //     }
        // }
478

479
480
481
482
        let values = featureCollection.features.map((feature) => {
            let newProps: {[p:string]:any} = {};
            //for horizontal timestamps column names can't start with a digit
            for (const key in feature.properties) {
483
                newProps[`_${key.replace(/[-:\/]/g,'_')}`] = feature.properties[key];
484
485
486
487
            }
                return {...newProps, geom: feature.geometry};
            }
        );
488
489

        console.log(`Writing ${featureCollection.features.length} features into ${this.schemaName}."${this.tableName}".`);
490

491
492
493
494
495
        return await this.db.tx('insert-transaction', async t => {

            const multiRowsSize = 10;
            //modifying values into Array of Arrays with multiRowSize each. Chunking array into smaller chunkarrays.
            let multiValues = [...Array(Math.ceil(values.length / multiRowsSize))]
496
                .map(_ => values.splice(0, multiRowsSize));
497
498
499
500
501


            return await t.sequence(
                async (index: number) => {
                    if (index < multiValues.length) {
502
503
                        return await t.none(this.pgp.helpers.insert(multiValues[index],columnSet));
                        // return await t.none(`INSERT INTO ${this.schemaName}."${this.tableName}" (${columnNames} ) VALUES ${multiValues[index]}`);
504
505
506
507
                    }
                }
            )
        })
508
509
    }

510
511
512
    async createIndex(column: string, indexType?: string) {
        column = column.replace(' ', '_');
        const usingIndexType = (!!indexType) ? `USING ${indexType}` : '';
513
        const sql = `CREATE INDEX IF NOT EXISTS ${this.tableName.replace(' ', '_')}_${column}_idx ON ${this.schemaName}."${this.tableName}" ${usingIndexType} ("${column}");`;
514
        console.log(sql);
515
        return await this.db.none(sql);
516
    }
517

518
519
520
    async clusterTable(column: string) {
        const sql = `CLUSTER ${this.schemaName}."${this.tableName}" USING ${this.tableName.replace(' ', '_')}_${column}_idx`;
        console.log(sql);
521
        return await this.db.none(sql);
522
    }
523

524
525
526
    async analyzeTable() {
        const sql = `ANALYZE ${this.schemaName}."${this.tableName}"`;
        console.log(sql);
527
        return await this.db.none(sql);
528
    }
529

530
    async finalize() {
531
        await this.closeConnections();
532
        console.log(`Connection to database '${this.store.database}' successfully closed.`)
533
534
535
536
    }

}