Java Programming Guide

The Spark Java API exposes all the Spark features available in the Scala version to Java. To learn the basics of Spark, we recommend reading through the Scala programming guide first; it should be easy to follow even if you don’t know Scala. This guide will show how to use the Spark features described there in Java.

The Spark Java API is defined in the org.apache.spark.api.java package, and includes a JavaSparkContext for initializing Spark and JavaRDD classes, which support the same methods as their Scala counterparts but take Java functions and return Java data and collection types. The main differences have to do with passing functions to RDD operations (e.g. map) and handling RDDs of different types, as discussed next.

Key Differences in the Java API

There are a few key differences between the Java and Scala APIs:

RDD Classes

Spark defines additional operations on RDDs of key-value pairs and doubles, such as reduceByKey, join, and stdev.

In the Scala API, these methods are automatically added using Scala’s implicit conversions mechanism.

In the Java API, the extra methods are defined in the JavaPairRDD and JavaDoubleRDD classes. RDD methods like map are overloaded by specialized PairFunction and DoubleFunction classes, allowing them to return RDDs of the appropriate types. Common methods like filter and sample are implemented by each specialized RDD class, so filtering a PairRDD returns a new PairRDD, etc (this acheives the “same-result-type” principle used by the Scala collections framework).

Function Classes

The following table lists the function classes used by the Java API. Each class has a single abstract method, call(), that must be implemented.

ClassFunction Type
Function<T, R>T => R
DoubleFunction<T>T => Double
PairFunction<T, K, V>T => Tuple2<K, V>
FlatMapFunction<T, R>T => Iterable<R>
DoubleFlatMapFunction<T>T => Iterable<Double>
PairFlatMapFunction<T, K, V>T => Iterable<Tuple2<K, V>>
Function2<T1, T2, R>T1, T2 => R (function of two arguments)

Storage Levels

RDD storage level constants, such as MEMORY_AND_DISK, are declared in the org.apache.spark.api.java.StorageLevels class. To define your own storage level, you can use StorageLevels.create(…).

Other Features

The Java API supports other Spark features, including accumulators, broadcast variables, and caching.

Example

As an example, we will implement word count using the Java API.

import org.apache.spark.api.java.*;
import org.apache.spark.api.java.function.*;

JavaSparkContext sc = new JavaSparkContext(...);
JavaRDD<String> lines = ctx.textFile("hdfs://...");
JavaRDD<String> words = lines.flatMap(
  new FlatMapFunction<String, String>() {
    public Iterable<String> call(String s) {
      return Arrays.asList(s.split(" "));
    }
  }
);

The word count program starts by creating a JavaSparkContext, which accepts the same parameters as its Scala counterpart. JavaSparkContext supports the same data loading methods as the regular SparkContext; here, textFile loads lines from text files stored in HDFS.

To split the lines into words, we use flatMap to split each line on whitespace. flatMap is passed a FlatMapFunction that accepts a string and returns an java.lang.Iterable of strings.

Here, the FlatMapFunction was created inline; another option is to subclass FlatMapFunction and pass an instance to flatMap:

class Split extends FlatMapFunction<String, String> {
  public Iterable<String> call(String s) {
    return Arrays.asList(s.split(" "));
  }
);
JavaRDD<String> words = lines.flatMap(new Split());

Continuing with the word count example, we map each word to a (word, 1) pair:

import scala.Tuple2;
JavaPairRDD<String, Integer> ones = words.map(
  new PairFunction<String, String, Integer>() {
    public Tuple2<String, Integer> call(String s) {
      return new Tuple2(s, 1);
    }
  }
);

Note that map was passed a PairFunction<String, String, Integer> and returned a JavaPairRDD<String, Integer>.

To finish the word count program, we will use reduceByKey to count the occurrences of each word:

JavaPairRDD<String, Integer> counts = ones.reduceByKey(
  new Function2<Integer, Integer, Integer>() {
    public Integer call(Integer i1, Integer i2) {
      return i1 + i2;
    }
  }
);

Here, reduceByKey is passed a Function2, which implements a function with two arguments. The resulting JavaPairRDD contains (word, count) pairs.

In this example, we explicitly showed each intermediate RDD. It is also possible to chain the RDD transformations, so the word count example could also be written as:

JavaPairRDD<String, Integer> counts = lines.flatMap(
    ...
  ).map(
    ...
  ).reduceByKey(
    ...
  );

There is no performance difference between these approaches; the choice is just a matter of style.

Javadoc

We currently provide documentation for the Java API as Scaladoc, in the org.apache.spark.api.java package, because some of the classes are implemented in Scala. The main downside is that the types and function definitions show Scala syntax (for example, def reduce(func: Function2[T, T]): T instead of T reduce(Function2<T, T> func)). We hope to generate documentation with Java-style syntax in the future.

Where to Go from Here

Spark includes several sample programs using the Java API in examples/src/main/java. You can run them by passing the class name to the run-example script included in Spark; for example:

./run-example org.apache.spark.examples.JavaWordCount

Each example program prints usage help when run without any arguments.