Home | Trees | Indices | Help |
|
---|
|
object --+ | RDD
A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. Represents an immutable, partitioned collection of elements that can be operated on in parallel.
Instance Methods | |||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
Inherited from |
Properties | |
Inherited from |
Method Details |
x.__init__(...) initializes x; see help(type(x)) for signature
|
repr(x)
|
The SparkContext that this RDD was created on.
|
Set this RDD's storage level to persist its values across operations after the first time it is computed. This can only be used to assign a new storage level if the RDD does not have a storage level set yet. |
Mark this RDD for checkpointing. It will be saved to a file inside the
checkpoint directory set with |
Return a new RDD by applying a function to each element of this RDD. >>> rdd = sc.parallelize(["b", "a", "c"]) >>> sorted(rdd.map(lambda x: (x, 1)).collect()) [('a', 1), ('b', 1), ('c', 1)] |
Return a new RDD by first applying a function to all elements of this RDD, and then flattening the results. >>> rdd = sc.parallelize([2, 3, 4]) >>> sorted(rdd.flatMap(lambda x: range(1, x)).collect()) [1, 1, 1, 2, 2, 3] >>> sorted(rdd.flatMap(lambda x: [(x, x), (x, x)]).collect()) [(2, 2), (2, 2), (3, 3), (3, 3), (4, 4), (4, 4)] |
Return a new RDD by applying a function to each partition of this RDD. >>> rdd = sc.parallelize([1, 2, 3, 4], 2) >>> def f(iterator): yield sum(iterator) >>> rdd.mapPartitions(f).collect() [3, 7] |
Return a new RDD by applying a function to each partition of this RDD, while tracking the index of the original partition. >>> rdd = sc.parallelize([1, 2, 3, 4], 4) >>> def f(splitIndex, iterator): yield splitIndex >>> rdd.mapPartitionsWithIndex(f).sum() 6 |
Deprecated: use mapPartitionsWithIndex instead. Return a new RDD by applying a function to each partition of this RDD, while tracking the index of the original partition. >>> rdd = sc.parallelize([1, 2, 3, 4], 4) >>> def f(splitIndex, iterator): yield splitIndex >>> rdd.mapPartitionsWithSplit(f).sum() 6 |
Return a new RDD containing only the elements that satisfy a predicate. >>> rdd = sc.parallelize([1, 2, 3, 4, 5]) >>> rdd.filter(lambda x: x % 2 == 0).collect() [2, 4] |
Return a new RDD containing the distinct elements in this RDD. >>> sorted(sc.parallelize([1, 1, 2, 3]).distinct().collect()) [1, 2, 3] |
Return a sampled subset of this RDD (relies on numpy and falls back on default random generator if numpy is unavailable). >>> sc.parallelize(range(0, 100)).sample(False, 0.1, 2).collect() #doctest: +SKIP [2, 3, 20, 21, 24, 41, 42, 66, 67, 89, 90, 98] |
Return a fixed-size sampled subset of this RDD (currently requires numpy). >>> sc.parallelize(range(0, 10)).takeSample(True, 10, 1) #doctest: +SKIP [4, 2, 1, 8, 2, 7, 0, 4, 1, 4] |
Return the union of this RDD and another one. >>> rdd = sc.parallelize([1, 1, 2, 3]) >>> rdd.union(rdd).collect() [1, 1, 2, 3, 1, 1, 2, 3] |
Return the intersection of this RDD and another one. The output will not contain any duplicate elements, even if the input RDDs did. Note that this method performs a shuffle internally. >>> rdd1 = sc.parallelize([1, 10, 2, 3, 4, 5]) >>> rdd2 = sc.parallelize([1, 6, 2, 3, 7, 8]) >>> rdd1.intersection(rdd2).collect() [1, 2, 3] |
Return the union of this RDD and another one. >>> rdd = sc.parallelize([1, 1, 2, 3]) >>> (rdd + rdd).collect() [1, 1, 2, 3, 1, 1, 2, 3] |
Sorts this RDD, which is assumed to consist of (key, value) pairs. >>> tmp = [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)] >>> sc.parallelize(tmp).sortByKey(True, 2).collect() [('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)] >>> tmp2 = [('Mary', 1), ('had', 2), ('a', 3), ('little', 4), ('lamb', 5)] >>> tmp2.extend([('whose', 6), ('fleece', 7), ('was', 8), ('white', 9)]) >>> sc.parallelize(tmp2).sortByKey(True, 3, keyfunc=lambda k: k.lower()).collect() [('a', 3), ('fleece', 7), ('had', 2), ('lamb', 5), ('little', 4), ('Mary', 1), ('was', 8), ('white', 9), ('whose', 6)] |
Return an RDD created by coalescing all elements within each partition into a list. >>> rdd = sc.parallelize([1, 2, 3, 4], 2) >>> sorted(rdd.glom().collect()) [[1, 2], [3, 4]] |
Return the Cartesian product of this RDD and another one, that is, the
RDD of all pairs of elements >>> rdd = sc.parallelize([1, 2]) >>> sorted(rdd.cartesian(rdd).collect()) [(1, 1), (1, 2), (2, 1), (2, 2)] |
Return an RDD of grouped items. >>> rdd = sc.parallelize([1, 1, 2, 3, 5, 8]) >>> result = rdd.groupBy(lambda x: x % 2).collect() >>> sorted([(x, sorted(y)) for (x, y) in result]) [(0, [2, 8]), (1, [1, 1, 3, 5])] |
Return an RDD created by piping elements to a forked external process. >>> sc.parallelize(['1', '2', '', '3']).pipe('cat').collect() ['1', '2', '', '3'] |
Applies a function to all elements of this RDD. >>> def f(x): print x >>> sc.parallelize([1, 2, 3, 4, 5]).foreach(f) |
Applies a function to each partition of this RDD. >>> def f(iterator): ... for x in iterator: ... print x ... yield None >>> sc.parallelize([1, 2, 3, 4, 5]).foreachPartition(f) |
Reduces the elements of this RDD using the specified commutative and associative binary operator. Currently reduces partitions locally. >>> from operator import add >>> sc.parallelize([1, 2, 3, 4, 5]).reduce(add) 15 >>> sc.parallelize((2 for _ in range(10))).map(lambda x: 1).cache().reduce(add) 10 |
Aggregate the elements of each partition, and then the results for all the partitions, using a given associative function and a neutral "zero value." The function >>> from operator import add >>> sc.parallelize([1, 2, 3, 4, 5]).fold(0, add) 15 |
Aggregate the elements of each partition, and then the results for all the partitions, using a given combine functions and a neutral "zero value." The functions The first function (seqOp) can return a different result type, U, than the type of this RDD. Thus, we need one operation for merging a T into an U and one operation for merging two U >>> seqOp = (lambda x, y: (x[0] + y, x[1] + 1)) >>> combOp = (lambda x, y: (x[0] + y[0], x[1] + y[1])) >>> sc.parallelize([1, 2, 3, 4]).aggregate((0, 0), seqOp, combOp) (10, 4) >>> sc.parallelize([]).aggregate((0, 0), seqOp, combOp) (0, 0) |
Find the maximum item in this RDD. >>> sc.parallelize([1.0, 5.0, 43.0, 10.0]).max() 43.0 |
Find the maximum item in this RDD. >>> sc.parallelize([1.0, 5.0, 43.0, 10.0]).min() 1.0 |
Add up the elements in this RDD. >>> sc.parallelize([1.0, 2.0, 3.0]).sum() 6.0 |
Return the number of elements in this RDD. >>> sc.parallelize([2, 3, 4]).count() 3 |
Compute the mean of this RDD's elements. >>> sc.parallelize([1, 2, 3]).mean() 2.0 |
Compute the variance of this RDD's elements. >>> sc.parallelize([1, 2, 3]).variance() 0.666... |
Compute the standard deviation of this RDD's elements. >>> sc.parallelize([1, 2, 3]).stdev() 0.816... |
Compute the sample standard deviation of this RDD's elements (which corrects for bias in estimating the standard deviation by dividing by N-1 instead of N). >>> sc.parallelize([1, 2, 3]).sampleStdev() 1.0 |
Compute the sample variance of this RDD's elements (which corrects for bias in estimating the variance by dividing by N-1 instead of N). >>> sc.parallelize([1, 2, 3]).sampleVariance() 1.0 |
Return the count of each unique value in this RDD as a dictionary of (value, count) pairs. >>> sorted(sc.parallelize([1, 2, 1, 2, 2], 2).countByValue().items()) [(1, 2), (2, 3)] |
Get the top N elements from a RDD. Note: It returns the list sorted in descending order. >>> sc.parallelize([10, 4, 2, 12, 3]).top(1) [12] >>> sc.parallelize([2, 3, 4, 5, 6], 2).top(2) [6, 5] |
Get the N elements from a RDD ordered in ascending order or as specified by the optional key function. >>> sc.parallelize([10, 1, 2, 9, 3, 4, 5, 6, 7]).takeOrdered(6) [1, 2, 3, 4, 5, 6] >>> sc.parallelize([10, 1, 2, 9, 3, 4, 5, 6, 7], 2).takeOrdered(6, key=lambda x: -x) [10, 9, 7, 6, 5, 4] |
Take the first num elements of the RDD. This currently scans the partitions *one by one*, so it will be slow if a lot of partitions are required. In that case, use collect to get the whole RDD instead. >>> sc.parallelize([2, 3, 4, 5, 6]).cache().take(2) [2, 3] >>> sc.parallelize([2, 3, 4, 5, 6]).take(10) [2, 3, 4, 5, 6] |
Return the first element in this RDD. >>> sc.parallelize([2, 3, 4]).first() 2 |
Save this RDD as a text file, using string representations of elements. >>> tempFile = NamedTemporaryFile(delete=True) >>> tempFile.close() >>> sc.parallelize(range(10)).saveAsTextFile(tempFile.name) >>> from fileinput import input >>> from glob import glob >>> ''.join(sorted(input(glob(tempFile.name + "/part-0000*")))) '0\n1\n2\n3\n4\n5\n6\n7\n8\n9\n' Empty lines are tolerated when saving to text files. >>> tempFile2 = NamedTemporaryFile(delete=True) >>> tempFile2.close() >>> sc.parallelize(['', 'foo', '', 'bar', '']).saveAsTextFile(tempFile2.name) >>> ''.join(sorted(input(glob(tempFile2.name + "/part-0000*")))) '\n\n\nbar\nfoo\n' |
Return the key-value pairs in this RDD to the master as a dictionary. >>> m = sc.parallelize([(1, 2), (3, 4)]).collectAsMap() >>> m[1] 2 >>> m[3] 4 |
Return an RDD with the keys of each tuple. >>> m = sc.parallelize([(1, 2), (3, 4)]).keys() >>> m.collect() [1, 3] |
Return an RDD with the values of each tuple. >>> m = sc.parallelize([(1, 2), (3, 4)]).values() >>> m.collect() [2, 4] |
Merge the values for each key using an associative reduce function. This will also perform the merging locally on each mapper before sending results to a reducer, similarly to a "combiner" in MapReduce. Output will be hash-partitioned with >>> from operator import add >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) >>> sorted(rdd.reduceByKey(add).collect()) [('a', 2), ('b', 1)] |
Merge the values for each key using an associative reduce function, but return the results immediately to the master as a dictionary. This will also perform the merging locally on each mapper before sending results to a reducer, similarly to a "combiner" in MapReduce. >>> from operator import add >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) >>> sorted(rdd.reduceByKeyLocally(add).items()) [('a', 2), ('b', 1)] |
Count the number of elements for each key, and return the result to the master as a dictionary. >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) >>> sorted(rdd.countByKey().items()) [('a', 2), ('b', 1)] |
Return an RDD containing all pairs of elements with matching keys in
Each pair of elements will be returned as a (k, (v1, v2)) tuple, where
(k, v1) is in Performs a hash join across the cluster. >>> x = sc.parallelize([("a", 1), ("b", 4)]) >>> y = sc.parallelize([("a", 2), ("a", 3)]) >>> sorted(x.join(y).collect()) [('a', (1, 2)), ('a', (1, 3))] |
Perform a left outer join of For each element (k, v) in Hash-partitions the resulting RDD into the given number of partitions. >>> x = sc.parallelize([("a", 1), ("b", 4)]) >>> y = sc.parallelize([("a", 2)]) >>> sorted(x.leftOuterJoin(y).collect()) [('a', (1, 2)), ('b', (4, None))] |
Perform a right outer join of For each element (k, w) in Hash-partitions the resulting RDD into the given number of partitions. >>> x = sc.parallelize([("a", 1), ("b", 4)]) >>> y = sc.parallelize([("a", 2)]) >>> sorted(y.rightOuterJoin(x).collect()) [('a', (2, 1)), ('b', (None, 4))] |
Return a copy of the RDD partitioned using the specified partitioner. >>> pairs = sc.parallelize([1, 2, 3, 4, 2, 4, 1]).map(lambda x: (x, x)) >>> sets = pairs.partitionBy(2).glom().collect() >>> set(sets[0]).intersection(set(sets[1])) set([]) |
Generic function to combine the elements for each key using a custom set of aggregation functions. Turns an RDD[(K, V)] into a result of type RDD[(K, C)], for a "combined type" C. Note that V and C can be different -- for example, one might group an RDD of type (Int, Int) into an RDD of type (Int, List[Int]). Users provide three functions:
In addition, users can control the partitioning of the output RDD. >>> x = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) >>> def f(x): return x >>> def add(a, b): return a + str(b) >>> sorted(x.combineByKey(str, add, add).collect()) [('a', '11'), ('b', '1')] |
Merge the values for each key using an associative function "func" and a neutral "zeroValue" which may be added to the result an arbitrary number of times, and must not change the result (e.g., 0 for addition, or 1 for multiplication.). >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) >>> from operator import add >>> rdd.foldByKey(0, add).collect() [('a', 2), ('b', 1)] |
Group the values for each key in the RDD into a single sequence. Hash-partitions the resulting RDD with into numPartitions partitions. Note: If you are grouping in order to perform an aggregation (such as a sum or average) over each key, using reduceByKey will provide much better performance. >>> x = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) >>> map((lambda (x,y): (x, list(y))), sorted(x.groupByKey().collect())) [('a', [1, 1]), ('b', [1])] |
Pass each value in the key-value pair RDD through a flatMap function without changing the keys; this also retains the original RDD's partitioning. >>> x = sc.parallelize([("a", ["x", "y", "z"]), ("b", ["p", "r"])]) >>> def f(x): return x >>> x.flatMapValues(f).collect() [('a', 'x'), ('a', 'y'), ('a', 'z'), ('b', 'p'), ('b', 'r')] |
Pass each value in the key-value pair RDD through a map function without changing the keys; this also retains the original RDD's partitioning. >>> x = sc.parallelize([("a", ["apple", "banana", "lemon"]), ("b", ["grapes"])]) >>> def f(x): return len(x) >>> x.mapValues(f).collect() [('a', 3), ('b', 1)] |
For each key k in >>> x = sc.parallelize([("a", 1), ("b", 4)]) >>> y = sc.parallelize([("a", 2)]) >>> map((lambda (x,y): (x, (list(y[0]), list(y[1])))), sorted(list(x.cogroup(y).collect()))) [('a', ([1], [2])), ('b', ([4], []))] |
Return each (key, value) pair in >>> x = sc.parallelize([("a", 1), ("b", 4), ("b", 5), ("a", 2)]) >>> y = sc.parallelize([("a", 3), ("c", None)]) >>> sorted(x.subtractByKey(y).collect()) [('b', 4), ('b', 5)] |
Return each value in >>> x = sc.parallelize([("a", 1), ("b", 4), ("b", 5), ("a", 3)]) >>> y = sc.parallelize([("a", 3), ("c", None)]) >>> sorted(x.subtract(y).collect()) [('a', 1), ('b', 4), ('b', 5)] |
Creates tuples of the elements in this RDD by applying
>>> x = sc.parallelize(range(0,3)).keyBy(lambda x: x*x) >>> y = sc.parallelize(zip(range(0,5), range(0,5))) >>> map((lambda (x,y): (x, (list(y[0]), (list(y[1]))))), sorted(x.cogroup(y).collect())) [(0, ([0], [0])), (1, ([1], [1])), (2, ([], [2])), (3, ([], [3])), (4, ([2], [4]))] |
Return a new RDD that has exactly numPartitions partitions. Can increase or decrease the level of parallelism in this RDD. Internally, this uses a shuffle to redistribute data. If you are decreasing the number of partitions in this RDD, consider using `coalesce`, which can avoid performing a shuffle. >>> rdd = sc.parallelize([1,2,3,4,5,6,7], 4) >>> sorted(rdd.glom().collect()) [[1], [2, 3], [4, 5], [6, 7]] >>> len(rdd.repartition(2).glom().collect()) 2 >>> len(rdd.repartition(10).glom().collect()) 10 |
Return a new RDD that is reduced into `numPartitions` partitions. >>> sc.parallelize([1, 2, 3, 4, 5], 3).glom().collect() [[1], [2, 3], [4, 5]] >>> sc.parallelize([1, 2, 3, 4, 5], 3).coalesce(1).glom().collect() [[1, 2, 3, 4, 5]] |
Zips this RDD with another one, returning key-value pairs with the first element in each RDD second element in each RDD, etc. Assumes that the two RDDs have the same number of partitions and the same number of elements in each partition (e.g. one was made through a map on the other). >>> x = sc.parallelize(range(0,5)) >>> y = sc.parallelize(range(1000, 1005)) >>> x.zip(y).collect() [(0, 1000), (1, 1001), (2, 1002), (3, 1003), (4, 1004)] |
Assign a name to this RDD. >>> rdd1 = sc.parallelize([1,2]) >>> rdd1.setName('RDD1') >>> rdd1.name() 'RDD1' |
Get the RDD's current storage level. >>> rdd1 = sc.parallelize([1,2]) >>> rdd1.getStorageLevel() StorageLevel(False, False, False, False, 1) |
Home | Trees | Indices | Help |
|
---|
Generated by Epydoc 3.0.1 on Fri Jul 4 18:52:26 2014 | http://epydoc.sourceforge.net |