Android – Get JSON object one by one from JSON array in Android using Retrofit and RxJava

androidjsonretrofit2rx-androidrx-java

I am using retrofit to hit my network api which returns json array. I am doing so using following code-

Observable<MyJson[]> response = (Observable<MyJson[]>)mNetworkService.getReadyObserverable(mNetworkService.getNetworkServiceApi().getMyDataJsons(), MyJson.class, true, useCache);
        mAirlineSubscription = response.subscribe(new Observer<MyJson[]>() {
            @Override
            public void onCompleted() {
                Log.d(TAG, "getData completed..");
            }

            @Override
            public void onError(Throwable e) {
                Log.e(TAG, "onError: "  + e.getLocalizedMessage());
            }

            @Override
            public void onNext(MyJson[] myJsonData) {
                //I want here json data one by one
            }

But my problem is json data array get downloaded completely and then only
onNext gets called. Here I want onNext should get called when retrofit downloads first json object from myData[] json array and keep continue until my all json objects of myData[] get downloaded. This way my UI would look more responsive and better in terms of user interaction.

Can anyone help to fix this?

Best Answer

Receiving elements One by One, as fast as possible?
Don't want to wait when everything will be downloaded?

The solution is: Declare the method of retrofit's interface with annotation @Streaming and use Observable<ResponseBody> as return value. And then, by using flatMap(), convert ResponseBody to series of POJO (Observable<TYPE>).

Example:

  1. Declare retrofit interface:

    public interface HugeJsonApi {
    
    String SERVICE_ENDPOINT = "https://raw.githubusercontent.com";
    
    @Streaming
    @GET("/zemirco/sf-city-lots-json/master/citylots.json")
    Observable<ResponseBody> get();
    }

  2. Use it like that:

    public void playHugeJsonSample() {
    
        HugeJsonApi hugeJsonApi = RestUtils.createService(HugeJsonApi.class, HugeJsonApi.SERVICE_ENDPOINT);
    
        Handler handler = new Handler(Looper.getMainLooper());
        hugeJsonApi.get()
                .flatMap(responseBody -> convertObjectsStream(responseBody, gson, Feature.class))
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Subscriber<Feature>() {
    
                    @Override
                    public void onStart() {
                        super.onStart();
                        request(1);
                    }
    
                    @Override
                    public void onNext(Feature feature) {
                        Log.i(TAG, gson.toJson(feature));
                        counter[0]++;
                        request(1);
                    }
    
                    @Override
                    public void onCompleted() {
                        Log.i(TAG, "onCompleted() called. Fetched elements:" + counter[0]);
                    }
    
                    @Override
                    public void onError(Throwable e) {
                        Log.e(TAG, "something went wrong", e);
                    }
                });
    }
    
    @NonNull
    private static <TYPE> Observable<TYPE> convertObjectsStream(ResponseBody responseBody, Gson gson, Class<TYPE> clazz) {
        Type type = TypeToken.get(clazz).getType();
        return Observable.create(SyncOnSubscribe.<JsonReader, TYPE>createStateful(
                // initialize the reader
                () -> {
                    try {
                        JsonReader reader = gson.newJsonReader(responseBody.charStream());
                        reader.beginObject();
                        return reader;
                    } catch (IOException e) {
                        e.printStackTrace();
                        RxJavaHooks.onError(e);
                    }
                    return null;
                },
                // read elements one by one
                (reader, observer) -> {
    
                    if (reader == null) {
                        observer.onCompleted();
                        return null;
                    }
    
                    try {
                        if (reader.hasNext()) {
                            TYPE t = gson.fromJson(reader, type);
                            observer.onNext(t);
                        }
                        else {
                            observer.onCompleted();
                        }
    
                    } catch (IOException e) {
                        e.printStackTrace();
                        observer.onError(e);
                    }
    
                    return reader;
                },
                // close the reader
                reader -> {
                    if (reader != null) {
                        try {
                            reader.close();
                        } catch (Exception e) {
                            e.printStackTrace();
                            RxJavaHooks.onError(e);
                        }
                    }
                }
    
        ));
    }

Here is workable example:

https://github.com/allco/RetrofitAndRxJava

It takes 180+Mb Json and parses it as a real stream.