Ikai Lan says

I say things!

Using the App Engine Mapper for bulk data import

with 16 comments

Since my last post describing App Engine mapreduce, a new InputReader has been added to the Java project for reading from the Blobstore. Nick Johnson wrote a great demo where indexing was done via reading code uploaded to the blobstore. This was demo’d at Google I/O. Now that the library is officially part of the project, it’s become much easier for developers to build Mappers that map across some large, contiguous piece of data as opposed to Entities in the datastore.The most obvious use case is data import. A developer looking to import large amounts of data would take the following steps:

  1. Create a CSV file containing the data you want to import. The assumption here is that each line of data corresponds to a datastore entity you want to create
  2. Upload the CSV file to the blobstore. You’ll need billing to be enabled for this to work.
  3. Create your Mapper, push it live and run your job importing your data.

This isn’t meant to be a replacement for the bulk uploader tool; merely an alternative. This method requires a good amount more programmatic changes for custom data transforms. The advantage of this method is that the work is done on the server side, whereas the bulk uploader makes use of the remote API to get work done. Let’s get started on each of the steps.

Step 1: Create a CSV file with the data you want to upload

We’re going to go through an example of uploading City and State information. MaxMind.com provides a free GeoIP CSV file. The free version isn’t as full featured as the paid version, but it’ll do fine for our demo. Be sure that if you use this file in any kind of production application that you read and understand the license first! For simplicity, we’re going to parse out only cities in the United States using grep. The file should now contain lines that look like this:

605,"US","NY","Valhalla","10595",41.0877,-73.7768,501,914
606,"US","PA","Pittsburgh","15222",40.4495,-79.9880,508,412
607,"US","MO","Bridgeton","63044",38.7667,-90.4201,609,314
608,"US","CA","San Francisco","94124",37.7312,-122.3826,807,415
609,"US","NY","New York","10017",40.7528,-73.9725,501,212
610,"US","PA","Bear Lake","16402",41.9491,-79.4448,516,814
611,"US","NJ","Piscataway","08854",40.5516,-74.4637,501,732
612,"US","NY","Keuka Park","14478",42.5669,-77.1325,555,315
613,"US","VT","Brattleboro","05302",42.8496,-72.6645,506,802

2. Create an upload handler for your CSV file and upload the CSV file

We’re going to create a basic handler for uploading a CSV file and displaying the key. We’ll need to pass this key to our mapper later. There isn’t too much magic here; it’s very similar to the sample code available for the basic blobstore example.

We’ll do a quick overview of the code we need here, but for the purposes of this post, it’s out of scope. We’ll need these files:

upload.jsp

<%@ page language="java" contentType="text/html; charset=ISO-8859-1"
    pageEncoding="ISO-8859-1"%>
<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">

<%@page import="com.google.appengine.api.blobstore.BlobstoreService"%>
<%@page import="com.google.appengine.api.blobstore.BlobstoreServiceFactory"%>
<html>
<head>
<meta http-equiv="Content-Type" content="text/html; charset=ISO-8859-1">
<title>Upload your CSV file here</title>
</head>
<body>
    <% BlobstoreService blobstoreService = BlobstoreServiceFactory.getBlobstoreService(); %>
    <form action="<%= blobstoreService.createUploadUrl("/upload") %>" method="post" enctype="multipart/form-data">
        <input type="file" name="data">
        <input type="submit" value="Submit">
    </form>
</body>
</html>

UploadBlobServlet.java

package com.ikai.mapperdemo.servlets;

import java.io.IOException;
import java.util.Map;

import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

import com.google.appengine.api.blobstore.BlobKey;
import com.google.appengine.api.blobstore.BlobstoreService;
import com.google.appengine.api.blobstore.BlobstoreServiceFactory;

