Spark 1.6.2's RDD caching seems do to weird things with filters in some cases -


i have rdd:

avrorecord: org.apache.spark.rdd.rdd[com.rr.eventdata.viewrecord] = mappartitionsrdd[75] 

i filter rdd single matching value:

val sitefiltered = avrorecord.filter(_.getsiteid == 1200) 

i count how many distinct values siteid. given filter should "1". here's 2 ways without cache , cache:

val basic = sitefiltered.map(_.getsiteid).distinct.count val cached = sitefiltered.cache.map(_.getsiteid).distinct.count 

the result indicates cached version isn't filtered @ all:

basic: long = 1 cached: long = 93 

"93" isn't expected value if filter ignored (that answer "522"). isn't problem "distinct" values real ones. seems cached rdd has odd partial version of filter.

anyone know what's going on here?

i supposed problem have cache result of rdd before doing action on it.

spark build dag represents execution of program. each node transformation or action on rdd. without cacheing rdd, each action forces spark execute whole dag begining (or last cache invocation).

so, code should work if following changes:

val sitefiltered =    avrorecord.filter(_.getsiteid == 1200)             .map(_.getsiteid).cache val basic = sitefiltered.distinct.count // yes, know, in way second count has no sense @ val cached = sitefiltered.distinct.count 

Comments

Popular posts from this blog

javascript - Thinglink image not visible until browser resize -

firebird - Error "invalid transaction handle (expecting explicit transaction start)" executing script from Delphi -

Sound is not coming out while implementing Text-to-speech in Android activity -