apache spark - Transforming PySpark RDD with Scala -
tl;dr - have looks dstream of strings in pyspark application. want send dstream[string]
scala library. strings not converted py4j, though.
i'm working on pyspark application pulls data kafka using spark streaming. messages strings , call method in scala code, passing dstream[string]
instance. however, i'm unable receive proper jvm strings in scala code. looks me python strings not converted java strings but, instead, serialized.
my question be: how java strings out of dstream
object?
here simplest python code came with:
from pyspark.streaming import streamingcontext ssc = streamingcontext(sparkcontext=sc, batchduration=int(1)) pyspark.streaming.kafka import kafkautils stream = kafkautils.createdirectstream(ssc, ["in"], {"metadata.broker.list": "localhost:9092"}) values = stream.map(lambda tuple: tuple[1]) ssc._jvm.com.seigneurin.mypythonhelper.dosomething(values._jdstream) ssc.start()
i'm running code in pyspark, passing path jar:
pyspark --driver-class-path ~/path/to/my/lib-0.1.1-snapshot.jar
on scala side, have:
package com.seigneurin import org.apache.spark.streaming.api.java.javadstream object mypythonhelper { def dosomething(jdstream: javadstream[string]) = { val dstream = jdstream.dstream dstream.foreachrdd(rdd => { rdd.foreach(println) }) } }
now, let's send data kafka:
echo 'foo bar' | $kafka_home/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic in
the println
statement in scala code prints looks like:
[b@758aa4d9
i expected foo bar
instead.
now, if replace simple println
statement in scala code following:
rdd.foreach(v => println(v.getclass.getcanonicalname))
i get:
java.lang.classcastexception: [b cannot cast java.lang.string
this suggests strings passed arrays of bytes.
if try convert array of bytes string (i know i'm not specifying encoding):
def dosomething(jdstream: javadstream[array[byte]]) = { val dstream = jdstream.dstream dstream.foreachrdd(rdd => { rdd.foreach(bytes => println(new string(bytes))) }) }
i looks (special characters might stripped off):
�]qxfoo barqa.
this suggests python string serialized (pickled?). how retrieve proper java string instead?
long story short there no supported way this. don't try in production. you've been warned.
in general spark doesn't use py4j else basic rpc calls on driver , doesn't start py4j gateway on other machine. when required (mostly mllib , parts of sql) spark uses pyrolite serialize objects passed between jvm , python.
this part of api either private (scala) or internal (python) , such not intended general usage. while theoretically access anyway either per batch:
package dummy import org.apache.spark.api.java.javardd import org.apache.spark.streaming.api.java.javadstream import org.apache.spark.sql.dataframe object pythonrddhelper { def go(rdd: javardd[any]) = { rdd.rdd.collect { case s: string => s }.take(5).foreach(println) } }
complete stream:
object pythondstreamhelper { def go(stream: javadstream[any]) = { stream.dstream.transform(_.collect { case s: string => s }).print } }
or exposing individual batches dataframes
(probably least evil option):
object pythondataframehelper { def go(df: dataframe) = { df.show } }
and use these wrappers follows:
from pyspark.streaming import streamingcontext pyspark.mllib.common import _to_java_object_rdd pyspark.rdd import rdd ssc = streamingcontext(spark.sparkcontext, 10) spark.catalog.listtables() q = ssc.queuestream([sc.parallelize(["foo", "bar"]) _ in range(10)]) # reserialize rdd java rdd<object> , pass # scala sink (only output) q.foreachrdd(lambda rdd: ssc._jvm.dummy.pythonrddhelper.go( _to_java_object_rdd(rdd) )) # reserialize , convert javadstream<object> # option allows further transformations # on dstream ssc._jvm.dummy.pythondstreamhelper.go( q.transform(lambda rdd: rdd( # reserialize keep python rdd _to_java_object_rdd(rdd), ssc.sparkcontext ))._jdstream ) # convert dataframe , pass scala sink. # arguably there relatively few moving parts here. q.foreachrdd(lambda rdd: ssc._jvm.dummy.pythondataframehelper.go( rdd.map(lambda x: (x, )).todf()._jdf ) ) ssc.start() ssc.awaitterminationortimeout(30) ssc.stop()
this not supported, untested , such rather useless else experiments spark api.
Comments
Post a Comment