This morning I set out some time to answer a question that came up when I was working through a Scala Dataset manipulation example: does one version of the code below run faster than the other?
Version one: filter
before map
val warm_ds = iot_ds
.filter(v => {v.temp > 30 && v.out_in == "Out"})
.map(v => WarmIOTDataTypes(v.room_id, v.out_in, v.temp))
Version two: map
before filter
val warm_ds = iot_ds
.map(v => WarmIOTDataTypes(v.room_id, v.out_in, v.temp))
.filter(v => {v.temp > 30 && v.out_in == "Out"})
My Confusion:
In the example above, I’m using the Dataset API (for Scala) instead of Dataframes. However, both are Structured API’s that take advantage of Spark’s Catalyst optimizer. This leads me to think that Spark manages to reorder the operations optimally in both cases, so they should run equally in the end.
However however…
The Catalyst optimizer can’t introspect inside my lambda function v => {v.temp > 30 && v.out_in == "Out"}
. This means Spark can push down this predicate1.
So which one is faster? I don’t know yet! Next time, I’ll examine the Execution Plan to hopefully find the answer.
Resources
Some interesting resources I plan to use:
- Spark SQL: Relational Data Processing in Spark
- Jacek Laskowski’s “The Internals of Spark SQL
- Deep Dive into Spark SQL’s Catalyst Optimizer
Predicate pushdown in this case means “pushing down” the filter to the file level to avoid reading in all of the data from disk into memory, eliminating unnecessary I/O. ↩︎