Skip to content
This repository was archived by the owner on Jan 29, 2022. It is now read-only.

Treasury Yield Example

Luke Lovett edited this page Feb 13, 2015 · 4 revisions

Example 1 - Treasury Yield Calculation

###Running

This sample can be run from the root directory with ./gradlew historicalYield. If you have just checked out the repository, you will need to run ./gradlew jar first.

Source code is in examples/treasury_yield. The data used in this example can be found in examples/treasury_yield/src/main/resources/yield_historical_in.json.

We end up with a test collection containing documents that look like this:

{ 
  "_id": ISODate("1990-01-25T19:00:00-0500"), 
  "dayOfWeek": "FRIDAY", "bc3Year": 8.38,
  "bc10Year": 8.49,
  …
}

###Map/Reduce with Java

The goal is to find the average of the bc10Year field, across each year that exists in the dataset. First we define a mapper, which is executed against each document in the collection. We extract the year from the _id field and use it as the output key, along with the value we want to use for averaging, bc10Year.

public class TreasuryYieldMapper extends Mapper<Object, BSONObject, IntWritable, DoubleWritable> {
    @Override
    public void map( final Object pKey, final BSONObject pValue, final Context pContext ) throws IOException, InterruptedException{
        final int year = ((Date)pValue.get("_id")).getYear() + 1900;
        double bid10Year = ( (Number) pValue.get( "bc10Year" ) ).doubleValue();
        pContext.write( new IntWritable( year ), new DoubleWritable( bid10Year ) );
    }
}

Then we write a reducer, a function which takes the values collected for each key (the year) and performs some aggregate computation of them to get a result.

public class TreasuryYieldReducer
        extends Reducer<IntWritable, DoubleWritable, IntWritable, BSONWritable> {
    @Override
    public void reduce( final IntWritable pKey, final Iterable<DoubleWritable> pValues, final Context pContext )
            throws IOException, InterruptedException{
        int count = 0;
        double sum = 0;
        for ( final DoubleWritable value : pValues ){
            sum += value.get();
            count++;
        }

        final double avg = sum / count;
    
        BasicBSONObject output = new BasicBSONObject();
        output.put("avg", avg);
        pContext.write( pKey, new BSONWritable( output ) );
    }   
}

###Pig

We can also easily accomplish the same task with just a few lines of Pig script. We also use some external UDFs provided by the Amazon Piggybank jar: http://aws.amazon.com/code/Elastic-MapReduce/2730

-- UDFs used for date parsing
REGISTER /tmp/piggybank-0.3-amzn.jar
-- MongoDB Java driver
REGISTER  /tmp/mongo-2.10.1.jar;
-- Core Mongo-Hadoop Library
REGISTER ../core/target/mongo-hadoop-core_1.0.3-1.1.0-SNAPSHOT.jar
-- mongo-hadoop pig support
REGISTER ../pig/target/mongo-hadoop-pig_1.0.3-1.1.0-SNAPSHOT.jar

raw = LOAD 'mongodb://localhost:27017/demo.yield_historical.in' using com.mongodb.hadoop.pig.MongoLoader; 
DEFINE UnixToISO org.apache.pig.piggybank.evaluation.datetime.convert.UnixToISO();
DEFINE EXTRACT org.apache.pig.piggybank.evaluation.string.EXTRACT();

date_tenyear = foreach raw generate UnixToISO($0#'_id'), $0#'bc10Year';
parsed_year = foreach date_tenyear generate 
    FLATTEN(EXTRACT($0, '(\\d{4})')) AS year, (double)$1 as bc;

by_year = GROUP parsed_year BY (chararray)year;
year_10yearavg = FOREACH by_year GENERATE group, AVG(parsed_year.bc) as tenyear_avg;

-- Args to MongoInsertStorage are: schema for output doc, field to use as '_id'.
STORE year_10yearavg 
 INTO 'mongodb://localhost:27017/demo.asfkjabfa' 
 USING      
 com.mongodb.hadoop.pig.MongoInsertStorage('group:chararray,tenyear_avg:float', 'group');

Clone this wiki locally