Kafka & Flink duplicate messages on restart -
first of all, similar kafka consuming latest message again when rerun flink consumer, it's not same. answer question not appear solve problem. if missed in answer, please rephrase answer, missed something.
the problem exact same, though -- flink (the kafka connector) re-runs last 3-9 messages saw before shut down.
my versions
flink 1.1.2 kafka 0.9.0.1 scala 2.11.7 java 1.8.0_91
my code
import java.util.properties import org.apache.flink.streaming.api.windowing.time.time import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.checkpointingmode import org.apache.flink.streaming.connectors.kafka._ import org.apache.flink.streaming.util.serialization._ import org.apache.flink.runtime.state.filesystem._ object runner { def main(args: array[string]): unit = { val env = streamexecutionenvironment.getexecutionenvironment env.enablecheckpointing(500) env.setstatebackend(new fsstatebackend("file:///tmp/checkpoints")) env.getcheckpointconfig.setcheckpointingmode(checkpointingmode.exactly_once) val properties = new properties() properties.setproperty("bootstrap.servers", "localhost:9092"); properties.setproperty("group.id", "testing"); val kafkaconsumer = new flinkkafkaconsumer09[string]("testing-in", new simplestringschema(), properties) val kafkaproducer = new flinkkafkaproducer09[string]("localhost:9092", "testing-out", new simplestringschema()) env.addsource(kafkaconsumer) .addsink(kafkaproducer) env.execute() } }
my sbt dependencies
librarydependencies ++= seq( "org.apache.flink" %% "flink-scala" % "1.1.2", "org.apache.flink" %% "flink-streaming-scala" % "1.1.2", "org.apache.flink" %% "flink-clients" % "1.1.2", "org.apache.flink" %% "flink-connector-kafka-0.9" % "1.1.2", "org.apache.flink" %% "flink-connector-filesystem" % "1.1.2" )
my process
(3 terminals)
term-1 start sbt, run program term-2 create kafka topics testing-in , testing-out term-2 run kafka-console-producer on testing-in topic term-3 run kafka-console-consumer on testing-out topic term-2 send data kafka producer. wait couple seconds (buffers need flush) term-3 watch data appear in testing-out topic wait @ least 500 milliseconds checkpointing happen term-1 stop sbt term-1 run sbt term-3 watch last few lines of data appear in testing-out topic
my expectations
when there no errors in system, expect able turn flink on , off without reprocessing messages completed stream in prior run.
my attempts fix
i've added call setstatebackend
, thinking perhaps default memory backend didn't remember correctly. didn't seem help.
i've removed call enablecheckpointing
, hoping perhaps there separate mechanism track state in flink vs zookeeper. didn't seem help.
i've used different sinks, rollingfilesink, print(); hoping maybe bug in kafka. didn't seem help.
i've rolled flink (and connectors) v1.1.0 , v1.1.1, hoping maybe bug in latest version. didn't seem help.
i've added zookeeper.connect
config properties object, hoping comment being useful in 0.8 wrong. didn't seem help.
i've explicitly set checkpointing mode exactly_once
(good idea drfloob). didn't seem help.
my plea
help!
(i've posted same reply in jira, cross-posting same here)
from description, i'm assuming you're manually shutting down job, , resubmitting it, correct?
flink not retain exactly-once across manual job restarts, unless use savepoints (https://ci.apache.org/projects/flink/flink-docs-master/setup/savepoints.html). exactly-once guarantee refers when job fails , automatically restores previous checkpoints (when checkpointing enabled, did env.enablecheckpointing(500) )
what happening kafka consumer start reading existing offsets committed in zk / kafka when manually resubmitted job. these offsets committed zk / kafka first time executed job. not used flink's exactly-once semantics; flink uses internally checkpointed kafka offsets that. kafka consumer commits offsets zk expose measure of progress of job consumption outside world (wrt flink).
Comments
Post a Comment