public class SparkUtils
extends java.lang.Object
Modifier and Type | Method and Description |
---|---|
static <T,U> org.apache.spark.api.java.JavaPairRDD<T,U>[] |
balancedRandomSplit(int totalObjectCount,
int numObjectsPerSplit,
org.apache.spark.api.java.JavaPairRDD<T,U> data)
Equivalent to
balancedRandomSplit(int, int, JavaRDD) but for Pair RDDs |
static <T,U> org.apache.spark.api.java.JavaPairRDD<T,U>[] |
balancedRandomSplit(int totalObjectCount,
int numObjectsPerSplit,
org.apache.spark.api.java.JavaPairRDD<T,U> data,
long rngSeed)
Equivalent to
balancedRandomSplit(int, int, JavaRDD) but for pair RDDs, and with control over the RNG seed |
static <T> org.apache.spark.api.java.JavaRDD<T>[] |
balancedRandomSplit(int totalObjectCount,
int numObjectsPerSplit,
org.apache.spark.api.java.JavaRDD<T> data)
Random split the specified RDD into a number of RDDs, where each has
numObjectsPerSplit in them. |
static <T> org.apache.spark.api.java.JavaRDD<T>[] |
balancedRandomSplit(int totalObjectCount,
int numObjectsPerSplit,
org.apache.spark.api.java.JavaRDD<T> data,
long rngSeed)
Equivalent to
balancedRandomSplit(int, int, JavaRDD) with control over the RNG seed |
static boolean |
checkKryoConfiguration(org.apache.spark.api.java.JavaSparkContext javaSparkContext,
org.slf4j.Logger log)
Check the spark configuration for incorrect Kryo configuration, logging a warning message if necessary
|
static org.apache.spark.api.java.JavaRDD<java.lang.String> |
listPaths(org.apache.spark.api.java.JavaSparkContext sc,
java.lang.String path)
List of the files in the given directory (path), as a
JavaRDD<String> |
static <T> T |
readObjectFromFile(java.lang.String path,
java.lang.Class<T> type,
org.apache.spark.api.java.JavaSparkContext sc)
Read an object from HDFS (or local) using default Java object serialization
|
static <T> T |
readObjectFromFile(java.lang.String path,
java.lang.Class<T> type,
org.apache.spark.SparkContext sc)
Read an object from HDFS (or local) using default Java object serialization
|
static java.lang.String |
readStringFromFile(java.lang.String path,
org.apache.spark.api.java.JavaSparkContext sc)
Read a UTF-8 format String from HDFS (or local)
|
static java.lang.String |
readStringFromFile(java.lang.String path,
org.apache.spark.SparkContext sc)
Read a UTF-8 format String from HDFS (or local)
|
static <T> org.apache.spark.api.java.JavaRDD<T> |
repartition(org.apache.spark.api.java.JavaRDD<T> rdd,
Repartition repartition,
RepartitionStrategy repartitionStrategy,
int objectsPerPartition,
int numPartitions)
Repartition the specified RDD (or not) using the given
Repartition and RepartitionStrategy settings |
static <T> org.apache.spark.api.java.JavaRDD<T> |
repartitionBalanceIfRequired(org.apache.spark.api.java.JavaRDD<T> rdd,
Repartition repartition,
int objectsPerPartition,
int numPartitions)
Repartition a RDD (given the
Repartition setting) such that we have approximately numPartitions partitions,
each of which has objectsPerPartition objects. |
static org.apache.spark.api.java.JavaRDD<org.nd4j.linalg.dataset.DataSet> |
shuffleExamples(org.apache.spark.api.java.JavaRDD<org.nd4j.linalg.dataset.DataSet> rdd,
int newBatchSize,
int numPartitions)
Randomly shuffle the examples in each DataSet object, and recombine them into new DataSet objects
with the specified BatchSize
|
static void |
writeObjectToFile(java.lang.String path,
java.lang.Object toWrite,
org.apache.spark.api.java.JavaSparkContext sc)
Write an object to HDFS (or local) using default Java object serialization
|
static void |
writeObjectToFile(java.lang.String path,
java.lang.Object toWrite,
org.apache.spark.SparkContext sc)
Write an object to HDFS (or local) using default Java object serialization
|
static void |
writeStringToFile(java.lang.String path,
java.lang.String toWrite,
org.apache.spark.api.java.JavaSparkContext sc)
Write a String to a file (on HDFS or local) in UTF-8 format
|
static void |
writeStringToFile(java.lang.String path,
java.lang.String toWrite,
org.apache.spark.SparkContext sc)
Write a String to a file (on HDFS or local) in UTF-8 format
|
public static boolean checkKryoConfiguration(org.apache.spark.api.java.JavaSparkContext javaSparkContext, org.slf4j.Logger log)
javaSparkContext
- Spark contextlog
- Logger to log messages topublic static void writeStringToFile(java.lang.String path, java.lang.String toWrite, org.apache.spark.api.java.JavaSparkContext sc) throws java.io.IOException
path
- Path to write totoWrite
- String to writesc
- Spark contextjava.io.IOException
public static void writeStringToFile(java.lang.String path, java.lang.String toWrite, org.apache.spark.SparkContext sc) throws java.io.IOException
path
- Path to write totoWrite
- String to writesc
- Spark contextjava.io.IOException
public static java.lang.String readStringFromFile(java.lang.String path, org.apache.spark.api.java.JavaSparkContext sc) throws java.io.IOException
path
- Path to write the stringsc
- Spark contextjava.io.IOException
public static java.lang.String readStringFromFile(java.lang.String path, org.apache.spark.SparkContext sc) throws java.io.IOException
path
- Path to write the stringsc
- Spark contextjava.io.IOException
public static void writeObjectToFile(java.lang.String path, java.lang.Object toWrite, org.apache.spark.api.java.JavaSparkContext sc) throws java.io.IOException
path
- Path to write the object totoWrite
- Object to writesc
- Spark contextjava.io.IOException
public static void writeObjectToFile(java.lang.String path, java.lang.Object toWrite, org.apache.spark.SparkContext sc) throws java.io.IOException
path
- Path to write the object totoWrite
- Object to writesc
- Spark contextjava.io.IOException
public static <T> T readObjectFromFile(java.lang.String path, java.lang.Class<T> type, org.apache.spark.api.java.JavaSparkContext sc) throws java.io.IOException
T
- Type of the object to readpath
- File to readtype
- Class of the object to readsc
- Spark contextjava.io.IOException
public static <T> T readObjectFromFile(java.lang.String path, java.lang.Class<T> type, org.apache.spark.SparkContext sc) throws java.io.IOException
T
- Type of the object to readpath
- File to readtype
- Class of the object to readsc
- Spark contextjava.io.IOException
public static <T> org.apache.spark.api.java.JavaRDD<T> repartition(org.apache.spark.api.java.JavaRDD<T> rdd, Repartition repartition, RepartitionStrategy repartitionStrategy, int objectsPerPartition, int numPartitions)
Repartition
and RepartitionStrategy
settingsT
- Type of the RDDrdd
- RDD to repartitionrepartition
- Setting for when repartiting is to be conductedrepartitionStrategy
- Setting for how repartitioning is to be conductedobjectsPerPartition
- Desired number of objects per partitionnumPartitions
- Total number of partitionspublic static <T> org.apache.spark.api.java.JavaRDD<T> repartitionBalanceIfRequired(org.apache.spark.api.java.JavaRDD<T> rdd, Repartition repartition, int objectsPerPartition, int numPartitions)
Repartition
setting) such that we have approximately numPartitions
partitions,
each of which has objectsPerPartition
objects.T
- Type of RDDrdd
- RDD to repartitionrepartition
- Repartitioning settingobjectsPerPartition
- Number of objects we want in each partitionnumPartitions
- Number of partitions to havepublic static <T> org.apache.spark.api.java.JavaRDD<T>[] balancedRandomSplit(int totalObjectCount, int numObjectsPerSplit, org.apache.spark.api.java.JavaRDD<T> data)
numObjectsPerSplit
in them.
This similar to how RDD.randomSplit works (i.e., split via filtering), but this should result in more
equal splits (instead of independent binomial sampling that is used there, based on weighting)
This balanced splitting approach is important when the number of DataSet objects we want in each split is small,
as random sampling variance of JavaRDD.randomSplit(double[])
is quite large relative to the number of examples
in each split. Note however that this method doesn't guarantee that partitions will be balanced
Downside is we need total object count (whereas JavaRDD.randomSplit(double[])
does not). However, randomSplit
requires a full pass of the data anyway (in order to do filtering upon it) so this should not add much overhead in practice
T
- Generic type for the RDDtotalObjectCount
- Total number of objects in the RDD to splitnumObjectsPerSplit
- Number of objects in each splitdata
- Data to splitpublic static <T> org.apache.spark.api.java.JavaRDD<T>[] balancedRandomSplit(int totalObjectCount, int numObjectsPerSplit, org.apache.spark.api.java.JavaRDD<T> data, long rngSeed)
balancedRandomSplit(int, int, JavaRDD)
with control over the RNG seedpublic static <T,U> org.apache.spark.api.java.JavaPairRDD<T,U>[] balancedRandomSplit(int totalObjectCount, int numObjectsPerSplit, org.apache.spark.api.java.JavaPairRDD<T,U> data)
balancedRandomSplit(int, int, JavaRDD)
but for Pair RDDspublic static <T,U> org.apache.spark.api.java.JavaPairRDD<T,U>[] balancedRandomSplit(int totalObjectCount, int numObjectsPerSplit, org.apache.spark.api.java.JavaPairRDD<T,U> data, long rngSeed)
balancedRandomSplit(int, int, JavaRDD)
but for pair RDDs, and with control over the RNG seedpublic static org.apache.spark.api.java.JavaRDD<java.lang.String> listPaths(org.apache.spark.api.java.JavaSparkContext sc, java.lang.String path) throws java.io.IOException
JavaRDD<String>
sc
- Spark contextpath
- Path to list files injava.io.IOException
- If error occurs getting directory contentspublic static org.apache.spark.api.java.JavaRDD<org.nd4j.linalg.dataset.DataSet> shuffleExamples(org.apache.spark.api.java.JavaRDD<org.nd4j.linalg.dataset.DataSet> rdd, int newBatchSize, int numPartitions)
rdd
- DataSets to shuffle/recombinenewBatchSize
- New batch size for the DataSet objects, after shuffling/recombiningnumPartitions
- Number of partitions to use when splitting/recombiningJavaRDD
, with the examples shuffled/combined in each