android - Using RxJava to chain request on a single thread -
i'm saving user's location in app local database , send server. once server return success, delete location sent.
each time point has been saved in database call method:
public void sendpoint(){ amazonretrofit.postamazonpoints(databasehelper.getpoints()) .map(listidssent -> deletedatabasepoints(listidssent)) .dooncompleted(() -> emitstorechange(finalevent)) .observeon(androidschedulers.mainthread()) .subscribeon(androidschedulers.from(backgroundlooper)) .subscribe(); }
- i query database point send server
- i received server list of point sent
- using
.map()
, gather point sent , delete them local database
sometimes, happens call method repeatedly without having wait previous request completed , deleted point sent. so, when call method again, post same point previous request because previous request isn't completed yet haven't deleted point using .map()
yet. causing server receive duplicates...
timeline
- 1st call
postpoint()
- retrive point a,b,c database
- post point a,b,c server
- 2nd call
postpoint()
- retrive point a,b,c,d database
- post point a,b,c,d server
- receive success 1st request
- deleting a,b,c local database
- receive success 2nd request
- deleting a,b,c,d local database
result:
the server database have received : a,b,c,a,b,c,d
each request occurs sequentially somehow same location points sent server when call sendpoint()
quickly. how can fix this?
first not using observeron operator properly, observeon operator applied on steps in pipeline, once defined. if define @ end of pipeline before subscribeon, none of previous steps executed in thread.
also, since need wait until response of server call, can use callbacks handlers subscriber provide (onnext(), oncomplete())
public void sendpoint(){ observable.from(databasehelper.getpoints()) .observeon(androidschedulers.mainthread()) .flatmap(poins-> amazonretrofit.postamazonpoints(points)) .subscribeon(androidschedulers.from(backgroundlooper)) .subscribe(listidssent-> deletedatabasepoints(listidssent), () -> emitstorechange(finalevent)); }
if want see more examples of observeron , subscribeon can take here. https://github.com/politrons/reactive/blob/master/src/test/java/rx/observables/scheduler/observableasynchronous.java
Comments
Post a Comment