python - Kafka Consumer didn't receiving any messages from its Producer -
the following python coding kafka producer, i'm not sure messages able published kafka broker or not. because consumer side didn't receiving messages. consumer python program working fine while testing using producer console command.
from __future__ import print_function import sys pyspark import sparkcontext kafka import kafkaclient, simpleproducer if __name__ == "__main__": if len(sys.argv) != 2: print("usage:spark-submit producer1.py <input file>", file=sys.stderr) exit(-1) sc = sparkcontext(appname="pythonregression") def sendkafka(messages): ## set broker port kafka = kafkaclient("localhost:9092") producer = simpleproducer(kafka, async=true, batch_send_every_n=5, batch_send_every_t=10) send_counts = 0 message in messages: try: print(message) ## set topic name , push messages kafka broker yield producer.send_messages('test', message.encode('utf-8')) except exception, e: print("error: %s" % str(e)) else: send_counts += 1 print("the count of prediction results sent in partition %d.\n" % send_counts) ## connect , read file. rawdata = sc.textfile(sys.argv[1]) ## find , skip first row dataheader = rawdata.first() data = rawdata.filter(lambda x: x != dataheader) ## collect rdds. sentrdd = data.mappartitions(sendkafka) sentrdd.collect() ## stop file connection sc.stop()
this "consumer" python coding
from __future__ import print_function import sys pyspark import sparkcontext pyspark.streaming import streamingcontext pyspark.streaming.kafka import kafkautils if len(sys.argv) < 3: print ("program pulls messages kafka brokers.") print("usage: consume.py <zk> <topic>", file=sys.stderr) else: ## flow ## loads settings system properties, launching of spark-submit. sc = sparkcontext(appname="pythonstreamingkafkawordcount") ## create streamingcontext using existing sparkcontext. ssc = streamingcontext(sc, 10) ## after python script name zkquorum, topic = sys.argv[1:] ## create input stream pulls messages kafka brokers. kvs = kafkautils.createstream(ssc, zkquorum, "spark-streaming-consumer", {topic: 1}) ## lines = kvs.map(lambda x: x[1]) ## print messages pulled kakfa brokers lines.pprint() ## save pulled messages file ## lines.saveastextfiles("outputa") ## start receiving data , processing ssc.start() ## allows current process wait termination of context ssc.awaittermination()
i debug such issues using kafka-console-consumer (part of apache kafka) consume topic tried producing to. if console consumer gets messages, know arrived kafka.
if first run producer, let finish, , start consumer, issue may consumer starting end of log , waiting additional messages. either make sure starting consumer first, or configure automatically start @ beginning (sorry, not sure how python client).
Comments
Post a Comment