Returning to yesterday’s question, I compared the physical plan of both versions.
Dataset API
Filter before map
Here I use the Dataset API and filter the dataset before mapping (rename, type cast) columns:
val warm_ds = iot_ds
.filter(v => {v.temp > 30 && v.out_in == "Out"})
.map(v => WarmIOTDataTypes(v.room_id, v.out_in, v.temp))
The resulting physical plan shows that the filter takes place before the mapping.
== Physical Plan ==
*(1) SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, sparklearning.WarmIOTDataTypes, true])).room_id, true, false, true) AS room_id#74, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, sparklearning.WarmIOTDataTypes, true])).out_in, true, false, true) AS out_in#75, knownnotnull(assertnotnull(input[0, sparklearning.WarmIOTDataTypes, true])).temp.intValue AS temp#76]
+- *(1) MapElements sparklearning.IOTData$$$Lambda$8661/0x00007f7a1da54400@13fc1c1e, obj#73: sparklearning.WarmIOTDataTypes
+- *(1) Filter sparklearning.IOTData$$$Lambda$8660/0x00007f7a1da53bd0@53903fe6.apply
+- *(1) DeserializeToObject newInstance(class sparklearning.IOTDataTypes), obj#72: sparklearning.IOTDataTypes
+- *(1) Project [id#17, room_id/id#18 AS room_id#27, gettimestamp(noted_date#19, dd-MM-yyyy HH:mm, TimestampType, Some(Europe/Brussels), false) AS noted_date#46, cast(temp#20 as int) AS temp#40, out/in#21 AS out_in#34]
+- FileScan csv [id#17,room_id/id#18,noted_date#19,temp#20,out/in#21] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/home/denis/code/spark-learning/data/IOT-temp.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:string,room_id/id:string,noted_date:string,temp:string,out/in:string>
Map before filter
Here I use also use the Dataset API but map the columns before filtering:
val warm_ds = iot_ds
.map(v => WarmIOTDataTypes(v.room_id, v.out_in, v.temp))
.filter(v => {v.temp > 30 && v.out_in == "Out"})
This order is reflected in the physical plan below: the filter takes place after mapping.
== Physical Plan ==
*(1) SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, sparklearning.WarmIOTDataTypes, true])).room_id, true, false, true) AS room_id#68, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, sparklearning.WarmIOTDataTypes, true])).out_in, true, false, true) AS out_in#69, knownnotnull(assertnotnull(input[0, sparklearning.WarmIOTDataTypes, true])).temp.intValue AS temp#70]
+- *(1) Filter sparklearning.IOTData$$$Lambda$8637/0x00007f792da4e000@43037ed4.apply
+- *(1) MapElements sparklearning.IOTData$$$Lambda$8631/0x00007f792da55a50@73c0914, obj#67: sparklearning.WarmIOTDataTypes
+- *(1) DeserializeToObject newInstance(class sparklearning.IOTDataTypes), obj#66: sparklearning.IOTDataTypes
+- *(1) Project [id#17, room_id/id#18 AS room_id#27, gettimestamp(noted_date#19, dd-MM-yyyy HH:mm, TimestampType, Some(Europe/Brussels), false) AS noted_date#46, cast(temp#20 as int) AS temp#40, out/in#21 AS out_in#34]
+- FileScan csv [id#17,room_id/id#18,noted_date#19,temp#20,out/in#21] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/home/denis/code/spark-learning/data/IOT-temp.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:string,room_id/id:string,noted_date:string,temp:string,out/in:string>
Dataframe API
To check if the result is different for the Dataframe API, I
Filter before map
Code:
val warm_ds = silver_df
.filter(F.col("temp") > 30 && F.col("out_in") === "Out")
.select("room_id", "out_in", "temp")
Plan:
The filter takes place before map (select).
== Physical Plan ==
*(1) Project [room_id/id#18 AS room_id#27, out/in#21 AS out_in#34, cast(temp#20 as int) AS temp#40]
+- *(1) Filter (((isnotnull(temp#20) AND isnotnull(out/in#21)) AND (cast(temp#20 as int) > 30)) AND (out/in#21 = Out))
+- FileScan csv [room_id/id#18,temp#20,out/in#21] Batched: false, DataFilters: [isnotnull(temp#20), isnotnull(out/in#21), (cast(temp#20 as int) > 30), (out/in#21 = Out)], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/home/denis/code/spark-learning/data/IOT-temp.csv], PartitionFilters: [], PushedFilters: [IsNotNull(temp), IsNotNull(out/in), EqualTo(out/in,Out)], ReadSchema: struct<room_id/id:string,temp:string,out/in:string>
map before filter
Code:
val warm_ds = silver_df
.select("room_id", "out_in", "temp")
.filter(F.col("temp") > 30 && F.col("out_in") === "Out")
Plan:
The filter also takes place before map (select).
== Physical Plan ==
*(1) Project [room_id/id#18 AS room_id#27, out/in#21 AS out_in#34, cast(temp#20 as int) AS temp#40]
+- *(1) Filter (((isnotnull(temp#20) AND isnotnull(out/in#21)) AND (cast(temp#20 as int) > 30)) AND (out/in#21 = Out))
+- FileScan csv [room_id/id#18,temp#20,out/in#21] Batched: false, DataFilters: [isnotnull(temp#20), isnotnull(out/in#21), (cast(temp#20 as int) > 30), (out/in#21 = Out)], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/home/denis/code/spark-learning/data/IOT-temp.csv], PartitionFilters: [], PushedFilters: [IsNotNull(temp), IsNotNull(out/in), EqualTo(out/in,Out)], ReadSchema: struct<room_id/id:string,temp:string,out/in:string>
Conclusion
I see now that the Dataset API puts some limitations on the Spark Catalyst optimizer. Because Catalyst can’t see inside the lambda functions passed to filter
and map
in the Dataset API, it doesn’t dare to rearrange their order.
This means that, the ordering of map first and then filter in the Dataset API will unfortunately be preserved (i.e. not optimized). It’s better to filter as soon as possible (as close to the data).
Note that the Dataframe API optimizes this: regardless of the ordering of the filter
and select
statements, the filtering takes precedence in the Physical Plan in both cases. This is because in the Dataframe API, Catalyst can clearly see what’s going on inside both filter
and select
, ans is therefore able to optimize.
This shows that, although the Dataset API provides type-safety, it can potentially sacrifice some optimization.