Dataframe – How does Spark parallelize the processing of a 1TB file

apache-sparkapache-spark-sqldataframeparallel-processing

Imaginary problem

  • A gigantic CSV log file, let's say 1 TB in size, the file is located on a USB drive
  • The log contains activities logs of users around the world, let's assume that the line contains 50 columns, among those there is Country.
  • We want a line count per country, descending order.
  • Let's assume the Spark cluster has enough nodes with RAM to process the entire 1TB in memory (20 nodes, 4 cores CPU, each node has 64GB RAM)

My Poorman's conceptual solution
Using SparkSQL & Databricks spark-csv

$ ./spark-shell --packages com.databricks:spark-csv_2.10:1.4.0
val dfBigLog = sqlContext.read
  .format("com.databricks.spark.csv")
  .option("header", "true")
  .load("/media/username/myUSBdrive/bogusBigLog1TB.log")

dfBigLog.select("Country")
  .groupBy("Country")
  .agg(count($"Country") as "CountryCount")
  .orderBy($"CountryCount".desc).show

Question 1: How does Spark parallelize the processing?

I suppose the majority of the execution time (99% ?) of the above solution is to read the 1TB file from the USB drive into the Spark cluster. Reading the file from the USB drive is not parallelizable. But after reading the entire file, what does Spark do under the hood to parallelize the processing?

  • How many nodes used for creating the DataFrame? (maybe only one?)

  • How many nodes used for groupBy & count? Let's assume there are 100+ countries (but Spark doesn't know that yet). How would Spark partition to distribute the 100+ country values on 20 nodes?

Question 2: How to make the Spark application the fastest possible?
I suppose the area of improvement would be to parallelize the reading of the 1TB file.

  • Convert the CSV File into a Parquet file format + using Snappy compression. Let's assume this can be done in advance.

  • Copy the Parquet file on HDFS. Let's assume the Spark cluster is within the same Hadoop cluster and the datanodes are independant from the 20 nodes Spark cluster.

  • Change the Spark application to read from HDFS. I suppose Spark would now use several nodes to read the file as Parquet is splittable.

  • Let's assume the Parquet file compressed by Snappy is 10x smaller, size = 100GB, HDFS block = 128 MB in size. Total 782 HDFS blocks.

But then how does Spark manage to use all the 20 nodes for both creating the DataFrame and the processing (groupBy and count)? Does Spark use all the nodes each time?

Best Answer

Question 1: How does Spark parallelize the processing (of reading a file from a USB drive)?

This scenario is not possible.

Spark relies on a hadoop compliant filesystem to read a file. When you mount the USB drive, you can only access it from the local host. Attempting to execute

.load("/media/username/myUSBdrive/bogusBigLog1TB.log")

will fail in cluster configuration, as executors in the cluster will not have access to that local path.

It would be possible to read the file with Spark in local mode (master=local[*]) in which case you only will have 1 host and hence the rest of the questions would not apply.

Question 2: How to make the Spark application the fastest possible?

Divide and conquer.
The strategy outlined in the question is good. Using Parquet will allow Spark to do a projection on the data and only .select("Country") column, further reducing the amount of data required to be ingested and hence speeding things up.

The cornerstone to parallelism in Spark are partitions. Again, as we are reading from a file, Spark relies on the Hadoop filesystem. When reading from HDFS, the partitioning will be dictated by the splits of the file on HDFS. Those splits will be evenly distributed among the executors. That's how Spark will initially distribute the work across all available executors for the job.

I'm not deeply familiar with the Catalist optimizations, but I think I could assume that .groupBy("Country").agg(count($"Country") will become something similar to: rdd.map(country => (country,1)).reduceByKey(_+_) The map operation will not affect partitioning, so can be applied on site. The reduceByKey will be applied first locally on each partition and partial results will be combined on the driver. So most counting happens distributed in the cluster, and adding it up will be centralized.

Related Topic