diff --git a/package.json b/package.json index bb11aec..66a7132 100644 --- a/package.json +++ b/package.json @@ -22,6 +22,7 @@ "aws-sdk": "^2.334.0", "axios": "^0.18.1", "cors": "^2.8.4", + "luxon": "^2.3.0", "express": "~4.15.5", "express-graphql": "^0.9.0", "lodash": "^4.17.13", diff --git a/src/helpers/s3Helper.js b/src/helpers/s3Helper.js index 231fe7e..c59dc40 100644 --- a/src/helpers/s3Helper.js +++ b/src/helpers/s3Helper.js @@ -1,6 +1,7 @@ const _ = require('lodash'); const AWS = require('aws-sdk'); var zlib = require('zlib'); +const { DateTime } = require("luxon"); const s3 = new AWS.S3(); @@ -10,20 +11,18 @@ if (!s3Bucket) { } console.log(`Reading state from s3://${s3Bucket}`); +const stateVersion = 'v1'; + /* * Gets bucket prefix at the minute-level * @param agencyId - String - * @param currentTime - Number + * @param timestamp - Number (Unix timestamp in seconds) * @return prefix - String */ -function getBucketMinutePrefix(agencyId, currentTime) { - const currentDateTime = new Date(Number(currentTime * 1000)); - const year = currentDateTime.getUTCFullYear(); - const month = currentDateTime.getUTCMonth()+1; - const day = currentDateTime.getUTCDate(); - const hour = currentDateTime.getUTCHours(); - const minute = currentDateTime.getUTCMinutes(); - return `${agencyId}/${year}/${month}/${day}/${hour}/${minute}/`; +function getBucketHourPrefix(agencyId, timestamp) { + const dateTime = DateTime.fromSeconds(timestamp, {zone:'UTC'}); + const dateTimePathSegment = dateTime.toFormat('yyyy/MM/dd/HH'); + return `state/${stateVersion}/${agencyId}/${dateTimePathSegment}/`; } function getS3Paths(prefix) { @@ -50,13 +49,20 @@ async function getVehiclePaths(agencyId, startEpoch, endEpoch) { if (!endEpoch) { endEpoch = startEpoch + 60; } - // Idea: there are 1440 minutes in a day, and the API return at most 1-2 days, - // so we can iterate every minute (as we have to get each file individually anyways) - let minutePrefixes = []; - for (let time = startEpoch; time < endEpoch; time += 60) { - minutePrefixes.push(getBucketMinutePrefix(agencyId, time)); + // There are typically about 4*60=240 state data files per hour, + // and the S3 API can return up to 1000 key names with a particular prefix + // (by default), so we can request all keys prefixed by each hour within + // the time range, then filter the resulting keys to make sure the timestamps + // are in the requested interval + let hourPrefixes = []; + + // UTC hours always start with a timestamp at multiples of 3600 seconds + const startHour = startEpoch - (startEpoch % 3600); + + for (let time = startHour; time < endEpoch; time += 3600) { + hourPrefixes.push(getBucketHourPrefix(agencyId, time)); } - let files = _.flatten(await Promise.all(minutePrefixes.map(prefix => getS3Paths(prefix)))); + let files = _.flatten(await Promise.all(hourPrefixes.map(prefix => getS3Paths(prefix)))); let timestampsMap = {}; let res = []; @@ -73,8 +79,20 @@ async function getVehiclePaths(agencyId, startEpoch, endEpoch) { // unzip the gzip data function decompressData(data) { - return new Promise((resolve, _) => { - return zlib.unzip(data, (_, decoded) => resolve(JSON.parse(decoded.toString()))); + return new Promise((resolve, reject) => { + return zlib.unzip(data, (err, decoded) => { + if (err) { + reject(err); + } else { + var parsedData; + try { + parsedData = JSON.parse(decoded.toString()); + } catch (e) { + reject(e); + } + resolve(parsedData); + } + }); }); } @@ -91,7 +109,7 @@ async function getVehicles(agencyId, startEpoch, endEpoch) { Key: key, }, (err, data) => { if (err) { - reject(err); + reject(err); } else { const timestamp = getTimestamp(key); decompressData(data.Body) @@ -99,12 +117,14 @@ async function getVehicles(agencyId, startEpoch, endEpoch) { resolve(insertTimestamp(timestamp, decodedData))); } }); + }).catch((err) => { + return Promise.reject(`Error loading s3://${s3Bucket}/${key}: ${err}`); }); }))); } function getTimestamp(key) { - const keyParts = key.split('-'); + const keyParts = key.split('_'); return Math.floor(Number(keyParts[keyParts.length - 1].split('.json')[0])/1000); }