import { Ohsome2XConfig, OhsomeQueryConfig, OhsomeQueryConfigFormat, TargetPostgisFeatureTypeConfig } from './config_types_interfaces'; import normalizeUrl from 'normalize-url'; import {GeoJsonFeatureType} from './GeoJsonFeatureType'; import {PgFeatureType} from './PgFeatureType'; import {FeatureTypeFactory} from './FeatureTypeFactory'; import {Feature, Geometry} from '@turf/helpers'; import turfArea from '@turf/area'; import * as querystring from 'querystring'; import axios, {AxiosError, AxiosRequestConfig, AxiosResponse} from 'axios'; import defaultConfig from './conf/default'; import {EventEmitter} from 'events'; import Ohsome2XError from './Ohsome2XError'; let OHSOME_API_URL = normalizeUrl(defaultConfig.OHSOME_API_URL); //remove trailing slash and other things class Ohsome2X extends EventEmitter { private config: Ohsome2XConfig; private cursor: number | string | null | undefined; private fetchSize: number | null; private storeZeroValues: boolean; private computeValuePerArea: boolean | undefined; private sourceFeatureType: GeoJsonFeatureType | PgFeatureType | null; private targetFeatureType: GeoJsonFeatureType | PgFeatureType | null; private ohsomeApiUrl: string; private log_start: Date; private log_end: Date; private isContributionView: boolean | undefined; private totalFeatureCount: number = 0; private currentFeatureCount: number = 0; constructor(config: Ohsome2XConfig) { super(); //EventEmitter this.config = config; this.cursor = ('cursor' in this.config.source)? this.config.source.cursor : null; // @ts-ignore fetchSize only available for PostgisStoreConfig this.fetchSize = (!!this.config.source.fetchSize) ? parseInt(this.config.source.fetchSize) : null; this.storeZeroValues = (!('storeZeroValues' in this.config.target)) ? true : !!this.config.target.storeZeroValues; this.computeValuePerArea = this.config.target.computeValuePerArea; this.sourceFeatureType = null; this.targetFeatureType = null; this.ohsomeApiUrl = config.ohsomeApiUrl || OHSOME_API_URL; //default use csv because its way faster than parsing json this.config.ohsomeQuery.format = (this.config.ohsomeQuery.format == null || this.config.ohsomeQuery.format.trim() === '')? 'csv' : this.config.ohsomeQuery.format.trim() as 'csv' | 'json'; this.log_start = new Date(); this.log_end = new Date(); console.log('Start at: ' + this.log_start.toLocaleString()); console.log('------------------------------'); console.log('Start Ohsome2X with:'); console.log('------------------------------'); console.log('Ohsome-API:', this.ohsomeApiUrl); console.log(JSON.stringify(this.config.ohsomeQuery, null, 2)); console.log('------------------------------'); console.log('Source of statistical areas:'); console.log(JSON.stringify(this.config.source, null, 2)); console.log('------------------------------'); console.log('Target of statistical results:'); console.log(JSON.stringify(this.config.target, null, 2)); console.log('------------------------------'); //this.run().catch(console.log); } async run() { console.log('RUN'); //initialize featureTypes with info if table aleady exists; try { this.sourceFeatureType = await FeatureTypeFactory.create(this.config.source); this.targetFeatureType = await FeatureTypeFactory.create(this.config.target); this.totalFeatureCount = await this.sourceFeatureType.getFeatureCount(); process.on('SIGINT', async ()=>{ console.log('\nSIGINT signal received. Closing existing connections.'); this.sourceFeatureType!.finalize(); await this.targetFeatureType!.finalize(); process.exit(0); }) } catch (e) { //console.log(e); // this.emit('error', {type:'error', error: e, message: 'Could not initialize FeatureTypes.', config: this.config, cursor: this.cursor, timestamp: new Date().toISOString()}); throw new Ohsome2XError('Could not initialize FeatureTypes.', {cursor: this.cursor}, e); // throw new Error('Could not initialize FeatureTypes.'); } //createGeometryOnTarget? default is true const shouldCreateGeometry = (this.config.target.createGeometry == null) ? true : this.config.target.createGeometry; // default is false const transformToWebmercator = (shouldCreateGeometry && !!this.config.target.transformToWebmercator); // write timestamp values in one or many columns? default is vertical (one column or two columns for contribution view queries) const shouldWriteHorizontalTimestamps = !!this.config.target.horizontalTimestampColumns; //dropTableIfexisits and views, do not delete if completing table from explicitly specified cursor // @ts-ignore if (this.targetFeatureType.store.type == 'postgis' && this.cursor == null) { try { await this.targetFeatureType.delete(); } catch (e) { // console.log(e); // this.emit('error', {type:'error', error: e, message: 'Could not delete existing PostGIS table.', config: this.config, cursor: this.cursor, timestamp: new Date().toISOString()}); throw new Ohsome2XError('Could not delete existing PostGIS table.', {cursor: this.cursor}, e); // throw new Error('Could not delete existing PostGIS table.'); } } //createItertively or createAllAtOnce (posgis source and tagrget only) if (!(this.sourceFeatureType instanceof PgFeatureType && this.targetFeatureType instanceof PgFeatureType)) { //one or both of the stores are not postgisOnly => can't use fetch size, no chunkwise processing possible this.fetchSize = null; } if (!!this.fetchSize) { // chunkwise processing this.sourceFeatureType = this.sourceFeatureType; this.targetFeatureType = this.targetFeatureType; //create iteratively //check curser type: number or string? const cursorType = await this.sourceFeatureType.getIdJsTypeFromPg(); console.log("cursorType", cursorType); let cursor; //cursor was defined in config if (this.cursor != null){ cursor = this.cursor; } else if (cursorType === 'number'){ cursor = Number.MIN_SAFE_INTEGER; } else { //cursorType == 'string' cursor = ''; } let fetchSize; //might be adjusted automatically if query takes too long fetchSize = this.fetchSize; let timesFetchSizeReduced = 0; let featureCount = 1; // 1 to pass break test first time try { while (true) { this.emit('progress', {type:'progress', processed: this.currentFeatureCount, total: this.totalFeatureCount, cursor: cursor, fetchSize: fetchSize, timestamp: new Date().toISOString()}); const sourceFeatureCollection: any = await this.sourceFeatureType.getFeaturesByCursorAndLimit(cursor, fetchSize); let targetFeatureCollection; featureCount = sourceFeatureCollection.features.length; if (featureCount == 0) { console.log('No more cells.'); break; } console.time('computeArea'); let idAreaMap: Map; if (!shouldWriteHorizontalTimestamps && this.computeValuePerArea) { // @ts-ignore let idArea: [any, number][] = sourceFeatureCollection.features.map((feature: Feature) => [feature.properties.id, turfArea(feature.geometry)]); idAreaMap = new Map(idArea); } console.timeEnd('computeArea'); //build bpolys and add to ohsomeQuery this.config.ohsomeQuery.bpolys = sourceFeatureCollection; let ohsomeResults: AxiosResponse; try { let queryStart = Date.now(); // ohsomeResults = await this.getOhsomeResults(this.config.ohsomeQuery); ohsomeResults = await Promise.reject({isAxiosError: true, response: {status: 413, statusText:'Payload too large', data:{}, headers:{}}, code: 'ERRTIMEOUT', config:{data:{}}}); let queryEnd = Date.now(); let queryDuration = queryEnd - queryStart; //duration in milliseconds console.log('Query duration (sec)', (queryDuration/1000)); //recover reduced fetch size if response times are lower than 4 minutes if (fetchSize < this.fetchSize && queryDuration <= 4 * 60 * 1000){ timesFetchSizeReduced--; timesFetchSizeReduced = Math.max(0,timesFetchSizeReduced); //should never get negative, 0 means original fetchSize fetchSize = Math.min(this.fetchSize, Math.round(this.fetchSize / (2 ** timesFetchSizeReduced))); console.log('INCREASING fetchSize...'); console.log('timesFetchSizeReduced', timesFetchSizeReduced); console.log('currentFetchSize', fetchSize, 'originalFetchSize', this.fetchSize); } } catch (e) { if(this.config.ohsomeQuery.bpolys) { delete this.config.ohsomeQuery.bpolys; //avoid logging } if (e.isAxiosError) { let axiosError: AxiosError = e; delete axiosError.config.data; //avoid logging bpoly data if (axiosError.response) { if (axiosError.response.status == 413) { //try reducing fetch size as long as possible before failing //check if minimum fetch size alredy reached if (fetchSize <= 1) { throw new Ohsome2XError('Could not get Ohsome results, even with fetchSize: 1. Processing took too long with current query and service configuration.', {}, axiosError); } //payload too large: reducing fetch size timesFetchSizeReduced++; fetchSize = Math.max(1, Math.round(this.fetchSize / (2 ** timesFetchSizeReduced))); console.log('REDUCING fetchSize...'); console.log('timesFetchSizeReduced', timesFetchSizeReduced); console.log('currentFetchSize', fetchSize, 'originalFetchSize', this.fetchSize); continue; //restart loop with reduced fetchSize } else { //other error codes throw axiosError; } } else if (axiosError.request) { // The request was made but no response was received // `error.request` is an instance of XMLHttpRequest in the browser and an instance of // http.ClientRequest in node.js console.log('No response received.'); throw axiosError; } } else { //no AxiosError something esle has happened throw e; } } ohsomeResults = ohsomeResults!; this.isContributionView = this.checkIsContributionView(ohsomeResults.data); console.log('Start conversion ohsome-result to GeoJSON'); console.time('convert'); if (shouldCreateGeometry) { if (typeof ohsomeResults.data == 'string') { console.log('-----------------------------------------'); console.log(ohsomeResults.data.substring(0, 400) + '...'); console.log('-----------------------------------------'); } targetFeatureCollection = GeoJsonFeatureType.fromOhsome(ohsomeResults.data, shouldWriteHorizontalTimestamps, sourceFeatureCollection, transformToWebmercator); } else { if (typeof ohsomeResults.data == 'string') { console.log('-----------------------------------------'); console.log(ohsomeResults.data.substring(0, 400) + '...'); console.log('-----------------------------------------'); } targetFeatureCollection = GeoJsonFeatureType.fromOhsome(ohsomeResults.data, shouldWriteHorizontalTimestamps); } console.timeEnd('convert'); if (!shouldWriteHorizontalTimestamps && !this.storeZeroValues) { //remove features where value = 0 console.log('Remove Zeros'); console.time('removeZero'); targetFeatureCollection = GeoJsonFeatureType.removeFeaturesByPropertyValue(targetFeatureCollection, 'value', 0); console.timeEnd('removeZero'); } console.time('computeValuePerArea'); if (!shouldWriteHorizontalTimestamps && this.computeValuePerArea) { targetFeatureCollection.features.forEach( // @ts-ignore (feature) => feature.properties["value_per_area"] = feature.properties.value / idAreaMap.get(feature.properties.id) ); } console.timeEnd('computeValuePerArea'); await this.targetFeatureType.writeFeatures(targetFeatureCollection); //update cursor for the next round of fetches // @ts-ignore cursor = sourceFeatureCollection.features[sourceFeatureCollection.features.length - 1].properties.id; //update finished featureCount this.currentFeatureCount += featureCount; console.log('-----------------------------------------' + new Date().toISOString() + '-----------------------------------------'); } } catch (e) { // console.log(e); this.sourceFeatureType.finalize(); this.targetFeatureType.finalize(); throw new Ohsome2XError('Could not create ohsome data.', {cursor: cursor}, e); } } else { // create all at once try { this.emit('progress', {type:'progress', processed: this.currentFeatureCount, total: this.totalFeatureCount, cursor: this.cursor, fetchSize: this.totalFeatureCount, timestamp: new Date().toISOString()}); let targetFeatureCollection; const sourceFeatureCollection = await this.sourceFeatureType.getFeatures(); this.currentFeatureCount = sourceFeatureCollection.features.length; console.time('computeArea'); let idAreaMap: Map; //in horizontal timestamp layout we can't have value and value_per_area, only one value is possible if (!shouldWriteHorizontalTimestamps && this.computeValuePerArea) { let idArea: any[any][number] = sourceFeatureCollection.features.map((feature: any) => [feature.properties.id, turfArea(feature.geometry)]); idAreaMap = new Map(idArea); } console.timeEnd('computeArea'); //build bpolys and add to ohsomeQuery //api requires ids to be strings //sourceFeatureCollection.features.forEach((feature)=>{feature.properties.id = String(feature.properties.id)}); this.config.ohsomeQuery.bpolys = sourceFeatureCollection; console.time('query'); const ohsomeResults = await this.getOhsomeResults(this.config.ohsomeQuery); console.timeEnd('query'); this.isContributionView = this.checkIsContributionView(ohsomeResults.data); console.log('Start conversion ohsomeJSON or csv to GeoJSON'); console.time('convert'); if (shouldCreateGeometry) { if (typeof ohsomeResults.data == 'string') { //if format is csv show beginning of result console.log('-----------------------------------------'); console.log(ohsomeResults.data.substring(0, 400) + '...'); console.log('-----------------------------------------'); } targetFeatureCollection = GeoJsonFeatureType.fromOhsome(ohsomeResults.data, shouldWriteHorizontalTimestamps, sourceFeatureCollection, transformToWebmercator); } else { if (typeof ohsomeResults.data == 'string') { //if format is csv show beginning of result console.log('-----------------------------------------'); console.log(ohsomeResults.data.substring(0, 400) + '...'); console.log('-----------------------------------------'); } targetFeatureCollection = GeoJsonFeatureType.fromOhsome(ohsomeResults.data, shouldWriteHorizontalTimestamps); } console.timeEnd('convert'); if (!shouldWriteHorizontalTimestamps && !this.storeZeroValues) { //remove features where value = 0 targetFeatureCollection = GeoJsonFeatureType.removeFeaturesByPropertyValue(targetFeatureCollection, 'value', 0); } console.time('computeValuePerArea'); if (!shouldWriteHorizontalTimestamps && this.computeValuePerArea) { targetFeatureCollection.features.forEach( // @ts-ignore (feature) => feature.properties["value_per_area"] = feature.properties.value / idAreaMap.get(feature.properties.id) ); } console.timeEnd('computeValuePerArea'); // console.log(JSON.stringify(targetFeatureCollection, undefined, 2)); await this.targetFeatureType.writeFeatures(targetFeatureCollection); this.emit('progress', {type:'progress', processed: this.currentFeatureCount, total: this.totalFeatureCount, cursor: this.cursor, fetchSize: this.totalFeatureCount, timestamp: new Date().toISOString()}); } catch (e) { // console.log(e); this.sourceFeatureType.finalize(); this.targetFeatureType.finalize(); throw new Ohsome2XError('Could not create ohsome data.', {cursor: this.cursor}, e); } } //createIndexes? try { // if(this.targetFeatureType.store.type == 'postgis' && !!this.config.target.createIndexes){ if (this.targetFeatureType instanceof PgFeatureType) { this.targetFeatureType = this.targetFeatureType; let pgTarget = this.config.target; if (!!pgTarget.createIndexes) { await this.targetFeatureType.createIndex('id'); if (!!this.config.target.horizontalTimestampColumns) { // TODO:horizontal console.log('create indexes for horizontalTimestampColumns not yet implemented!'); } else { // vertical: one timestamp column if (this.isContributionView){ await this.targetFeatureType.createIndex('from_timestamp'); await this.targetFeatureType.clusterTable('from_timestamp'); await this.targetFeatureType.createIndex('to_timestamp'); } else { await this.targetFeatureType.createIndex('timestamp'); await this.targetFeatureType.clusterTable('timestamp'); } await this.targetFeatureType.createIndex('value'); if (this.computeValuePerArea) { await this.targetFeatureType.createIndex('value_per_area'); } } if (!!this.config.target.createGeometry) { await this.targetFeatureType.createIndex('geom', 'gist'); } await this.targetFeatureType.analyzeTable(); } } } catch (e) { // console.log(e); this.sourceFeatureType.finalize(); this.targetFeatureType.finalize(); throw new Ohsome2XError('Could not create indexes.', {}, e); } //finalize e.g. close connections this.sourceFeatureType.finalize(); this.targetFeatureType.finalize(); this.log_end = new Date(); console.log('Finished at: ' + this.log_end.toLocaleString()); let log_diff = this.log_end.valueOf() - this.log_start.valueOf(); let diffDays = Math.floor(log_diff / 86400000); // days let diffHrs = Math.floor((log_diff % 86400000) / 3600000); // hours let diffMins = Math.round(((log_diff % 86400000) % 3600000) / 60000); // minutes let diffSecs = Math.round(((log_diff % 86400000) % 3600000) % 60000 / 1000); // seconds console.log('===================================================================================='); console.log(`Duration: ${diffDays} days ${diffHrs} hours ${diffMins} min ${diffSecs} seconds`); console.log('===================================================================================='); this.emit('finished', {type: 'finished', duration:`${diffDays} days ${diffHrs} hours ${diffMins} min ${diffSecs} seconds`, duration_ms: log_diff, timestamp: this.log_end.toISOString()}); } ///////////// methods async getOhsomeResults(ohsomeQuery: OhsomeQueryConfig): Promise { // set empty strings to null const filter = (ohsomeQuery.filter != null && ohsomeQuery.filter.trim() != '')? ohsomeQuery.filter.trim() : undefined; const format: OhsomeQueryConfigFormat = (ohsomeQuery.format != null && ohsomeQuery.format.trim() != '')? ohsomeQuery.format.trim() as 'csv'|'json' : undefined; try { if (ohsomeQuery.bpolys == null) { throw Error('bpolys undefined in OhsomeQueryConfig'); } let bpolyString = (typeof ohsomeQuery.bpolys === 'object')? JSON.stringify(ohsomeQuery.bpolys) : ohsomeQuery.bpolys.trim(); //default query object, empty values will be removed due to issue with ohsomeAPI: https://gitlab.gistools.geog.uni-heidelberg.de/giscience/big-data/ohsome/ohsome-api/issues/72 let dataObject: {} = { bpolys: bpolyString, filter: filter, time: ohsomeQuery.time, showMetadata: true, format: format } // removes properties with null or undefined const cleanDataObject: {[p: string]: string} = Object.entries(dataObject).reduce((a,[k,v]) => (v == null ? a : {...a, [k]:v}), {}); let dataString = querystring.stringify(cleanDataObject); // always make a group by boundary query for backward compatibility // complete ohsomeAPI resource path should be used let queryType = ohsomeQuery.queryType.replace(/(.*)(\/)?(\/groupBy\/boundary)(\/)?/, "$1"); console.log(`Querying ohsome-API: ${this.ohsomeApiUrl}/${queryType}/groupBy/boundary`); console.log('with params:', Object.entries(cleanDataObject).reduce((a,[k,v])=>({...a, [k]: (typeof v =='string')?v.substring(0,200): v }),{})); const stats = await axios({ url: `${this.ohsomeApiUrl}/${queryType}/groupBy/boundary`, method: 'post', header: {'content-type': 'application/x-www-form-urlencoded'}, maxContentLength: 1024 * 1024 * 1024 * 1024, data: dataString }); //only for json requests with showMetadata=true, not csv if (typeof stats.data === 'object' && "metadata" in stats.data){ console.log('----------------------------------'); console.log('Response Metadata', JSON.stringify(stats.data.metadata)); console.log('----------------------------------'); } return stats; } catch (e) { console.log('Ohsome API request failed.'); this.axiosErrorPrinter(e); throw e; //Propagate the axios error } } checkIsContributionView(ohsomeGroupByBoundaryResponse: any): boolean { const isCSV = (typeof ohsomeGroupByBoundaryResponse == 'string'); let isContributionView: boolean = true; if (isCSV) { const lines = ohsomeGroupByBoundaryResponse.split("\n"); //find first non-comment line for (let i = 0; i < lines.length; i++) { if (!lines[i].trimLeft().startsWith("#")) { // check columnheaders for contributionview column names const columns = lines[i].trim().split(";"); isContributionView = columns.includes("fromTimestamp") && columns.includes("toTimestamp"); break; } } } else { //JSON Response const hasFromTimestamps = ohsomeGroupByBoundaryResponse.groupByResult[0].result[0].fromTimestamp !== undefined; const hasToTimestamps = ohsomeGroupByBoundaryResponse.groupByResult[0].result[0].toTimestamp !== undefined; isContributionView = hasFromTimestamps && hasToTimestamps; } return isContributionView; } axiosErrorPrinter(error: AxiosError) { if (error.response) { // The request was made and the server responded with a status code // that falls out of the range of 2xx console.log('Response error status: ' + error.response.status); console.log('Response error headers: ' + JSON.stringify(error.response.headers, null, 2)); console.log('Response error data: ' + JSON.stringify(error.response.data, null, 2)); } else if (error.request) { // The request was made but no response was received // `error.request` is an instance of XMLHttpRequest in the browser and an instance of // http.ClientRequest in node.js console.log('No response received, request.'); } else { // Something happened in setting up the request that triggered an Error console.log('Error', error.message); } delete error.config.data; //avoid bpoly logging console.log('Request config:\n', error.config); } onError(error: any) { console.log(error); if (this.sourceFeatureType && this.targetFeatureType) { this.sourceFeatureType.finalize(); this.targetFeatureType.finalize(); } } } export = Ohsome2X; // export * from './config_types_interfaces';