@SuppressWarnings("serial")
public class UploadBlobServlet extends HttpServlet {
	public void doPost(HttpServletRequest req, HttpServletResponse resp)
			throws IOException {

		BlobstoreService blobstoreService = BlobstoreServiceFactory.getBlobstoreService();
		Map<String, BlobKey> blobs = blobstoreService.getUploadedBlobs(req);
		BlobKey blobKey = blobs.get("data");

		if (blobKey == null) {
			resp.sendRedirect("/");
		} else {
			resp.sendRedirect("/upload-success?blob-key=" + blobKey.getKeyString());
		}
	}

}

SuccessfulUploadServlet.java

package com.ikai.mapperdemo.servlets;

import java.io.IOException;

import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

@SuppressWarnings("serial")
public class SuccessfulUploadServlet extends HttpServlet {
	public void doGet(HttpServletRequest req, HttpServletResponse resp)
			throws IOException {

		String blobKey = req.getParameter("blob-key");

		resp.setContentType("text/html");
		resp.getWriter().println("Successfully uploaded. Download file: <br/>");
		resp.getWriter().println(
				"<a href='/serve?blob-key=" + blobKey
						+ "'>Click to download</a>");
	}

}

Source code for this and other helper functions should be available in the Github repository.

Step 3: Create your Mapper

Now we get to the fun part. We need to create our Mapper. A prerequisite for understanding what’s coming next is reading the last post about Mapper I wrote, so check that out before proceeding if you aren’t familiar with Mapper basics. Our Mapper class looks like this:

ImportFromBlobstoreMapper.java

package com.ikai.mapperdemo.mappers;

import java.util.logging.Logger;

import org.apache.hadoop.io.NullWritable;

import com.google.appengine.api.datastore.Entity;
import com.google.appengine.tools.mapreduce.AppEngineMapper;
import com.google.appengine.tools.mapreduce.BlobstoreRecordKey;
import com.google.appengine.tools.mapreduce.DatastoreMutationPool;

/**
 * 
 * This Mapper imports from a CSV file in the Blobstore. The CSV
 * assumes it's in the MaxMind format for cities, states, zipcodes
 * and lat/long.
 * 
 * 
 * @author Ikai Lan
 *
 */
public class ImportFromBlobstoreMapper extends
		AppEngineMapper<BlobstoreRecordKey, byte[], NullWritable, NullWritable> {
	private static final Logger log = Logger.getLogger(ImportFromBlobstoreMapper.class
			.getName());

	@Override
	public void map(BlobstoreRecordKey key, byte[] segment, Context context) {
		
		String line = new String(segment);
		
		log.info("At offset: " + key.getOffset());
		log.info("Got value: " + line);
		
		// Line format looks like this:
		// 10644,"US","VA","Tazewell","24651",37.0595,-81.5220,559,276
		// We're also assuming no errant commas in this simple example
		
		String[] values = line.split(",");
		String state = values[2];
		String cityName = values[3];		
		String zipcode = values[4];
		Double latitude = Double.parseDouble(values[5]);
		Double longitude = Double.parseDouble(values[6]);		
		
		state = state.replaceAll("\"", "");
		cityName = cityName.replaceAll("\"", "");
		zipcode = zipcode.replaceAll("\"", "");
		
		if(!zipcode.isEmpty()) {
			Entity zip = new Entity("Zip", zipcode);
			zip.setProperty("state", state);
			zip.setProperty("city", cityName);
			zip.setProperty("latitude", latitude);
			zip.setProperty("longitute", longitude);
			
			Entity city = new Entity("City", cityName);
			city.setProperty("state", state);
			city.setUnindexedProperty("zip", zipcode);
			
			DatastoreMutationPool mutationPool = this.getAppEngineContext(context)
					.getMutationPool();
			mutationPool.put(zip);
			mutationPool.put(city);
		}

	}
}

Let’s explain the things in this Mapper that are new:

public class ImportFromBlobstoreMapper extends
AppEngineMapper&lt;BlobstoreRecordKey, byte[], NullWritable, NullWritable&gt;

Note this line. It’s different from our previous Mappers in that the type arguments are no longer Key and Entity, but BlobstoreRecordKey and byte[]. The source for BlobstoreRecordKey is here. Remember that map-reduce is about some large body of data and breaking it into smaller pieces to operate on. BlobstoreRecordKey represents a pointer to range of data in our Blobstore. byte[] is a byte[] array actually containing that data.

