Context
Let's say you deal with time series data. Your desired outcome relies on multiple window functions with distinct window specifications. The result may resemble a single spark column expression, like an identifier for intervals.
Status Quo
Usually, I don't store intermediate results with df.withColumn but rather chain/stack column expressions and trust Spark to find the most effective DAG (when dealing with DataFrame).
Reproducible example
However, in the following example (PySpark 2.4.4 standalone), storing an intermediate result with df.withColumn reduces the DAG complexity. Let's consider following test setup:
import pandas as pd
import numpy as np
from pyspark.sql import SparkSession, Window
from pyspark.sql import functions as F
spark = SparkSession.builder.getOrCreate()
dfp = pd.DataFrame(
{
"col1": np.random.randint(0, 5, size=100),
"col2": np.random.randint(0, 5, size=100),
"col3": np.random.randint(0, 5, size=100),
"col4": np.random.randint(0, 5, size=100),
}
)
df = spark.createDataFrame(dfp)
df.show(5)
+----+----+----+----+
|col1|col2|col3|col4|
+----+----+----+----+
| 1| 2| 4| 1|
| 0| 2| 3| 0|
| 2| 0| 1| 0|
| 4| 1| 1| 2|
| 1| 3| 0| 4|
+----+----+----+----+
only showing top 5 rows
The computation is arbitrary. Basically we have 2 window specs and 3 computational steps. The 3 computational steps are dependend on each other and use alternating window specs:
w1 = Window.partitionBy("col1").orderBy("col2")
w2 = Window.partitionBy("col3").orderBy("col4")
# first step, arbitrary window func over 1st window
step1 = F.lag("col3").over(w1)
# second step, arbitrary window func over 2nd window with step 1
step2 = F.lag(step1).over(w2)
# third step, arbitrary window func over 1st window with step 2
step3 = F.when(step2 > 1, F.max(step2).over(w1))
df_result = df.withColumn("result", step3)
Inspecting the phyiscal plan via df_result.explain() reveals 4 exchanges and sorts! However, only 3 should be necessary here because we change the window spec only twice.
df_result.explain()
== Physical Plan ==
*(7) Project [col1#0L, col2#1L, col3#2L, col4#3L, CASE WHEN (_we0#25L > 1) THEN _we1#26L END AS result#22L]
+- Window [lag(_w0#23L, 1, null) windowspecdefinition(col3#2L, col4#3L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) AS _we0#25L], [col3#2L], [col4#3L ASC NULLS FIRST]
+- *(6) Sort [col3#2L ASC NULLS FIRST, col4#3L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(col3#2L, 200)
+- *(5) Project [col1#0L, col2#1L, col3#2L, col4#3L, _w0#23L, _we1#26L]
+- Window [max(_w1#24L) windowspecdefinition(col1#0L, col2#1L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS _we1#26L], [col1#0L], [col2#1L ASC NULLS FIRST]
+- *(4) Sort [col1#0L ASC NULLS FIRST, col2#1L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(col1#0L, 200)
+- *(3) Project [col1#0L, col2#1L, col3#2L, col4#3L, _w0#23L, _w1#24L]
+- Window [lag(_w0#27L, 1, null) windowspecdefinition(col3#2L, col4#3L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) AS _w1#24L], [col3#2L], [col4#3L ASC NULLS FIRST]
+- *(2) Sort [col3#2L ASC NULLS FIRST, col4#3L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(col3#2L, 200)
+- Window [lag(col3#2L, 1, null) windowspecdefinition(col1#0L, col2#1L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) AS _w0#27L, lag(col3#2L, 1, null) windowspecdefinition(col1#0L, col2#1L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) AS _w0#23L], [col1#0L], [col2#1L ASC NULLS FIRST]
+- *(1) Sort [col1#0L ASC NULLS FIRST, col2#1L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(col1#0L, 200)
+- Scan ExistingRDD[col1#0L,col2#1L,col3#2L,col4#3L]
Improvement
To get a better DAG, we slightly modify the code to store the column expression of step2 with withColumn and just pass the reference of this column. The new logical plan requires only 3 shuffles indeed!
w1 = Window.partitionBy("col1").orderBy("col2")
w2 = Window.partitionBy("col3").orderBy("col4")
# first step, arbitrary window func
step1 = F.lag("col3").over(w1)
# second step, arbitrary window func over 2nd window with step 1
step2 = F.lag(step1).over(w2)
# save temporary
df = df.withColumn("tmp_variable", step2)
step2 = F.col("tmp_variable")
# third step, arbitrary window func over 1st window with step 2
step3 = F.when(step2 > 1, F.max(step2).over(w1))
df_result = df.withColumn("result", step3).drop("tmp_variable")
df_result.explain()
== Physical Plan ==
*(5) Project [col1#0L, col2#1L, col3#2L, col4#3L, CASE WHEN (tmp_variable#33L > 1) THEN _we0#42L END AS result#41L]
+- Window [max(tmp_variable#33L) windowspecdefinition(col1#0L, col2#1L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS _we0#42L], [col1#0L], [col2#1L ASC NULLS FIRST]
+- *(4) Sort [col1#0L ASC NULLS FIRST, col2#1L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(col1#0L, 200)
+- *(3) Project [col1#0L, col2#1L, col3#2L, col4#3L, tmp_variable#33L]
+- Window [lag(_w0#34L, 1, null) windowspecdefinition(col3#2L, col4#3L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) AS tmp_variable#33L], [col3#2L], [col4#3L ASC NULLS FIRST]
+- *(2) Sort [col3#2L ASC NULLS FIRST, col4#3L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(col3#2L, 200)
+- Window [lag(col3#2L, 1, null) windowspecdefinition(col1#0L, col2#1L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) AS _w0#34L], [col1#0L], [col2#1L ASC NULLS FIRST]
+- *(1) Sort [col1#0L ASC NULLS FIRST, col2#1L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(col1#0L, 200)
+- Scan ExistingRDD[col1#0L,col2#1L,col3#2L,col4#3L]
Relevance
My original example was even more complex and resulted in an even greater difference of the DAG (on real world data up to 10 times slower)
Question
Does anyone have an answer to this odd behavior? I've thought that stacking/chaining column expressions is best practice since it allows Spark to optimize intermediate steps most effectively (in contrast to create references for intermediate results).
You could try adding a hint to the withColumn method to tell Spark to not exchange data between partitions. You can do this by using .hint("broadcast") after withColumn. This will tell Spark to try to broadcast the intermediate result to all the executors instead of exchanging data between partitions. However, this may not always be possible or effective, depending on the size of the data and the number of executors.
You can also try to manually specify the number of partitions to use in your window functions, using the rowsBetween parameter. This can help reduce the number of exchanges and sorts that are needed. For example:
w1 = Window.partitionBy("col1").orderBy("col2").rowsBetween(-sys.maxsize, sys.maxsize) w2 = Window.partitionBy("col3").orderBy("col4").rowsBetween(-sys.maxsize, sys.maxsize)
This will specify that all rows should be included in the window for both window functions, which may reduce the number of exchanges and sorts that are needed. You can also try specifying a smaller range for the rowsBetween parameter to see if it has any effect on the plan.
It's also worth noting that the physical plan is not always a reliable indicator of the actual execution of the query. The actual execution may be more efficient than the plan suggests. You may want to try running the query and measuring the performance to see if it meets your needs.