PgFeatureType.ts 20.3 KB
Newer Older
1
2
3
4
5
6
7
// const axios = require('axios');
// const PgAsync = require('pg-async').default;
// const pgTypes = require('pg').types;
// const querystring = require('querystring');
// const turfHelpers = require('@turf/helpers');
// const FeatureType = require('./FeatureType.js');
// const moment = require('moment');
8
9

//extends FeatureType
10
import {PostgisStore, SourcePostgisFeatureTypeConfig, TargetPostgisFeatureTypeConfig} from './config_types_interfaces';
11
import {FeatureType} from './FeatureType';
12
import pgPromise, {Column, IDatabase} from 'pg-promise'
Michael Auer's avatar
Michael Auer committed
13
// import {types as pgTypes} from 'pg';
14
import * as turfHelpers from '@turf/helpers';
15
import {Feature, FeatureCollection} from '@turf/helpers';
16
import {PgColumnTypes} from './PgColumnTypes';
17
18
19
20
21
22

// import Moment = require('moment');
// import MomentRange = require('moment-range');

// @ts-ignore
// const moment = MomentRange.extendMoment(Moment);
23
24

export class PgFeatureType extends FeatureType {
25
26
27
    public readonly store: PostgisStore;
    private schemaName: string;
    private tableName: string;
28
29
30
    private shouldCreatePartitions: boolean = false;
    private partitionBaseName: string;
    private partitionBy: { key: string; values: any[]; createDefaultPartition: boolean; baseName?: string } | undefined;
Michael Auer's avatar
Michael Auer committed
31
    private geometryId: string | number | undefined;
32
    private geometryColumn: string;
33
34
35

    public pgp: pgPromise.IMain<any>;
    public db: IDatabase<any>;
36
37
    private tableExists: any;

Michael Auer's avatar
Michael Auer committed
38
    constructor(config: SourcePostgisFeatureTypeConfig | TargetPostgisFeatureTypeConfig) {
39
        super(config);
40

41
42
43
        this.store = config.store; //type:'postgis'
        this.schemaName = config.schema || 'public';
        this.tableName = config.name;
Michael Auer's avatar
Michael Auer committed
44
45
46
        if ('geometryId' in config) {
            this.geometryId = config.geometryId;
        }
47
        this.geometryColumn = config.geometryColumn;
Michael Auer's avatar
Michael Auer committed
48

49
50
51
52
53
54
55
56
57
58
        this.partitionBaseName = ('partitionBy' in config && config.partitionBy!.baseName != null)? config.partitionBy!.baseName : config.name;

        // create List Partitions in case of target being a postgres DB
        if ("partitionBy" in config) {
            this.shouldCreatePartitions =
                config.partitionBy!.key != null
                && config.partitionBy!.values.length > 0;
            this.partitionBy = (this.shouldCreatePartitions)? config.partitionBy : undefined;
        }

Michael Auer's avatar
Michael Auer committed
59
60
        this.pgp = pgPromise({});

61
        //per default pg parses int8 values as strings
Michael Auer's avatar
Michael Auer committed
62
        this.pgp.pg.types.setTypeParser(20, function (val) {
63
64
            return parseInt(val)
        });
65
66
67

        this.db = this.pgp(this.store);

68
        this.checkTableExists();
69
70
71
    }

    async checkTableExists() {
72
        const sql = `SELECT exists(
73
74
75
                   SELECT 1
                   FROM   pg_catalog.pg_class c
                   JOIN   pg_catalog.pg_namespace n ON n.oid = c.relnamespace
76
77
                   WHERE  n.nspname = $1
                   AND    c.relname = $2
78
79
80
                   AND    (c.relkind = 'r' OR c.relkind = 'v' )    -- only tables or views
                   );`;

81
82
        let exists;
        try {
83
            exists = await this.db.one(sql, [this.schemaName, this.tableName], row => row.exists);//.value(sql);
Michael Auer's avatar
Michael Auer committed
84
            console.log(`Checked if TABLE ${this.schemaName}.${this.tableName} exists: ${exists}`);
85
            this.tableExists = exists;
86
87
88
89
90
            return exists;
        } catch (e) {
            console.log(e);
            throw new Error('Could not check if table exists. Is the database accessible?');
        }
91
    }
92
93
94