public void map(BlobstoreRecordKey key, byte[] segment, Context context)

Again, notice the new types. By default, we are splitting on a newline, so segment represents a single line. We can change what we split on by specifying a terminator in mapreduce.xml.

		String line = new String(segment);
		
		// Line format looks like this:
		// 10644,"US","VA","Tazewell","24651",37.0595,-81.5220,559,276
		// We're also assuming no errant commas in this simple example
		
		String[] values = line.split(",");
		String state = values[2];
		String cityName = values[3];		
		String zipcode = values[4];
		Double latitude = Double.parseDouble(values[5]);
		Double longitude = Double.parseDouble(values[6]);		
		
		state = state.replaceAll("\"", "");
		cityName = cityName.replaceAll("\"", "");
		zipcode = zipcode.replaceAll("\"", "");

This is very naive String parsing. Nothing fancy here.

		if(!zipcode.isEmpty()) {
			Entity zip = new Entity("Zip", zipcode);
			zip.setProperty("state", state);
			zip.setProperty("city", cityName);
			zip.setProperty("latitude", latitude);
			zip.setProperty("longitute", longitude);
			
			Entity city = new Entity("City", cityName);
			city.setProperty("state", state);
			city.setUnindexedProperty("zip", zipcode);
			
			DatastoreMutationPool mutationPool = this.getAppEngineContext(context)
					.getMutationPool();
			mutationPool.put(zip);
			mutationPool.put(city);
		}

Again, very straightforward if you’ve seen this before. Some zipcodes in our CSV file subset are empty, so we’ll check for that and just not create an Entity. We’re adding 2 entities to the mutation pool here – a City and a Zipcode. This ensures that we can search by key when we do a datastore get. Remember that fetches by key are always faster than fetches with a query, since a query requires an index scan followed by a batch get, whereas the datastore can perform a get in a single operation.

That’s it for our Mapper. Let’s add a configuration:

  <configuration name="Import all data from the Blobstore">
    <property>
      <name>mapreduce.map.class</name>
      
      <!--  Set this to be your Mapper class  -->
      <value>com.ikai.mapperdemo.mappers.ImportFromBlobstoreMapper</value>
    </property>
        
    <!--  This is a default tool that lets us iterate over blobstore data -->
    <property>
      <name>mapreduce.inputformat.class</name>
      <value>com.google.appengine.tools.mapreduce.BlobstoreInputFormat</value>
    </property>
    
    <property>
      <name human="Blob Keys to Map Over">mapreduce.mapper.inputformat.blobstoreinputformat.blobkeys</name>
      <value template="optional">blobkeyhere</value>      
    </property>        
    
    <property>
      <name human="Number of shards to use">mapreduce.mapper.shardcount</name>
      <value template="optional">10</value>      
    </property>        
    
  </configuration>  

We’ve changed 2 properties here: the input format class as well as a property for the blobstore key pointing to the data to iterate over.

Step 4: Deploy!

We can now package our application up and deploy it! Make sure that you built a new JAR file with the new classes in appengine-mapreduce! If you have the old JAR file, it won’t include the BlobstoreInputFormat class that we need to do our work.

Step 5: Using the Mapper

Let’s browse to our upload hander at /upload.jsp. The page should be pretty bare.

Once the upload has finished, we’ll be on a page that looks like this:

Let’s copy the blob-key in the URL. It’s not the most streamlined approach but it works. We’ll use it in the next screen when we browser to our mapper:

We’ll copy-paste the key to replace “blobkeyhere” and hit “Run”. And now we play the waiting game – we’ll be able to check on the status of our Mapper in the UI, or check on Tasks, or look in the datastore to see if the data has been imported correctly:

Get the code

The code is here on Github:

http://github.com/ikai/App-Engine-Java-Mapper-API-demos

It’s been updated with the new examples.

Summary

