From 44d306c6d52b7a3a35b936446d47f10f3236db9d Mon Sep 17 00:00:00 2001 From: sidetrackedmind Date: Tue, 25 Jan 2022 19:43:30 -0800 Subject: [PATCH 1/4] minor updates to adapt for restart --- docker-compose.yml | 4 ++ src/helpers/s3Helper.js | 60 ++++++++++---------- src/resolvers.js | 118 ++++++++++++++++++++++------------------ src/schema.graphql | 21 ++++--- 4 files changed, 108 insertions(+), 95 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index cf0d599..8910afa 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -9,3 +9,7 @@ services: - "4000:4000" volumes: - ./src:/usr/src/app/src + environment: + TRYNAPI_S3_BUCKET: "opentransit-pdx" + AWS_ACCESS_KEY_ID: "" + AWS_SECRET_ACCESS_KEY: "" diff --git a/src/helpers/s3Helper.js b/src/helpers/s3Helper.js index 8d415e9..b460fb1 100644 --- a/src/helpers/s3Helper.js +++ b/src/helpers/s3Helper.js @@ -7,20 +7,31 @@ const s3 = new AWS.S3(); const s3Bucket = process.env.TRYNAPI_S3_BUCKET || "orion-vehicles"; console.log(`Reading state from s3://${s3Bucket}`); +function convertTZ(date, tzString) { + return new Date((typeof date === "string" ? new Date(date) : date).toLocaleString("en-US", {timeZone: tzString})); +} + /* - * Gets bucket prefix at the minute-level + * Gets bucket prefix at the hour-level + * Note to Jesse - I changed the bucket structure + * when we switch to a new bucket (code for PDX owned) + * we could switch it back. For now, it's not great + * but I think it's not the source of the problem. + * see getVehiclePaths - I think the function is + * not as fast as it could be but it works * @param agencyId - String * @param currentTime - Number * @return prefix - String */ -function getBucketMinutePrefix(agencyId, currentTime) { +function getBucketHourPrefix(agencyId, currentTime) { const currentDateTime = new Date(Number(currentTime * 1000)); - const year = currentDateTime.getUTCFullYear(); - const month = currentDateTime.getUTCMonth()+1; - const day = currentDateTime.getUTCDate(); - const hour = currentDateTime.getUTCHours(); - const minute = currentDateTime.getUTCMinutes(); - return `${agencyId}/${year}/${month}/${day}/${hour}/${minute}/`; + const pacificDateTime = convertTZ(currentDateTime, 'America/Los_Angeles'); + const year = pacificDateTime.getFullYear(); + const month = String(pacificDateTime.getMonth()+1).padStart(2, '0'); + const day = String(pacificDateTime.getUTCDate()).padStart(2, '0'); + const hour = String(pacificDateTime.getUTCHours()).padStart(2, '0'); + // console.log('looking at bucket year %i, month %i, day %i, hour %i', year, month, day, hour); + return `${agencyId}/${year}/${month}/${day}/${hour}/`; } function getS3Paths(prefix) { @@ -49,11 +60,13 @@ async function getVehiclePaths(agencyId, startEpoch, endEpoch) { } // Idea: there are 1440 minutes in a day, and the API return at most 1-2 days, // so we can iterate every minute (as we have to get each file individually anyways) - let minutePrefixes = []; + let hourPrefixes = []; for (let time = startEpoch; time < endEpoch; time += 60) { - minutePrefixes.push(getBucketMinutePrefix(agencyId, time)); + hourPrefixes.push(getBucketHourPrefix(agencyId, time)); } - let files = _.flatten(await Promise.all(minutePrefixes.map(prefix => getS3Paths(prefix)))); + let uniquehourPrefixes = [...new Set(hourPrefixes)]; + // console.log(uniquehourPrefixes) + let files = _.flatten(await Promise.all(uniquehourPrefixes.map(prefix => getS3Paths(prefix)))); let timestampsMap = {}; let res = []; @@ -65,9 +78,11 @@ async function getVehiclePaths(agencyId, startEpoch, endEpoch) { res.push(key); } }); + // console.log(res) return res; } + // unzip the gzip data function decompressData(data) { return new Promise((resolve, _) => { @@ -90,10 +105,7 @@ async function getVehicles(agencyId, startEpoch, endEpoch) { if (err) { reject(err); } else { - const timestamp = getTimestamp(key); - decompressData(data.Body) - .then(decodedData => - resolve(insertTimestamp(timestamp, decodedData))); + resolve(decompressData(data.Body)); } }); }); @@ -102,22 +114,8 @@ async function getVehicles(agencyId, startEpoch, endEpoch) { function getTimestamp(key) { const keyParts = key.split('-'); - return Math.floor(Number(keyParts[keyParts.length - 1].split('.json')[0])/1000); -} - -/* - * The API defines timestamp (epoch time in seconds) as a field for each vehicle, - * which was also a column in Cassandra. - * Since the timestamp is in the key in S3, that field does not exist, - * thus we have to add it in the S3Helper to maintain compatibility - */ -function insertTimestamp(timestamp, vehicles) { - return vehicles.map(vehicle => { - return { - ...vehicle, - timestamp: timestamp, - }; - }); + const raw_timestamp = Number(keyParts[keyParts.length - 1].split('.json')[0]) + return raw_timestamp; } module.exports = { diff --git a/src/resolvers.js b/src/resolvers.js index c933667..6ba8713 100644 --- a/src/resolvers.js +++ b/src/resolvers.js @@ -18,66 +18,72 @@ const resolvers = { let { startTime, endTime } = params; - const vehicles = await s3Helper.getVehicles(agencyId, startTime, endTime); + console.log(agencyId, routes) + + const resultSets = await s3Helper.getVehicles(agencyId, startTime, endTime); + const vehiclesByTripByTime = {}; const vehiclesByRouteByTime = {}; + const UniqueVehicleTimeKeys = {}; // group the vehicles by route, and then by time + // below are fields in the vehicle api response + // 'serviceDate', 'latitude', 'nextStopSeq', 'type', 'blockID', +// 'signMessageLong', 'lastLocID', 'nextLocID', 'locationInScheduleDay', +// 'longitude', 'direction', 'routeNumber', 'bearing', 'garage', 'tripID', +// 'delay', 'lastStopSeq', 'vehicleID', 'time' - vehicles.forEach(vehicle => { - const routeId = vehicle.rid; - const vtime = vehicle.timestamp; + - if (!vehiclesByRouteByTime[routeId]) { - vehiclesByRouteByTime[routeId] = {}; - } - if (!vehiclesByRouteByTime[routeId][vtime]) { - vehiclesByRouteByTime[routeId][vtime] = []; - } + resultSets.forEach(results => { + + const queryTime = results.resultSet.queryTime; + + if (!(results.resultSet['vehicle']===undefined)) { - // check for multiple vehicles with the same tripId, assume to be multiple-car trains if close to each other - const tripId = vehicle.tripId; - if (tripId) { - if (!vehiclesByTripByTime[tripId]) { - vehiclesByTripByTime[tripId] = {}; - } - const prevVehicle = vehiclesByTripByTime[tripId][vtime]; - if (!prevVehicle) { - vehiclesByTripByTime[tripId][vtime] = vehicle; - } else if (Math.abs(prevVehicle.lat - vehicle.lat) < 0.001 && Math.abs(prevVehicle.lon - vehicle.lon) < 0.001) { - // 0.001 degrees latitude = 111m, 0.001 degrees longitude typically between between ~50m and 111m - prevVehicle.numCars = (prevVehicle.numCars || 1) + 1; - if (prevVehicle.vid > vehicle.vid) { - prevVehicle.vid = vehicle.vid; + results.resultSet.vehicle.forEach(vehicle => { + + // console.log(vehicle) + const routeId = vehicle.routeNumber; + const vtime = queryTime; + const vehicleID = vehicle.vehicleID; + const tempVehicleTime = vehicleID+'_'+vtime; + + const secsSinceReport = Math.floor((Number(vtime)-Number(vehicle.time))/1000); + + vehicle.secsSinceReport = secsSinceReport; + + if (!vehiclesByRouteByTime[routeId]) { + vehiclesByRouteByTime[routeId] = {}; + } + if (!vehiclesByRouteByTime[routeId][vtime]) { + vehiclesByRouteByTime[routeId][vtime] = []; } - return; - } - } - vehiclesByRouteByTime[routeId][vtime].push(vehicle); - }); + if (!UniqueVehicleTimeKeys[tempVehicleTime]) { + vehiclesByRouteByTime[tempVehicleTime] = []; + vehiclesByRouteByTime[routeId][vtime].push(vehicle); + } + + }); + + - // remove duplicate Muni Metro vehicles - if (agencyId === 'muni') { - const affectedRouteIDs = ['KT', 'L', 'M', 'N', 'J']; - affectedRouteIDs.forEach(routeID => { - if (debug) { - console.log(routeID); - } - if (vehiclesByRouteByTime[routeID]) { - vehiclesByRouteByTime[routeID] = removeMuniMetroDuplicates( - vehiclesByRouteByTime[routeID], - ); - } - }); - } + + + + + } + + }); // get all the routes const routeIDs = routes ? _.intersection(routes, Object.keys(vehiclesByRouteByTime)) : Object.keys(vehiclesByRouteByTime); + return { agencyId, routeIDs, @@ -85,6 +91,7 @@ const resolvers = { endTime, vehiclesByRouteByTime }; + }, }, @@ -110,18 +117,23 @@ const resolvers = { } }, + // list all available fields + // 'serviceDate', 'latitude', 'nextStopSeq', 'type', 'blockID', +// 'signMessageLong', 'lastLocID', 'nextLocID', 'locationInScheduleDay', +// 'longitude', 'direction', 'routeNumber', 'bearing', 'garage', 'tripID', +// 'delay', 'lastStopSeq', 'vehicleID', 'time' + VehicleState: { - vid: vehicle => vehicle.vid, - did: vehicle => vehicle.did, - lat: vehicle => vehicle.lat, - lon: vehicle => vehicle.lon, - heading: vehicle => vehicle.heading, + vehicleID: vehicle => vehicle.vehicleID, + direction: vehicle => vehicle.direction, + latitude: vehicle => vehicle.latitude, + longitude: vehicle => vehicle.longitude, + bearing: vehicle => vehicle.bearing, + tripID: vehicle => vehicle.tripID, + nextStopSeq: vehicle => vehicle.nextStopSeq, + lastStopSeq: vehicle => vehicle.lastStopSeq, + time: vehicle => vehicle.time, secsSinceReport: vehicle => vehicle.secsSinceReport, - numCars: vehicle => vehicle.numCars, - tripId: vehicle => vehicle.tripId, - stopId: vehicle => vehicle.stopId, - stopIndex: vehicle => vehicle.stopIndex, - status: vehicle => vehicle.status, } }; diff --git a/src/schema.graphql b/src/schema.graphql index e37e53d..62173f2 100644 --- a/src/schema.graphql +++ b/src/schema.graphql @@ -1,17 +1,16 @@ scalar BigInt type VehicleState { - vid: String - did: String - lat: Float - lon: Float - heading: Int - secsSinceReport: Int - numCars: Int - tripId: String - stopIndex: Int - stopId: String - status: Int + vehicleID: String + direction: String + latitude: Float + longitude: Float + bearing: Int + tripID: String + nextStopSeq: Int + lastStopSeq: Int + time: BigInt + secsSinceReport: BigInt } type RouteState { From 40ef7141c2b13df657146e4133ec7dafbcb08450 Mon Sep 17 00:00:00 2001 From: sidetrackedmind Date: Wed, 26 Jan 2022 08:01:36 -0800 Subject: [PATCH 2/4] changed timestamps in resolver from millisecond to second --- docker-compose.yml | 2 -- src/resolvers.js | 7 +++++-- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index 8910afa..bcfe810 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -11,5 +11,3 @@ services: - ./src:/usr/src/app/src environment: TRYNAPI_S3_BUCKET: "opentransit-pdx" - AWS_ACCESS_KEY_ID: "" - AWS_SECRET_ACCESS_KEY: "" diff --git a/src/resolvers.js b/src/resolvers.js index 6ba8713..bcfab4f 100644 --- a/src/resolvers.js +++ b/src/resolvers.js @@ -46,11 +46,14 @@ const resolvers = { // console.log(vehicle) const routeId = vehicle.routeNumber; - const vtime = queryTime; + const vtime = Math.floor(Number(queryTime)/1000); const vehicleID = vehicle.vehicleID; const tempVehicleTime = vehicleID+'_'+vtime; + const vehicleTIme = Math.floor(Number(vehicle.time)/1000); - const secsSinceReport = Math.floor((Number(vtime)-Number(vehicle.time))/1000); + const secsSinceReport = (vtime-vehicleTIme); + + vehicle.time = vehicleTIme; vehicle.secsSinceReport = secsSinceReport; From 8ffbec93f110476f5f59a04321f6b51c5bd62e98 Mon Sep 17 00:00:00 2001 From: sidetrackedmind Date: Wed, 2 Feb 2022 22:45:25 -0800 Subject: [PATCH 3/4] update readme and query example for new schema --- .gitignore | 3 ++- README.md | 32 ++++++++++++++++++-------------- examples/marin-query.graphql | 19 +++++++++++++------ 3 files changed, 33 insertions(+), 21 deletions(-) diff --git a/.gitignore b/.gitignore index 42dc0c9..2f741d1 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ node_modules -docker-compose.override.yml \ No newline at end of file +docker-compose.override.yml +.DS_Store diff --git a/README.md b/README.md index 1a150e8..4d3172a 100644 --- a/README.md +++ b/README.md @@ -81,16 +81,13 @@ State of a particular vehicle at a particular time. Some fields may be null if n | Field Name | Type | Description | | --- | --- | --- | -| `vid` | `String` | ID of this vehicle. | -| `did` | `String` | ID of the direction the vehicle reported it was going. | -| `lat` | `Float` | Reported latitude of vehicle. | -| `lon` | `Float` | Reported longitude of vehicle. | -| `heading` | `Float` | Reported heading of vehicle in degrees. | +| `vehicleID` | `String` | ID of this vehicle. | +| `direction` | `String` | ID of the direction the vehicle reported it was going. | +| `latitude` | `Float` | Reported latitude of vehicle. | +| `longitude` | `Float` | Reported longitude of vehicle. | +| `bearing` | `Float` | Reported heading of vehicle in degrees. | | `secsSinceReport` | `Int` | Number of seconds old this observation was when it was retrieved (at `timestamp` of `RouteState`). | -| `numCars` | `Int` | Number of cars in this vehicle. | -| `stopIndex` | `Int` | The index of the current stop in the sequence (GTFS-realtime providers only) | -| `status` | `Int` | 0 if the vehicle is about to arrive at the stop referred to by `stopIndex`, 1 if the vehicle is stopped at this stop, 2 if the vehicle is in transit to this stop (GTFS-realtime providers only) | -| `tripId` | `String` | ID of the trip the vehicle reported it was running (GTFS-realtime providers only) | +| `tripID` | `String` | ID of the trip the vehicle reported it was running (GTFS-realtime providers only) | ## Sample Query @@ -98,7 +95,10 @@ Once you run it, go to http://localhost:4000/graphql in your browser and run thi ``` query { - state(agencyId: "muni", startTime: 1572105600, endTime: 1572112800, routes: ["14", "19", "49"]) { + state(agencyId: "trimet" + , startTime: 1642867201 + , endTime: 1642867500 + , routes: ["100"]) { agencyId startTime routes { @@ -106,10 +106,14 @@ query { states { timestamp vehicles { - vid - lat - lon - heading + vehicleID + tripID + latitude + longitude + direction + bearing + time + secsSinceReport } } } diff --git a/examples/marin-query.graphql b/examples/marin-query.graphql index 5af0a7f..094be0d 100644 --- a/examples/marin-query.graphql +++ b/examples/marin-query.graphql @@ -1,7 +1,10 @@ -# replace "marin" with "muni" to use that as the agency +# replace "trimet" with your agency query { - state(agencyId: "marin", startTime: 1572127400, endTime: 1572127900) { + state(agencyId: "trimet" + , startTime: 1642867201 + , endTime: 1642867500 + , routes: ["100"]) { agencyId startTime routes { @@ -9,10 +12,14 @@ query { states { timestamp vehicles { - vid - lat - lon - heading + vehicleID + tripID + latitude + longitude + direction + bearing + time + secsSinceReport } } } From f25028d688dead84f4cdd25a62e96e10e5c012b8 Mon Sep 17 00:00:00 2001 From: Jesse Young Date: Thu, 3 Feb 2022 20:04:03 -0800 Subject: [PATCH 4/4] avoid crashing on errors extracting data from s3; show s3 path that caused error --- src/helpers/s3Helper.js | 26 ++++++++++++++++++++------ 1 file changed, 20 insertions(+), 6 deletions(-) diff --git a/src/helpers/s3Helper.js b/src/helpers/s3Helper.js index b460fb1..d6e286b 100644 --- a/src/helpers/s3Helper.js +++ b/src/helpers/s3Helper.js @@ -8,7 +8,7 @@ const s3Bucket = process.env.TRYNAPI_S3_BUCKET || "orion-vehicles"; console.log(`Reading state from s3://${s3Bucket}`); function convertTZ(date, tzString) { - return new Date((typeof date === "string" ? new Date(date) : date).toLocaleString("en-US", {timeZone: tzString})); + return new Date((typeof date === "string" ? new Date(date) : date).toLocaleString("en-US", {timeZone: tzString})); } /* @@ -17,7 +17,7 @@ function convertTZ(date, tzString) { * when we switch to a new bucket (code for PDX owned) * we could switch it back. For now, it's not great * but I think it's not the source of the problem. - * see getVehiclePaths - I think the function is + * see getVehiclePaths - I think the function is * not as fast as it could be but it works * @param agencyId - String * @param currentTime - Number @@ -85,8 +85,20 @@ async function getVehiclePaths(agencyId, startEpoch, endEpoch) { // unzip the gzip data function decompressData(data) { - return new Promise((resolve, _) => { - return zlib.unzip(data, (_, decoded) => resolve(JSON.parse(decoded.toString()))); + return new Promise((resolve, reject) => { + return zlib.unzip(data, (err, decoded) => { + if (err) { + reject(err); + } else { + var parsedData; + try { + parsedData = JSON.parse(decoded.toString()); + } catch (e) { + reject(e); + } + resolve(parsedData); + } + }); }); } @@ -103,11 +115,13 @@ async function getVehicles(agencyId, startEpoch, endEpoch) { Key: key, }, (err, data) => { if (err) { - reject(err); + reject(err); } else { - resolve(decompressData(data.Body)); + resolve(decompressData(data.Body)); } }); + }).catch((err) => { + return Promise.reject(`Error loading s3://${s3Bucket}/${key}: ${err}`); }); }))); }