Spark 1.6.2's RDD caching seems do to weird things with filters in some cases -
i have rdd:
avrorecord: org.apache.spark.rdd.rdd[com.rr.eventdata.viewrecord] = mappartitionsrdd[75]
i filter rdd single matching value:
val sitefiltered = avrorecord.filter(_.getsiteid == 1200)
i count how many distinct values siteid. given filter should "1". here's 2 ways without cache , cache:
val basic = sitefiltered.map(_.getsiteid).distinct.count val cached = sitefiltered.cache.map(_.getsiteid).distinct.count
the result indicates cached version isn't filtered @ all:
basic: long = 1 cached: long = 93
"93" isn't expected value if filter ignored (that answer "522"). isn't problem "distinct" values real ones. seems cached rdd has odd partial version of filter.
anyone know what's going on here?
i supposed problem have cache
result of rdd
before doing action on it.
spark build dag represents execution of program. each node transformation or action on rdd
. without cache
ing rdd
, each action forces spark execute whole dag begining (or last cache
invocation).
so, code should work if following changes:
val sitefiltered = avrorecord.filter(_.getsiteid == 1200) .map(_.getsiteid).cache val basic = sitefiltered.distinct.count // yes, know, in way second count has no sense @ val cached = sitefiltered.distinct.count
Comments
Post a Comment