Commit 6077e492 authored by Michael Auer's avatar Michael Auer
Browse files

adding events for progress and finished

adding dynamic retries with adjusted fetch size
adding custom Ohsome2XError for error handling
parent b720e13d
......@@ -117,7 +117,7 @@ $ npx @giscience/ohsome2x run --conf myquery.json
## API
### Basic Usage
Node:
```js
const Ohsome2X = require('@giscience/ohsome2x');
......@@ -153,7 +153,77 @@ const ohsome2x = new Ohsome2X(config);
ohsome2x.run().catch(console.log);
```
### Related
### Events
The Ohsome2X object currently provides 2 event types that can be listened to:
- `progress` for getting progress updates. Especially useful for iterative processing of large input datasets
`progress` provides the following object to your callback (use processed and total to compute a progress percentage):
```json
{
"type": "progress",
"processed": <number> of already finished input geometries,
"total": <number> of all input geometries,
"cursor": <number> that indicates the current state of iterative processing in case you configured source and target as PostGIS-Stores and specified a fetchSize,
"fetchSize": <number> fetchSize that is currently used to fetch the next chunk of source data,
"timestamp": <string> ISO timestamp
}
```
- `finished` to get some information on duration and finishing time:
```json
{
"type": "finished",
"duration": <string> a human readable duration like '2 days 13 hours 45 min 12 seconds',
"duration_ms": <number> duration in milliseconds,
"timestamp": <string> ISO timestamp
}
```
Example:
```js
const Ohsome2X = require('@giscience/ohsome2x');
// you can create this config using the command-line wizard, run: npx ohsome2x-cli
const config = {
ohsomeQuery: {
"queryType": "users/count/groupBy/boundary",
"filter": "natural=tree and type:node",
"time": "2010/2020/P1Y"
},
source: {
"schema": "public",
"name": "my_source_table_name",
"geometryId": "id", // name of the id column. Can be numbers or strings.
"geometryColumn": "geom",
"fetchSize": 5000, // this option triggers an iterative processing
"cursor": null, // can be used to resume an interrupted iterative processing at a certain position. Get this information from the 'fetch' event.
"store": {
"host": "your.postgis.server.com",
"port": "5434",
"database": "your_database_name",
"user": "username",
"password": "secret",
"type": "postgis"
}
},
target: {...another PostGIS target config...}
}
const ohsome2x = new Ohsome2X(config);
// define functions to be executed when an event is reported
ohsome2x.on('progress', (evt)=>console.log(evt, ((evt.processed / evt.total) * 100).toFixed(0) +'%'));
ohsome2x.on('finished', (evt)=>console.log(evt.duration));
// This will return a Promise
ohsome2x.run().catch(console.log);
```
## Related
- [OhsomeHeX](https://ohsome.org/apps/osm-history-explorer) - The OSM History Explorer: Uses this library as backend
- [ohsome API](https://api.ohsome.org) - WebAPI to query OSM History Data
......
import {
Ohsome2XConfig,
OhsomeQueryConfig, OhsomeQueryConfigFormat,
OhsomeQueryConfig,
OhsomeQueryConfigFormat,
TargetPostgisFeatureTypeConfig
} from './config_types_interfaces';
import normalizeUrl from 'normalize-url';
......@@ -10,16 +11,17 @@ import {FeatureTypeFactory} from './FeatureTypeFactory';
import {Feature, Geometry} from '@turf/helpers';
import turfArea from '@turf/area';
import * as querystring from 'querystring';
import axios, {AxiosRequestConfig, AxiosResponse} from 'axios';
import axios, {AxiosError, AxiosRequestConfig, AxiosResponse} from 'axios';
import defaultConfig from './conf/default';
import {EventEmitter} from 'events';
import Ohsome2XError from './Ohsome2XError';
let OHSOME_API_URL = normalizeUrl(defaultConfig.OHSOME_API_URL); //remove trailing slash and other things
class Ohsome2X extends EventEmitter {
private config: Ohsome2XConfig;
private cursor: number | string | undefined;
private cursor: number | string | null | undefined;
private fetchSize: number | null;
private storeZeroValues: boolean;
private computeValuePerArea: boolean | undefined;
......@@ -36,9 +38,9 @@ class Ohsome2X extends EventEmitter {
super(); //EventEmitter
this.config = config;
if ('cursor' in this.config.source) {
this.cursor = this.config.source.cursor;
}
this.cursor = ('cursor' in this.config.source)? this.config.source.cursor : null;
// @ts-ignore fetchSize only available for PostgisStoreConfig
this.fetchSize = (!!this.config.source.fetchSize) ? parseInt(this.config.source.fetchSize) : null;
this.storeZeroValues = (!('storeZeroValues' in this.config.target)) ? true : !!this.config.target.storeZeroValues;
......@@ -78,11 +80,19 @@ class Ohsome2X extends EventEmitter {
this.targetFeatureType = await FeatureTypeFactory.create(this.config.target);
this.totalFeatureCount = await this.sourceFeatureType.getFeatureCount();
this.emit('progress', {type:'progress', processed: this.currentFeatureCount, total: this.totalFeatureCount, timestamp: new Date().toISOString()});
process.on('SIGINT', async ()=>{
console.log('\nSIGINT signal received. Closing existing connections.');
this.sourceFeatureType!.finalize();
await this.targetFeatureType!.finalize();
process.exit(0);
})
} catch (e) {
console.log(e);
this.emit('error', {type:'error', error: e, message: 'Could not initialize FeatureTypes.', config: this.config, timestamp: new Date().toISOString()});
throw new Error('Could not initialize FeatureTypes.');
//console.log(e);
// this.emit('error', {type:'error', error: e, message: 'Could not initialize FeatureTypes.', config: this.config, cursor: this.cursor, timestamp: new Date().toISOString()});
throw new Ohsome2XError('Could not initialize FeatureTypes.', {cursor: this.cursor}, e);
// throw new Error('Could not initialize FeatureTypes.');
}
//createGeometryOnTarget? default is true
......@@ -99,9 +109,10 @@ class Ohsome2X extends EventEmitter {
try {
await this.targetFeatureType.delete();
} catch (e) {
console.log(e);
this.emit('error', {type:'error', error: e, message: 'Could not delete existing PostGIS table.', config: this.config, timestamp: new Date().toISOString()});
throw new Error('Could not delete existing PostGIS table.');
// console.log(e);
// this.emit('error', {type:'error', error: e, message: 'Could not delete existing PostGIS table.', config: this.config, cursor: this.cursor, timestamp: new Date().toISOString()});
throw new Ohsome2XError('Could not delete existing PostGIS table.', {cursor: this.cursor}, e);
// throw new Error('Could not delete existing PostGIS table.');
}
}
......@@ -132,25 +143,27 @@ class Ohsome2X extends EventEmitter {
cursor = '';
}
let fetchSize; //might be adjusted automatically if query takes too long
fetchSize = this.fetchSize;
let timesFetchSizeReduced = 0;
let featureCount = 1; // 1 to pass break test first time
try {
while (true) {
//emit event before fetching data
this.emit('fetch', {type: 'fetch', originalConfig: this.config, currentCursor: cursor, timestamp: new Date().toISOString()});
this.emit('progress', {type:'progress', processed: this.currentFeatureCount, total: this.totalFeatureCount, cursor: cursor, fetchSize: fetchSize, timestamp: new Date().toISOString()});
const sourceFeatureCollection: any = await this.sourceFeatureType.getFeaturesByCursorAndLimit(cursor, this.fetchSize);
const sourceFeatureCollection: any = await this.sourceFeatureType.getFeaturesByCursorAndLimit(cursor, fetchSize);
let targetFeatureCollection;
featureCount = sourceFeatureCollection.features.length;
this.currentFeatureCount += featureCount;
if (featureCount == 0) {
console.log('No more cells.');
break;
}
// @ts-ignore
//cursor = parseInt(sourceFeatureCollection.features[sourceFeatureCollection.features.length - 1].properties.id);
cursor = sourceFeatureCollection.features[sourceFeatureCollection.features.length - 1].properties.id;
console.time('computeArea');
let idAreaMap: Map<any, number>;
......@@ -162,15 +175,73 @@ class Ohsome2X extends EventEmitter {
console.timeEnd('computeArea');
//build bpolys and add to ohsomeQuery
//api requires ids to be strings
//sourceFeatureCollection.features.forEach((feature)=>{feature.properties.id = String(feature.properties.id)});
this.config.ohsomeQuery.bpolys = sourceFeatureCollection;
console.time('query');
let ohsomeResults = await this.getOhsomeResults(this.config.ohsomeQuery);
console.timeEnd('query');
let ohsomeResults: AxiosResponse<any>;
try {
let queryStart = Date.now();
// ohsomeResults = await this.getOhsomeResults(this.config.ohsomeQuery);
ohsomeResults = await Promise.reject({isAxiosError: true, response: {status: 413, statusText:'Payload too large', data:{}, headers:{}}, code: 'ERRTIMEOUT', config:{data:{}}});
let queryEnd = Date.now();
let queryDuration = queryEnd - queryStart; //duration in milliseconds
console.log('Query duration (sec)', (queryDuration/1000));
//recover reduced fetch size if response times are lower than 4 minutes
if (fetchSize < this.fetchSize && queryDuration <= 4 * 60 * 1000){
timesFetchSizeReduced--;
timesFetchSizeReduced = Math.max(0,timesFetchSizeReduced); //should never get negative, 0 means original fetchSize
fetchSize = Math.min(this.fetchSize, Math.round(this.fetchSize / (2 ** timesFetchSizeReduced)));
console.log('INCREASING fetchSize...');
console.log('timesFetchSizeReduced', timesFetchSizeReduced);
console.log('currentFetchSize', fetchSize, 'originalFetchSize', this.fetchSize);
}
} catch (e) {
if(this.config.ohsomeQuery.bpolys) {
delete this.config.ohsomeQuery.bpolys; //avoid logging
}
if (e.isAxiosError) {
let axiosError: AxiosError = e;
delete axiosError.config.data; //avoid logging bpoly data
if (axiosError.response) {
if (axiosError.response.status == 413) {
//try reducing fetch size as long as possible before failing
//check if minimum fetch size alredy reached
if (fetchSize <= 1) {
throw new Ohsome2XError('Could not get Ohsome results, even with fetchSize: 1. Processing took too long with current query and service configuration.', {}, axiosError);
}
//payload too large: reducing fetch size
timesFetchSizeReduced++;
fetchSize = Math.max(1, Math.round(this.fetchSize / (2 ** timesFetchSizeReduced)));
console.log('REDUCING fetchSize...');
console.log('timesFetchSizeReduced', timesFetchSizeReduced);
console.log('currentFetchSize', fetchSize, 'originalFetchSize', this.fetchSize);
continue; //restart loop with reduced fetchSize
} else {
//other error codes
throw axiosError;
}
} else if (axiosError.request) {
// The request was made but no response was received
// `error.request` is an instance of XMLHttpRequest in the browser and an instance of
// http.ClientRequest in node.js
console.log('No response received.');
throw axiosError;
}
} else {
//no AxiosError something esle has happened
throw e;
}
}
ohsomeResults = ohsomeResults!;
this.isContributionView = this.checkIsContributionView(ohsomeResults.data);
console.log('Start conversion ohsome-result to GeoJSON');
......@@ -178,7 +249,7 @@ class Ohsome2X extends EventEmitter {
if (shouldCreateGeometry) {
if (typeof ohsomeResults.data == 'string') {
console.log('-----------------------------------------');
console.log(ohsomeResults.data.substring(0, 400));
console.log(ohsomeResults.data.substring(0, 400) + '...');
console.log('-----------------------------------------');
}
targetFeatureCollection = GeoJsonFeatureType.fromOhsome(ohsomeResults.data, shouldWriteHorizontalTimestamps, sourceFeatureCollection, transformToWebmercator);
......@@ -186,7 +257,7 @@ class Ohsome2X extends EventEmitter {
} else {
if (typeof ohsomeResults.data == 'string') {
console.log('-----------------------------------------');
console.log(ohsomeResults.data.substring(0, 400));
console.log(ohsomeResults.data.substring(0, 400) + '...');
console.log('-----------------------------------------');
}
targetFeatureCollection = GeoJsonFeatureType.fromOhsome(ohsomeResults.data, shouldWriteHorizontalTimestamps);
......@@ -213,21 +284,29 @@ class Ohsome2X extends EventEmitter {
console.timeEnd('computeValuePerArea');
await this.targetFeatureType.writeFeatures(targetFeatureCollection);
console.log('-----------------------------------------' + Date.now() + '-----------------------------------------');
this.emit('progress', {type:'progress', processed: this.currentFeatureCount, total: this.totalFeatureCount, timestamp: new Date().toISOString()});
//update cursor for the next round of fetches
// @ts-ignore
cursor = sourceFeatureCollection.features[sourceFeatureCollection.features.length - 1].properties.id;
//update finished featureCount
this.currentFeatureCount += featureCount;
console.log('-----------------------------------------' + new Date().toISOString() + '-----------------------------------------');
}
} catch (e) {
console.log(e);
this.emit('error', {type:'error', error: e, message: 'Could not create ohsome data.', config: this.config, timestamp: new Date().toISOString()});
// console.log(e);
this.sourceFeatureType.finalize();
this.targetFeatureType.finalize();
throw new Error('Could not create ohsome data.')
throw new Ohsome2XError('Could not create ohsome data.', {cursor: cursor}, e);
}
} else {
// create all at once
try {
this.emit('progress', {type:'progress', processed: this.currentFeatureCount, total: this.totalFeatureCount, cursor: this.cursor, fetchSize: this.totalFeatureCount, timestamp: new Date().toISOString()});
let targetFeatureCollection;
const sourceFeatureCollection = await this.sourceFeatureType.getFeatures();
......@@ -258,7 +337,7 @@ class Ohsome2X extends EventEmitter {
if (shouldCreateGeometry) {
if (typeof ohsomeResults.data == 'string') { //if format is csv show beginning of result
console.log('-----------------------------------------');
console.log(ohsomeResults.data.substring(0, 400));
console.log(ohsomeResults.data.substring(0, 400) + '...');
console.log('-----------------------------------------');
}
targetFeatureCollection = GeoJsonFeatureType.fromOhsome(ohsomeResults.data, shouldWriteHorizontalTimestamps, sourceFeatureCollection, transformToWebmercator);
......@@ -267,7 +346,7 @@ class Ohsome2X extends EventEmitter {
if (typeof ohsomeResults.data == 'string') { //if format is csv show beginning of result
console.log('-----------------------------------------');
console.log(ohsomeResults.data.substring(0, 400));
console.log(ohsomeResults.data.substring(0, 400) + '...');
console.log('-----------------------------------------');
}
targetFeatureCollection = GeoJsonFeatureType.fromOhsome(ohsomeResults.data, shouldWriteHorizontalTimestamps);
......@@ -293,19 +372,16 @@ class Ohsome2X extends EventEmitter {
// console.log(JSON.stringify(targetFeatureCollection, undefined, 2));
await this.targetFeatureType.writeFeatures(targetFeatureCollection);
this.emit('progress', {type:'progress', processed: this.currentFeatureCount, total: this.totalFeatureCount, timestamp: new Date().toISOString()});
this.emit('progress', {type:'progress', processed: this.currentFeatureCount, total: this.totalFeatureCount, cursor: this.cursor, fetchSize: this.totalFeatureCount, timestamp: new Date().toISOString()});
} catch (e) {
console.log(e);
this.emit('error', {type:'error', error: e, message: 'Could not create ohsome data.', config: this.config, timestamp: new Date().toISOString()});
// console.log(e);
this.sourceFeatureType.finalize();
this.targetFeatureType.finalize();
throw new Error('Could not create ohsome data.');
throw new Ohsome2XError('Could not create ohsome data.', {cursor: this.cursor}, e);
}
}
//createIndexes?
try {
// if(this.targetFeatureType.store.type == 'postgis' && !!this.config.target.createIndexes){
......@@ -349,11 +425,10 @@ class Ohsome2X extends EventEmitter {
}
}
} catch (e) {
console.log(e);
this.emit('error', {type:'error', error: e, message: 'Could not create indexes.', config: this.config, timestamp: new Date().toISOString()});
// console.log(e);
this.sourceFeatureType.finalize();
this.targetFeatureType.finalize();
throw new Error('Could not create indexes.');
throw new Ohsome2XError('Could not create indexes.', {}, e);
}
//finalize e.g. close connections
......@@ -371,7 +446,7 @@ class Ohsome2X extends EventEmitter {
console.log(`Duration: ${diffDays} days ${diffHrs} hours ${diffMins} min ${diffSecs} seconds`);
console.log('====================================================================================');
this.emit('finished', {type: 'finished', duration:`${diffDays} days ${diffHrs} hours ${diffMins} min ${diffSecs} seconds`, timestamp: this.log_end.toISOString()})
this.emit('finished', {type: 'finished', duration:`${diffDays} days ${diffHrs} hours ${diffMins} min ${diffSecs} seconds`, duration_ms: log_diff, timestamp: this.log_end.toISOString()});
}
///////////// methods
......@@ -426,8 +501,8 @@ class Ohsome2X extends EventEmitter {
return stats;
} catch (e) {
console.log('Ohsome API request failed.');
this.axiosErrorHandler(e);
throw new Error(e.message);
this.axiosErrorPrinter(e);
throw e; //Propagate the axios error
}
}
......@@ -461,7 +536,7 @@ class Ohsome2X extends EventEmitter {
return isContributionView;
}
axiosErrorHandler(error: any) {
axiosErrorPrinter(error: AxiosError) {
if (error.response) {
// The request was made and the server responded with a status code
// that falls out of the range of 2xx
......
import {Ohsome2XConfig} from './config_types_interfaces';
class Ohsome2XError extends Error {
status: {
config?: Ohsome2XConfig;
cursor?: number | string | null;
['name']?: any
} = {};
constructor(message: string, status: object, e?: Error) {
super(message);
// see https://github.com/Microsoft/TypeScript/wiki/Breaking-Changes#extending-built-ins-like-error-array-and-map-may-no-longer-work
Object.setPrototypeOf(this, Ohsome2XError.prototype);
this.name = this.constructor.name;
if (e instanceof Error){
this.stack = e.stack + '\n\n' + this.stack
}
if (e instanceof Ohsome2XError){
this.status = Object.assign(e.status, status);
} else {
this.status = status;
}
}
}
export = Ohsome2XError;
\ No newline at end of file
......@@ -159,7 +159,7 @@ export class PgFeatureType extends FeatureType {
FROM ${this.schemaName}."${this.tableName}"
WHERE "${this.geometryId}" > ${cursorSql}
ORDER BY "${this.geometryId}" ASC
LIMIT ${this.fetchSize};`;
LIMIT ${fetchSize};`;
console.log('Get cells from ' + this.tableName + ' with id > ' + cursorSql + ' with fetchSize: ' + fetchSize);
const result = await this.db.query(sql);
......@@ -336,6 +336,7 @@ export class PgFeatureType extends FeatureType {
async finalize() {
this.db.closeConnections();
console.log(`Connection to database '${this.store.database}' successfully closed.`)
}
}
......
......@@ -51,7 +51,7 @@ export interface PostgisFeatureTypeConfig extends FeatureTypeConfig<PostgisStore
export interface SourceGeoJSONFeatureTypeConfig extends GeoJSONFeatureTypeConfig, SourceFeatureTypeConfig {}
export interface SourcePostgisFeatureTypeConfig extends PostgisFeatureTypeConfig, SourceFeatureTypeConfig {
fetchSize?: number; // default is 0 ( -> compute all in one )
cursor?: number | string;
cursor?: number | string | null;
}
export interface TargetGeoJSONFeatureTypeConfig extends GeoJSONFeatureTypeConfig, TargetFeatureTypeConfig {}
export interface TargetPostgisFeatureTypeConfig extends PostgisFeatureTypeConfig, TargetFeatureTypeConfig {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment