Ikai Lan says

I say things!

Using the Java Mapper Framework for App Engine

with 29 comments

The recently released Mapper framework is the first part of App Engine’s mapreduce offering. In this post, we’ll be discussing some of the types of operations we can perform using this framework and how easily they can be done.

Introduction to Map Reduce

If you aren’t familiar with Map Reduce, read more about it from a high level from Wikipedia here. The official paper can be downloaded from this site if you’re interested in a more technical discussion.

The simplest breakdown of MapReduce is as follows:

  1. Take a large dataset and break it into pieces, mapping individual pieces of data
  2. Work on those mapped datasets and reduce them into the form you need

A simple example here is full text indexing. Suppose we wanted create indexes from existing text documents. We would use the Map step to iterate over every document and “map” each phrase or term to a document, then we would “reduce” the mappings by writing them to an index. Map/reduce problems have the advantage of not only being easy to conceptualize as problems that can be distributed and parallelized, but also because there are frameworks that support many of the administrative functions of map-reduce: failure recovery, distribution of work, tracking status of jobs, reporting and so forth. The appengine-mapreduce project seeks to provide as many of these features as possible while making it as easy as possible for developers to write large batch processing jobs without having to think about the plumbing details.

But I only have Map available!

Yes, this is true  – as of the writing of this post, only the “map” step exists, hence why it’s currently referred to as the “Mapper API”. That doesn’t mean it’s not useful. For starters, it is a very easy way to perform some operation on every single Entity of a given Kind in your datastore in parallel. What would you have to build for yourself if Mapper weren’t available?

  1. Begin querying over every Entity in chained Task Queues
  2. Store beginning and end cursors (introduced in 1.3.5)
  3. Create tasks to work with chunks of your datastore
  4. Write the code to manipulate your data
  5. Build an interface to control your batch jobs
  6. Build a callback system for your multitudes of parallelized workers to call when the entire task has completed

It’s certainly not a trivial amount of work. Some things you can do very easily with the Mapper library include:

  • Modify some property or set of properties for every Entity of a given Kind
  • Delete all entities of a single Kind – the functional equivalent of a “DROP TABLE” if you were using a relational database
  • Count the occurrences of some property across every single Entity of a given Kind in your datastore

We’ll go through a few of these examples in this post.

Our Sample application

Our sample application will be a modified version of the Guestbook demo. We’ll add a few additional properties. For simplicity, we’ll use the low-level API, since the Mapper API also uses the low-level API. You can see this application here:
The code is also available to clone via Github if you’d like to follow along.

How to define a Mapper

There are three steps to defining a Mapper:

  1. Download, build and place the appengine-mapreduce JAR files in your WEB-INF/lib directory and add them to your build path. You only need to do this once per project. The steps for doing this are on the “Getting Started” page for Java. You’ll need all the JAR files that are built.
  2. Make sure that we have a DESCENDING index created on Key. This is important! If we run our Mapper locally, this’ll automatically be created in our datastore-indexes.xml file when we deploy our application. One trick to ensure that indexes get built before they are needed, at least in a live application, is to create and deploy an application with the new index configuration to a non-default version. Because all versions use the same datastore and the same set of indexes, this will schedule the index to be built before we need it in the live version. When it has completed, we simply switch the default version over, and we’re ready to roll.
  3. Create your Mapper class
  4. Configure your Mapper class in mapreduce.xml

We’ll go over steps 3 and 4 in each example.

Example 1: Changing a property on every Entity (Naive way)

(You can even use this technique if you just need to change a property on a large set of Entities).

Assuming you’ve already set up your environment for the Mapper servlet, you can dive right in. Let’s create a Mapper classes that goes through every Entity of a given Kind and converts the “comment” property to use all lowercase letters. We’ll also add a timestamp for when we modified this Entity. In this first example, we’ll do this the naive way. This is a very good way to introduce you to very simple mutations on all your Entities using Mapper.

Note that this requires some familiarity with the Low-Level API. Don’t worry – entities edited or saved using the low-level API are accessible via managed persistence interface such as JDO/JPA (and vice versa). If you aren’t familiar with the low-level API, you can read more about it here on the Javadocs.

