Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
"aws-sdk": "^2.334.0",
"axios": "^0.18.1",
"cors": "^2.8.4",
"luxon": "^2.3.0",
"express": "~4.15.5",
"express-graphql": "^0.9.0",
"lodash": "^4.17.13",
Expand Down
58 changes: 39 additions & 19 deletions src/helpers/s3Helper.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
const _ = require('lodash');
const AWS = require('aws-sdk');
var zlib = require('zlib');
const { DateTime } = require("luxon");

const s3 = new AWS.S3();

Expand All @@ -10,20 +11,18 @@ if (!s3Bucket) {
}
console.log(`Reading state from s3://${s3Bucket}`);

const stateVersion = 'v1';

/*
* Gets bucket prefix at the minute-level
* @param agencyId - String
* @param currentTime - Number
* @param timestamp - Number (Unix timestamp in seconds)
* @return prefix - String
*/
function getBucketMinutePrefix(agencyId, currentTime) {
const currentDateTime = new Date(Number(currentTime * 1000));
const year = currentDateTime.getUTCFullYear();
const month = currentDateTime.getUTCMonth()+1;
const day = currentDateTime.getUTCDate();
const hour = currentDateTime.getUTCHours();
const minute = currentDateTime.getUTCMinutes();
return `${agencyId}/${year}/${month}/${day}/${hour}/${minute}/`;
function getBucketHourPrefix(agencyId, timestamp) {
const dateTime = DateTime.fromSeconds(timestamp, {zone:'UTC'});
const dateTimePathSegment = dateTime.toFormat('yyyy/MM/dd/HH');
return `state/${stateVersion}/${agencyId}/${dateTimePathSegment}/`;
}

function getS3Paths(prefix) {
Expand All @@ -50,13 +49,20 @@ async function getVehiclePaths(agencyId, startEpoch, endEpoch) {
if (!endEpoch) {
endEpoch = startEpoch + 60;
}
// Idea: there are 1440 minutes in a day, and the API return at most 1-2 days,
// so we can iterate every minute (as we have to get each file individually anyways)
let minutePrefixes = [];
for (let time = startEpoch; time < endEpoch; time += 60) {
minutePrefixes.push(getBucketMinutePrefix(agencyId, time));
// There are typically about 4*60=240 state data files per hour,
// and the S3 API can return up to 1000 key names with a particular prefix
// (by default), so we can request all keys prefixed by each hour within
// the time range, then filter the resulting keys to make sure the timestamps
// are in the requested interval
let hourPrefixes = [];

// UTC hours always start with a timestamp at multiples of 3600 seconds
const startHour = startEpoch - (startEpoch % 3600);

for (let time = startHour; time < endEpoch; time += 3600) {
hourPrefixes.push(getBucketHourPrefix(agencyId, time));
}
let files = _.flatten(await Promise.all(minutePrefixes.map(prefix => getS3Paths(prefix))));
let files = _.flatten(await Promise.all(hourPrefixes.map(prefix => getS3Paths(prefix))));

let timestampsMap = {};
let res = [];
Expand All @@ -73,8 +79,20 @@ async function getVehiclePaths(agencyId, startEpoch, endEpoch) {

// unzip the gzip data
function decompressData(data) {
return new Promise((resolve, _) => {
return zlib.unzip(data, (_, decoded) => resolve(JSON.parse(decoded.toString())));
return new Promise((resolve, reject) => {
return zlib.unzip(data, (err, decoded) => {
if (err) {
reject(err);
} else {
var parsedData;
try {
parsedData = JSON.parse(decoded.toString());
} catch (e) {
reject(e);
}
resolve(parsedData);
}
});
});
}

Expand All @@ -91,20 +109,22 @@ async function getVehicles(agencyId, startEpoch, endEpoch) {
Key: key,
}, (err, data) => {
if (err) {
reject(err);
reject(err);
} else {
const timestamp = getTimestamp(key);
decompressData(data.Body)
.then(decodedData =>
resolve(insertTimestamp(timestamp, decodedData)));
}
});
}).catch((err) => {
return Promise.reject(`Error loading s3://${s3Bucket}/${key}: ${err}`);
});
})));
}

function getTimestamp(key) {
const keyParts = key.split('-');
const keyParts = key.split('_');
return Math.floor(Number(keyParts[keyParts.length - 1].split('.json')[0])/1000);
}

Expand Down