Commit 68e01d0c authored by Michael Auer's avatar Michael Auer
Browse files

Merge branch...

Merge branch '4-add-events-to-ohsome2x-for-better-control-from-an-outer-process-onerror-onprogress-onfetch' into 'master'

Resolve "Add events to ohsome2x for better control from an outer process (onError, onProgress, onFetch, onFinished)"

Closes #4

See merge request !4
parents 7fca85ea 6077e492
......@@ -117,7 +117,7 @@ $ npx @giscience/ohsome2x run --conf myquery.json
## API
### Basic Usage
Node:
```js
const Ohsome2X = require('@giscience/ohsome2x');
......@@ -153,7 +153,77 @@ const ohsome2x = new Ohsome2X(config);
ohsome2x.run().catch(console.log);
```
### Related
### Events
The Ohsome2X object currently provides 2 event types that can be listened to:
- `progress` for getting progress updates. Especially useful for iterative processing of large input datasets
`progress` provides the following object to your callback (use processed and total to compute a progress percentage):
```json
{
"type": "progress",
"processed": <number> of already finished input geometries,
"total": <number> of all input geometries,
"cursor": <number> that indicates the current state of iterative processing in case you configured source and target as PostGIS-Stores and specified a fetchSize,
"fetchSize": <number> fetchSize that is currently used to fetch the next chunk of source data,
"timestamp": <string> ISO timestamp
}
```
- `finished` to get some information on duration and finishing time:
```json
{
"type": "finished",
"duration": <string> a human readable duration like '2 days 13 hours 45 min 12 seconds',
"duration_ms": <number> duration in milliseconds,
"timestamp": <string> ISO timestamp
}
```
Example:
```js
const Ohsome2X = require('@giscience/ohsome2x');
// you can create this config using the command-line wizard, run: npx ohsome2x-cli
const config = {
ohsomeQuery: {
"queryType": "users/count/groupBy/boundary",
"filter": "natural=tree and type:node",
"time": "2010/2020/P1Y"
},
source: {
"schema": "public",
"name": "my_source_table_name",
"geometryId": "id", // name of the id column. Can be numbers or strings.
"geometryColumn": "geom",
"fetchSize": 5000, // this option triggers an iterative processing
"cursor": null, // can be used to resume an interrupted iterative processing at a certain position. Get this information from the 'fetch' event.
"store": {
"host": "your.postgis.server.com",
"port": "5434",
"database": "your_database_name",
"user": "username",
"password": "secret",
"type": "postgis"
}
},
target: {...another PostGIS target config...}
}
const ohsome2x = new Ohsome2X(config);
// define functions to be executed when an event is reported
ohsome2x.on('progress', (evt)=>console.log(evt, ((evt.processed / evt.total) * 100).toFixed(0) +'%'));
ohsome2x.on('finished', (evt)=>console.log(evt.duration));
// This will return a Promise
ohsome2x.run().catch(console.log);
```
## Related
- [OhsomeHeX](https://ohsome.org/apps/osm-history-explorer) - The OSM History Explorer: Uses this library as backend
- [ohsome API](https://api.ohsome.org) - WebAPI to query OSM History Data
......
......@@ -11,7 +11,12 @@ export abstract class FeatureType {
}
/**
* @returns GeoJSON<FeatureCollection>
* @return number the total number of features
*/
abstract async getFeatureCount(): Promise<number>
/**
* @return GeoJSON<FeatureCollection>
**/
abstract async getFeatures(): Promise<FeatureCollection>;
......
......@@ -54,10 +54,18 @@ export class GeoJsonFeatureType extends FeatureType {
}
};
/**
* returns the total number of features
*/
async getFeatureCount(): Promise<number> {
const geojson = await this.getFeatures();
return geojson.features.length;
}
/**
* @returns GeoJSON<FeatureCollection>
**/
getFeatures() {
async getFeatures(): Promise<FeatureCollection> {
if (!fs.existsSync(this.path)) {
throw new Error('Cannot get features. File does not exist: ' + this.path);
}
......
import {
Ohsome2XConfig,
OhsomeQueryConfig, OhsomeQueryConfigFormat,
OhsomeQueryConfig,
OhsomeQueryConfigFormat,
TargetPostgisFeatureTypeConfig
} from './config_types_interfaces';
import normalizeUrl from 'normalize-url';
......@@ -10,15 +11,17 @@ import {FeatureTypeFactory} from './FeatureTypeFactory';
import {Feature, Geometry} from '@turf/helpers';
import turfArea from '@turf/area';
import * as querystring from 'querystring';
import axios, {AxiosRequestConfig, AxiosResponse} from 'axios';
import axios, {AxiosError, AxiosRequestConfig, AxiosResponse} from 'axios';
import defaultConfig from './conf/default';
import {EventEmitter} from 'events';
import Ohsome2XError from './Ohsome2XError';
let OHSOME_API_URL = normalizeUrl(defaultConfig.OHSOME_API_URL); //remove trailing slash and other things
class Ohsome2X {
class Ohsome2X extends EventEmitter {
private config: Ohsome2XConfig;
private cursor: number | string | undefined;
private cursor: number | string | null | undefined;
private fetchSize: number | null;
private storeZeroValues: boolean;
private computeValuePerArea: boolean | undefined;
......@@ -28,13 +31,16 @@ class Ohsome2X {
private log_start: Date;
private log_end: Date;
private isContributionView: boolean | undefined;
private totalFeatureCount: number = 0;
private currentFeatureCount: number = 0;
constructor(config: Ohsome2XConfig) {
super(); //EventEmitter
this.config = config;
if ('cursor' in this.config.source) {
this.cursor = this.config.source.cursor;
}
this.cursor = ('cursor' in this.config.source)? this.config.source.cursor : null;
// @ts-ignore fetchSize only available for PostgisStoreConfig
this.fetchSize = (!!this.config.source.fetchSize) ? parseInt(this.config.source.fetchSize) : null;
this.storeZeroValues = (!('storeZeroValues' in this.config.target)) ? true : !!this.config.target.storeZeroValues;
......@@ -67,13 +73,26 @@ class Ohsome2X {
async run() {
console.log('RUN');
//initialize featureTypes with info if table aleady exists;
try {
this.sourceFeatureType = await FeatureTypeFactory.create(this.config.source);
this.targetFeatureType = await FeatureTypeFactory.create(this.config.target);
this.totalFeatureCount = await this.sourceFeatureType.getFeatureCount();
process.on('SIGINT', async ()=>{
console.log('\nSIGINT signal received. Closing existing connections.');
this.sourceFeatureType!.finalize();
await this.targetFeatureType!.finalize();
process.exit(0);
})
} catch (e) {
console.log(e);
throw new Error('Could not initialize FeatureTypes.');
//console.log(e);
// this.emit('error', {type:'error', error: e, message: 'Could not initialize FeatureTypes.', config: this.config, cursor: this.cursor, timestamp: new Date().toISOString()});
throw new Ohsome2XError('Could not initialize FeatureTypes.', {cursor: this.cursor}, e);
// throw new Error('Could not initialize FeatureTypes.');
}
//createGeometryOnTarget? default is true
......@@ -87,7 +106,15 @@ class Ohsome2X {
//dropTableIfexisits and views, do not delete if completing table from explicitly specified cursor
// @ts-ignore
if (this.targetFeatureType.store.type == 'postgis' && this.cursor == null) {
await this.targetFeatureType.delete();
try {
await this.targetFeatureType.delete();
} catch (e) {
// console.log(e);
// this.emit('error', {type:'error', error: e, message: 'Could not delete existing PostGIS table.', config: this.config, cursor: this.cursor, timestamp: new Date().toISOString()});
throw new Ohsome2XError('Could not delete existing PostGIS table.', {cursor: this.cursor}, e);
// throw new Error('Could not delete existing PostGIS table.');
}
}
//createItertively or createAllAtOnce (posgis source and tagrget only)
......@@ -116,21 +143,27 @@ class Ohsome2X {
cursor = '';
}
let fetchSize; //might be adjusted automatically if query takes too long
fetchSize = this.fetchSize;
let timesFetchSizeReduced = 0;
let featureCount = 1; // 1 to pass break test first time
try {
while (true) {
const sourceFeatureCollection: any = await this.sourceFeatureType.getFeaturesByCursorAndLimit(cursor, this.fetchSize);
this.emit('progress', {type:'progress', processed: this.currentFeatureCount, total: this.totalFeatureCount, cursor: cursor, fetchSize: fetchSize, timestamp: new Date().toISOString()});
const sourceFeatureCollection: any = await this.sourceFeatureType.getFeaturesByCursorAndLimit(cursor, fetchSize);
let targetFeatureCollection;
featureCount = sourceFeatureCollection.features.length;
if (featureCount == 0) {
console.log('No more cells.');
break;
}
// @ts-ignore
//cursor = parseInt(sourceFeatureCollection.features[sourceFeatureCollection.features.length - 1].properties.id);
cursor = sourceFeatureCollection.features[sourceFeatureCollection.features.length - 1].properties.id;
console.time('computeArea');
let idAreaMap: Map<any, number>;
......@@ -142,15 +175,73 @@ class Ohsome2X {
console.timeEnd('computeArea');
//build bpolys and add to ohsomeQuery
//api requires ids to be strings
//sourceFeatureCollection.features.forEach((feature)=>{feature.properties.id = String(feature.properties.id)});
this.config.ohsomeQuery.bpolys = sourceFeatureCollection;
console.time('query');
let ohsomeResults = await this.getOhsomeResults(this.config.ohsomeQuery);
console.timeEnd('query');
let ohsomeResults: AxiosResponse<any>;
try {
let queryStart = Date.now();
// ohsomeResults = await this.getOhsomeResults(this.config.ohsomeQuery);
ohsomeResults = await Promise.reject({isAxiosError: true, response: {status: 413, statusText:'Payload too large', data:{}, headers:{}}, code: 'ERRTIMEOUT', config:{data:{}}});
let queryEnd = Date.now();
let queryDuration = queryEnd - queryStart; //duration in milliseconds
console.log('Query duration (sec)', (queryDuration/1000));
//recover reduced fetch size if response times are lower than 4 minutes
if (fetchSize < this.fetchSize && queryDuration <= 4 * 60 * 1000){
timesFetchSizeReduced--;
timesFetchSizeReduced = Math.max(0,timesFetchSizeReduced); //should never get negative, 0 means original fetchSize
fetchSize = Math.min(this.fetchSize, Math.round(this.fetchSize / (2 ** timesFetchSizeReduced)));
console.log('INCREASING fetchSize...');
console.log('timesFetchSizeReduced', timesFetchSizeReduced);
console.log('currentFetchSize', fetchSize, 'originalFetchSize', this.fetchSize);
}
} catch (e) {
if(this.config.ohsomeQuery.bpolys) {
delete this.config.ohsomeQuery.bpolys; //avoid logging
}
if (e.isAxiosError) {
let axiosError: AxiosError = e;
delete axiosError.config.data; //avoid logging bpoly data
if (axiosError.response) {
if (axiosError.response.status == 413) {
//try reducing fetch size as long as possible before failing
//check if minimum fetch size alredy reached
if (fetchSize <= 1) {
throw new Ohsome2XError('Could not get Ohsome results, even with fetchSize: 1. Processing took too long with current query and service configuration.', {}, axiosError);
}
//payload too large: reducing fetch size
timesFetchSizeReduced++;
fetchSize = Math.max(1, Math.round(this.fetchSize / (2 ** timesFetchSizeReduced)));
console.log('REDUCING fetchSize...');
console.log('timesFetchSizeReduced', timesFetchSizeReduced);
console.log('currentFetchSize', fetchSize, 'originalFetchSize', this.fetchSize);
continue; //restart loop with reduced fetchSize
} else {
//other error codes
throw axiosError;
}
} else if (axiosError.request) {
// The request was made but no response was received
// `error.request` is an instance of XMLHttpRequest in the browser and an instance of
// http.ClientRequest in node.js
console.log('No response received.');
throw axiosError;
}
} else {
//no AxiosError something esle has happened
throw e;
}
}
ohsomeResults = ohsomeResults!;
this.isContributionView = this.checkIsContributionView(ohsomeResults.data);
console.log('Start conversion ohsome-result to GeoJSON');
......@@ -158,7 +249,7 @@ class Ohsome2X {
if (shouldCreateGeometry) {
if (typeof ohsomeResults.data == 'string') {
console.log('-----------------------------------------');
console.log(ohsomeResults.data.substring(0, 400));
console.log(ohsomeResults.data.substring(0, 400) + '...');
console.log('-----------------------------------------');
}
targetFeatureCollection = GeoJsonFeatureType.fromOhsome(ohsomeResults.data, shouldWriteHorizontalTimestamps, sourceFeatureCollection, transformToWebmercator);
......@@ -166,7 +257,7 @@ class Ohsome2X {
} else {
if (typeof ohsomeResults.data == 'string') {
console.log('-----------------------------------------');
console.log(ohsomeResults.data.substring(0, 400));
console.log(ohsomeResults.data.substring(0, 400) + '...');
console.log('-----------------------------------------');
}
targetFeatureCollection = GeoJsonFeatureType.fromOhsome(ohsomeResults.data, shouldWriteHorizontalTimestamps);
......@@ -193,27 +284,39 @@ class Ohsome2X {
console.timeEnd('computeValuePerArea');
await this.targetFeatureType.writeFeatures(targetFeatureCollection);
console.log('-----------------------------------------' + Date.now() + '-----------------------------------------');
//update cursor for the next round of fetches
// @ts-ignore
cursor = sourceFeatureCollection.features[sourceFeatureCollection.features.length - 1].properties.id;
//update finished featureCount
this.currentFeatureCount += featureCount;
console.log('-----------------------------------------' + new Date().toISOString() + '-----------------------------------------');
}
} catch (e) {
console.log(e);
// console.log(e);
this.sourceFeatureType.finalize();
this.targetFeatureType.finalize();
throw new Error('Could not create ohsome data.')
throw new Ohsome2XError('Could not create ohsome data.', {cursor: cursor}, e);
}
} else {
// create all at once
try {
this.emit('progress', {type:'progress', processed: this.currentFeatureCount, total: this.totalFeatureCount, cursor: this.cursor, fetchSize: this.totalFeatureCount, timestamp: new Date().toISOString()});
let targetFeatureCollection;
const sourceFeatureCollection = await this.sourceFeatureType.getFeatures();
this.currentFeatureCount = sourceFeatureCollection.features.length;
console.time('computeArea');
let idAreaMap: Map<any, number>;
//in horizontal timestamp layout we can't have value and value_per_area, only one value is possible
if (!shouldWriteHorizontalTimestamps && this.computeValuePerArea) {
let idArea = sourceFeatureCollection.features.map((feature: any) => [feature.properties.id, turfArea(feature.geometry)]);
let idArea: any[any][number] = sourceFeatureCollection.features.map((feature: any) => [feature.properties.id, turfArea(feature.geometry)]);
idAreaMap = new Map(idArea);
}
console.timeEnd('computeArea');
......@@ -234,7 +337,7 @@ class Ohsome2X {
if (shouldCreateGeometry) {
if (typeof ohsomeResults.data == 'string') { //if format is csv show beginning of result
console.log('-----------------------------------------');
console.log(ohsomeResults.data.substring(0, 400));
console.log(ohsomeResults.data.substring(0, 400) + '...');
console.log('-----------------------------------------');
}
targetFeatureCollection = GeoJsonFeatureType.fromOhsome(ohsomeResults.data, shouldWriteHorizontalTimestamps, sourceFeatureCollection, transformToWebmercator);
......@@ -243,7 +346,7 @@ class Ohsome2X {
if (typeof ohsomeResults.data == 'string') { //if format is csv show beginning of result
console.log('-----------------------------------------');
console.log(ohsomeResults.data.substring(0, 400));
console.log(ohsomeResults.data.substring(0, 400) + '...');
console.log('-----------------------------------------');
}
targetFeatureCollection = GeoJsonFeatureType.fromOhsome(ohsomeResults.data, shouldWriteHorizontalTimestamps);
......@@ -269,12 +372,12 @@ class Ohsome2X {
// console.log(JSON.stringify(targetFeatureCollection, undefined, 2));
await this.targetFeatureType.writeFeatures(targetFeatureCollection);
//await this.targetFeatureType.writeOhsomeFeatures(ohsomeResults.data, false);
this.emit('progress', {type:'progress', processed: this.currentFeatureCount, total: this.totalFeatureCount, cursor: this.cursor, fetchSize: this.totalFeatureCount, timestamp: new Date().toISOString()});
} catch (e) {
console.log(e);
// console.log(e);
this.sourceFeatureType.finalize();
this.targetFeatureType.finalize();
throw new Error('Could not create ohsome data.');
throw new Ohsome2XError('Could not create ohsome data.', {cursor: this.cursor}, e);
}
}
......@@ -322,10 +425,10 @@ class Ohsome2X {
}
}
} catch (e) {
console.log(e);
// console.log(e);
this.sourceFeatureType.finalize();
this.targetFeatureType.finalize();
throw new Error('Could not create indexes.');
throw new Ohsome2XError('Could not create indexes.', {}, e);
}
//finalize e.g. close connections
......@@ -342,6 +445,8 @@ class Ohsome2X {
console.log('====================================================================================');
console.log(`Duration: ${diffDays} days ${diffHrs} hours ${diffMins} min ${diffSecs} seconds`);
console.log('====================================================================================');
this.emit('finished', {type: 'finished', duration:`${diffDays} days ${diffHrs} hours ${diffMins} min ${diffSecs} seconds`, duration_ms: log_diff, timestamp: this.log_end.toISOString()});
}
///////////// methods
......@@ -396,8 +501,8 @@ class Ohsome2X {
return stats;
} catch (e) {
console.log('Ohsome API request failed.');
this.axiosErrorHandler(e);
throw new Error(e.message);
this.axiosErrorPrinter(e);
throw e; //Propagate the axios error
}
}
......@@ -431,7 +536,7 @@ class Ohsome2X {
return isContributionView;
}
axiosErrorHandler(error: any) {
axiosErrorPrinter(error: AxiosError) {
if (error.response) {
// The request was made and the server responded with a status code
// that falls out of the range of 2xx
......
import {Ohsome2XConfig} from './config_types_interfaces';
class Ohsome2XError extends Error {
status: {
config?: Ohsome2XConfig;
cursor?: number | string | null;
['name']?: any
} = {};
constructor(message: string, status: object, e?: Error) {
super(message);
// see https://github.com/Microsoft/TypeScript/wiki/Breaking-Changes#extending-built-ins-like-error-array-and-map-may-no-longer-work
Object.setPrototypeOf(this, Ohsome2XError.prototype);
this.name = this.constructor.name;
if (e instanceof Error){
this.stack = e.stack + '\n\n' + this.stack
}
if (e instanceof Ohsome2XError){
this.status = Object.assign(e.status, status);
} else {
this.status = status;
}
}
}
export = Ohsome2XError;
\ No newline at end of file
......@@ -113,6 +113,15 @@ export class PgFeatureType extends FeatureType {
}
}
/**
* returns the total number of features
*/
async getFeatureCount(): Promise<number> {
const sql = `SELECT count(*)
FROM ${this.schemaName}."${this.tableName}";`;
return await this.db.value(sql);
}
/**
get all Features
id column always returned as string
......@@ -150,7 +159,7 @@ export class PgFeatureType extends FeatureType {
FROM ${this.schemaName}."${this.tableName}"
WHERE "${this.geometryId}" > ${cursorSql}
ORDER BY "${this.geometryId}" ASC
LIMIT ${this.fetchSize};`;
LIMIT ${fetchSize};`;
console.log('Get cells from ' + this.tableName + ' with id > ' + cursorSql + ' with fetchSize: ' + fetchSize);
const result = await this.db.query(sql);
......@@ -327,6 +336,7 @@ export class PgFeatureType extends FeatureType {
async finalize() {
this.db.closeConnections();
console.log(`Connection to database '${this.store.database}' successfully closed.`)
}
}
......
......@@ -51,7 +51,7 @@ export interface PostgisFeatureTypeConfig extends FeatureTypeConfig<PostgisStore
export interface SourceGeoJSONFeatureTypeConfig extends GeoJSONFeatureTypeConfig, SourceFeatureTypeConfig {}
export interface SourcePostgisFeatureTypeConfig extends PostgisFeatureTypeConfig, SourceFeatureTypeConfig {
fetchSize?: number; // default is 0 ( -> compute all in one )
cursor?: number | string;
cursor?: number | string | null;
}
export interface TargetGeoJSONFeatureTypeConfig extends GeoJSONFeatureTypeConfig, TargetFeatureTypeConfig {}
export interface TargetPostgisFeatureTypeConfig extends PostgisFeatureTypeConfig, TargetFeatureTypeConfig {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment