Azure – Spark MergeSchema on parquet columns

apache-sparkazuredatabricksscala

For schema evolution Mergeschema can be used in Spark for Parquet file formats, and I have below clarifications on this

Does this support only Parquet file format or any other file formats like csv,txt files.

If new additional columns are added in between I understand Mergeschema will move the columns to last.

And if column orders are disturbed then whether Mergeschema will align the columns to correct order when it was created or do we need to do this manually by selecting all the columns.

Update from Comment :
for example If I have a schema as below and create table as below – spark.sql("CREATE TABLE emp USING DELTA LOCATION '****'") empid,empname,salary====> 001,ABC,10000 and next day if I get below format empid,empage,empdept,empname,salary====> 001,30,XYZ,ABC,10000.

Whether new columns – empage, empdept will be added after empid,empname,salary columns?

Best Answer

Q : 1. Does this support only Parquet file format or any other file formats like csv,txt files. 2. if column orders are disturbed then whether Mergeschema will align the columns to correct order when it was created or do we need to do this manuallly by selecting all the columns


AFAIK Merge schema is supported only by parquet not by other format like csv , txt.

Mergeschema (spark.sql.parquet.mergeSchema) will align the columns in the correct order even they are distributed.

Example from spark documentation on parquet schema-merging:

import spark.implicits._

// Create a simple DataFrame, store into a partition directory
val squaresDF = spark.sparkContext.makeRDD(1 to 5).map(i => (i, i * i)).toDF("value", "square")
squaresDF.write.parquet("data/test_table/key=1")

// Create another DataFrame in a new partition directory,
// adding a new column and dropping an existing column
val cubesDF = spark.sparkContext.makeRDD(6 to 10).map(i => (i, i * i * i)).toDF("value", "cube")
cubesDF.write.parquet("data/test_table/key=2")

// Read the partitioned table
val mergedDF = spark.read.option("mergeSchema", "true").parquet("data/test_table")
mergedDF.printSchema()

// The final schema consists of all 3 columns in the Parquet files together
// with the partitioning column appeared in the partition directory paths
// root
//  |-- value: int (nullable = true)
//  |-- square: int (nullable = true)
//  |-- cube: int (nullable = true)
//  |-- key: int (nullable = true)

UPDATE : Real example given by you in the comment box...


Q : Whether new columns - empage, empdept will be added after empid,empname,salary columns?


Answer : Yes EMPAGE,EMPDEPT WERE ADDED AFER EMPID,EMPNAME,SALARY followed by your day column.

see the full example.

package examples

import org.apache.log4j.Level
import org.apache.spark.sql.SaveMode


object CSVDataSourceParquetSchemaMerge extends App {
  val logger = org.apache.log4j.Logger.getLogger("org")
  logger.setLevel(Level.WARN)

  import org.apache.spark.sql.SparkSession

  val spark = SparkSession.builder().appName("CSVParquetSchemaMerge")
    .master("local")
    .getOrCreate()


  import spark.implicits._

  val csvDataday1 = spark.sparkContext.parallelize(
    """
      |empid,empname,salary
      |001,ABC,10000
    """.stripMargin.lines.toList).toDS()
  val csvDataday2 = spark.sparkContext.parallelize(
    """
      |empid,empage,empdept,empname,salary
      |001,30,XYZ,ABC,10000
    """.stripMargin.lines.toList).toDS()

  val frame = spark.read.option("header", true).option("inferSchema", true).csv(csvDataday1)

  println("first day data ")
  frame.show
  frame.write.mode(SaveMode.Overwrite).parquet("data/test_table/day=1")
  frame.printSchema

  val frame1 = spark.read.option("header", true).option("inferSchema", true).csv(csvDataday2)
  frame1.write.mode(SaveMode.Overwrite).parquet("data/test_table/day=2")
  println("Second day data ")

  frame1.show(false)
  frame1.printSchema

  // Read the partitioned table
  val mergedDF = spark.read.option("mergeSchema", "true").parquet("data/test_table")
  println("Merged Schema")
  mergedDF.printSchema
  println("Merged Datarame where EMPAGE,EMPDEPT WERE ADDED AFER EMPID,EMPNAME,SALARY followed by your day column")
  mergedDF.show(false)


}


Result :

first day data 
+-----+-------+------+
|empid|empname|salary|
+-----+-------+------+
|    1|    ABC| 10000|
+-----+-------+------+

root
 |-- empid: integer (nullable = true)
 |-- empname: string (nullable = true)
 |-- salary: integer (nullable = true)

Second day data 
+-----+------+-------+-------+------+
|empid|empage|empdept|empname|salary|
+-----+------+-------+-------+------+
|1    |30    |XYZ    |ABC    |10000 |
+-----+------+-------+-------+------+

root
 |-- empid: integer (nullable = true)
 |-- empage: integer (nullable = true)
 |-- empdept: string (nullable = true)
 |-- empname: string (nullable = true)
 |-- salary: integer (nullable = true)

Merged Schema
root
 |-- empid: integer (nullable = true)
 |-- empname: string (nullable = true)
 |-- salary: integer (nullable = true)
 |-- empage: integer (nullable = true)
 |-- empdept: string (nullable = true)
 |-- day: integer (nullable = true)

Merged Datarame where EMPAGE,EMPDEPT WERE ADDED AFER EMPID,EMPNAME,SALARY followed by your day column
+-----+-------+------+------+-------+---+
|empid|empname|salary|empage|empdept|day|
+-----+-------+------+------+-------+---+
|1    |ABC    |10000 |30    |XYZ    |2  |
|1    |ABC    |10000 |null  |null   |1  |
+-----+-------+------+------+-------+---+

Directory tree :

enter image description here