python - Kafka Consumer didn't receiving any messages from its Producer -


the following python coding kafka producer, i'm not sure messages able published kafka broker or not. because consumer side didn't receiving messages. consumer python program working fine while testing using producer console command.

from __future__ import print_function  import sys pyspark import sparkcontext kafka import kafkaclient, simpleproducer  if __name__ == "__main__":  if len(sys.argv) != 2:     print("usage:spark-submit producer1.py <input file>", file=sys.stderr)     exit(-1)  sc = sparkcontext(appname="pythonregression")  def sendkafka(messages):     ## set broker port     kafka = kafkaclient("localhost:9092")     producer = simpleproducer(kafka, async=true, batch_send_every_n=5,   batch_send_every_t=10)     send_counts = 0     message in messages:         try:             print(message)             ## set topic name , push messages kafka broker             yield producer.send_messages('test', message.encode('utf-8'))         except exception, e:             print("error: %s" % str(e))         else:             send_counts += 1     print("the count of prediction results sent in partition  %d.\n" % send_counts)  ## connect , read file.     rawdata = sc.textfile(sys.argv[1])  ## find , skip first row dataheader = rawdata.first() data =  rawdata.filter(lambda x: x != dataheader)  ## collect rdds. sentrdd = data.mappartitions(sendkafka)  sentrdd.collect()  ## stop file connection sc.stop() 

this "consumer" python coding

from __future__ import print_function import sys pyspark import sparkcontext pyspark.streaming import streamingcontext pyspark.streaming.kafka import kafkautils  if len(sys.argv) < 3: print ("program pulls messages kafka brokers.") print("usage: consume.py <zk> <topic>", file=sys.stderr)  else: ## flow ## loads settings system properties, launching of spark-submit. sc = sparkcontext(appname="pythonstreamingkafkawordcount")  ## create streamingcontext using existing sparkcontext. ssc = streamingcontext(sc, 10)  ## after python script name zkquorum, topic = sys.argv[1:]  ## create input stream pulls messages kafka brokers. kvs = kafkautils.createstream(ssc, zkquorum, "spark-streaming-consumer",  {topic: 1})  ##  lines = kvs.map(lambda x: x[1])  ## print messages pulled kakfa brokers lines.pprint()  ## save pulled messages file ## lines.saveastextfiles("outputa")  ## start receiving data , processing ssc.start()  ## allows current process wait termination of context  ssc.awaittermination() 

i debug such issues using kafka-console-consumer (part of apache kafka) consume topic tried producing to. if console consumer gets messages, know arrived kafka.

if first run producer, let finish, , start consumer, issue may consumer starting end of log , waiting additional messages. either make sure starting consumer first, or configure automatically start @ beginning (sorry, not sure how python client).


Comments

Popular posts from this blog

javascript - Thinglink image not visible until browser resize -

firebird - Error "invalid transaction handle (expecting explicit transaction start)" executing script from Delphi -

mongodb - How to keep track of users making Stripe Payments -