This morning I set out some time to answer a question that came up when I was working through a Scala Dataset manipulation example: does one version of the code below run faster than the other?

Version one: filter before map

  val warm_ds = iot_ds
    .filter(v => {v.temp > 30 && v.out_in == "Out"})
    .map(v => WarmIOTDataTypes(v.room_id, v.out_in, v.temp))

Version two: map before filter

  val warm_ds = iot_ds
    .map(v => WarmIOTDataTypes(v.room_id, v.out_in, v.temp))
    .filter(v => {v.temp > 30 && v.out_in == "Out"})

My Confusion:

In the example above, I’m using the Dataset API (for Scala) instead of Dataframes. However, both are Structured API’s that take advantage of Spark’s Catalyst optimizer. This leads me to think that Spark manages to reorder the operations optimally in both cases, so they should run equally in the end.

However however…

The Catalyst optimizer can’t introspect inside my lambda function v => {v.temp > 30 && v.out_in == "Out"}. This means Spark can push down this predicate1.

So which one is faster? I don’t know yet! Next time, I’ll examine the Execution Plan to hopefully find the answer.

Resources

Some interesting resources I plan to use:


  1. Predicate pushdown in this case means “pushing down” the filter to the file level to avoid reading in all of the data from disk into memory, eliminating unnecessary I/O. ↩︎