public class DataFrame extends Object implements RDDApi<org.apache.spark.sql.Row>, scala.Serializable
A DataFrame
is equivalent to a relational table in Spark SQL. There are multiple ways
to create a DataFrame
:
// Create a DataFrame from Parquet files
val people = sqlContext.parquetFile("...")
// Create a DataFrame from data sources
val df = sqlContext.load("...", "json")
Once created, it can be manipulated using the various domain-specific-language (DSL) functions
defined in: DataFrame
(this class), Column
, and functions
.
To select a column from the data frame, use apply
method in Scala and col
in Java.
val ageCol = people("age") // in Scala
Column ageCol = people.col("age") // in Java
Note that the Column
type can also be manipulated through its various functions.
// The following creates a new column that increases everybody's age by 10.
people("age") + 10 // in Scala
people.col("age").plus(10); // in Java
A more concrete example in Scala:
// To create DataFrame using SQLContext
val people = sqlContext.parquetFile("...")
val department = sqlContext.parquetFile("...")
people.filter("age" > 30)
.join(department, people("deptId") === department("id"))
.groupBy(department("name"), "gender")
.agg(avg(people("salary")), max(people("age")))
and in Java:
// To create DataFrame using SQLContext
DataFrame people = sqlContext.parquetFile("...");
DataFrame department = sqlContext.parquetFile("...");
people.filter("age".gt(30))
.join(department, people.col("deptId").equalTo(department("id")))
.groupBy(department.col("name"), "gender")
.agg(avg(people.col("salary")), max(people.col("age")));
Constructor and Description |
---|
DataFrame(SQLContext sqlContext,
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan logicalPlan)
A constructor that automatically analyzes the logical plan.
|
DataFrame(SQLContext sqlContext,
org.apache.spark.sql.SQLContext.QueryExecution queryExecution) |
Modifier and Type | Method and Description |
---|---|
DataFrame |
agg(Column expr,
Column... exprs)
Aggregates on the entire
DataFrame without groups. |
DataFrame |
agg(Column expr,
scala.collection.Seq<Column> exprs)
Aggregates on the entire
DataFrame without groups. |
DataFrame |
agg(scala.collection.immutable.Map<String,String> exprs)
(Scala-specific) Aggregates on the entire
DataFrame without groups. |
DataFrame |
agg(java.util.Map<String,String> exprs)
(Java-specific) Aggregates on the entire
DataFrame without groups. |
DataFrame |
agg(scala.Tuple2<String,String> aggExpr,
scala.collection.Seq<scala.Tuple2<String,String>> aggExprs)
(Scala-specific) Compute aggregates by specifying a map from column name to
aggregate methods.
|
Column |
apply(String colName)
Selects column based on the column name and return it as a
Column . |
DataFrame |
as(String alias)
Returns a new
DataFrame with an alias set. |
DataFrame |
as(scala.Symbol alias)
(Scala-specific) Returns a new
DataFrame with an alias set. |
DataFrame |
cache() |
Column |
col(String colName)
Selects column based on the column name and return it as a
Column . |
org.apache.spark.sql.Row[] |
collect()
Returns an array that contains all of
Row s in this DataFrame . |
java.util.List<org.apache.spark.sql.Row> |
collectAsList()
Returns a Java list that contains all of
Row s in this DataFrame . |
String[] |
columns()
Returns all column names as an array.
|
long |
count()
Returns the number of rows in the
DataFrame . |
void |
createJDBCTable(String url,
String table,
boolean allowExisting)
Save this RDD to a JDBC database at
url under the table name table . |
DataFrame |
distinct()
|
scala.Tuple2<String,String>[] |
dtypes()
Returns all column names and their data types as an array.
|
DataFrame |
except(DataFrame other)
Returns a new
DataFrame containing rows in this frame but not in another frame. |
void |
explain()
Only prints the physical plan to the console for debugging purpose.
|
void |
explain(boolean extended)
Prints the plans (logical and physical) to the console for debugging purpose.
|
<A extends scala.Product> |
explode(scala.collection.Seq<Column> input,
scala.Function1<org.apache.spark.sql.Row,scala.collection.TraversableOnce<A>> f,
scala.reflect.api.TypeTags.TypeTag<A> evidence$1)
(Scala-specific) Returns a new
DataFrame where each row has been expanded to zero or more
rows by the provided function. |
<A,B> DataFrame |
explode(String inputColumn,
String outputColumn,
scala.Function1<A,scala.collection.TraversableOnce<B>> f,
scala.reflect.api.TypeTags.TypeTag<B> evidence$2)
(Scala-specific) Returns a new
DataFrame where a single column has been expanded to zero
or more rows by the provided function. |
DataFrame |
filter(Column condition)
Filters rows using the given condition.
|
DataFrame |
filter(String conditionExpr)
Filters rows using the given SQL expression.
|
org.apache.spark.sql.Row |
first()
Returns the first row.
|
<R> RDD<R> |
flatMap(scala.Function1<org.apache.spark.sql.Row,scala.collection.TraversableOnce<R>> f,
scala.reflect.ClassTag<R> evidence$4)
Returns a new RDD by first applying a function to all rows of this
DataFrame ,
and then flattening the results. |
void |
foreach(scala.Function1<org.apache.spark.sql.Row,scala.runtime.BoxedUnit> f)
Applies a function
f to all rows. |
void |
foreachPartition(scala.Function1<scala.collection.Iterator<org.apache.spark.sql.Row>,scala.runtime.BoxedUnit> f)
Applies a function f to each partition of this
DataFrame . |
GroupedData |
groupBy(Column... cols)
Groups the
DataFrame using the specified columns, so we can run aggregation on them. |
GroupedData |
groupBy(scala.collection.Seq<Column> cols)
Groups the
DataFrame using the specified columns, so we can run aggregation on them. |
GroupedData |
groupBy(String col1,
scala.collection.Seq<String> cols)
Groups the
DataFrame using the specified columns, so we can run aggregation on them. |
GroupedData |
groupBy(String col1,
String... cols)
Groups the
DataFrame using the specified columns, so we can run aggregation on them. |
org.apache.spark.sql.Row |
head()
Returns the first row.
|
org.apache.spark.sql.Row[] |
head(int n)
Returns the first
n rows. |
void |
insertInto(String tableName)
:: Experimental ::
Adds the rows from this RDD to the specified table.
|
void |
insertInto(String tableName,
boolean overwrite)
:: Experimental ::
Adds the rows from this RDD to the specified table, optionally overwriting the existing data.
|
void |
insertIntoJDBC(String url,
String table,
boolean overwrite)
Save this RDD to a JDBC database at
url under the table name table . |
DataFrame |
intersect(DataFrame other)
Returns a new
DataFrame containing rows only in both this frame and another frame. |
boolean |
isLocal()
Returns true if the
collect and take methods can be run locally
(without any Spark executors). |
JavaRDD<org.apache.spark.sql.Row> |
javaRDD()
|
DataFrame |
join(DataFrame right)
Cartesian join with another
DataFrame . |
DataFrame |
join(DataFrame right,
Column joinExprs)
Inner join with another
DataFrame , using the given join expression. |
DataFrame |
join(DataFrame right,
Column joinExprs,
String joinType)
Join with another
DataFrame , using the given join expression. |
DataFrame |
limit(int n)
Returns a new
DataFrame by taking the first n rows. |
<R> RDD<R> |
map(scala.Function1<org.apache.spark.sql.Row,R> f,
scala.reflect.ClassTag<R> evidence$3)
Returns a new RDD by applying a function to all rows of this DataFrame.
|
<R> RDD<R> |
mapPartitions(scala.Function1<scala.collection.Iterator<org.apache.spark.sql.Row>,scala.collection.Iterator<R>> f,
scala.reflect.ClassTag<R> evidence$5)
Returns a new RDD by applying a function to each partition of this DataFrame.
|
DataFrame |
orderBy(Column... sortExprs)
Returns a new
DataFrame sorted by the given expressions. |
DataFrame |
orderBy(scala.collection.Seq<Column> sortExprs)
Returns a new
DataFrame sorted by the given expressions. |
DataFrame |
orderBy(String sortCol,
scala.collection.Seq<String> sortCols)
Returns a new
DataFrame sorted by the given expressions. |
DataFrame |
orderBy(String sortCol,
String... sortCols)
Returns a new
DataFrame sorted by the given expressions. |
DataFrame |
persist() |
DataFrame |
persist(StorageLevel newLevel) |
void |
printSchema()
Prints the schema to the console in a nice tree format.
|
org.apache.spark.sql.SQLContext.QueryExecution |
queryExecution() |
RDD<org.apache.spark.sql.Row> |
rdd()
|
void |
registerTempTable(String tableName)
Registers this RDD as a temporary table using the given name.
|
DataFrame |
repartition(int numPartitions)
Returns a new
DataFrame that has exactly numPartitions partitions. |
DataFrame |
sample(boolean withReplacement,
double fraction)
Returns a new
DataFrame by sampling a fraction of rows, using a random seed. |
DataFrame |
sample(boolean withReplacement,
double fraction,
long seed)
Returns a new
DataFrame by sampling a fraction of rows. |
void |
save(String path)
:: Experimental ::
Saves the contents of this DataFrame to the given path,
using the default data source configured by spark.sql.sources.default and
SaveMode.ErrorIfExists as the save mode. |
void |
save(String path,
SaveMode mode)
:: Experimental ::
Saves the contents of this DataFrame to the given path and
SaveMode specified by mode,
using the default data source configured by spark.sql.sources.default. |
void |
save(String source,
SaveMode mode,
java.util.Map<String,String> options)
:: Experimental ::
Saves the contents of this DataFrame based on the given data source,
SaveMode specified by mode, and a set of options. |
void |
save(String source,
SaveMode mode,
scala.collection.immutable.Map<String,String> options)
:: Experimental ::
(Scala-specific)
Saves the contents of this DataFrame based on the given data source,
SaveMode specified by mode, and a set of options |
void |
save(String path,
String source)
:: Experimental ::
Saves the contents of this DataFrame to the given path based on the given data source,
using
SaveMode.ErrorIfExists as the save mode. |
void |
save(String path,
String source,
SaveMode mode)
:: Experimental ::
Saves the contents of this DataFrame to the given path based on the given data source and
SaveMode specified by mode. |
void |
saveAsParquetFile(String path)
Saves the contents of this
DataFrame as a parquet file, preserving the schema. |
void |
saveAsTable(String tableName)
:: Experimental ::
Creates a table from the the contents of this DataFrame.
|
void |
saveAsTable(String tableName,
SaveMode mode)
:: Experimental ::
Creates a table from the the contents of this DataFrame, using the default data source
configured by spark.sql.sources.default and
SaveMode.ErrorIfExists as the save mode. |
void |
saveAsTable(String tableName,
String source)
:: Experimental ::
Creates a table at the given path from the the contents of this DataFrame
based on a given data source and a set of options,
using
SaveMode.ErrorIfExists as the save mode. |
void |
saveAsTable(String tableName,
String source,
SaveMode mode)
:: Experimental ::
Creates a table at the given path from the the contents of this DataFrame
based on a given data source,
SaveMode specified by mode, and a set of options. |
void |
saveAsTable(String tableName,
String source,
SaveMode mode,
java.util.Map<String,String> options)
:: Experimental ::
Creates a table at the given path from the the contents of this DataFrame
based on a given data source,
SaveMode specified by mode, and a set of options. |
void |
saveAsTable(String tableName,
String source,
SaveMode mode,
scala.collection.immutable.Map<String,String> options)
:: Experimental ::
(Scala-specific)
Creates a table from the the contents of this DataFrame based on a given data source,
SaveMode specified by mode, and a set of options. |
org.apache.spark.sql.types.StructType |
schema()
Returns the schema of this
DataFrame . |
DataFrame |
select(Column... cols)
Selects a set of expressions.
|
DataFrame |
select(scala.collection.Seq<Column> cols)
Selects a set of expressions.
|
DataFrame |
select(String col,
scala.collection.Seq<String> cols)
Selects a set of columns.
|
DataFrame |
select(String col,
String... cols)
Selects a set of columns.
|
DataFrame |
selectExpr(scala.collection.Seq<String> exprs)
Selects a set of SQL expressions.
|
DataFrame |
selectExpr(String... exprs)
Selects a set of SQL expressions.
|
void |
show()
Displays the top 20 rows of
DataFrame in a tabular form. |
void |
show(int numRows)
Displays the
DataFrame in a tabular form. |
String |
showString(int numRows)
Internal API for Python
|
DataFrame |
sort(Column... sortExprs)
Returns a new
DataFrame sorted by the given expressions. |
DataFrame |
sort(scala.collection.Seq<Column> sortExprs)
Returns a new
DataFrame sorted by the given expressions. |
DataFrame |
sort(String sortCol,
scala.collection.Seq<String> sortCols)
Returns a new
DataFrame sorted by the specified column, all in ascending order. |
DataFrame |
sort(String sortCol,
String... sortCols)
Returns a new
DataFrame sorted by the specified column, all in ascending order. |
SQLContext |
sqlContext() |
org.apache.spark.sql.Row[] |
take(int n)
Returns the first
n rows in the DataFrame . |
DataFrame |
toDF()
Returns the object itself.
|
DataFrame |
toDF(scala.collection.Seq<String> colNames)
Returns a new
DataFrame with columns renamed. |
DataFrame |
toDF(String... colNames)
Returns a new
DataFrame with columns renamed. |
JavaRDD<org.apache.spark.sql.Row> |
toJavaRDD()
|
RDD<String> |
toJSON()
Returns the content of the
DataFrame as a RDD of JSON strings. |
DataFrame |
toSchemaRDD()
Left here for backward compatibility.
|
String |
toString() |
DataFrame |
unionAll(DataFrame other)
Returns a new
DataFrame containing union of rows in this frame and another frame. |
DataFrame |
unpersist() |
DataFrame |
unpersist(boolean blocking) |
DataFrame |
where(Column condition)
Filters rows using the given condition.
|
DataFrame |
withColumn(String colName,
Column col)
Returns a new
DataFrame by adding a column. |
DataFrame |
withColumnRenamed(String existingName,
String newName)
Returns a new
DataFrame with a column renamed. |
public DataFrame(SQLContext sqlContext, org.apache.spark.sql.SQLContext.QueryExecution queryExecution)
public DataFrame(SQLContext sqlContext, org.apache.spark.sql.catalyst.plans.logical.LogicalPlan logicalPlan)
This reports error eagerly as the DataFrame
is constructed, unless
SQLConf.dataFrameEagerAnalysis
is turned off.
public DataFrame toDF(String... colNames)
DataFrame
with columns renamed. This can be quite convenient in conversion
from a RDD of tuples into a DataFrame
with meaningful names. For example:
val rdd: RDD[(Int, String)] = ...
rdd.toDF() // this implicit conversion creates a DataFrame with column name _1 and _2
rdd.toDF("id", "name") // this creates a DataFrame with column name "id" and "name"
public DataFrame sort(String sortCol, String... sortCols)
DataFrame
sorted by the specified column, all in ascending order.
// The following 3 are equivalent
df.sort("sortcol")
df.sort($"sortcol")
df.sort($"sortcol".asc)
public DataFrame sort(Column... sortExprs)
DataFrame
sorted by the given expressions. For example:
df.sort($"col1", $"col2".desc)
public DataFrame orderBy(String sortCol, String... sortCols)
DataFrame
sorted by the given expressions.
This is an alias of the sort
function.public DataFrame orderBy(Column... sortExprs)
DataFrame
sorted by the given expressions.
This is an alias of the sort
function.public DataFrame select(Column... cols)
df.select($"colA", $"colB" + 1)
public DataFrame select(String col, String... cols)
select
that can only select
existing columns using column names (i.e. cannot construct expressions).
// The following two are equivalent:
df.select("colA", "colB")
df.select($"colA", $"colB")
public DataFrame selectExpr(String... exprs)
select
that accepts
SQL expressions.
df.selectExpr("colA", "colB as newName", "abs(colC)")
public GroupedData groupBy(Column... cols)
DataFrame
using the specified columns, so we can run aggregation on them.
See GroupedData
for all the available aggregate functions.
// Compute the average for all numeric columns grouped by department.
df.groupBy($"department").avg()
// Compute the max age and average salary, grouped by department and gender.
df.groupBy($"department", $"gender").agg(Map(
"salary" -> "avg",
"age" -> "max"
))
public GroupedData groupBy(String col1, String... cols)
DataFrame
using the specified columns, so we can run aggregation on them.
See GroupedData
for all the available aggregate functions.
This is a variant of groupBy that can only group by existing columns using column names (i.e. cannot construct expressions).
// Compute the average for all numeric columns grouped by department.
df.groupBy("department").avg()
// Compute the max age and average salary, grouped by department and gender.
df.groupBy($"department", $"gender").agg(Map(
"salary" -> "avg",
"age" -> "max"
))
public DataFrame agg(Column expr, Column... exprs)
DataFrame
without groups.
{{
// df.agg(...) is a shorthand for df.groupBy().agg(...)
df.agg(max($"age"), avg($"salary"))
df.groupBy().agg(max($"age"), avg($"salary"))
}}public SQLContext sqlContext()
public org.apache.spark.sql.SQLContext.QueryExecution queryExecution()
public String showString(int numRows)
numRows
- Number of rows to showpublic String toString()
toString
in class Object
public DataFrame toSchemaRDD()
public DataFrame toDF()
public DataFrame toDF(scala.collection.Seq<String> colNames)
DataFrame
with columns renamed. This can be quite convenient in conversion
from a RDD of tuples into a DataFrame
with meaningful names. For example:
val rdd: RDD[(Int, String)] = ...
rdd.toDF() // this implicit conversion creates a DataFrame with column name _1 and _2
rdd.toDF("id", "name") // this creates a DataFrame with column name "id" and "name"
public org.apache.spark.sql.types.StructType schema()
DataFrame
.public scala.Tuple2<String,String>[] dtypes()
public String[] columns()
public void printSchema()
public void explain(boolean extended)
public void explain()
public boolean isLocal()
collect
and take
methods can be run locally
(without any Spark executors).public void show(int numRows)
DataFrame
in a tabular form. For example:
year month AVG('Adj Close) MAX('Adj Close)
1980 12 0.503218 0.595103
1981 01 0.523289 0.570307
1982 02 0.436504 0.475256
1983 03 0.410516 0.442194
1984 04 0.450090 0.483521
numRows
- Number of rows to show
public void show()
DataFrame
in a tabular form.public DataFrame join(DataFrame right)
DataFrame
.
Note that cartesian joins are very expensive without an extra filter that can be pushed down.
right
- Right side of the join operation.public DataFrame join(DataFrame right, Column joinExprs)
DataFrame
, using the given join expression.
// The following two are equivalent:
df1.join(df2, $"df1Key" === $"df2Key")
df1.join(df2).where($"df1Key" === $"df2Key")
public DataFrame join(DataFrame right, Column joinExprs, String joinType)
DataFrame
, using the given join expression. The following performs
a full outer join between df1
and df2
.
// Scala:
import org.apache.spark.sql.functions._
df1.join(df2, $"df1Key" === $"df2Key", "outer")
// Java:
import static org.apache.spark.sql.functions.*;
df1.join(df2, col("df1Key").equalTo(col("df2Key")), "outer");
right
- Right side of the join.joinExprs
- Join expression.joinType
- One of: inner
, outer
, left_outer
, right_outer
, semijoin
.public DataFrame sort(String sortCol, scala.collection.Seq<String> sortCols)
DataFrame
sorted by the specified column, all in ascending order.
// The following 3 are equivalent
df.sort("sortcol")
df.sort($"sortcol")
df.sort($"sortcol".asc)
public DataFrame sort(scala.collection.Seq<Column> sortExprs)
DataFrame
sorted by the given expressions. For example:
df.sort($"col1", $"col2".desc)
public DataFrame orderBy(String sortCol, scala.collection.Seq<String> sortCols)
DataFrame
sorted by the given expressions.
This is an alias of the sort
function.public DataFrame orderBy(scala.collection.Seq<Column> sortExprs)
DataFrame
sorted by the given expressions.
This is an alias of the sort
function.public Column apply(String colName)
Column
.public Column col(String colName)
Column
.public DataFrame as(scala.Symbol alias)
DataFrame
with an alias set.public DataFrame select(scala.collection.Seq<Column> cols)
df.select($"colA", $"colB" + 1)
public DataFrame select(String col, scala.collection.Seq<String> cols)
select
that can only select
existing columns using column names (i.e. cannot construct expressions).
// The following two are equivalent:
df.select("colA", "colB")
df.select($"colA", $"colB")
public DataFrame selectExpr(scala.collection.Seq<String> exprs)
select
that accepts
SQL expressions.
df.selectExpr("colA", "colB as newName", "abs(colC)")
public DataFrame filter(Column condition)
// The following are equivalent:
peopleDf.filter($"age" > 15)
peopleDf.where($"age" > 15)
peopleDf($"age" > 15)
public DataFrame filter(String conditionExpr)
peopleDf.filter("age > 15")
public DataFrame where(Column condition)
filter
.
// The following are equivalent:
peopleDf.filter($"age" > 15)
peopleDf.where($"age" > 15)
peopleDf($"age" > 15)
public GroupedData groupBy(scala.collection.Seq<Column> cols)
DataFrame
using the specified columns, so we can run aggregation on them.
See GroupedData
for all the available aggregate functions.
// Compute the average for all numeric columns grouped by department.
df.groupBy($"department").avg()
// Compute the max age and average salary, grouped by department and gender.
df.groupBy($"department", $"gender").agg(Map(
"salary" -> "avg",
"age" -> "max"
))
public GroupedData groupBy(String col1, scala.collection.Seq<String> cols)
DataFrame
using the specified columns, so we can run aggregation on them.
See GroupedData
for all the available aggregate functions.
This is a variant of groupBy that can only group by existing columns using column names (i.e. cannot construct expressions).
// Compute the average for all numeric columns grouped by department.
df.groupBy("department").avg()
// Compute the max age and average salary, grouped by department and gender.
df.groupBy($"department", $"gender").agg(Map(
"salary" -> "avg",
"age" -> "max"
))
public DataFrame agg(scala.Tuple2<String,String> aggExpr, scala.collection.Seq<scala.Tuple2<String,String>> aggExprs)
DataFrame
will also contain the grouping columns.
The available aggregate methods are avg
, max
, min
, sum
, count
.
// Selects the age of the oldest employee and the aggregate expense for each department
df.groupBy("department").agg(
"age" -> "max",
"expense" -> "sum"
)
public DataFrame agg(scala.collection.immutable.Map<String,String> exprs)
DataFrame
without groups.
{{
// df.agg(...) is a shorthand for df.groupBy().agg(...)
df.agg(Map("age" -> "max", "salary" -> "avg"))
df.groupBy().agg(Map("age" -> "max", "salary" -> "avg"))
}}public DataFrame agg(java.util.Map<String,String> exprs)
DataFrame
without groups.
{{
// df.agg(...) is a shorthand for df.groupBy().agg(...)
df.agg(Map("age" -> "max", "salary" -> "avg"))
df.groupBy().agg(Map("age" -> "max", "salary" -> "avg"))
}}public DataFrame agg(Column expr, scala.collection.Seq<Column> exprs)
DataFrame
without groups.
{{
// df.agg(...) is a shorthand for df.groupBy().agg(...)
df.agg(max($"age"), avg($"salary"))
df.groupBy().agg(max($"age"), avg($"salary"))
}}public DataFrame limit(int n)
public DataFrame unionAll(DataFrame other)
DataFrame
containing union of rows in this frame and another frame.
This is equivalent to UNION ALL
in SQL.public DataFrame intersect(DataFrame other)
DataFrame
containing rows only in both this frame and another frame.
This is equivalent to INTERSECT
in SQL.public DataFrame except(DataFrame other)
DataFrame
containing rows in this frame but not in another frame.
This is equivalent to EXCEPT
in SQL.public DataFrame sample(boolean withReplacement, double fraction, long seed)
DataFrame
by sampling a fraction of rows.
withReplacement
- Sample with replacement or not.fraction
- Fraction of rows to generate.seed
- Seed for sampling.public DataFrame sample(boolean withReplacement, double fraction)
DataFrame
by sampling a fraction of rows, using a random seed.
withReplacement
- Sample with replacement or not.fraction
- Fraction of rows to generate.public <A extends scala.Product> DataFrame explode(scala.collection.Seq<Column> input, scala.Function1<org.apache.spark.sql.Row,scala.collection.TraversableOnce<A>> f, scala.reflect.api.TypeTags.TypeTag<A> evidence$1)
DataFrame
where each row has been expanded to zero or more
rows by the provided function. This is similar to a LATERAL VIEW
in HiveQL. The columns of
the input row are implicitly joined with each row that is output by the function.
The following example uses this function to count the number of books which contain a given word:
case class Book(title: String, words: String)
val df: RDD[Book]
case class Word(word: String)
val allWords = df.explode('words) {
case Row(words: String) => words.split(" ").map(Word(_))
}
val bookCountPerWord = allWords.groupBy("word").agg(countDistinct("title"))
public <A,B> DataFrame explode(String inputColumn, String outputColumn, scala.Function1<A,scala.collection.TraversableOnce<B>> f, scala.reflect.api.TypeTags.TypeTag<B> evidence$2)
DataFrame
where a single column has been expanded to zero
or more rows by the provided function. This is similar to a LATERAL VIEW
in HiveQL. All
columns of the input row are implicitly joined with each value that is output by the function.
df.explode("words", "word")(words: String => words.split(" "))
public DataFrame withColumn(String colName, Column col)
DataFrame
by adding a column.public DataFrame withColumnRenamed(String existingName, String newName)
DataFrame
with a column renamed.public org.apache.spark.sql.Row[] head(int n)
n
rows.public org.apache.spark.sql.Row head()
public org.apache.spark.sql.Row first()
public <R> RDD<R> map(scala.Function1<org.apache.spark.sql.Row,R> f, scala.reflect.ClassTag<R> evidence$3)
public <R> RDD<R> flatMap(scala.Function1<org.apache.spark.sql.Row,scala.collection.TraversableOnce<R>> f, scala.reflect.ClassTag<R> evidence$4)
DataFrame
,
and then flattening the results.public <R> RDD<R> mapPartitions(scala.Function1<scala.collection.Iterator<org.apache.spark.sql.Row>,scala.collection.Iterator<R>> f, scala.reflect.ClassTag<R> evidence$5)
mapPartitions
in interface RDDApi<org.apache.spark.sql.Row>
public void foreach(scala.Function1<org.apache.spark.sql.Row,scala.runtime.BoxedUnit> f)
f
to all rows.public void foreachPartition(scala.Function1<scala.collection.Iterator<org.apache.spark.sql.Row>,scala.runtime.BoxedUnit> f)
DataFrame
.foreachPartition
in interface RDDApi<org.apache.spark.sql.Row>
public org.apache.spark.sql.Row[] take(int n)
n
rows in the DataFrame
.public org.apache.spark.sql.Row[] collect()
Row
s in this DataFrame
.public java.util.List<org.apache.spark.sql.Row> collectAsList()
Row
s in this DataFrame
.collectAsList
in interface RDDApi<org.apache.spark.sql.Row>
public long count()
DataFrame
.public DataFrame repartition(int numPartitions)
DataFrame
that has exactly numPartitions
partitions.repartition
in interface RDDApi<org.apache.spark.sql.Row>
public DataFrame distinct()
public DataFrame persist()
public DataFrame persist(StorageLevel newLevel)
public DataFrame unpersist(boolean blocking)
public DataFrame unpersist()
public RDD<org.apache.spark.sql.Row> rdd()
public JavaRDD<org.apache.spark.sql.Row> toJavaRDD()
public JavaRDD<org.apache.spark.sql.Row> javaRDD()
public void registerTempTable(String tableName)
SQLContext
that was used to create this DataFrame.
public void saveAsParquetFile(String path)
DataFrame
as a parquet file, preserving the schema.
Files that are written out using this method can be read back in as a DataFrame
using the parquetFile
function in SQLContext
.public void saveAsTable(String tableName)
Note that this currently only works with DataFrames that are created from a HiveContext as
there is no notion of a persisted catalog in a standard SQL context. Instead you can write
an RDD out to a parquet file, and then register that file as a table. This "table" can then
be the target of an insertInto
.
public void saveAsTable(String tableName, SaveMode mode)
SaveMode.ErrorIfExists
as the save mode.
Note that this currently only works with DataFrames that are created from a HiveContext as
there is no notion of a persisted catalog in a standard SQL context. Instead you can write
an RDD out to a parquet file, and then register that file as a table. This "table" can then
be the target of an insertInto
.
public void saveAsTable(String tableName, String source)
SaveMode.ErrorIfExists
as the save mode.
Note that this currently only works with DataFrames that are created from a HiveContext as
there is no notion of a persisted catalog in a standard SQL context. Instead you can write
an RDD out to a parquet file, and then register that file as a table. This "table" can then
be the target of an insertInto
.
public void saveAsTable(String tableName, String source, SaveMode mode)
SaveMode
specified by mode, and a set of options.
Note that this currently only works with DataFrames that are created from a HiveContext as
there is no notion of a persisted catalog in a standard SQL context. Instead you can write
an RDD out to a parquet file, and then register that file as a table. This "table" can then
be the target of an insertInto
.
public void saveAsTable(String tableName, String source, SaveMode mode, java.util.Map<String,String> options)
SaveMode
specified by mode, and a set of options.
Note that this currently only works with DataFrames that are created from a HiveContext as
there is no notion of a persisted catalog in a standard SQL context. Instead you can write
an RDD out to a parquet file, and then register that file as a table. This "table" can then
be the target of an insertInto
.
public void saveAsTable(String tableName, String source, SaveMode mode, scala.collection.immutable.Map<String,String> options)
SaveMode
specified by mode, and a set of options.
Note that this currently only works with DataFrames that are created from a HiveContext as
there is no notion of a persisted catalog in a standard SQL context. Instead you can write
an RDD out to a parquet file, and then register that file as a table. This "table" can then
be the target of an insertInto
.
public void save(String path)
SaveMode.ErrorIfExists
as the save mode.public void save(String path, SaveMode mode)
SaveMode
specified by mode,
using the default data source configured by spark.sql.sources.default.public void save(String path, String source)
SaveMode.ErrorIfExists
as the save mode.public void save(String path, String source, SaveMode mode)
SaveMode
specified by mode.public void save(String source, SaveMode mode, java.util.Map<String,String> options)
SaveMode
specified by mode, and a set of options.public void save(String source, SaveMode mode, scala.collection.immutable.Map<String,String> options)
SaveMode
specified by mode, and a set of optionspublic void insertInto(String tableName, boolean overwrite)
public void insertInto(String tableName)
public void createJDBCTable(String url, String table, boolean allowExisting)
url
under the table name table
.
This will run a CREATE TABLE
and a bunch of INSERT INTO
statements.
If you pass true
for allowExisting
, it will drop any table with the
given name; if you pass false
, it will throw if the table already
exists.public void insertIntoJDBC(String url, String table, boolean overwrite)
url
under the table name table
.
Assumes the table already exists and has a compatible schema. If you
pass true
for overwrite
, it will TRUNCATE
the table before
performing the INSERT
s.
The table must already exist on the database. It must have a schema
that is compatible with the schema of this RDD; inserting the rows of
the RDD in order via the simple statement
INSERT INTO table VALUES (?, ?, ..., ?)
should not fail.