The first thing we’ll have to do is define a Mapper. We tried as much as possible to mimic Hadoop’s Mapper class. We’ll be subclassing AppEngineMapper, which is itself a subclass of Hadoop’s Mapper. The meat of this class is the map() method, which we’ll be overriding. We’ll also override the taskSetup() lifecycle callback. We’ll be using this to initialize our DatastoreService, though we could probably initialize it in the body of the map() method itself. The other methods are taskCleanup(), setup() and cleanup() – examples here. Let’s have a look at our code below:

package com.ikai.mapperdemo.mappers;

import java.util.Date;
import java.util.logging.Logger;

import org.apache.hadoop.io.NullWritable;

import com.google.appengine.api.datastore.DatastoreService;
import com.google.appengine.api.datastore.DatastoreServiceFactory;
import com.google.appengine.api.datastore.Entity;
import com.google.appengine.api.datastore.Key;
import com.google.appengine.tools.mapreduce.AppEngineMapper;

/**
 *
 * This mapper changes all Strings to lowercase Strings, sets
 * a timestamp, and reputs them into the Datastore. The reason
 * this is a "Naive" Mapper is because it doesn't make use of
 * Mutation Pools, which can do these operations in batch instead
 * of individually.
 *
 * @author Ikai Lan
 *
 */
public class NaiveToLowercaseMapper extends
		AppEngineMapper<Key, Entity, NullWritable, NullWritable> {
	private static final Logger log = Logger
			.getLogger(NaiveToLowercaseMapper.class.getName());

	private DatastoreService datastore;

	@Override
	public void taskSetup(Context context) {
		this.datastore = DatastoreServiceFactory.getDatastoreService();
	}

	@Override
	public void map(Key key, Entity value, Context context) {
		log.info("Mapping key: " + key);

		if (value.hasProperty("comment")) {
			String comment = (String) value.getProperty("comment");
			comment = comment.toLowerCase();
			value.setProperty("comment", comment);
			value.setProperty("updatedAt", new Date());

			datastore.put(value);

		}
	}
}

Notice that this map method takes 3 parameters:

Key key – this is the datastore Key for the Entity we are about to perform an operation on. Mostly this exists for API compatibility with Hadoop, but we don’t really need it yet. For iterating over datastore Entities, we don’t really need this, because we *could* use this to look up the Entity, but we don’t have to because …

Entity value – … because we actually get the Entity already. If we did a lookup for the Entity, we’d double the amount of lookups we do per Entity. We can certainly use the Key to do a lookup using a PersistenceManager or EntityManager and have a populated, typesafe Entity object, but from an efficiency standpoint we’d be doubling our work for some JDO/JPA sugar.

Context context – We don’t need this in our example, but it’s easy to think of the Context as giving us access to “global” values such as temporary variables and configuration files. For a later example in this post, we’ll be using the Context to store a global value in a counter and increment it. For this example, it’s unused.

If you’re familiar at all with the low-level API, this will look very straightfoward (again, I highly encourage you to read the docs). We take an entity, add 2 properties to it, then re-put() the Entity back into the datastore.

Now let’s add this job to mapreduce.xml:

<configurations>
  <configuration name="Naive Mass toLowercase()">
    <property>
      <name>mapreduce.map.class</name>

      <!--  Set this to be your Mapper class  -->
      <value>com.ikai.mapperdemo.mappers.NaiveToLowercaseMapper</value>
    </property>

    <!--  This is a default tool that lets us iterate over datastore entities -->
    <property>
      <name>mapreduce.inputformat.class</name>
      <value>com.google.appengine.tools.mapreduce.DatastoreInputFormat</value>
    </property>

    <property>
      <name human="Entity Kind to Map Over">mapreduce.mapper.inputformat.datastoreinputformat.entitykind</name>
      <value template="optional">Comment</value>
    </property>
  </configuration>
</configurations>

It looks complex, but it’s really not. We define a configuration element and name the job. The name of the job is also the name we’ll see in the GUI when we fire off the job. We need 3 sets of property elements under this element, which are just name/value pairs. Let’s go over each one we used:

Name: mapreduce.map.class
Value: com.ikai.mapperdemo.mappers.NaiveToLowercaseMapper
This one is straightforward – we provide the name of an AppEngineMapper subclass with the map() method we want run.

Name: mapreduce.inputformat.class
Value: com.google.appengine.tools.mapreduce.DatastoreInputFormat
This is a class that takes some input to map over. DatastoreInputFormat is provided by appengine-mapreduce, but it is possible for us to define our own input formatter. For guidance, check out the source of DatastoreInputFormat here.

In a more advanced example (ahem, future blog post), we’ll discuss building our own InputFormat to read from another source such as the Blobstore. For our examples in this post, we won’t need anything beyond DatastoreInputFormat.

Name: mapreduce.mapper.inputformat.datastoreinputformat.entitykind
Value: Comment
This input is specific to DatastoreInputFormat. It tells DatastoreInputFormat which Entity Kind to iterate over. Note that in the mapper console, a user can type in the name of a Kind or edit this Field to reflect the value they want. We can’t leave this blank, though, if we want this to work.

If we browse to the URI at which we’ve defined the Mapper console (in our case /mapper), we see something that looks like this:

“Running jobs” appears when we click “Run”. We can click “Detail” to see the progress of our job, or we can “Abort” to quit the job. Note that aborting a job won’t revert our Entities! We’ll end up with a partially run job if we run a giant mutation, so we’ll have to be cognizant of this when we use this tool.

When the job completes, we’ll take a look at our Comments. Sure enough, they are now all lowercase.

Example 2: Changing a property on every Entity using Mutation Pools

There’s a reason the Mapper in Example 1 is called a Naive Mapper: because it doesn’t take advantage of mutation pools. As we all know, App Engine’s datastore is capable of handling operations in parallel using batched calls. We’re already doing work in parallel by specifying shards, but we’ll want to use batched calls when possible. We do this by adding the mutations we want to a mutation pool, then, periodically as the pool hits a certain size, we flush all the writes to the datastore with a single call instead of individually. This has the advantage of making our map() call as fast as possible, since all we’re really doing is making a list of operations to perform all at once when the system is good and ready. Let’s define the XML file first assuming we call the class PooledToLowercaseMapper:

  <configuration name="Mass toLowercase() with Mutation Pool">
    <property>
      <name>mapreduce.map.class</name>

      <!--  Set this to be your Mapper class  -->
      <value>com.ikai.mapperdemo.mappers.PooledToLowercaseMapper</value>
    </property>

    <!--  This is a default tool that lets us iterate over datastore entities -->
    <property>
      <name>mapreduce.inputformat.class</name>
      <value>com.google.appengine.tools.mapreduce.DatastoreInputFormat</value>
    </property>

    <property>
      <name human="Entity Kind to Map Over">mapreduce.mapper.inputformat.datastoreinputformat.entitykind</name>
      <value template="optional">Comment</value>
    </property>

  </configuration>

It looks almost exactly the same. That’s because the meat is in what we do in the actually class itself:

package com.ikai.mapperdemo.mappers;

import java.util.Date;
import java.util.logging.Logger;

import org.apache.hadoop.io.NullWritable;

import com.google.appengine.api.datastore.Entity;
import com.google.appengine.api.datastore.Key;
import com.google.appengine.tools.mapreduce.AppEngineMapper;
import com.google.appengine.tools.mapreduce.DatastoreMutationPool;

/**
 *
 * The functionality of this is exactly the same as in {@link NaiveToLowercaseMapper}.
 * The advantage here is that since a {@link DatastoreMutationPool} is used, mutations
 * can be done in batch, saving API calls.
 *
 * @author Ikai Lan
 *
 */
public class PooledToLowercaseMapper extends
		AppEngineMapper<Key, Entity, NullWritable, NullWritable> {
	private static final Logger log = Logger
			.getLogger(PooledToLowercaseMapper.class.getName());

	@Override
	public void map(Key key, Entity value, Context context) {
		log.info("Mapping key: " + key);

		if (value.hasProperty("comment")) {
			String comment = (String) value.getProperty("comment");
			comment = comment.toLowerCase();
			value.setProperty("comment", comment);
			value.setProperty("updatedAt", new Date());

			DatastoreMutationPool mutationPool = this.getAppEngineContext(
					context).getMutationPool();
			mutationPool.put(value);
		}
	}
}

Everything looks example the same until these lines:

DatastoreMutationPool mutationPool = this.getAppEngineContext(context).getMutationPool();
mutationPool.put(value);

Aha! So we finally put the context to use. Granted, we use the context as a parameter to another, more useful method, but at least we’re using it.  We acquire a DatastoreMutationPool using the getAppEngineContext(context).getMutationPool() method, then we just call put() and pass the changed entity. DatastoreMutationPool is defined here and is open source.

The interface is similar to that of DatastoreService. There’s not a lot of fancy stuff going on here. put(), as we’ve seen, is defined. get() isn’t, because, well, that method makes no sense in this context. delete() is defined, which brings me to my bonus section:

Bonus Example 2: Delete all Entities of a given Kind

One of the most common questions asked in the group is, “How do I drop table?” Usually, this question is asked by new App Engine developers who don’t yet understand that the datastore is a distributed key-value store and not a relational database. But it’s also a legitimate use case. What if you just wanted to nuke all Entities of a given Kind? Prior to Mapper, you would have had to write your own handler to take care of this. Mapper makes this very easy. Here’s what a generic “DeleteAllMapper” would look like. This will work with *any* Entity Kind:

package com.ikai.mapperdemo.mappers;

import java.util.logging.Logger;

import org.apache.hadoop.io.NullWritable;

import com.google.appengine.api.datastore.Entity;
import com.google.appengine.api.datastore.Key;
import com.google.appengine.tools.mapreduce.AppEngineMapper;
import com.google.appengine.tools.mapreduce.DatastoreMutationPool;

/**
 *
 * This Mapper deletes all Entities of a given kind. It simulates the
 * DROP TABLE functionality asked for by developers.
 *
 * @author Ikai Lan
 *
 */
public class DeleteAllMapper extends
		AppEngineMapper<Key, Entity, NullWritable, NullWritable> {
	private static final Logger log = Logger.getLogger(DeleteAllMapper.class
			.getName());

	@Override
	public void map(Key key, Entity value, Context context) {
		log.info("Adding key to deletion pool: " + key);
		DatastoreMutationPool mutationPool = this.getAppEngineContext(context)
				.getMutationPool();
		mutationPool.delete(value.getKey());
	}
}

That’s it! We wire it up the same way we wire up other Mappers:


  <configuration name="Delete all Entities">
    <property>
      <name>mapreduce.map.class</name>

      <!--  Set this to be your Mapper class  -->
      <value>com.ikai.mapperdemo.mappers.DeleteAllMapper</value>
    </property>

    <!--  This is a default tool that lets us iterate over datastore entities -->
    <property>
      <name>mapreduce.inputformat.class</name>
      <value>com.google.appengine.tools.mapreduce.DatastoreInputFormat</value>
    </property>

    <property>
      <name human="Entity Kind to Map Over">mapreduce.mapper.inputformat.datastoreinputformat.entitykind</name>
      <value template="optional">Comment</value>
    </property>
  </configuration>

I’ve separated each out into its own mapreduce.xml, but this isn’t necessary. A given App Engine project can have multiple configuration elements defined. That’s why there’s a dropdown list in the Mapreduce console GUI.

Example 3: Taking more user input in the Mapper console and counting

Our next example covers using counters in the context. Let’s say that we wanted to allow the User to enter a String, then we iterate over every Entity searching for occurrences of that Substring on-the-fly and not with pre-built indexes. First, let’s discuss the XML configuration we use:

  <configuration name="Count words in all Comments">
    <property>
      <name>mapreduce.map.class</name>

      <!--  Set this to be your Mapper class  -->
      <value>com.ikai.mapperdemo.mappers.CountWordsMapper</value>
    </property>

    <property>
    	<!--  This is the URL to call after the entire Mapper has run -->
    	<name>mapreduce.appengine.donecallback.url</name>
    	<value>/callbacks/word_count_completed</value>
    </property>

    <!--  This is a default tool that lets us iterate over datastore entities -->
    <property>
      <name>mapreduce.inputformat.class</name>
      <value>com.google.appengine.tools.mapreduce.DatastoreInputFormat</value>
    </property>

    <property>
      <name human="Entity Kind to Map Over">mapreduce.mapper.inputformat.datastoreinputformat.entitykind</name>
      <value template="optional">Comment</value>
    </property>

  </configuration>

There’s one new name/value pair:
Name: mapreduce.mapper.counter.substringtarget
Value: Substring
We can pick any name or value we want. We just pick this one because it makes sense. We’ll retrieve this value in the Mapper via the Context. This causes an extra text field to appear in the Mapper console:


The Mapper is below:

package com.ikai.mapperdemo.mappers;

import java.util.logging.Logger;

import org.apache.hadoop.io.NullWritable;

import com.google.appengine.api.datastore.Entity;
import com.google.appengine.api.datastore.Key;
import com.google.appengine.tools.mapreduce.AppEngineMapper;

/**
 *
 * This Mapper takes some input and counts the number of Comments which
 * contain that substring.
 *
 * @author Ikai Lan
 *
 */
public class SubstringMatcherMapper extends
		AppEngineMapper<Key, Entity, NullWritable, NullWritable> {
	private static final Logger log = Logger.getLogger(SubstringMatcherMapper.class
			.getName());

	/*
	 * Get the target that we want to match on and count the number of Comments that
	 * match it
	 */
	@Override
	public void map(Key key, Entity value, Context context) {

	    String substringToMatch = context.getConfiguration().get("mapreduce.mapper.counter.substringtarget");

		String comment = (String) value.getProperty("comment");
		if (comment != null) {
			if(comment.contains(substringToMatch)) {
				log.info("Found match in: " + comment);
				context.getCounter("SubstringMatch", "count").increment(1);
			}
		}

	}
}

We retrieve the value entered by the user with this line of code:

context.getConfiguration().get("mapreduce.mapper.counter.substringtarget");

If the comment we’re current working on contains the substring, we want to increment our count. The context object has a getCounter() method that returns a counter we can increment or decrement:

context.getCounter("SubstringMatch", "count").increment(1);

When our job completes running, we can see the total count when we click “Detail” on the completed job:

More likely than not, however, we’ll want to store this number back in the datastore or do something with it besides stick it into a status page. Good that we mention that …

Example 4: Completion callbacks and JobContexts

Let’s modify Example 3 a bit. Suppose now we want to count the total number of words across all comments. We’ll need to use a counter. But suppose that instead of just displaying it in a console page, we want that number to get stored into the datastore again. Much like Task Queues, incoming email and XMPP, the callback is event driven, and therefore uses the HTTP request to an app URI model to dispatch. That is – we’ll define a servlet with a doPost() handler and read the input out of the parameters.

The first thing we’ll need to do is configure our Mapper to fire off the callback when done. We do this in mapreduce.xml:

  <configuration name="Count substring matches in all Comments">
    <property>
      <name>mapreduce.map.class</name>

      <!--  Set this to be your Mapper class  -->
      <value>com.ikai.mapperdemo.mappers.SubstringMatcherMapper</value>
    </property>

    <!--  This is a default tool that lets us iterate over datastore entities -->
    <property>
      <name>mapreduce.inputformat.class</name>
      <value>com.google.appengine.tools.mapreduce.DatastoreInputFormat</value>
    </property>

    <property>
      <name human="Entity Kind to Map Over">mapreduce.mapper.inputformat.datastoreinputformat.entitykind</name>
      <value template="optional">Comment</value>
    </property>

    <property>
      <name human="Search for substring">mapreduce.mapper.counter.substringtarget</name>
      <value template="optional">Substring</value>
    </property>

  </configuration>

Here’s the property we care about:

Name: mapreduce.appengine.donecallback.url
Value: /callbacks/word_count_completed

The value of this can map to any URI in your application. Just be sure that URI points to the Servlet that will be handling your callback. Let’s define the Mapper class:

package com.ikai.mapperdemo.mappers;

import java.util.logging.Logger;

import org.apache.hadoop.io.NullWritable;

import com.google.appengine.api.datastore.Entity;
import com.google.appengine.api.datastore.Key;
import com.google.appengine.tools.mapreduce.AppEngineMapper;

/**
 *
 * This mapper counts the number of total words across all comments. It cheats a
 * bit by just splitting on whitespace and just using the length. This mapper
 * demonstrates use of counters as well as using a completion callback.
 *
 * @author Ikai Lan
 *
 */
public class CountWordsMapper extends
		AppEngineMapper<Key, Entity, NullWritable, NullWritable> {
	private static final Logger log = Logger.getLogger(CountWordsMapper.class
			.getName());

	/*
	 * This is a bit of a lazy implementation more to prove a point than to
	 * actually be correct. Split on whitespace, count words
	 */
	@Override
	public void map(Key key, Entity value, Context context) {

		String comment = (String) value.getProperty("comment");
		if (comment != null) {
			String[] words = comment.split("\\s+");
			int wordCount = words.length;

			// Takes a "group" and a "counter"
			// We'll use these later to store the final count back in the
			// datastore
			context.getCounter("CommentWords", "count").increment(wordCount);
		}

	}
}

Not a lot that’s new here. We use the context again to store a counter. Note that we can increment by any value, not just 1.

Let’s take a look at what our servlet looks like that handles this callback:

package com.ikai.mapperdemo.servlets;

import java.io.IOException;
import java.util.Date;

import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.CounterGroup;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.JobID;

import com.google.appengine.api.datastore.DatastoreService;
import com.google.appengine.api.datastore.DatastoreServiceFactory;
import com.google.appengine.api.datastore.Entity;
import com.google.appengine.api.datastore.EntityNotFoundException;
import com.google.appengine.tools.mapreduce.MapReduceState;
import com.ikai.mapperdemo.mappers.CountWordsMapper;

/**
 * This is the servlet that takes care of any processing we have to do after we
 * have finished running {@link CountWordsMapper}.
 *
 * This is just a standard servlet - we can do anything we want here. We can use
 * any App Engine API such as email or XMPP, for instance, to notify an
 * administrator. We could also store a final state into the datastore - in
 * fact, that is what this example below does.
 *
 * @author Ikai Lan
 *
 */
@SuppressWarnings("serial")
public class WordCountCompletedCallbackServlet extends HttpServlet {

	public void doPost(HttpServletRequest req, HttpServletResponse resp)
			throws IOException {

		String jobIdName = req.getParameter("job_id");
		JobID jobId = JobID.forName(jobIdName);

		// A future iteration of this will likely contain a default
		// option if we don't care which DatastoreService instance we use.
		DatastoreService datastore = DatastoreServiceFactory
				.getDatastoreService();
		try {

			// We get the state back from the job_id parameter. The state is
			// serialized and stored in the datastore, so we pass an instance
			// of the datastore service.
			MapReduceState mrState = MapReduceState.getMapReduceStateFromJobID(
					datastore, jobId);

			// There's a bit of ceremony to get the actual counter. This
			// example is intentionally verbose for clarity. First get all the
			// Counters,
			// then we get the CounterGroup, then we get the Counter, then we
			// get the count.
			Counters counters = mrState.getCounters();
			CounterGroup counterGroup = counters.getGroup("CommentWords");
			Counter counter = counterGroup.findCounter("count");
			long wordCount = counter.getValue(); // Finally!

			// Let's create a special datastore Entity for this value so
			// we can reference it on the ViewComments page
			Entity totalCountEntity = new Entity("TotalWordCount",
					"total_word_count");
			totalCountEntity.setProperty("count", wordCount);

			// Now we timestamp this bad boy
			totalCountEntity.setProperty("updatedAt", new Date());
			datastore.put(totalCountEntity);

		} catch (EntityNotFoundException e) {
			throw new IOException("No datastore state");
		}

	}

}

The JobID comes as a String parameter. We get it like so:

String jobIdName = req.getParameter("job_id");
JobID jobId = JobID.forName(jobIdName);

Be aware of the imports used. Your IDE may import the wrong class, as there is a deprecated JobID and a non-deprecated version.

Once you have the JobID, you use it to retrieve the MapReduceState:

MapReduceState mrState = MapReduceState.getMapReduceStateFromJobID(datastore, jobId);

From the MapReduceState object, we have to perform a bit of a ceremony to get what we want. We need to:

1. Fetch the Counters from the MapReduceState
2. Fetch the appropriate CounterGroup from the Counters object
3. Fetch the named Counter from the CounterGroup
4. Fetch the value from the Counter

Here’s what it looks like in code:

Counters counters = mrState.getCounters();
CounterGroup counterGroup = counters.getGroup("CommentWords");
Counter counter = counterGroup.findCounter("count");
long wordCount = counter.getValue();

We can now do what we want with this count. In our servlet example, we save it to a datastore Entity and use it later on.

Get the code

You’re undoubtedly ready to start playing with this thing. You’ve got everything you need to know. First, here’s the getting started page for appengine-mapreduce in Java:

Here’s my sample source code on GitHub.

Summary

So there you have it: an easy to use tool for mapping operations across entire Entity Kinds. There are still a lot of topics to cover, and we’ll likely explore them in a future article. For instance, I didn’t have a chance to cover building your own InputFormat class. We’re still hard at work extending this framework (such as the “Shuffle” and “Reduce” phases), so please post your feedback in the App Engine groups or file bugs in the issue tracker.

About these ads

Written by Ikai Lan

July 9, 2010 at 3:35 pm

Posted in App Engine, Java, Java

29 Responses

Subscribe to comments with RSS.

  1. Didn’t know a Java version of the mapper library was available. GREAT news!!!!

    George Moschovitis

    July 11, 2010 at 1:20 pm

  2. Runnning mappers from the admin servlet is nice, but I want to be able to programmatically start predefined actions from my code. I would love to see an article describing how this could be implemented.

    George Moschovitis

    July 12, 2010 at 11:58 am

  3. The library is open source and build on existing APIs. Until that article exists, feel free to poke in the codeand see how it’s done.

    Ikai Lan

    July 12, 2010 at 12:01 pm

  4. awesome, very well written and helpful. Can you write blog posts like this on all appengine new features :)

    @lepah

    July 13, 2010 at 6:46 pm

  5. I will most certainly try to =)

    Ikai Lan

    July 13, 2010 at 6:48 pm

  6. Thank you Ikai!

    Francesco Passantino

    July 24, 2010 at 8:11 am

  7. Thanks for the clean introduction to this framework!

    A small remark: I think the code for example 3 and example 4 is switched (there is no mapreduce.mapper.counter.substringtarget in example 3).

    Pieter Coucke

    July 26, 2010 at 1:37 am

  8. Hello Ikai,
    It seems that there is no simple way to start a job from a cron. I wonder why ? Have you an opinion about that ?
    Thanks in advance,
    Didier

    dgirard

    July 26, 2010 at 2:25 pm

  9. Programmatic access to jobs is coming, though our priorities right now are shuffle and reduce. The whole thing is built on top of existing APIs, so feel free to look through the source code and adapt whatever functionality you need.

    Ikai Lan

    July 26, 2010 at 2:28 pm

  10. Thanks, Didier

    dgirard

    July 27, 2010 at 5:18 am

  11. In the demos I see you use DatastoreService API to add “matching” entity.

    Entity entry = new Entity(“Comment”);
    entry.setProperty(“comment”, comment);
    entry.setProperty(“createdAt”, new Date());
    entry.setProperty(“locale”, req.getLocale().toString());

    DatastoreService datastore = DatastoreServiceFactory.getDatastoreService();
    datastore.put(entry);

    Can JPA be used instead?

    alesj

    July 31, 2010 at 3:32 am

  12. Ah, forgot what I read. :-)

    “Note that this requires some familiarity with the Low-Level API. Don’t worry – entities edited or saved using the low-level API are accessible via managed persistence interface such as JDO/JPA (and vice versa). If you aren’t familiar with the low-level API, you can read more about it here on the Javadocs.”

    alesj

    July 31, 2010 at 4:14 am

  13. Thank you for this great post. The deletion mapper example is working fine in my app.
    Before developing my own mapper, one crucial question for me : is it possible to use the mapper to iterate on a entity kind A, and meanwhile to create entities of an entity kind B ?

    My need : mass-creation of entities B based on the content of the entities A. Thank you in advance for your answer.

    cyrille

    August 7, 2010 at 7:42 pm

  14. […] Usando el framework Mapper para java en la appEngine […]

  15. […] a comment » Since my last post describing App Engine mapreduce, a new InputReader has been added to the Java project for reading […]

  16. One VITAL comment I would have loved not to discover by myself : the Map() function does NOT get rid with the 30 sec. limit : each atomic piece of work has to be under the fatal 30 sec duration.

    This means that any attempt to use the Mapper with massive entities creation embedded in a big transaction will fail, if the Mapper cannot divide the work into smaller pieces.

    cyrille

    August 11, 2010 at 11:45 pm

  17. Great article, Ikai.

    Does context.getCounter(“SubstringMatch”, “count”) retrieve/store data from memcache? If so, will it flush data, if quota exceeds?

    Mani Doraisamy

    August 13, 2010 at 8:09 am

  18. Hi Ikai,

    First, congrats to your very interesting blog.

    I got a curious question regarding PooledToLowercaseMapper and DatastoreMutationPool. Having had a look at the code of both classes, I haven’t found a location where pooled ops (puts and/or deletes) are flushed if the count or size limit is not accidentally reached on the last invocation of the map method of a given mapper instance. Isn’t there a call to DatastoreMutationPool.flush() missing somewhere in a clean-up hook of the mapper class?

    Cheers, Remigius.

    Remigius

    August 19, 2010 at 5:18 am

  19. Thanks for the great tutorial! Well written, awesome feature.

    Amos

    August 28, 2010 at 10:17 pm

  20. Thank you for this well-written article!

    I have one question: I have setup and started the deletion mapper for one of my classes. There have been approx. 1 million entries and I want to delete them all.

    But I see now that the deletion process is *very* cpu intensive so that the free cpu amount of 6.5 hours is fully consumed within a few minutes and deleting only about 200.000 entries. Therefore, I will need about 5 days to delete all entries. Is this a normal behaviour?

    I am not even able to delete the running process, because my app is now over quota the whole day.

    Delo

    October 1, 2010 at 12:23 am

  21. Must we use the mapper console to launch a job? Isn’t there a way to launch something programmatically?

    Thanks.

    Johnny

    October 27, 2010 at 5:48 pm

  22. Mr.Lan
    First, thank you for your blog about mapper API.
    Recently, I want to use GAE for numerical computation.
    could you ask me whether mapper API can do that or not?
    I am sorry,I am a beginner about GAE,maybe my problem is very stupid. waiting for your reply. thank you.

    wu

    January 13, 2011 at 10:16 pm

  23. Hey Ikai,

    How can I get the value of an input parameter from the CallbackServlet?

    I don’t see any way to get the configuration object from MapReduceState.

    I know how to get it:
    MapReduceState mrState = MapReduceState.getMapReduceStateFromJobID(
    datastore, jobId);

    But once I have it, what do I do? I want to get the value of an input parameter passed to it.

    Scott Murphy

    July 14, 2011 at 5:15 pm

  24. […] find remotely close to this is MapReduce and most of the samples are based on Python. There is this blog entry which is very helpful but somewhat outdated since it predated the reduce portion. Then there is the […]

  25. Hi Ikai,

    Can you update your post ? It wasn’t compatible anymore with new appengine-mapreduce.

    Thank you.

    nxhoaf

    October 16, 2012 at 4:14 am

  26. […] find remotely close to this is MapReduce and most of the samples are based on Python. There is this blog entry which is very helpful but somewhat outdated since it predated the reduce portion. Then there is the […]

  27. […] the best introduction to this whole area of App Engine and highly scalable datastore operations is Ikai’s post on the Mapper from a couple of years ago. It’d have been perfect for this situation except there’s recently been a new version […]

  28. it would be great if you could write a post on how to migrate this to the new api’s.

    mufumbo

    July 14, 2013 at 1:59 pm


Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s