A Retrofit 2 call adapter for RxJava 1.x reactive types
npx @tessl/cli install tessl/maven-com-squareup-retrofit2--adapter-rxjava@3.0.0A call adapter factory for Retrofit 2 that enables integration with RxJava 1.x reactive types (Observable, Single, Completable). This adapter allows service interface methods to return RxJava reactive types instead of Retrofit's default Call interface, supporting both synchronous and asynchronous execution with configurable threading.
implementation 'com.squareup.retrofit2:adapter-rxjava:3.0.0'import retrofit2.adapter.rxjava.RxJavaCallAdapterFactory;
import retrofit2.adapter.rxjava.Result;For creating Retrofit instances:
import retrofit2.Retrofit;For service method return types:
import rx.Observable;
import rx.Single;
import rx.Completable;import retrofit2.Retrofit;
import retrofit2.adapter.rxjava.RxJavaCallAdapterFactory;
import rx.Observable;
import rx.Single;
import rx.Completable;
// Add RxJava adapter to Retrofit
Retrofit retrofit = new Retrofit.Builder()
.baseUrl("https://api.example.com/")
.addCallAdapterFactory(RxJavaCallAdapterFactory.create())
.addConverterFactory(GsonConverterFactory.create())
.build();
// Service interface with RxJava return types
interface ApiService {
@GET("users/{id}")
Observable<User> getUser(@Path("id") String userId);
@POST("users")
Single<User> createUser(@Body User user);
@DELETE("users/{id}")
Completable deleteUser(@Path("id") String userId);
}
// Create service and make requests
ApiService service = retrofit.create(ApiService.class);
// Observable request
service.getUser("123")
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
user -> System.out.println("User: " + user.getName()),
error -> System.err.println("Error: " + error.getMessage())
);
// Single request
service.createUser(newUser)
.subscribe(
createdUser -> System.out.println("Created: " + createdUser.getId()),
error -> System.err.println("Creation failed: " + error.getMessage())
);
// Completable request (no response body)
service.deleteUser("123")
.subscribe(
() -> System.out.println("User deleted successfully"),
error -> System.err.println("Deletion failed: " + error.getMessage())
);The RxJava adapter provides three main components:
Factory methods for creating RxJavaCallAdapterFactory instances with different threading configurations.
/**
* Returns an instance which creates synchronous observables that do not operate on any scheduler by default.
*/
public static RxJavaCallAdapterFactory create();
/**
* Returns an instance which creates asynchronous observables using OkHttp's internal thread pool.
*/
public static RxJavaCallAdapterFactory createAsync();
/**
* Returns an instance which creates synchronous observables that subscribe on the specified scheduler by default.
* @param scheduler The scheduler to subscribe on by default
* @throws NullPointerException if scheduler is null
*/
public static RxJavaCallAdapterFactory createWithScheduler(Scheduler scheduler);The adapter supports multiple RxJava return types with different response wrapping modes.
// Direct body types - calls onNext with deserialized body for 2XX responses,
// calls onError with HttpException for non-2XX responses and IOException for network errors
Observable<T> getResource();
Single<T> getResource();
// Response wrapped types - calls onNext with Response object for all HTTP responses,
// calls onError only with IOException for network errors
Observable<Response<T>> getResourceWithResponse();
Single<Response<T>> getResourceWithResponse();
// Result wrapped types - calls onNext with Result object for all HTTP responses and errors,
// never calls onError
Observable<Result<T>> getResourceWithResult();
Single<Result<T>> getResourceWithResult();
// Completable type - discards response bodies, used for operations that only need success/failure
Completable performAction();A wrapper class for handling HTTP responses that captures both successful responses and errors.
/**
* The result of executing an HTTP request containing either a response or error.
*/
public final class Result<T> {
/**
* Creates an error result wrapping the given throwable.
* @param error The error that occurred
* @throws NullPointerException if error is null
*/
public static <T> Result<T> error(Throwable error);
/**
* Creates a successful result wrapping the given response.
* @param response The HTTP response
* @throws NullPointerException if response is null
*/
public static <T> Result<T> response(Response<T> response);
/**
* Returns the HTTP response if successful, null if error occurred.
*/
public Response<T> response();
/**
* Returns the error if one occurred, null if successful.
* IOException indicates transport problems, other exceptions indicate unexpected failures.
*/
public Throwable error();
/**
* Returns true if the request resulted in an error.
*/
public boolean isError();
}Legacy exception class for HTTP errors, deprecated in favor of retrofit2.HttpException.
/**
* @deprecated Use retrofit2.HttpException instead
*/
@Deprecated
public final class HttpException extends retrofit2.HttpException {
public HttpException(Response<?> response);
}The adapter provides three different error handling patterns:
Direct Body Types (Observable<T>, Single<T>):
onNext() called with deserialized bodyonError() called with HttpExceptiononError() called with IOExceptionResponse Wrapped Types (Observable<Response<T>>, Single<Response<T>>):
onNext() called with Response<T> objectonError() called with IOExceptionResult Wrapped Types (Observable<Result<T>>, Single<Result<T>>):
onNext() called with Result<T> objectonError() calls - use Result.isError() and Result.error() to check for failuresCompletable Type:
onCompleted() called, response body discardedonError() called with appropriate exceptionDefault Behavior: Requests execute synchronously on the calling thread.
Asynchronous Execution:
// Using createAsync() - requests execute on OkHttp's thread pool
RxJavaCallAdapterFactory.createAsync()Custom Scheduler:
// Using createWithScheduler() - requests subscribe on specified scheduler
RxJavaCallAdapterFactory.createWithScheduler(Schedulers.io())Per-Request Threading:
// Override default threading per request
service.getUser("123")
.subscribeOn(Schedulers.computation())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(/* ... */);service.getUsers()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
users -> {
// Handle successful response
displayUsers(users);
},
error -> {
if (error instanceof HttpException) {
HttpException httpError = (HttpException) error;
int code = httpError.code();
handleHttpError(code, httpError.message());
} else if (error instanceof IOException) {
handleNetworkError(error);
} else {
handleUnexpectedError(error);
}
}
);service.getUserWithResponse("123")
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
response -> {
if (response.isSuccessful()) {
User user = response.body();
displayUser(user);
} else {
handleHttpError(response.code(), response.message());
}
},
error -> {
// Only IOException for network errors
handleNetworkError(error);
}
);service.getUserWithResult("123")
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(result -> {
if (result.isError()) {
Throwable error = result.error();
if (error instanceof IOException) {
handleNetworkError(error);
} else {
handleUnexpectedError(error);
}
} else {
Response<User> response = result.response();
if (response.isSuccessful()) {
displayUser(response.body());
} else {
handleHttpError(response.code(), response.message());
}
}
});service.deleteUser("123")
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
() -> {
// Success - user deleted
showSuccess("User deleted successfully");
},
error -> {
// Handle deletion error
if (error instanceof HttpException) {
HttpException httpError = (HttpException) error;
showError("Deletion failed: " + httpError.message());
} else {
showError("Network error during deletion");
}
}
);// Response wrapper from Retrofit core
public final class Response<T> {
public boolean isSuccessful();
public T body();
public int code();
public String message();
public Headers headers();
public ResponseBody errorBody();
}
// Scheduler from RxJava
public abstract class Scheduler {
// Used for threading configuration
}