    async closeConnections() {
        console.log(`${this.schemaName}.${this.tableName}: ` + 'Close all connections of database: ' + this.store.database + ' on ' + this.store.host);
95
        await this.pgp.end(); //this.db.$pool.end() //.closeConnections();
96
97
    }

98
99
    async delete() {
        const sql = `DROP TABLE IF EXISTS ${this.schemaName}."${this.tableName}" CASCADE;`;
100

101
        try {
102
            const res = await this.db.none(sql);
103
104
105
106
107
108
109
110
111
112
113
114
            this.tableExists = false;
            console.log(sql);
            return res;
        } catch (e) {
            console.log(e);
        }

    }

    /**
     * get JS data type of id column (number or string)
     */
115
    async getIdJsType(): Promise<string> {
116
        if (this.tableExists || await this.checkTableExists()) {
117
118
            const sql = `SELECT "${this.geometryId}" as id
                         FROM ${this.schemaName}."${this.tableName}" LIMIT 1;`;
119
            const id = await this.db.one(sql, null, row => row.id);
120

121
            return typeof id;
122
123
124
        } else {
            throw Error(`Can't getId data type from table that doesn't exist: ${this.schemaName}.${this.tableName}`);
        }
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
    }

    /**
     * Returns a look-up object with column names as keys and their data types as values
     */
    async getPgColumnTypes(): Promise<{ [columns: string]: string }> {
        if (this.tableExists || await this.checkTableExists()) {

            const sql = `WITH 
                            cols AS (
                            SELECT 
                            ordinal_position,
                            column_name, 
                            udt_name AS dtype 
                            FROM information_schema."columns"  
                            WHERE table_schema = $1
                            AND table_name = $2
                            ORDER BY ordinal_position
                            ),
                            geom_cols AS (
                            SELECT f_geometry_column AS column_name ,'geometry('||"type"||','||srid||')' AS dtype 
                            FROM geometry_columns 
                            WHERE f_table_schema = $1
                            AND f_table_name = $2
                            )
                            SELECT 
--                            ordinal_position,
                            column_name,
153
154
155
                            CASE 
                              WHEN dtype = 'geometry' 
                                THEN (SELECT dtype FROM geom_cols WHERE geom_cols.column_name = cols.column_name)
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
                              ELSE dtype
                            END AS dtype
                            FROM cols
                            ORDER BY ordinal_position
                            ;`;
            // console.log(this.pgp.as.format(sql, [this.schemaName, this.tableName]))

            const types = await this.db.many(sql, [this.schemaName, this.tableName]);

            // console.log("raw", types);

            let columnTypes: { [column: string]: string } = types.map(row => {
                return {[row.column_name]: row.dtype}
            }).reduce((acc, curr, i) => {
                return {...acc, ...curr};
            }, {});
            // console.log("reduced", columnTypes);

            //rename id column
            if (this.geometryId != 'id' && this.geometryId != null) {
                columnTypes.id = columnTypes[this.geometryId];
                delete columnTypes[this.geometryId];
            }


            // rename colums with alias
            for (const columnDef of this.keepColumns) {
                if (columnDef?.as && columnDef.name in columnTypes) {
                    columnTypes[columnDef.as] = columnTypes[columnDef.name];
                    delete columnTypes[columnDef.name];
                }
            }

            //rename geometryColumn
            if (this.geometryColumn != 'geom' && this.geometryColumn != null) {
                columnTypes.geom = columnTypes[this.geometryColumn];
                delete columnTypes[this.geometryColumn];
            }

            return columnTypes;
        } else {
            throw Error(`Can't get data types from table that doesn't exist: ${this.schemaName}.${this.tableName}`);
        }
    }
200

201
202
203
    /**
     * returns the total number of features
     */
204
    async getFeatureCount(): Promise<number> {
205
206
        const sql = `SELECT count(*)
              FROM ${this.schemaName}."${this.tableName}";`;
207
208
        return await this.db.one(sql, null, row => row.count);
    }
209

210
211
212
213
    /**
     get all Features
     **/
    async getFeatures(): Promise<FeatureCollection> {
214
215
216
217
218
219
220
221
222
223

        let keepColumnsSql = this.keepColumns.map(col => {
            return (typeof col.as === 'string') ? `"${col.name}" as "${col.as}"` : `"${col.name}"`;
        }).join(',');

        // add trailing comma if not empty
        if (this.keepColumns.length > 0) {
            keepColumnsSql += ',';
        }

224
        const sql = `SELECT "${this.geometryId}" as id,
225
226
              ${keepColumnsSql}
              st_asgeojson("${this.geometryColumn}")::json as geom
227
              FROM ${this.schemaName}."${this.tableName}";`;
228
        console.log(`Get all cells from ${this.schemaName}."${this.tableName}"`);
229

230
        const result = await this.db.any(sql);
231
232
        // console.log(JSON.stringify(result,undefined,2));
        // console.log(result.rows);
233

234
235
236
237
238
        let features = result.map((row: any) => {
                let {geom, ...props} = row;
                return turfHelpers.feature(geom, props)
            }
        );
239
        let featureCollection = turfHelpers.featureCollection(features);
240

241
242
        return featureCollection;
    }
243

244
245
246
247
248
    /**
     * @returns GeoJSON<FeatureCollection>
     **/
    async getFeaturesByCursorAndLimit(cursor: string | number | null, fetchSize: number | null) {
        console.log('cursor', cursor);
249

250
251
252
253
254
255
256
257
258
        let keepColumnsSql = this.keepColumns.map(col => {
            return (typeof col.as === 'string') ? `"${col.name}" as "${col.as}"` : `"${col.name}"`;
        }).join(',');

        // add trailing comma if not empty
        if (this.keepColumns.length > 0) {
            keepColumnsSql += ',';
        }

259
260
261
        if (cursor == null || fetchSize == null) {
            throw Error('Please set params: cursor: int | string, fetchSize: int');
        }
262

263
        // const cursorSql = (typeof cursor === 'number')? cursor : `'${cursor}'`;
264
265

        const sql = `SELECT "${this.geometryId}" as id,
266
267
              ${keepColumnsSql}
              st_asgeojson("${this.geometryColumn}")::json as geom
268
              FROM ${this.schemaName}."${this.tableName}"
269
              WHERE "${this.geometryId}" > $1
270
              ORDER BY "${this.geometryId}" ASC
271
              LIMIT ${fetchSize};`;
272
        // console.log(sql);
273
        console.log('Get cells from ' + this.tableName + ' with id > ' + cursor + ' with fetchSize: ' + fetchSize);
Michael Auer's avatar
Michael Auer committed
274

275
        const result = await this.db.any(sql, cursor);
276
277
278
279
280
281
282

        let features = result.map((row: any) => {
                let {geom, ...props} = row;
                return turfHelpers.feature(geom, props)
            }
        );

283
284
285
        let featureCollection = turfHelpers.featureCollection(features);
        return featureCollection;
    }
286
287


288
289
    async createTableFromGeoJSONFeature(geoJSONFeature: Feature, pgColumnTypes: PgColumnTypes) {

290
291
292
        //check if geometries exists
        const hasGeometry = !!geoJSONFeature.geometry;

293
294
        if (!hasGeometry && geoJSONFeature.properties == null) {
            throw new Error("Can't create TABLE from GeoJSONFeature without properties and null-geometry)");
295
296
        }

297
298
299
        let columnDefs;
        // if (!!geoJSONFeature.properties) {
        const properties = geoJSONFeature.properties || {};
300
        //get columns from properties
301
        let propertyKeys: string[] = Object.keys(properties);
302
303

        if (hasGeometry) {
304
            propertyKeys.push('geom');
305
306
        }

307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
        //order of columns
        //id always first column, geom always last
        propertyKeys.sort(PgColumnTypes.columnOrderFn);


        columnDefs = propertyKeys.map((property: string) => `"${property}" ${pgColumnTypes.getTypeByColumnName(property)}`);

        // console.log(columnDefs);
        //

        // function getDataTypeByColumnName(columnName: string) {
        //
        //     switch (columnName) {
        //         case 'id':
        //             return idType; //'bigint' or 'text';
        //         case 'timestamp':
        //         case 'from_timestamp':
        //         case 'to_timestamp':
        //         case 'fromTimestamp':
        //         case 'toTimestamp':
        //             return 'timestamptz';
        //         case 'node':
        //         case 'way':
        //         case 'relation':
        //         case 'value':
        //         case 'value2':
        //         case 'ratio':
        //         case 'value_per_area':
        //             return 'double precision';
        //         case 'attrName':
        //         case 'tag':
        //             return 'text';
        //         //timestamp or timerange in horizontal mode as columnNames
        //         case  (columnName.split('/').every((datetime) => moment(datetime).isValid())) ? columnName : null:
        //             return 'double precision';
        //         default:
        //             return 'bigint';
        //     }
        // }


        // let columnDefs = properties.map((property) => `"${property}" ${getDataTypeByColumnName(property)}`);

        // if (hasGeometry) {
        //     const geometryType = geoJSONFeature.geometry.type;
        //     epsg = (!!epsg) ? epsg : '4326';
        //     const geomColumnDef = `geom geometry(${geometryType},${epsg})`;
        //     columnDefs.push(geomColumnDef);
        // }

357
358
        const columnDefsString = columnDefs.join(',');

359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
        let partitionBySQL = '';
        let partitionsCreateSQL = '';
        if(this.shouldCreatePartitions){
            partitionBySQL = `PARTITION BY LIST(${this.partitionBy!.key});`
            partitionsCreateSQL = this.partitionBy!.values.map(
                val => `CREATE TABLE IF NOT EXISTS ${this.schemaName}."${this.partitionBaseName}_${val}" PARTITION OF ${this.schemaName}."${this.tableName}" FOR VALUES IN (${val});`)
                .join(`
                `);

            if(!!this.partitionBy!.createDefaultPartition){
                partitionsCreateSQL += `
                CREATE TABLE IF NOT EXISTS ${this.schemaName}."${this.partitionBaseName}_default" 
                PARTITION OF ${this.schemaName}."${this.tableName}" DEFAULT;
                `
            }
        }


        const sql = `CREATE TABLE IF NOT EXISTS ${this.schemaName}."${this.tableName}" (${columnDefsString})
         ${partitionBySQL}
         ${partitionsCreateSQL}
        `;
381

382
        console.log((sql.length > 70) ? sql.substring(0, 67) + '...' : sql);
383
        console.log(sql);
384
385
386
        let res = await this.db.none(sql);
        this.tableExists = true;
        console.log(`✔️ TABLE is ready.`)
387
        return res;
388
    }
389

390
391
    /**
     * @param featureCollection may or may not have geometries (geometry: null)
392
     * @param pgColumnTypes
393
     **/
394
395
    //async writeFeatures(featureCollection: FeatureCollection, pgTypeMappings: object) {}
    async writeFeatures(featureCollection: FeatureCollection, pgColumnTypes: PgColumnTypes): Promise<void> {
396
397
398
399
400
401
402
403
404
405
406

        //do nothing when feature collection is empty
        if (featureCollection.features.length == 0) {
            console.log('WARNING: No features in feature collection. No feature writing.');
            return;
        }

        // @ts-ignore
        let epsg = (featureCollection.crs && featureCollection.crs.properties && featureCollection.crs.properties.name == 'EPSG:3857') ? '3857' : '4326';

        if (!await this.tableExists) {
407
            await this.createTableFromGeoJSONFeature(featureCollection.features[0], pgColumnTypes);
408
409
410
411
412
413
414
            // await this.createTableFromOhsome(ohsomeGroupByBoundaryResponseJSON, false);
        }

        //check if geometries exists
        const hasGeometry = !!featureCollection.features[0].geometry;

        //get columns from properties
Michael Auer's avatar
Michael Auer committed
415

416
417
418
        // create a column SET
        let columns: string[] = (!!featureCollection.features[0].properties) ? Object.keys(featureCollection.features[0].properties) : [];
        let columnsDef: Column[] = columns.sort(PgColumnTypes.columnOrderFn).slice().map(col => {
419
            return new this.pgp.helpers.Column({name:col, prop:`_${col.replace(/[-:\/]/g,'_')}`});
420
421
422
423
424
425
426
427
428
429
430
        });
        let geomFromGeoJSONFn = (col: { value: any; }) => {
            const geometry = col.value;
            if (geometry) {
                return {
                    toPostgres: () => this.pgp.as.format('ST_SetSRID(ST_GeomFromGeoJSON(${geom}), ${epsg})', {
                        geom: JSON.stringify(geometry),
                        epsg
                    }),
                    rawType: true
                };
431
            }
432
            // otherwise, we return nothing, which will result into NULL automatically
433
434
        }

435
436
437
        if (hasGeometry){
            columnsDef.push(new this.pgp.helpers.Column({name: 'geom', init: geomFromGeoJSONFn}));
        }
438

Michael Auer's avatar
Michael Auer committed
439

440
441
442
443
444
445
        let columnSet = new this.pgp.helpers.ColumnSet(columnsDef, {
            table: new this.pgp.helpers.TableName({
                table: this.tableName,
                schema: this.schemaName
            })
        });
446

447

448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
        //
        // function castToDataType(value: any, index: number) {
        //     let property = properties[index];
        //     switch (property) {
        //         case 'id':
        //             return (typeof value === 'number') ? value : `'${value}'`; //bigint or text
        //         case 'timestamp':
        //         case 'from_timestamp':
        //         case 'to_timestamp':
        //         case 'fromTimestamp':
        //         case 'toTimestamp':
        //         case 'attrName':
        //         case 'tag':
        //             return `'${value}'`;
        //         case 'node':
        //         case 'way':
        //         case 'relation':
        //         case 'value':
        //         case 'value2':
        //         case 'ratio':
        //         case 'value_per_area':
        //             return (!isNaN(parseFloat(value))) ? parseFloat(value) : 'NULL';
        //         // check for ISO ranges too to support horizontal timestamps for contributionView data
        //         case (property.split('/').every((datetime) => moment(datetime).isValid())) ? property : null:
        //             return (!isNaN(parseFloat(value))) ? parseFloat(value) : 'NULL';
        //         default:
        //             return value;
        //     }
        // }
477

478
479
480
481
        let values = featureCollection.features.map((feature) => {
            let newProps: {[p:string]:any} = {};
            //for horizontal timestamps column names can't start with a digit
            for (const key in feature.properties) {
482
                newProps[`_${key.replace(/[-:\/]/g,'_')}`] = feature.properties[key];
483
484
485
486
            }
                return {...newProps, geom: feature.geometry};
            }
        );
487
488

        console.log(`Writing ${featureCollection.features.length} features into ${this.schemaName}."${this.tableName}".`);
489

490
491
492
493
494
        return await this.db.tx('insert-transaction', async t => {

            const multiRowsSize = 10;
            //modifying values into Array of Arrays with multiRowSize each. Chunking array into smaller chunkarrays.
            let multiValues = [...Array(Math.ceil(values.length / multiRowsSize))]
495
                .map(_ => values.splice(0, multiRowsSize));
496
497
498
499
500


            return await t.sequence(
                async (index: number) => {
                    if (index < multiValues.length) {
501
502
                        return await t.none(this.pgp.helpers.insert(multiValues[index],columnSet));
                        // return await t.none(`INSERT INTO ${this.schemaName}."${this.tableName}" (${columnNames} ) VALUES ${multiValues[index]}`);
503
504
505
506
                    }
                }
            )
        })
507
508
    }

509
510
511
    async createIndex(column: string, indexType?: string) {
        column = column.replace(' ', '_');
        const usingIndexType = (!!indexType) ? `USING ${indexType}` : '';
512
        const sql = `CREATE INDEX IF NOT EXISTS ${this.tableName.replace(' ', '_')}_${column}_idx ON ${this.schemaName}."${this.tableName}" ${usingIndexType} ("${column}");`;
513
        console.log(sql);
514
        return await this.db.none(sql);
515
    }
516

517
518
519
    async clusterTable(column: string) {
        const sql = `CLUSTER ${this.schemaName}."${this.tableName}" USING ${this.tableName.replace(' ', '_')}_${column}_idx`;
        console.log(sql);
520
        return await this.db.none(sql);
521
    }
522

523
524
525
    async analyzeTable() {
        const sql = `ANALYZE ${this.schemaName}."${this.tableName}"`;
        console.log(sql);
526
        return await this.db.none(sql);
527
    }
528

529
    async finalize() {
530
        await this.closeConnections();
531
        console.log(`Connection to database '${this.store.database}' successfully closed.`)
532
533
534
535
    }

}