Returning to yesterday’s question, I compared the physical plan of both versions.

Dataset API

Filter before map

Here I use the Dataset API and filter the dataset before mapping (rename, type cast) columns:

val warm_ds = iot_ds
  .filter(v => {v.temp > 30 && v.out_in == "Out"})
  .map(v => WarmIOTDataTypes(v.room_id, v.out_in, v.temp))

The resulting physical plan shows that the filter takes place before the mapping.

== Physical Plan ==
*(1) SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, sparklearning.WarmIOTDataTypes, true])).room_id, true, false, true) AS room_id#74, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, sparklearning.WarmIOTDataTypes, true])).out_in, true, false, true) AS out_in#75, knownnotnull(assertnotnull(input[0, sparklearning.WarmIOTDataTypes, true])).temp.intValue AS temp#76]
+- *(1) MapElements sparklearning.IOTData$$$Lambda$8661/0x00007f7a1da54400@13fc1c1e, obj#73: sparklearning.WarmIOTDataTypes
   +- *(1) Filter sparklearning.IOTData$$$Lambda$8660/0x00007f7a1da53bd0@53903fe6.apply
      +- *(1) DeserializeToObject newInstance(class sparklearning.IOTDataTypes), obj#72: sparklearning.IOTDataTypes
         +- *(1) Project [id#17, room_id/id#18 AS room_id#27, gettimestamp(noted_date#19, dd-MM-yyyy HH:mm, TimestampType, Some(Europe/Brussels), false) AS noted_date#46, cast(temp#20 as int) AS temp#40, out/in#21 AS out_in#34]
            +- FileScan csv [id#17,room_id/id#18,noted_date#19,temp#20,out/in#21] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/home/denis/code/spark-learning/data/IOT-temp.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:string,room_id/id:string,noted_date:string,temp:string,out/in:string>

Map before filter

Here I use also use the Dataset API but map the columns before filtering:

val warm_ds = iot_ds
  .map(v => WarmIOTDataTypes(v.room_id, v.out_in, v.temp))
  .filter(v => {v.temp > 30 && v.out_in == "Out"})

This order is reflected in the physical plan below: the filter takes place after mapping.

== Physical Plan ==
*(1) SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, sparklearning.WarmIOTDataTypes, true])).room_id, true, false, true) AS room_id#68, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, sparklearning.WarmIOTDataTypes, true])).out_in, true, false, true) AS out_in#69, knownnotnull(assertnotnull(input[0, sparklearning.WarmIOTDataTypes, true])).temp.intValue AS temp#70]
+- *(1) Filter sparklearning.IOTData$$$Lambda$8637/0x00007f792da4e000@43037ed4.apply
   +- *(1) MapElements sparklearning.IOTData$$$Lambda$8631/0x00007f792da55a50@73c0914, obj#67: sparklearning.WarmIOTDataTypes
      +- *(1) DeserializeToObject newInstance(class sparklearning.IOTDataTypes), obj#66: sparklearning.IOTDataTypes
         +- *(1) Project [id#17, room_id/id#18 AS room_id#27, gettimestamp(noted_date#19, dd-MM-yyyy HH:mm, TimestampType, Some(Europe/Brussels), false) AS noted_date#46, cast(temp#20 as int) AS temp#40, out/in#21 AS out_in#34]
            +- FileScan csv [id#17,room_id/id#18,noted_date#19,temp#20,out/in#21] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/home/denis/code/spark-learning/data/IOT-temp.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:string,room_id/id:string,noted_date:string,temp:string,out/in:string>

Dataframe API

To check if the result is different for the Dataframe API, I

Filter before map

Code:

val warm_ds = silver_df
  .filter(F.col("temp") > 30 && F.col("out_in") === "Out")
  .select("room_id", "out_in", "temp")

Plan:

The filter takes place before map (select).

== Physical Plan ==
*(1) Project [room_id/id#18 AS room_id#27, out/in#21 AS out_in#34, cast(temp#20 as int) AS temp#40]
+- *(1) Filter (((isnotnull(temp#20) AND isnotnull(out/in#21)) AND (cast(temp#20 as int) > 30)) AND (out/in#21 = Out))
   +- FileScan csv [room_id/id#18,temp#20,out/in#21] Batched: false, DataFilters: [isnotnull(temp#20), isnotnull(out/in#21), (cast(temp#20 as int) > 30), (out/in#21 = Out)], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/home/denis/code/spark-learning/data/IOT-temp.csv], PartitionFilters: [], PushedFilters: [IsNotNull(temp), IsNotNull(out/in), EqualTo(out/in,Out)], ReadSchema: struct<room_id/id:string,temp:string,out/in:string>

map before filter

Code:

val warm_ds = silver_df
  .select("room_id", "out_in", "temp")
  .filter(F.col("temp") > 30 && F.col("out_in") === "Out")

Plan:

The filter also takes place before map (select).

== Physical Plan ==
*(1) Project [room_id/id#18 AS room_id#27, out/in#21 AS out_in#34, cast(temp#20 as int) AS temp#40]
+- *(1) Filter (((isnotnull(temp#20) AND isnotnull(out/in#21)) AND (cast(temp#20 as int) > 30)) AND (out/in#21 = Out))
   +- FileScan csv [room_id/id#18,temp#20,out/in#21] Batched: false, DataFilters: [isnotnull(temp#20), isnotnull(out/in#21), (cast(temp#20 as int) > 30), (out/in#21 = Out)], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/home/denis/code/spark-learning/data/IOT-temp.csv], PartitionFilters: [], PushedFilters: [IsNotNull(temp), IsNotNull(out/in), EqualTo(out/in,Out)], ReadSchema: struct<room_id/id:string,temp:string,out/in:string>

Conclusion

I see now that the Dataset API puts some limitations on the Spark Catalyst optimizer. Because Catalyst can’t see inside the lambda functions passed to filter and map in the Dataset API, it doesn’t dare to rearrange their order.

This means that, the ordering of map first and then filter in the Dataset API will unfortunately be preserved (i.e. not optimized). It’s better to filter as soon as possible (as close to the data).

Note that the Dataframe API optimizes this: regardless of the ordering of the filter and select statements, the filtering takes precedence in the Physical Plan in both cases. This is because in the Dataframe API, Catalyst can clearly see what’s going on inside both filter and select, ans is therefore able to optimize.

This shows that, although the Dataset API provides type-safety, it can potentially sacrifice some optimization.