apache spark - Transforming PySpark RDD with Scala -


tl;dr - have looks dstream of strings in pyspark application. want send dstream[string] scala library. strings not converted py4j, though.

i'm working on pyspark application pulls data kafka using spark streaming. messages strings , call method in scala code, passing dstream[string] instance. however, i'm unable receive proper jvm strings in scala code. looks me python strings not converted java strings but, instead, serialized.

my question be: how java strings out of dstream object?


here simplest python code came with:

from pyspark.streaming import streamingcontext ssc = streamingcontext(sparkcontext=sc, batchduration=int(1))  pyspark.streaming.kafka import kafkautils stream = kafkautils.createdirectstream(ssc, ["in"], {"metadata.broker.list": "localhost:9092"}) values = stream.map(lambda tuple: tuple[1])  ssc._jvm.com.seigneurin.mypythonhelper.dosomething(values._jdstream)  ssc.start() 

i'm running code in pyspark, passing path jar:

pyspark --driver-class-path ~/path/to/my/lib-0.1.1-snapshot.jar 

on scala side, have:

package com.seigneurin  import org.apache.spark.streaming.api.java.javadstream  object mypythonhelper {   def dosomething(jdstream: javadstream[string]) = {     val dstream = jdstream.dstream     dstream.foreachrdd(rdd => {       rdd.foreach(println)     })   } } 

now, let's send data kafka:

echo 'foo bar' | $kafka_home/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic in 

the println statement in scala code prints looks like:

[b@758aa4d9 

i expected foo bar instead.

now, if replace simple println statement in scala code following:

rdd.foreach(v => println(v.getclass.getcanonicalname)) 

i get:

java.lang.classcastexception: [b cannot cast java.lang.string 

this suggests strings passed arrays of bytes.

if try convert array of bytes string (i know i'm not specifying encoding):

      def dosomething(jdstream: javadstream[array[byte]]) = {         val dstream = jdstream.dstream         dstream.foreachrdd(rdd => {           rdd.foreach(bytes => println(new string(bytes)))         })       } 

i looks (special characters might stripped off):

�]qxfoo barqa. 

this suggests python string serialized (pickled?). how retrieve proper java string instead?

long story short there no supported way this. don't try in production. you've been warned.

in general spark doesn't use py4j else basic rpc calls on driver , doesn't start py4j gateway on other machine. when required (mostly mllib , parts of sql) spark uses pyrolite serialize objects passed between jvm , python.

this part of api either private (scala) or internal (python) , such not intended general usage. while theoretically access anyway either per batch:

package dummy  import org.apache.spark.api.java.javardd import org.apache.spark.streaming.api.java.javadstream import org.apache.spark.sql.dataframe  object pythonrddhelper {   def go(rdd: javardd[any]) = {     rdd.rdd.collect {       case s: string => s     }.take(5).foreach(println)   } } 

complete stream:

object pythondstreamhelper {   def go(stream: javadstream[any]) = {     stream.dstream.transform(_.collect {       case s: string => s     }).print   } } 

or exposing individual batches dataframes (probably least evil option):

object pythondataframehelper {   def go(df: dataframe) = {     df.show   } } 

and use these wrappers follows:

from pyspark.streaming import streamingcontext pyspark.mllib.common import _to_java_object_rdd pyspark.rdd import rdd  ssc = streamingcontext(spark.sparkcontext, 10) spark.catalog.listtables()  q = ssc.queuestream([sc.parallelize(["foo", "bar"]) _ in range(10)])   # reserialize rdd java rdd<object> , pass  # scala sink (only output) q.foreachrdd(lambda rdd: ssc._jvm.dummy.pythonrddhelper.go(     _to_java_object_rdd(rdd) ))  # reserialize , convert javadstream<object> # option allows further transformations # on dstream ssc._jvm.dummy.pythondstreamhelper.go(     q.transform(lambda rdd: rdd(  # reserialize keep python rdd         _to_java_object_rdd(rdd), ssc.sparkcontext     ))._jdstream )  # convert dataframe , pass scala sink. # arguably there relatively few moving parts here.  q.foreachrdd(lambda rdd:      ssc._jvm.dummy.pythondataframehelper.go(         rdd.map(lambda x: (x, )).todf()._jdf     ) )  ssc.start() ssc.awaitterminationortimeout(30) ssc.stop() 

this not supported, untested , such rather useless else experiments spark api.


Comments

Popular posts from this blog

javascript - Thinglink image not visible until browser resize -

firebird - Error "invalid transaction handle (expecting explicit transaction start)" executing script from Delphi -

mongodb - How to keep track of users making Stripe Payments -