Getting Started
- Starting Point: SparkSession
- Creating DataFrames
- | age| name|
- +—-+——-+
- |null|Michael|
- | 30| Andy|
- | 19| Justin|
- +—-+——-+
- Print the schema in a tree format
- |– age: long (nullable = true)
- |– name: string (nullable = true)
- | name|
- +——-+
- |Michael|
- | Andy|
- | Justin|
- +——-+
- | name|(age + 1)|
- +——-+———+
- |Michael| null|
- | Andy| 31|
- | Justin| 20|
- +——-+———+
- |age|name|
- +—+—-+
- | 30|Andy|
- +—+—-+
- | age|count|
- +—-+—–+
- | 19| 1|
- |null| 1|
- | 30| 1|
- +—-+—–+
- | age| name|
- +—-+——-+
- |null|Michael|
- | 30| Andy|
- | 19| Justin|
- +—-+——-+
- | age| name|
- +—-+——-+
- |null|Michael|
- | 30| Andy|
- | 19| Justin|
- +—-+——-+
- | age| name|
- +—-+——-+
- |null|Michael|
- | 30| Andy|
- | 19| Justin|
- +—-+——-+
- rdd returns the content as an :class:
pyspark.RDD
of :class:Row
. - | name|
- +——-+
- |Michael|
- | Andy|
- | Justin|
- +——-+
Starting Point: SparkSession
The entry point into all functionality in Spark is the SparkSession
class. To create a basic SparkSession
, just use SparkSession.builder()
:
import org.apache.spark.sql.SparkSession
val spark = SparkSession .builder() .appName(“Spark SQL basic example”) .config(“spark.some.config.option”, “some-value”) .getOrCreate()
// For implicit conversions like converting RDDs to DataFrames import spark.implicits._
The entry point into all functionality in Spark is the SparkSession
class. To create a basic SparkSession
, just use SparkSession.builder()
:
import org.apache.spark.sql.SparkSession;
SparkSession spark = SparkSession .builder() .appName(“Java Spark SQL basic example”) .config(“spark.some.config.option”, “some-value”) .getOrCreate();
The entry point into all functionality in Spark is the SparkSession
class. To create a basic SparkSession
, just use SparkSession.builder
:
from pyspark.sql import SparkSession
spark = SparkSession \ .builder \ .appName(“Python Spark SQL basic example”) \ .config(“spark.some.config.option”, “some-value”) \ .getOrCreate()
The entry point into all functionality in Spark is the SparkSession
class. To initialize a basic SparkSession
, just call sparkR.session()
:
sparkR.session(appName = “R Spark SQL basic example”, sparkConfig = list(spark.some.config.option = “some-value”))
</span><div>Find full example code at “examples/src/main/r/RSparkSQLExample.R” in the Spark repo.</div>
Note that when invoked for the first time, sparkR.session()
initializes a global SparkSession
singleton instance, and always returns a reference to this instance for successive invocations. In this way, users only need to initialize the SparkSession
once, then SparkR functions like read.df
will be able to access this global instance implicitly, and users don’t need to pass the SparkSession
instance around.
SparkSession
in Spark 2.0 provides builtin support for Hive features including the ability to
write queries using HiveQL, access to Hive UDFs, and the ability to read data from Hive tables.
To use these features, you do not need to have an existing Hive setup.
Creating DataFrames
With a SparkSession
, applications can create DataFrames from an existing RDD
,
from a Hive table, or from Spark data sources.
As an example, the following creates a DataFrame based on the content of a JSON file:
val df = spark.read.json(“examples/src/main/resources/people.json”)
// Displays the content of the DataFrame to stdout df.show() // +—-+——-+ // | age| name| // +—-+——-+ // |null|Michael| // | 30| Andy| // | 19| Justin| // +—-+——-+
With a SparkSession
, applications can create DataFrames from an existing RDD
,
from a Hive table, or from Spark data sources.
As an example, the following creates a DataFrame based on the content of a JSON file:
import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row;
Dataset<Row> df = spark.read().json(“examples/src/main/resources/people.json”);
// Displays the content of the DataFrame to stdout df.show(); // +—-+——-+ // | age| name| // +—-+——-+ // |null|Michael| // | 30| Andy| // | 19| Justin| // +—-+——-+
With a SparkSession
, applications can create DataFrames from an existing RDD
,
from a Hive table, or from Spark data sources.
As an example, the following creates a DataFrame based on the content of a JSON file:
# spark is an existing SparkSession df = spark.read.json(“examples/src/main/resources/people.json”) # Displays the content of the DataFrame to stdout df.show() # +—-+——-+
| age| name|
+—-+——-+
|null|Michael|
| 30| Andy|
| 19| Justin|
+—-+——-+
</span>
With a SparkSession
, applications can create DataFrames from a local R data.frame,
from a Hive table, or from Spark data sources.
As an example, the following creates a DataFrame based on the content of a JSON file:
df <- read.json(“examples/src/main/resources/people.json”)
</span># Displays the content of the DataFrame head(df) ## age name ## 1 NA Michael ## 2 30 Andy ## 3 19 Justin
</span># Another method to print the first few rows and optionally truncate the printing of long values showDF(df) ## +—-+——-+ ## | age| name| ## +—-+——-+ ## |null|Michael| ## | 30| Andy| ## | 19| Justin| ## +—-+——-+
</span><div>Find full example code at “examples/src/main/r/RSparkSQLExample.R” in the Spark repo.</div>
Untyped Dataset Operations (aka DataFrame Operations)
DataFrames provide a domain-specific language for structured data manipulation in Scala, Java, Python and R.
As mentioned above, in Spark 2.0, DataFrames are just Dataset of Row
s in Scala and Java API. These operations are also referred as “untyped transformations” in contrast to “typed transformations” come with strongly typed Scala/Java Datasets.
Here we include some basic examples of structured data processing using Datasets:
// This import is needed to use the $-notation import spark.implicits._ // Print the schema in a tree format df.printSchema() // root // |– age: long (nullable = true) // |– name: string (nullable = true)
// Select only the “name” column df.select(“name”).show() // +——-+ // | name| // +——-+ // |Michael| // | Andy| // | Justin| // +——-+
// Select everybody, but increment the age by 1 df.select($“name”, $“age” + 1).show() // +——-+———+ // | name|(age + 1)| // +——-+———+ // |Michael| null| // | Andy| 31| // | Justin| 20| // +——-+———+
// Select people older than 21 df.filter($“age” > 21).show() // +—+—-+ // |age|name| // +—+—-+ // | 30|Andy| // +—+—-+
// Count people by age df.groupBy(“age”).count().show() // +—-+—–+ // | age|count| // +—-+—–+ // | 19| 1| // |null| 1| // | 30| 1| // +—-+—–+
For a complete list of the types of operations that can be performed on a Dataset, refer to the API Documentation.
In addition to simple column references and expressions, Datasets also have a rich library of functions including string manipulation, date arithmetic, common math operations and more. The complete list is available in the DataFrame Function Reference.
// col(“…”) is preferable to df.col(“…”) import static org.apache.spark.sql.functions.col;
// Print the schema in a tree format df.printSchema(); // root // |– age: long (nullable = true) // |– name: string (nullable = true)
// Select only the “name” column df.select(“name”).show(); // +——-+ // | name| // +——-+ // |Michael| // | Andy| // | Justin| // +——-+
// Select everybody, but increment the age by 1 df.select(col(“name”), col(“age”).plus(1)).show(); // +——-+———+ // | name|(age + 1)| // +——-+———+ // |Michael| null| // | Andy| 31| // | Justin| 20| // +——-+———+
// Select people older than 21 df.filter(col(“age”).gt(21)).show(); // +—+—-+ // |age|name| // +—+—-+ // | 30|Andy| // +—+—-+
// Count people by age df.groupBy(“age”).count().show(); // +—-+—–+ // | age|count| // +—-+—–+ // | 19| 1| // |null| 1| // | 30| 1| // +—-+—–+
For a complete list of the types of operations that can be performed on a Dataset refer to the API Documentation.
In addition to simple column references and expressions, Datasets also have a rich library of functions including string manipulation, date arithmetic, common math operations and more. The complete list is available in the DataFrame Function Reference.
In Python, it’s possible to access a DataFrame’s columns either by attribute
(df.age
) or by indexing (df['age']
). While the former is convenient for
interactive data exploration, users are highly encouraged to use the
latter form, which is future proof and won’t break with column names that
are also attributes on the DataFrame class.
# spark, df are from the previous example
Print the schema in a tree format
</span>df.printSchema() # root
|– age: long (nullable = true)
|– name: string (nullable = true)
</span> # Select only the “name” column df.select(“name”).show() # +——-+
| name|
+——-+
|Michael|
| Andy|
| Justin|
+——-+
</span> # Select everybody, but increment the age by 1 df.select(df[‘name’], df[‘age’] + 1).show() # +——-+———+
| name|(age + 1)|
+——-+———+
|Michael| null|
| Andy| 31|
| Justin| 20|
+——-+———+
</span> # Select people older than 21 df.filter(df[‘age’] > 21).show() # +—+—-+
|age|name|
+—+—-+
| 30|Andy|
+—+—-+
</span> # Count people by age df.groupBy(“age”).count().show() # +—-+—–+
| age|count|
+—-+—–+
| 19| 1|
|null| 1|
| 30| 1|
+—-+—–+
</span>
For a complete list of the types of operations that can be performed on a DataFrame refer to the API Documentation.
In addition to simple column references and expressions, DataFrames also have a rich library of functions including string manipulation, date arithmetic, common math operations and more. The complete list is available in the DataFrame Function Reference.
# Create the DataFrame df <- read.json(“examples/src/main/resources/people.json”)
</span># Show the content of the DataFrame head(df) ## age name ## 1 NA Michael ## 2 30 Andy ## 3 19 Justin
</span># Print the schema in a tree format printSchema(df) ## root ## |– age: long (nullable = true) ## |– name: string (nullable = true)
</span># Select only the “name” column head(select(df, “name”)) ## name ## 1 Michael ## 2 Andy ## 3 Justin
</span># Select everybody, but increment the age by 1 head(select(df, df$name, df$age + 1)) ## name (age + 1.0) ## 1 Michael NA ## 2 Andy 31 ## 3 Justin 20
</span># Select people older than 21 head(where(df, df$age > 21)) ## age name ## 1 30 Andy
</span># Count people by age head(count(groupBy(df, “age”))) ## age count ## 1 19 1 ## 2 NA 1 ## 3 30 1
</span><div>Find full example code at “examples/src/main/r/RSparkSQLExample.R” in the Spark repo.</div>
For a complete list of the types of operations that can be performed on a DataFrame refer to the API Documentation.
In addition to simple column references and expressions, DataFrames also have a rich library of functions including string manipulation, date arithmetic, common math operations and more. The complete list is available in the DataFrame Function Reference.
Running SQL Queries Programmatically
The sql
function on a SparkSession
enables applications to run SQL queries programmatically and returns the result as a DataFrame
.
// Register the DataFrame as a SQL temporary view df.createOrReplaceTempView(“people”)
val sqlDF = spark.sql(“SELECT * FROM people”) sqlDF.show() // +—-+——-+ // | age| name| // +—-+——-+ // |null|Michael| // | 30| Andy| // | 19| Justin| // +—-+——-+
The sql
function on a SparkSession
enables applications to run SQL queries programmatically and returns the result as a Dataset<Row>
.
import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row;
// Register the DataFrame as a SQL temporary view df.createOrReplaceTempView(“people”);
Dataset<Row> sqlDF = spark.sql(“SELECT * FROM people”); sqlDF.show(); // +—-+——-+ // | age| name| // +—-+——-+ // |null|Michael| // | 30| Andy| // | 19| Justin| // +—-+——-+
The sql
function on a SparkSession
enables applications to run SQL queries programmatically and returns the result as a DataFrame
.
# Register the DataFrame as a SQL temporary view df.createOrReplaceTempView(“people”)
sqlDF = spark.sql(“SELECT * FROM people”) sqlDF.show() # +—-+——-+
| age| name|
+—-+——-+
|null|Michael|
| 30| Andy|
| 19| Justin|
+—-+——-+
</span>
The sql
function enables applications to run SQL queries programmatically and returns the result as a SparkDataFrame
.
df <- sql(“SELECT * FROM table”)
</span><div>Find full example code at “examples/src/main/r/RSparkSQLExample.R” in the Spark repo.</div>
Global Temporary View
Temporary views in Spark SQL are session-scoped and will disappear if the session that creates it
terminates. If you want to have a temporary view that is shared among all sessions and keep alive
until the Spark application terminates, you can create a global temporary view. Global temporary
view is tied to a system preserved database global_temp
, and we must use the qualified name to
refer it, e.g. SELECT * FROM global_temp.view1
.
// Register the DataFrame as a global temporary view df.createGlobalTempView(“people”)
// Global temporary view is tied to a system preserved database global_temp
spark.sql(“SELECT * FROM global_temp.people”).show()
// +—-+——-+
// | age| name|
// +—-+——-+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +—-+——-+
// Global temporary view is cross-session spark.newSession().sql(“SELECT * FROM global_temp.people”).show() // +—-+——-+ // | age| name| // +—-+——-+ // |null|Michael| // | 30| Andy| // | 19| Justin| // +—-+——-+
// Register the DataFrame as a global temporary view df.createGlobalTempView(“people”);
// Global temporary view is tied to a system preserved database global_temp
spark.sql(“SELECT * FROM global_temp.people”).show();
// +—-+——-+
// | age| name|
// +—-+——-+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +—-+——-+
// Global temporary view is cross-session spark.newSession().sql(“SELECT * FROM global_temp.people”).show(); // +—-+——-+ // | age| name| // +—-+——-+ // |null|Michael| // | 30| Andy| // | 19| Justin| // +—-+——-+
# Register the DataFrame as a global temporary view df.createGlobalTempView(“people”)
# Global temporary view is tied to a system preserved database global_temp
spark.sql(“SELECT * FROM global_temp.people”).show()
# +—-+——-+
| age| name|
+—-+——-+
|null|Michael|
| 30| Andy|
| 19| Justin|
+—-+——-+
</span> # Global temporary view is cross-session spark.newSession().sql(“SELECT * FROM global_temp.people”).show() # +—-+——-+
| age| name|
+—-+——-+
|null|Michael|
| 30| Andy|
| 19| Justin|
+—-+——-+
</span>
Creating Datasets
Datasets are similar to RDDs, however, instead of using Java serialization or Kryo they use a specialized Encoder to serialize the objects for processing or transmitting over the network. While both encoders and standard serialization are responsible for turning an object into bytes, encoders are code generated dynamically and use a format that allows Spark to perform many operations like filtering, sorting and hashing without deserializing the bytes back into an object.
case class Person(name: String, age: Long)
// Encoders are created for case classes val caseClassDS = Seq(Person(“Andy”, 32)).toDS() caseClassDS.show() // +—-+—+ // |name|age| // +—-+—+ // |Andy| 32| // +—-+—+
// Encoders for most common types are automatically provided by importing spark.implicits._ val primitiveDS = Seq(1, 2, 3).toDS() primitiveDS.map(_ + 1).collect() // Returns: Array(2, 3, 4)
// DataFrames can be converted to a Dataset by providing a class. Mapping will be done by name val path = “examples/src/main/resources/people.json” val peopleDS = spark.read.json(path).as[Person] peopleDS.show() // +—-+——-+ // | age| name| // +—-+——-+ // |null|Michael| // | 30| Andy| // | 19| Justin| // +—-+——-+
import java.util.Arrays; import java.util.Collections; import java.io.Serializable;
import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.Encoder; import org.apache.spark.sql.Encoders;
public static class Person implements Serializable { private String name; private int age;
public String getName() { return name; }
public void setName(String name) { this.name = name; }
public int getAge() { return age; }
public void setAge(int age) { this.age = age; } }
// Create an instance of a Bean class Person person = new Person(); person.setName(“Andy”); person.setAge(32);
// Encoders are created for Java beans Encoder<Person> personEncoder = Encoders.bean(Person.class); Dataset<Person> javaBeanDS = spark.createDataset( Collections.singletonList(person), personEncoder ); javaBeanDS.show(); // +—+—-+ // |age|name| // +—+—-+ // | 32|Andy| // +—+—-+
// Encoders for most common types are provided in class Encoders Encoder<Integer> integerEncoder = Encoders.INT(); Dataset<Integer> primitiveDS = spark.createDataset(Arrays.asList(1, 2, 3), integerEncoder); Dataset<Integer> transformedDS = primitiveDS.map( (MapFunction<Integer, Integer>) value -> value + 1, integerEncoder); transformedDS.collect(); // Returns [2, 3, 4]
// DataFrames can be converted to a Dataset by providing a class. Mapping based on name String path = “examples/src/main/resources/people.json”; Dataset<Person> peopleDS = spark.read().json(path).as(personEncoder); peopleDS.show(); // +—-+——-+ // | age| name| // +—-+——-+ // |null|Michael| // | 30| Andy| // | 19| Justin| // +—-+——-+
Interoperating with RDDs
Spark SQL supports two different methods for converting existing RDDs into Datasets. The first method uses reflection to infer the schema of an RDD that contains specific types of objects. This reflection-based approach leads to more concise code and works well when you already know the schema while writing your Spark application.
The second method for creating Datasets is through a programmatic interface that allows you to construct a schema and then apply it to an existing RDD. While this method is more verbose, it allows you to construct Datasets when the columns and their types are not known until runtime.
Inferring the Schema Using Reflection
The Scala interface for Spark SQL supports automatically converting an RDD containing case classes
to a DataFrame. The case class
defines the schema of the table. The names of the arguments to the case class are read using
reflection and become the names of the columns. Case classes can also be nested or contain complex
types such as Seq
s or Array
s. This RDD can be implicitly converted to a DataFrame and then be
registered as a table. Tables can be used in subsequent SQL statements.
// For implicit conversions from RDDs to DataFrames import spark.implicits._
// Create an RDD of Person objects from a text file, convert it to a Dataframe val peopleDF = spark.sparkContext .textFile(“examples/src/main/resources/people.txt”) .map(_.split(”,”)) .map(attributes => Person(attributes(0), attributes(1).trim.toInt)) .toDF() // Register the DataFrame as a temporary view peopleDF.createOrReplaceTempView(“people”)
// SQL statements can be run by using the sql methods provided by Spark val teenagersDF = spark.sql(“SELECT name, age FROM people WHERE age BETWEEN 13 AND 19”)
// The columns of a row in the result can be accessed by field index teenagersDF.map(teenager => “Name: “ + teenager(0)).show() // +————+ // | value| // +————+ // |Name: Justin| // +————+
// or by field name teenagersDF.map(teenager => “Name: “ + teenager.getAs[String](“name”)).show() // +————+ // | value| // +————+ // |Name: Justin| // +————+
// No pre-defined encoders for Dataset[Map[K,V]], define explicitly implicit val mapEncoder = org.apache.spark.sql.Encoders.kryo[Map[String, Any]] // Primitive types and case classes can be also defined as // implicit val stringIntMapEncoder: Encoder[Map[String, Any]] = ExpressionEncoder()
// row.getValuesMap[T] retrieves multiple columns at once into a Map[String, T] teenagersDF.map(teenager => teenager.getValuesMap[Any](List(“name”, “age”))).collect() // Array(Map(“name” -> “Justin”, “age” -> 19))
Spark SQL supports automatically converting an RDD of
JavaBeans into a DataFrame.
The BeanInfo
, obtained using reflection, defines the schema of the table. Currently, Spark SQL
does not support JavaBeans that contain Map
field(s). Nested JavaBeans and List
or Array
fields are supported though. You can create a JavaBean by creating a class that implements
Serializable and has getters and setters for all of its fields.
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.Encoder; import org.apache.spark.sql.Encoders;
// Create an RDD of Person objects from a text file JavaRDD<Person> peopleRDD = spark.read() .textFile(“examples/src/main/resources/people.txt”) .javaRDD() .map(line -> { String[] parts = line.split(”,”); Person person = new Person(); person.setName(parts[0]); person.setAge(Integer.parseInt(parts[1].trim())); return person; });
// Apply a schema to an RDD of JavaBeans to get a DataFrame Dataset<Row> peopleDF = spark.createDataFrame(peopleRDD, Person.class); // Register the DataFrame as a temporary view peopleDF.createOrReplaceTempView(“people”);
// SQL statements can be run by using the sql methods provided by spark Dataset<Row> teenagersDF = spark.sql(“SELECT name FROM people WHERE age BETWEEN 13 AND 19”);
// The columns of a row in the result can be accessed by field index Encoder<String> stringEncoder = Encoders.STRING(); Dataset<String> teenagerNamesByIndexDF = teenagersDF.map( (MapFunction<Row, String>) row -> “Name: “ + row.getString(0), stringEncoder); teenagerNamesByIndexDF.show(); // +————+ // | value| // +————+ // |Name: Justin| // +————+
// or by field name Dataset<String> teenagerNamesByFieldDF = teenagersDF.map( (MapFunction<Row, String>) row -> “Name: “ + row.<String>getAs(“name”), stringEncoder); teenagerNamesByFieldDF.show(); // +————+ // | value| // +————+ // |Name: Justin| // +————+
Spark SQL can convert an RDD of Row objects to a DataFrame, inferring the datatypes. Rows are constructed by passing a list of key/value pairs as kwargs to the Row class. The keys of this list define the column names of the table, and the types are inferred by sampling the whole dataset, similar to the inference that is performed on JSON files.
from pyspark.sql import Row
sc = spark.sparkContext
# Load a text file and convert each line to a Row. lines = sc.textFile(“examples/src/main/resources/people.txt”) parts = lines.map(lambda l: l.split(”,”)) people = parts.map(lambda p: Row(name=p[0], age=int(p[1])))
# Infer the schema, and register the DataFrame as a table. schemaPeople = spark.createDataFrame(people) schemaPeople.createOrReplaceTempView(“people”)
# SQL can be run over DataFrames that have been registered as a table. teenagers = spark.sql(“SELECT name FROM people WHERE age >= 13 AND age <= 19”)
# The results of SQL queries are Dataframe objects.
rdd returns the content as an :class:pyspark.RDD
of :class:Row
.
</span>teenNames = teenagers.rdd.map(lambda p: “Name: “ + p.name).collect() for name in teenNames: print(name) # Name: Justin
Programmatically Specifying the Schema
When case classes cannot be defined ahead of time (for example,
the structure of records is encoded in a string, or a text dataset will be parsed
and fields will be projected differently for different users),
a DataFrame
can be created programmatically with three steps.
- Create an RDD of
Row
s from the original RDD; - Create the schema represented by a
StructType
matching the structure ofRow
s in the RDD created in Step 1. - Apply the schema to the RDD of
Row
s viacreateDataFrame
method provided bySparkSession
.
For example:
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
// Create an RDD val peopleRDD = spark.sparkContext.textFile(“examples/src/main/resources/people.txt”)
// The schema is encoded in a string val schemaString = “name age”
// Generate the schema based on the string of schema val fields = schemaString.split(” “) .map(fieldName => StructField(fieldName, StringType, nullable = true)) val schema = StructType(fields)
// Convert records of the RDD (people) to Rows val rowRDD = peopleRDD .map(_.split(”,”)) .map(attributes => Row(attributes(0), attributes(1).trim))
// Apply the schema to the RDD val peopleDF = spark.createDataFrame(rowRDD, schema)
// Creates a temporary view using the DataFrame peopleDF.createOrReplaceTempView(“people”)
// SQL can be run over a temporary view created using DataFrames val results = spark.sql(“SELECT name FROM people”)
// The results of SQL queries are DataFrames and support all the normal RDD operations // The columns of a row in the result can be accessed by field index or by field name results.map(attributes => “Name: “ + attributes(0)).show() // +————-+ // | value| // +————-+ // |Name: Michael| // | Name: Andy| // | Name: Justin| // +————-+
When JavaBean classes cannot be defined ahead of time (for example,
the structure of records is encoded in a string, or a text dataset will be parsed and
fields will be projected differently for different users),
a Dataset<Row>
can be created programmatically with three steps.
- Create an RDD of
Row
s from the original RDD; - Create the schema represented by a
StructType
matching the structure ofRow
s in the RDD created in Step 1. - Apply the schema to the RDD of
Row
s viacreateDataFrame
method provided bySparkSession
.
For example:
import java.util.ArrayList; import java.util.List;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row;
import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType;
// Create an RDD JavaRDD<String> peopleRDD = spark.sparkContext() .textFile(“examples/src/main/resources/people.txt”, 1) .toJavaRDD();
// The schema is encoded in a string String schemaString = “name age”;
// Generate the schema based on the string of schema List<StructField> fields = new ArrayList<>(); for (String fieldName : schemaString.split(” “)) { StructField field = DataTypes.createStructField(fieldName, DataTypes.StringType, true); fields.add(field); } StructType schema = DataTypes.createStructType(fields);
// Convert records of the RDD (people) to Rows JavaRDD<Row> rowRDD = peopleRDD.map((Function<String, Row>) record -> { String[] attributes = record.split(”,”); return RowFactory.create(attributes[0], attributes[1].trim()); });
// Apply the schema to the RDD Dataset<Row> peopleDataFrame = spark.createDataFrame(rowRDD, schema);
// Creates a temporary view using the DataFrame peopleDataFrame.createOrReplaceTempView(“people”);
// SQL can be run over a temporary view created using DataFrames Dataset<Row> results = spark.sql(“SELECT name FROM people”);
// The results of SQL queries are DataFrames and support all the normal RDD operations // The columns of a row in the result can be accessed by field index or by field name Dataset<String> namesDS = results.map( (MapFunction<Row, String>) row -> “Name: “ + row.getString(0), Encoders.STRING()); namesDS.show(); // +————-+ // | value| // +————-+ // |Name: Michael| // | Name: Andy| // | Name: Justin| // +————-+
When a dictionary of kwargs cannot be defined ahead of time (for example,
the structure of records is encoded in a string, or a text dataset will be parsed and
fields will be projected differently for different users),
a DataFrame
can be created programmatically with three steps.
- Create an RDD of tuples or lists from the original RDD;
- Create the schema represented by a
StructType
matching the structure of tuples or lists in the RDD created in the step 1. - Apply the schema to the RDD via
createDataFrame
method provided bySparkSession
.
For example:
# Import data types from pyspark.sql.types import *
sc = spark.sparkContext
# Load a text file and convert each line to a Row. lines = sc.textFile(“examples/src/main/resources/people.txt”) parts = lines.map(lambda l: l.split(”,”)) # Each line is converted to a tuple. people = parts.map(lambda p: (p[0], p[1].strip()))
# The schema is encoded in a string. schemaString = “name age”
fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()] schema = StructType(fields)
# Apply the schema to the RDD. schemaPeople = spark.createDataFrame(people, schema)
# Creates a temporary view using the DataFrame schemaPeople.createOrReplaceTempView(“people”)
# SQL can be run over DataFrames that have been registered as a table. results = spark.sql(“SELECT name FROM people”)
results.show() # +——-+
| name|
+——-+
|Michael|
| Andy|
| Justin|
+——-+
</span>
Scalar Functions
(to be filled soon)
Aggregations
The built-in DataFrames functions provide common
aggregations such as count()
, countDistinct()
, avg()
, max()
, min()
, etc.
While those functions are designed for DataFrames, Spark SQL also has type-safe versions for some of them in
Scala and
Java to work with strongly typed Datasets.
Moreover, users are not limited to the predefined aggregate functions and can create their own.
Untyped User-Defined Aggregate Functions
Users have to extend the UserDefinedAggregateFunction abstract class to implement a custom untyped aggregate function. For example, a user-defined average can look like:
import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.expressions.MutableAggregationBuffer import org.apache.spark.sql.expressions.UserDefinedAggregateFunction import org.apache.spark.sql.types._
object MyAverage extends UserDefinedAggregateFunction {
// Data types of input arguments of this aggregate function
def inputSchema: StructType = StructType(StructField(“inputColumn”, LongType) :: Nil)
// Data types of values in the aggregation buffer
def bufferSchema: StructType = {
StructType(StructField(“sum”, LongType) :: StructField(“count”, LongType) :: Nil)
}
// The data type of the returned value
def dataType: DataType = DoubleType
// Whether this function always returns the same output on the identical input
def deterministic: Boolean = true
// Initializes the given aggregation buffer. The buffer itself is a Row
that in addition to
// standard methods like retrieving a value at an index (e.g., get(), getBoolean()), provides
// the opportunity to update its values. Note that arrays and maps inside the buffer are still
// immutable.
def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer(0) = 0L
buffer(1) = 0L
}
// Updates the given aggregation buffer buffer
with new input data from input
def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
if (!input.isNullAt(0)) {
buffer(0) = buffer.getLong(0) + input.getLong(0)
buffer(1) = buffer.getLong(1) + 1
}
}
// Merges two aggregation buffers and stores the updated buffer values back to buffer1
def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
buffer1(0) = buffer1.getLong(0) + buffer2.getLong(0)
buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1)
}
// Calculates the final result
def evaluate(buffer: Row): Double = buffer.getLong(0).toDouble / buffer.getLong(1)
}
// Register the function to access it spark.udf.register(“myAverage”, MyAverage)
val df = spark.read.json(“examples/src/main/resources/employees.json”) df.createOrReplaceTempView(“employees”) df.show() // +——-+——+ // | name|salary| // +——-+——+ // |Michael| 3000| // | Andy| 4500| // | Justin| 3500| // | Berta| 4000| // +——-+——+
val result = spark.sql(“SELECT myAverage(salary) as average_salary FROM employees”) result.show() // +————–+ // |average_salary| // +————–+ // | 3750.0| // +————–+
import java.util.ArrayList; import java.util.List;
import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.expressions.MutableAggregationBuffer; import org.apache.spark.sql.expressions.UserDefinedAggregateFunction; import org.apache.spark.sql.types.DataType; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType;
public static class MyAverage extends UserDefinedAggregateFunction {
private StructType inputSchema; private StructType bufferSchema;
public MyAverage() { List<StructField> inputFields = new ArrayList<>(); inputFields.add(DataTypes.createStructField(“inputColumn”, DataTypes.LongType, true)); inputSchema = DataTypes.createStructType(inputFields);
<span class="nc">List</span><span class="o"><</span><span class="nc">StructField</span><span class="o">></span> <span class="n">bufferFields</span> <span class="o">=</span> <span class="k">new</span> <span class="nc">ArrayList</span><span class="o"><>();</span>
<span class="n">bufferFields</span><span class="o">.</span><span class="na">add</span><span class="o">(</span><span class="nc">DataTypes</span><span class="o">.</span><span class="na">createStructField</span><span class="o">(</span><span class="s">"sum"</span><span class="o">,</span> <span class="nc">DataTypes</span><span class="o">.</span><span class="na">LongType</span><span class="o">,</span> <span class="kc">true</span><span class="o">));</span>
<span class="n">bufferFields</span><span class="o">.</span><span class="na">add</span><span class="o">(</span><span class="nc">DataTypes</span><span class="o">.</span><span class="na">createStructField</span><span class="o">(</span><span class="s">"count"</span><span class="o">,</span> <span class="nc">DataTypes</span><span class="o">.</span><span class="na">LongType</span><span class="o">,</span> <span class="kc">true</span><span class="o">));</span>
<span class="n">bufferSchema</span> <span class="o">=</span> <span class="nc">DataTypes</span><span class="o">.</span><span class="na">createStructType</span><span class="o">(</span><span class="n">bufferFields</span><span class="o">);</span> <span class="o">}</span> <span class="c1">// Data types of input arguments of this aggregate function</span> <span class="kd">public</span> <span class="nc">StructType</span> <span class="nf">inputSchema</span><span class="o">()</span> <span class="o">{</span>
<span class="k">return</span> <span class="n">inputSchema</span><span class="o">;</span> <span class="o">}</span> <span class="c1">// Data types of values in the aggregation buffer</span> <span class="kd">public</span> <span class="nc">StructType</span> <span class="nf">bufferSchema</span><span class="o">()</span> <span class="o">{</span>
<span class="k">return</span> <span class="n">bufferSchema</span><span class="o">;</span> <span class="o">}</span> <span class="c1">// The data type of the returned value</span> <span class="kd">public</span> <span class="nc">DataType</span> <span class="nf">dataType</span><span class="o">()</span> <span class="o">{</span>
<span class="k">return</span> <span class="nc">DataTypes</span><span class="o">.</span><span class="na">DoubleType</span><span class="o">;</span> <span class="o">}</span> <span class="c1">// Whether this function always returns the same output on the identical input</span> <span class="kd">public</span> <span class="kt">boolean</span> <span class="nf">deterministic</span><span class="o">()</span> <span class="o">{</span>
<span class="k">return</span> <span class="kc">true</span><span class="o">;</span> <span class="o">}</span> <span class="c1">// Initializes the given aggregation buffer. The buffer itself is a `Row` that in addition to</span> <span class="c1">// standard methods like retrieving a value at an index (e.g., get(), getBoolean()), provides</span> <span class="c1">// the opportunity to update its values. Note that arrays and maps inside the buffer are still</span> <span class="c1">// immutable.</span> <span class="kd">public</span> <span class="kt">void</span> <span class="nf">initialize</span><span class="o">(</span><span class="nc">MutableAggregationBuffer</span> <span class="n">buffer</span><span class="o">)</span> <span class="o">{</span>
<span class="n">buffer</span><span class="o">.</span><span class="na">update</span><span class="o">(</span><span class="mi">0</span><span class="o">,</span> <span class="mi">0L</span><span class="o">);</span>
<span class="n">buffer</span><span class="o">.</span><span class="na">update</span><span class="o">(</span><span class="mi">1</span><span class="o">,</span> <span class="mi">0L</span><span class="o">);</span> <span class="o">}</span> <span class="c1">// Updates the given aggregation buffer `buffer` with new input data from `input`</span> <span class="kd">public</span> <span class="kt">void</span> <span class="nf">update</span><span class="o">(</span><span class="nc">MutableAggregationBuffer</span> <span class="n">buffer</span><span class="o">,</span> <span class="nc">Row</span> <span class="n">input</span><span class="o">)</span> <span class="o">{</span>
<span class="k">if</span> <span class="o">(!</span><span class="n">input</span><span class="o">.</span><span class="na">isNullAt</span><span class="o">(</span><span class="mi">0</span><span class="o">))</span> <span class="o">{</span>
<span class="kt">long</span> <span class="n">updatedSum</span> <span class="o">=</span> <span class="n">buffer</span><span class="o">.</span><span class="na">getLong</span><span class="o">(</span><span class="mi">0</span><span class="o">)</span> <span class="o">+</span> <span class="n">input</span><span class="o">.</span><span class="na">getLong</span><span class="o">(</span><span class="mi">0</span><span class="o">);</span>
<span class="kt">long</span> <span class="n">updatedCount</span> <span class="o">=</span> <span class="n">buffer</span><span class="o">.</span><span class="na">getLong</span><span class="o">(</span><span class="mi">1</span><span class="o">)</span> <span class="o">+</span> <span class="mi">1</span><span class="o">;</span>
<span class="n">buffer</span><span class="o">.</span><span class="na">update</span><span class="o">(</span><span class="mi">0</span><span class="o">,</span> <span class="n">updatedSum</span><span class="o">);</span>
<span class="n">buffer</span><span class="o">.</span><span class="na">update</span><span class="o">(</span><span class="mi">1</span><span class="o">,</span> <span class="n">updatedCount</span><span class="o">);</span>
<span class="o">}</span> <span class="o">}</span> <span class="c1">// Merges two aggregation buffers and stores the updated buffer values back to `buffer1`</span> <span class="kd">public</span> <span class="kt">void</span> <span class="nf">merge</span><span class="o">(</span><span class="nc">MutableAggregationBuffer</span> <span class="n">buffer1</span><span class="o">,</span> <span class="nc">Row</span> <span class="n">buffer2</span><span class="o">)</span> <span class="o">{</span>
<span class="kt">long</span> <span class="n">mergedSum</span> <span class="o">=</span> <span class="n">buffer1</span><span class="o">.</span><span class="na">getLong</span><span class="o">(</span><span class="mi">0</span><span class="o">)</span> <span class="o">+</span> <span class="n">buffer2</span><span class="o">.</span><span class="na">getLong</span><span class="o">(</span><span class="mi">0</span><span class="o">);</span>
<span class="kt">long</span> <span class="n">mergedCount</span> <span class="o">=</span> <span class="n">buffer1</span><span class="o">.</span><span class="na">getLong</span><span class="o">(</span><span class="mi">1</span><span class="o">)</span> <span class="o">+</span> <span class="n">buffer2</span><span class="o">.</span><span class="na">getLong</span><span class="o">(</span><span class="mi">1</span><span class="o">);</span>
<span class="n">buffer1</span><span class="o">.</span><span class="na">update</span><span class="o">(</span><span class="mi">0</span><span class="o">,</span> <span class="n">mergedSum</span><span class="o">);</span>
<span class="n">buffer1</span><span class="o">.</span><span class="na">update</span><span class="o">(</span><span class="mi">1</span><span class="o">,</span> <span class="n">mergedCount</span><span class="o">);</span> <span class="o">}</span> <span class="c1">// Calculates the final result</span> <span class="kd">public</span> <span class="nc">Double</span> <span class="nf">evaluate</span><span class="o">(</span><span class="nc">Row</span> <span class="n">buffer</span><span class="o">)</span> <span class="o">{</span>
<span class="k">return</span> <span class="o">((</span><span class="kt">double</span><span class="o">)</span> <span class="n">buffer</span><span class="o">.</span><span class="na">getLong</span><span class="o">(</span><span class="mi">0</span><span class="o">))</span> <span class="o">/</span> <span class="n">buffer</span><span class="o">.</span><span class="na">getLong</span><span class="o">(</span><span class="mi">1</span><span class="o">);</span> <span class="o">}</span> <span class="o">}</span>
// Register the function to access it spark.udf().register(“myAverage”, new MyAverage());
Dataset<Row> df = spark.read().json(“examples/src/main/resources/employees.json”); df.createOrReplaceTempView(“employees”); df.show(); // +——-+——+ // | name|salary| // +——-+——+ // |Michael| 3000| // | Andy| 4500| // | Justin| 3500| // | Berta| 4000| // +——-+——+
Dataset<Row> result = spark.sql(“SELECT myAverage(salary) as average_salary FROM employees”); result.show(); // +————–+ // |average_salary| // +————–+ // | 3750.0| // +————–+
Type-Safe User-Defined Aggregate Functions
User-defined aggregations for strongly typed Datasets revolve around the Aggregator abstract class. For example, a type-safe user-defined average can look like:
import org.apache.spark.sql.{Encoder, Encoders, SparkSession} import org.apache.spark.sql.expressions.Aggregator
case class Employee(name: String, salary: Long) case class Average(var sum: Long, var count: Long)
object MyAverage extends Aggregator[Employee, Average, Double] {
// A zero value for this aggregation. Should satisfy the property that any b + zero = b
def zero: Average = Average(0L, 0L)
// Combine two values to produce a new value. For performance, the function may modify buffer
// and return it instead of constructing a new object
def reduce(buffer: Average, employee: Employee): Average = {
buffer.sum += employee.salary
buffer.count += 1
buffer
}
// Merge two intermediate values
def merge(b1: Average, b2: Average): Average = {
b1.sum += b2.sum
b1.count += b2.count
b1
}
// Transform the output of the reduction
def finish(reduction: Average): Double = reduction.sum.toDouble / reduction.count
// Specifies the Encoder for the intermediate value type
def bufferEncoder: Encoder[Average] = Encoders.product
// Specifies the Encoder for the final output value type
def outputEncoder: Encoder[Double] = Encoders.scalaDouble
}
val ds = spark.read.json(“examples/src/main/resources/employees.json”).as[Employee] ds.show() // +——-+——+ // | name|salary| // +——-+——+ // |Michael| 3000| // | Andy| 4500| // | Justin| 3500| // | Berta| 4000| // +——-+——+
// Convert the function to a TypedColumn
and give it a name
val averageSalary = MyAverage.toColumn.name(“average_salary”)
val result = ds.select(averageSalary)
result.show()
// +————–+
// |average_salary|
// +————–+
// | 3750.0|
// +————–+
import java.io.Serializable;
import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoder; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.TypedColumn; import org.apache.spark.sql.expressions.Aggregator;
public static class Employee implements Serializable { private String name; private long salary;
// Constructors, getters, setters…
}
public static class Average implements Serializable { private long sum; private long count;
// Constructors, getters, setters…
}
public static class MyAverage extends Aggregator<Employee, Average, Double> {
// A zero value for this aggregation. Should satisfy the property that any b + zero = b
public Average zero() {
return new Average(0L, 0L);
}
// Combine two values to produce a new value. For performance, the function may modify buffer
// and return it instead of constructing a new object
public Average reduce(Average buffer, Employee employee) {
long newSum = buffer.getSum() + employee.getSalary();
long newCount = buffer.getCount() + 1;
buffer.setSum(newSum);
buffer.setCount(newCount);
return buffer;
}
// Merge two intermediate values
public Average merge(Average b1, Average b2) {
long mergedSum = b1.getSum() + b2.getSum();
long mergedCount = b1.getCount() + b2.getCount();
b1.setSum(mergedSum);
b1.setCount(mergedCount);
return b1;
}
// Transform the output of the reduction
public Double finish(Average reduction) {
return ((double) reduction.getSum()) / reduction.getCount();
}
// Specifies the Encoder for the intermediate value type
public Encoder<Average> bufferEncoder() {
return Encoders.bean(Average.class);
}
// Specifies the Encoder for the final output value type
public Encoder<Double> outputEncoder() {
return Encoders.DOUBLE();
}
}
Encoder<Employee> employeeEncoder = Encoders.bean(Employee.class); String path = “examples/src/main/resources/employees.json”; Dataset<Employee> ds = spark.read().json(path).as(employeeEncoder); ds.show(); // +——-+——+ // | name|salary| // +——-+——+ // |Michael| 3000| // | Andy| 4500| // | Justin| 3500| // | Berta| 4000| // +——-+——+
MyAverage myAverage = new MyAverage();
// Convert the function to a TypedColumn
and give it a name
TypedColumn<Employee, Double> averageSalary = myAverage.toColumn().name(“average_salary”);
Dataset<Double> result = ds.select(averageSalary);
result.show();
// +————–+
// |average_salary|
// +————–+
// | 3750.0|
// +————–+