Clustering - spark.ml
In this section, we introduce the pipeline API for clustering in mllib.
Table of Contents
K-means
k-means is one of the most commonly used clustering algorithms that clusters the data points into a predefined number of clusters. The MLlib implementation includes a parallelized variant of the k-means++ method called kmeans||.
KMeans
is implemented as an Estimator
and generates a KMeansModel
as the base model.
Input Columns
Param name | Type(s) | Default | Description |
---|---|---|---|
featuresCol | Vector | "features" | Feature vector |
Output Columns
Param name | Type(s) | Default | Description |
---|---|---|---|
predictionCol | Int | "prediction" | Predicted cluster center |
Example
Refer to the Scala API docs for more details.
import org.apache.spark.ml.clustering.KMeans
import org.apache.spark.mllib.linalg.Vectors
// Crates a DataFrame
val dataset: DataFrame = sqlContext.createDataFrame(Seq(
(1, Vectors.dense(0.0, 0.0, 0.0)),
(2, Vectors.dense(0.1, 0.1, 0.1)),
(3, Vectors.dense(0.2, 0.2, 0.2)),
(4, Vectors.dense(9.0, 9.0, 9.0)),
(5, Vectors.dense(9.1, 9.1, 9.1)),
(6, Vectors.dense(9.2, 9.2, 9.2))
)).toDF("id", "features")
// Trains a k-means model
val kmeans = new KMeans()
.setK(2)
.setFeaturesCol("features")
.setPredictionCol("prediction")
val model = kmeans.fit(dataset)
// Shows the result
println("Final Centers: ")
model.clusterCenters.foreach(println)
Refer to the Java API docs for more details.
import org.apache.spark.ml.clustering.KMeansModel;
import org.apache.spark.ml.clustering.KMeans;
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.linalg.VectorUDT;
import org.apache.spark.mllib.linalg.Vectors;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
// Loads data
JavaRDD<Row> points = jsc.textFile(inputFile).map(new ParsePoint());
StructField[] fields = {new StructField("features", new VectorUDT(), false, Metadata.empty())};
StructType schema = new StructType(fields);
DataFrame dataset = sqlContext.createDataFrame(points, schema);
// Trains a k-means model
KMeans kmeans = new KMeans()
.setK(k);
KMeansModel model = kmeans.fit(dataset);
// Shows the result
Vector[] centers = model.clusterCenters();
System.out.println("Cluster Centers: ");
for (Vector center: centers) {
System.out.println(center);
}
Latent Dirichlet allocation (LDA)
LDA
is implemented as an Estimator
that supports both EMLDAOptimizer
and OnlineLDAOptimizer
,
and generates a LDAModel
as the base models. Expert users may cast a LDAModel
generated by
EMLDAOptimizer
to a DistributedLDAModel
if needed.
Refer to the Scala API docs for more details.
import org.apache.spark.ml.clustering.LDA
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.sql.types.{StructField, StructType}
// Loads data
val rowRDD = sc.textFile(input).filter(_.nonEmpty)
.map(_.split(" ").map(_.toDouble)).map(Vectors.dense).map(Row(_))
val schema = StructType(Array(StructField(FEATURES_COL, new VectorUDT, false)))
val dataset = sqlContext.createDataFrame(rowRDD, schema)
// Trains a LDA model
val lda = new LDA()
.setK(10)
.setMaxIter(10)
.setFeaturesCol(FEATURES_COL)
val model = lda.fit(dataset)
val transformed = model.transform(dataset)
val ll = model.logLikelihood(dataset)
val lp = model.logPerplexity(dataset)
// describeTopics
val topics = model.describeTopics(3)
// Shows the result
topics.show(false)
transformed.show(false)
Refer to the Java API docs for more details.
import java.util.regex.Pattern;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.ml.clustering.LDA;
import org.apache.spark.ml.clustering.LDAModel;
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.linalg.VectorUDT;
import org.apache.spark.mllib.linalg.Vectors;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.catalyst.expressions.GenericRow;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
private static class ParseVector implements Function<String, Row> {
private static final Pattern separator = Pattern.compile(" ");
@Override
public Row call(String line) {
String[] tok = separator.split(line);
double[] point = new double[tok.length];
for (int i = 0; i < tok.length; ++i) {
point[i] = Double.parseDouble(tok[i]);
}
Vector[] points = {Vectors.dense(point)};
return new GenericRow(points);
}
}
public static void main(String[] args) {
String inputFile = "data/mllib/sample_lda_data.txt";
// Parses the arguments
SparkConf conf = new SparkConf().setAppName("JavaLDAExample");
JavaSparkContext jsc = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(jsc);
// Loads data
JavaRDD<Row> points = jsc.textFile(inputFile).map(new ParseVector());
StructField[] fields = {new StructField("features", new VectorUDT(), false, Metadata.empty())};
StructType schema = new StructType(fields);
DataFrame dataset = sqlContext.createDataFrame(points, schema);
// Trains a LDA model
LDA lda = new LDA()
.setK(10)
.setMaxIter(10);
LDAModel model = lda.fit(dataset);
System.out.println(model.logLikelihood(dataset));
System.out.println(model.logPerplexity(dataset));
// Shows the result
DataFrame topics = model.describeTopics(3);
topics.show(false);
model.transform(dataset).show(false);
jsc.stop();
}