Let's say you deal with time series data. Your desired outcome relies on multiple window functions with distinct window specifications. The result may resemble a single spark column expression, like an identifier for intervals.
Usually, I don't store intermediate results with df.withColumn but rather chain/stack column expressions and trust Spark to find the most effective DAG (when dealing with DataFrame).
However, in the following example (PySpark 2.4.4 standalone), storing an intermediate result with df.withColumn reduces the DAG complexity. Let's consider following test setup:
To get a better DAG, we slightly modify the code to store the column expression of step2 with withColumn and just pass the reference of this column. The new logical plan requires only 3 shuffles indeed!
w1 = Window.partitionBy("col1").orderBy("col2")w2 = Window.partitionBy("col3").orderBy("col4")# first step, arbitrary window funcstep1 = F.lag("col3").over(w1)# second step, arbitrary window func over 2nd window with step 1step2 = F.lag(step1).over(w2)# save temporarydf = df.withColumn("tmp_variable", step2)step2 = F.col("tmp_variable")# third step, arbitrary window func over 1st window with step 2step3 = F.when(step2 > 1, F.max(step2).over(w1))df_result = df.withColumn("result", step3).drop("tmp_variable")df_result.explain()
My original example was even more complex and resulted in an even greater difference of the DAG (on real world data up to 10 times slower)
Does anyone have an answer to this odd behavior? I've thought that stacking/chaining column expressions is best practice since it allows Spark to optimize intermediate steps most effectively (in contrast to create references for intermediate results).