So there you have it: another way of importing data into the datastore. This isn’t a replacement for the bulk uploader, just another option. Here are some useful links for additional information:

App Engine Mapreduce issues tracker – report issues here

Nick Johnson’s post explaining how he built the code search example

One last tip: the best place for questions or discussion is probably the App Engine Discussion Groups, not the comments.

Happy hacking.

About these ads

Written by Ikai Lan

August 11, 2010 at 10:33 am

Posted in App Engine, Java, Java

16 Responses

Subscribe to comments with RSS.

  1. This is very cool. Finally we have another way to achieve Bulk Loads. The client side bulk loader using the RemoteApi used to take quite a while, when the number of entities involved were very large in number.

    Rahul

    August 11, 2010 at 4:53 pm

  2. Hey Rahul,

    Can you give it a try? I’m trying to work out any gotchas before porting this into an article on the main code.google.com site.

    Ikai Lan

    August 11, 2010 at 5:08 pm

  3. [...] blogstore, creating a datastore entity for every line in the file. This is the Python version of this blog post (which uses [...]

  4. Thanks for the post! I’ve created a sample for Python using the GAE Django helper:
    http://www.carlosble.com/?p=697

    carlosble

    September 7, 2010 at 11:28 am

  5. this is a great tutorial. The Mapper API will allow me to get rid of a lot of code. Great project.
    I have one question for this CSV example. I was testing with a file of about 900k lines. I wrote a simple mapper that is just using counters. What is strange is that the Number of shards to use does not change the overall time. I mean, there is a certain overhead .. still I would expect it to go faster the more mapper I use?

    Toby

    December 15, 2010 at 7:05 am

  6. This is real great! But I still have some problem with my algorithm: I have to run a specifc number of shard, and it must be deterimined at runtime (1 shard for 1 object). My Algorithm start with 2 entity for each shard, wich make it run real slowly, It must run one on one! Do you know how to make it works?

    Alessio

    June 13, 2011 at 9:31 am

  7. Hi,
    Thanks ikai this is a great example ,i just need to ask you one thing that inorder to launch a job is it necessary we deploy the application ,can we not just run it locally?

    Asjad Hassan

    August 18, 2011 at 10:38 pm

  8. It should run locally, but obviously won’t scale up as fast or as far.

    Ikai Lan

    August 20, 2011 at 4:35 pm

  9. Thanks ikai for the reply but when i m running the application locally it is displaying me following details
    Running jobs
    No job records found.
    Launch job
    No job configurations found.
    please help me if you understand the issue.

    Asjad Hassan

    August 23, 2011 at 10:16 pm

  10. Hi Ikai Lan,
    This is a great example to upload file on google app engine. I am using your sample code the sample file is successfully upload on blob and it show me on app engine blob viewer but when i try to import data from blob store to data store as per your sample example it give me error which is mention below.

    Error — java.lang.NoClassDefFoundError: Full stack trace is available in the server logs. Message: com/google/common/base/Preconditions

    Kindly reply me on this issue.

    sajidbandayy

    August 25, 2011 at 8:43 am

  11. Have you asked this question in the groups yet?

    Ikai Lan

    October 7, 2011 at 9:57 am

  12. I have figure out this problem myself :) . The main reason of this error because of one jar file is missing in your sample code which is charts4j-1.2.jar. when i add this jar file in my project then run sample application is run successfully. Thanks for reply.

    Sajid Banday

    October 9, 2011 at 10:36 pm

  13. Hi Ikai Lan,

    Do you know about a BlobstoreInputFormat implementation which allows us to have multiple lines per mapper instead of only one?

    cheers
    Fábio

    Fábio Franco Uechi

    October 17, 2011 at 5:15 pm

  14. This feature is very nice, but as we need to upload large csv file, would it be possible to directly process a gziped blob ?

    Fabien

    February 9, 2012 at 7:13 am

  15. Hi lkai Lan,

    Can i give service as input.Means instread of selecting file from file system i’ll hard code the location of file.it should take that file as input.Is it possible?

    Vasu

    December 7, 2012 at 7:09 am


Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s