diff --git a/.gitignore b/.gitignore index 42dc0c9..2f741d1 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ node_modules -docker-compose.override.yml \ No newline at end of file +docker-compose.override.yml +.DS_Store diff --git a/README.md b/README.md index 1a150e8..4d3172a 100644 --- a/README.md +++ b/README.md @@ -81,16 +81,13 @@ State of a particular vehicle at a particular time. Some fields may be null if n | Field Name | Type | Description | | --- | --- | --- | -| `vid` | `String` | ID of this vehicle. | -| `did` | `String` | ID of the direction the vehicle reported it was going. | -| `lat` | `Float` | Reported latitude of vehicle. | -| `lon` | `Float` | Reported longitude of vehicle. | -| `heading` | `Float` | Reported heading of vehicle in degrees. | +| `vehicleID` | `String` | ID of this vehicle. | +| `direction` | `String` | ID of the direction the vehicle reported it was going. | +| `latitude` | `Float` | Reported latitude of vehicle. | +| `longitude` | `Float` | Reported longitude of vehicle. | +| `bearing` | `Float` | Reported heading of vehicle in degrees. | | `secsSinceReport` | `Int` | Number of seconds old this observation was when it was retrieved (at `timestamp` of `RouteState`). | -| `numCars` | `Int` | Number of cars in this vehicle. | -| `stopIndex` | `Int` | The index of the current stop in the sequence (GTFS-realtime providers only) | -| `status` | `Int` | 0 if the vehicle is about to arrive at the stop referred to by `stopIndex`, 1 if the vehicle is stopped at this stop, 2 if the vehicle is in transit to this stop (GTFS-realtime providers only) | -| `tripId` | `String` | ID of the trip the vehicle reported it was running (GTFS-realtime providers only) | +| `tripID` | `String` | ID of the trip the vehicle reported it was running (GTFS-realtime providers only) | ## Sample Query @@ -98,7 +95,10 @@ Once you run it, go to http://localhost:4000/graphql in your browser and run thi ``` query { - state(agencyId: "muni", startTime: 1572105600, endTime: 1572112800, routes: ["14", "19", "49"]) { + state(agencyId: "trimet" + , startTime: 1642867201 + , endTime: 1642867500 + , routes: ["100"]) { agencyId startTime routes { @@ -106,10 +106,14 @@ query { states { timestamp vehicles { - vid - lat - lon - heading + vehicleID + tripID + latitude + longitude + direction + bearing + time + secsSinceReport } } } diff --git a/docker-compose.yml b/docker-compose.yml index cf0d599..bcfe810 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -9,3 +9,5 @@ services: - "4000:4000" volumes: - ./src:/usr/src/app/src + environment: + TRYNAPI_S3_BUCKET: "opentransit-pdx" diff --git a/examples/marin-query.graphql b/examples/marin-query.graphql index 5af0a7f..094be0d 100644 --- a/examples/marin-query.graphql +++ b/examples/marin-query.graphql @@ -1,7 +1,10 @@ -# replace "marin" with "muni" to use that as the agency +# replace "trimet" with your agency query { - state(agencyId: "marin", startTime: 1572127400, endTime: 1572127900) { + state(agencyId: "trimet" + , startTime: 1642867201 + , endTime: 1642867500 + , routes: ["100"]) { agencyId startTime routes { @@ -9,10 +12,14 @@ query { states { timestamp vehicles { - vid - lat - lon - heading + vehicleID + tripID + latitude + longitude + direction + bearing + time + secsSinceReport } } } diff --git a/src/helpers/s3Helper.js b/src/helpers/s3Helper.js index 8d415e9..d6e286b 100644 --- a/src/helpers/s3Helper.js +++ b/src/helpers/s3Helper.js @@ -7,20 +7,31 @@ const s3 = new AWS.S3(); const s3Bucket = process.env.TRYNAPI_S3_BUCKET || "orion-vehicles"; console.log(`Reading state from s3://${s3Bucket}`); +function convertTZ(date, tzString) { + return new Date((typeof date === "string" ? new Date(date) : date).toLocaleString("en-US", {timeZone: tzString})); +} + /* - * Gets bucket prefix at the minute-level + * Gets bucket prefix at the hour-level + * Note to Jesse - I changed the bucket structure + * when we switch to a new bucket (code for PDX owned) + * we could switch it back. For now, it's not great + * but I think it's not the source of the problem. + * see getVehiclePaths - I think the function is + * not as fast as it could be but it works * @param agencyId - String * @param currentTime - Number * @return prefix - String */ -function getBucketMinutePrefix(agencyId, currentTime) { +function getBucketHourPrefix(agencyId, currentTime) { const currentDateTime = new Date(Number(currentTime * 1000)); - const year = currentDateTime.getUTCFullYear(); - const month = currentDateTime.getUTCMonth()+1; - const day = currentDateTime.getUTCDate(); - const hour = currentDateTime.getUTCHours(); - const minute = currentDateTime.getUTCMinutes(); - return `${agencyId}/${year}/${month}/${day}/${hour}/${minute}/`; + const pacificDateTime = convertTZ(currentDateTime, 'America/Los_Angeles'); + const year = pacificDateTime.getFullYear(); + const month = String(pacificDateTime.getMonth()+1).padStart(2, '0'); + const day = String(pacificDateTime.getUTCDate()).padStart(2, '0'); + const hour = String(pacificDateTime.getUTCHours()).padStart(2, '0'); + // console.log('looking at bucket year %i, month %i, day %i, hour %i', year, month, day, hour); + return `${agencyId}/${year}/${month}/${day}/${hour}/`; } function getS3Paths(prefix) { @@ -49,11 +60,13 @@ async function getVehiclePaths(agencyId, startEpoch, endEpoch) { } // Idea: there are 1440 minutes in a day, and the API return at most 1-2 days, // so we can iterate every minute (as we have to get each file individually anyways) - let minutePrefixes = []; + let hourPrefixes = []; for (let time = startEpoch; time < endEpoch; time += 60) { - minutePrefixes.push(getBucketMinutePrefix(agencyId, time)); + hourPrefixes.push(getBucketHourPrefix(agencyId, time)); } - let files = _.flatten(await Promise.all(minutePrefixes.map(prefix => getS3Paths(prefix)))); + let uniquehourPrefixes = [...new Set(hourPrefixes)]; + // console.log(uniquehourPrefixes) + let files = _.flatten(await Promise.all(uniquehourPrefixes.map(prefix => getS3Paths(prefix)))); let timestampsMap = {}; let res = []; @@ -65,13 +78,27 @@ async function getVehiclePaths(agencyId, startEpoch, endEpoch) { res.push(key); } }); + // console.log(res) return res; } + // unzip the gzip data function decompressData(data) { - return new Promise((resolve, _) => { - return zlib.unzip(data, (_, decoded) => resolve(JSON.parse(decoded.toString()))); + return new Promise((resolve, reject) => { + return zlib.unzip(data, (err, decoded) => { + if (err) { + reject(err); + } else { + var parsedData; + try { + parsedData = JSON.parse(decoded.toString()); + } catch (e) { + reject(e); + } + resolve(parsedData); + } + }); }); } @@ -88,36 +115,21 @@ async function getVehicles(agencyId, startEpoch, endEpoch) { Key: key, }, (err, data) => { if (err) { - reject(err); + reject(err); } else { - const timestamp = getTimestamp(key); - decompressData(data.Body) - .then(decodedData => - resolve(insertTimestamp(timestamp, decodedData))); + resolve(decompressData(data.Body)); } }); + }).catch((err) => { + return Promise.reject(`Error loading s3://${s3Bucket}/${key}: ${err}`); }); }))); } function getTimestamp(key) { const keyParts = key.split('-'); - return Math.floor(Number(keyParts[keyParts.length - 1].split('.json')[0])/1000); -} - -/* - * The API defines timestamp (epoch time in seconds) as a field for each vehicle, - * which was also a column in Cassandra. - * Since the timestamp is in the key in S3, that field does not exist, - * thus we have to add it in the S3Helper to maintain compatibility - */ -function insertTimestamp(timestamp, vehicles) { - return vehicles.map(vehicle => { - return { - ...vehicle, - timestamp: timestamp, - }; - }); + const raw_timestamp = Number(keyParts[keyParts.length - 1].split('.json')[0]) + return raw_timestamp; } module.exports = { diff --git a/src/resolvers.js b/src/resolvers.js index c933667..bcfab4f 100644 --- a/src/resolvers.js +++ b/src/resolvers.js @@ -18,66 +18,75 @@ const resolvers = { let { startTime, endTime } = params; - const vehicles = await s3Helper.getVehicles(agencyId, startTime, endTime); + console.log(agencyId, routes) + + const resultSets = await s3Helper.getVehicles(agencyId, startTime, endTime); + const vehiclesByTripByTime = {}; const vehiclesByRouteByTime = {}; + const UniqueVehicleTimeKeys = {}; // group the vehicles by route, and then by time + // below are fields in the vehicle api response + // 'serviceDate', 'latitude', 'nextStopSeq', 'type', 'blockID', +// 'signMessageLong', 'lastLocID', 'nextLocID', 'locationInScheduleDay', +// 'longitude', 'direction', 'routeNumber', 'bearing', 'garage', 'tripID', +// 'delay', 'lastStopSeq', 'vehicleID', 'time' - vehicles.forEach(vehicle => { - const routeId = vehicle.rid; - const vtime = vehicle.timestamp; + - if (!vehiclesByRouteByTime[routeId]) { - vehiclesByRouteByTime[routeId] = {}; - } - if (!vehiclesByRouteByTime[routeId][vtime]) { - vehiclesByRouteByTime[routeId][vtime] = []; - } + resultSets.forEach(results => { + + const queryTime = results.resultSet.queryTime; + + if (!(results.resultSet['vehicle']===undefined)) { + + results.resultSet.vehicle.forEach(vehicle => { + + // console.log(vehicle) + const routeId = vehicle.routeNumber; + const vtime = Math.floor(Number(queryTime)/1000); + const vehicleID = vehicle.vehicleID; + const tempVehicleTime = vehicleID+'_'+vtime; + const vehicleTIme = Math.floor(Number(vehicle.time)/1000); + + const secsSinceReport = (vtime-vehicleTIme); + + vehicle.time = vehicleTIme; + + vehicle.secsSinceReport = secsSinceReport; - // check for multiple vehicles with the same tripId, assume to be multiple-car trains if close to each other - const tripId = vehicle.tripId; - if (tripId) { - if (!vehiclesByTripByTime[tripId]) { - vehiclesByTripByTime[tripId] = {}; - } - const prevVehicle = vehiclesByTripByTime[tripId][vtime]; - if (!prevVehicle) { - vehiclesByTripByTime[tripId][vtime] = vehicle; - } else if (Math.abs(prevVehicle.lat - vehicle.lat) < 0.001 && Math.abs(prevVehicle.lon - vehicle.lon) < 0.001) { - // 0.001 degrees latitude = 111m, 0.001 degrees longitude typically between between ~50m and 111m - prevVehicle.numCars = (prevVehicle.numCars || 1) + 1; - if (prevVehicle.vid > vehicle.vid) { - prevVehicle.vid = vehicle.vid; + if (!vehiclesByRouteByTime[routeId]) { + vehiclesByRouteByTime[routeId] = {}; + } + if (!vehiclesByRouteByTime[routeId][vtime]) { + vehiclesByRouteByTime[routeId][vtime] = []; } - return; - } - } - vehiclesByRouteByTime[routeId][vtime].push(vehicle); - }); + if (!UniqueVehicleTimeKeys[tempVehicleTime]) { + vehiclesByRouteByTime[tempVehicleTime] = []; + vehiclesByRouteByTime[routeId][vtime].push(vehicle); + } + + }); + + - // remove duplicate Muni Metro vehicles - if (agencyId === 'muni') { - const affectedRouteIDs = ['KT', 'L', 'M', 'N', 'J']; - affectedRouteIDs.forEach(routeID => { - if (debug) { - console.log(routeID); - } - if (vehiclesByRouteByTime[routeID]) { - vehiclesByRouteByTime[routeID] = removeMuniMetroDuplicates( - vehiclesByRouteByTime[routeID], - ); - } - }); - } + + + + + } + + }); // get all the routes const routeIDs = routes ? _.intersection(routes, Object.keys(vehiclesByRouteByTime)) : Object.keys(vehiclesByRouteByTime); + return { agencyId, routeIDs, @@ -85,6 +94,7 @@ const resolvers = { endTime, vehiclesByRouteByTime }; + }, }, @@ -110,18 +120,23 @@ const resolvers = { } }, + // list all available fields + // 'serviceDate', 'latitude', 'nextStopSeq', 'type', 'blockID', +// 'signMessageLong', 'lastLocID', 'nextLocID', 'locationInScheduleDay', +// 'longitude', 'direction', 'routeNumber', 'bearing', 'garage', 'tripID', +// 'delay', 'lastStopSeq', 'vehicleID', 'time' + VehicleState: { - vid: vehicle => vehicle.vid, - did: vehicle => vehicle.did, - lat: vehicle => vehicle.lat, - lon: vehicle => vehicle.lon, - heading: vehicle => vehicle.heading, + vehicleID: vehicle => vehicle.vehicleID, + direction: vehicle => vehicle.direction, + latitude: vehicle => vehicle.latitude, + longitude: vehicle => vehicle.longitude, + bearing: vehicle => vehicle.bearing, + tripID: vehicle => vehicle.tripID, + nextStopSeq: vehicle => vehicle.nextStopSeq, + lastStopSeq: vehicle => vehicle.lastStopSeq, + time: vehicle => vehicle.time, secsSinceReport: vehicle => vehicle.secsSinceReport, - numCars: vehicle => vehicle.numCars, - tripId: vehicle => vehicle.tripId, - stopId: vehicle => vehicle.stopId, - stopIndex: vehicle => vehicle.stopIndex, - status: vehicle => vehicle.status, } }; diff --git a/src/schema.graphql b/src/schema.graphql index e37e53d..62173f2 100644 --- a/src/schema.graphql +++ b/src/schema.graphql @@ -1,17 +1,16 @@ scalar BigInt type VehicleState { - vid: String - did: String - lat: Float - lon: Float - heading: Int - secsSinceReport: Int - numCars: Int - tripId: String - stopIndex: Int - stopId: String - status: Int + vehicleID: String + direction: String + latitude: Float + longitude: Float + bearing: Int + tripID: String + nextStopSeq: Int + lastStopSeq: Int + time: BigInt + secsSinceReport: BigInt } type RouteState {