برنامه‌نویسی پیشرفته اندروید با زبان کاتلین – بخش سوم (RxJava)

نویسنده : سید ایوب کوکبی ۶ مهر ۱۳۹۷

برنامه‌نویسی پیشرفته اندروید با زبان کاتلین - بخش سوم (RxJava)

خب در ادامه‌ی بخش قبلی برنامه‌نویسی پیشرفته اندروید با زبان کاتلین به سراغ بخش سوم می‌رویم. در این بخش سعی می‌کنم درباره‌ی RxJava توضیحات لازم را بدهم. تا جای ممکن با مثال توضیح خواهم داد تا مفاهیم را بهتر درک کنید.

RxJava چیست؟

فقط بحث RxJava نیست. این مفهوم بسیار وسیع بوده و تنها مختص جاوا و برنامه‌نویسی اندروید نیست. RxJava تنها یک پیاده‌سازی جاوا از برنامه‌نویسی ناهمگام به کمک observable streams و reactivex است. در واقع RxJava ترکیبی از سه مفهومِ الگوی Observer، الگوی Iterator و برنامه‌نویسی فانکشنال (در فارسی: تابعی یا تابع‌گرا) است. برای سایر زبان‌ها نیز کتابخانه‌هایی مثل RxSwift (برای زبان سوئیفت)، RxNet (برای دات‌نت)، RxJs (برای جاوااسکریپت) و … وجود دارد.

راستش را بخواهید آموزش RxJava کمی سخت است چون بعضی وقت‌ها پیاده‌سازی نادرست یا نامناسبش باعث بروز مشکلات زیادی می‌شود. با این حال ارزش دارد زمانی را صرف یادگیری آن کنید. در ادامه سعی می‌کنم با مثال‌هایی ساده مفاهیم موجود در RxJava را قدم به قدم توضیح دهم.

برای شروع اجازه دهید به یک پرسش ساده که احتمالاً در شروع مطالعه‌ی این بخش از خودتان پرسیده‌اید پاسخ دهم.

آیا واقعاً نیازی به استفاده از RxJava هست؟

خیر. RxJava نیز همانند سایر کتابخانه‌های اندرویدی فقط یک کتابخانه است. به کاتلین هم نیازی نیست. کتابخانه‌ی Databinding هم نیازی نیست. می‌فهمید که چه می‌گویم. این کتابخانه فقط برای کمک به توسعه‌دهندگان ساخته شده است. همین و بس.

آیا قبل از یادگیری RxJava2 باید RxJava1 را بلد باشم؟

لزوماً نیازی نیست. می‌توانید سرراست با RxJava2 شروع کنید اما به عنوان یک توسعه‌دهنده‌ی حرفه‌ای اندروید بهتر است نسخه ۱ را هم یاد بگیرید. چرا؟ چون به راحتی می‌توانید از مثال‌هایی که با RxJava1 عرضه شده استفاده کنید یا در کدهای قدیمی هم مشارکت نمایید.

RxAndroid هم وجود دارد، از کدام یک استفاده کنم RxAndroid یا RxJava ؟

RxJava را می‌توانید در هر پروژه‌ای به زبان جاوا به کار برید. مثلاً RxJava را می‌توانید با فریم‌ورک Spring برای توسعه‌ی بکند استفاده کنید. RxAndroid کتابخانه‌ای است که حاوی بخش‌های لازم RxJava برای دنیای اندروید است. بنابراین وقتی بخواهید از RxJava در اندروید استفاده کنید باید کتابخانه‌ی اضافه‌ی دیگری تحت عنوان RxAndroid را هم اضافه کنید. بعداً توضیح می‌دهم این RxAndroid چه چیزهایی را به RxJava اضافه کرده است.

ما که از کاتلین استفاده می‌کنیم چرا سراغ RxKotlin نرویم؟

از آنجایی که کاتلین به صورت کامل با کدهای جاوا سازگار است دیگر نیازی به نوشتن یک کتابخانه‌ی مجزای Rx برای این زبان نیست.بله، کتابخانه‌ای تحت عنوان RxKotlin وجود دارد ولی در نظر داشته باشید که این کتابخانه بر فراز RxJava بنا شده است. تنها کاری هم که می‌کند یک‌سری ویژگی‌های جدید زبان کاتلین بوده که به RxJava اضافه کرده است. پس شما می‌توانید بدون استفاده از RxKotlin مستقیماً از RxJava در زبان کاتلین استفاده کنید. من برای حفظ سادگی توضیحات در این بخش از RxKotlin استفاده نمی‌کنم.

چگونه RxJava2 را به پروژه اضافه کنیم؟

برای اضافه کردن این کتابخانه باید دو خط زیر را به فایل build.gradle اضافه کنید و سپس پروژه را Sync کنید:

dependencies {
...
implementation "io.reactivex.rxjava2:rxjava:2.1.8"
implementation "io.reactivex.rxjava2:rxandroid:2.0.1"
...
}

 

RxJava شامل چه چیزهایی هست؟

من تمایل دارم RxJava را به ۳ بخش تقسیم کنم:

  1. کلاس‌هایی برای الگوی Observer و data flow: یعنی Observables و Observers
  2. Schedulers (برای همزمانی)
  3. عملگرهایی برای data flow یا جریان‌های داده‌

Observables و Observers

قبلا توضیح این الگو را داده‌ام. Observable را می‌توانید به عنوان منبعی برای انتشار داده‌ها و Observer را گیرنده‌ی داده در نظر بگیرید.

روش‌های مختلفی برای ساخت observables وجود دارد. ساده‌ترین راه استفاده از ()Observable.just بوده که کارش دریافت یک آیتم و ساخت Observableای برای انتشار آن آیتم است.

برویم سراغ GitRepoRemoteDataSource، متد getRepositoriesاش را طوری تغییر می‌دهیم که Observable را برگرداند:

class GitRepoRemoteDataSource {

fun getRepositories() : Observable<ArrayList<Repository>> {
var arrayList = ArrayList<Repository>()
arrayList.add(Repository("First from remote", "Owner 1", 100, false))
arrayList.add(Repository("Second from remote", "Owner 2", 30, true))
arrayList.add(Repository("Third from remote", "Owner 3", 430, false))

return Observable.just(arrayList).delay(2,TimeUnit.SECONDS)
}
}

 

بخوانید  چرا از جاوا به زبان کاتلین (Kotlin) سوئیچ کنیم؟

<<Observable<ArrayList<Repository به این معناست که Observable آرایه‌ای از اشیاء Repository را بر می‌گرداند. در صورتی که بخواهید <Observable<Repositoryرا طوری بسازید که اشیاء Repository برگرداند بایستی از (Observable.from(arrayList استفاده کنید.

(delay(2,TimeUnit.SECONDS. باعث می‌شود تا Observables به مدت دو ثانیه قبل از انتشار داده منتظر بماند.

اما صبر کنید، ما هنوز نگفتیم Observable چه زمانی شروع به انتشار داده می‌کند. Observables معمولاً بعد از عضویت یک Observer شروع می‌کنند به انتشار داده.

توجه کنید دیگر نیازی به اینترفیس پایین نداریم:

interface OnRepoRemoteReadyCallback {
fun onRemoteDataReady(data: ArrayList<Repository>)
}

 

اجازه دهید برای GitRepoLocalDataSource نیز همین کار را بکنیم:

class GitRepoLocalDataSource {

fun getRepositories() : Observable<ArrayList<Repository>> {
var arrayList = ArrayList<Repository>()
arrayList.add(Repository("First From Local", "Owner 1", 100, false))
arrayList.add(Repository("Second From Local", "Owner 2", 30, true))
arrayList.add(Repository("Third From Local", "Owner 3", 430, false))

return Observable.just(arrayList).delay(2, TimeUnit.SECONDS)
}

fun saveRepositories(arrayList: ArrayList<Repository>) {
//todo save repositories in DB
}
}

 

اینجا نیز به اینترفیس پایین نیازی نداریم:

interface OnRepoLocalReadyCallback {
fun onLocalDataReady(data: ArrayList<Repository>)
}

 

اکنون باید repository را به گونه‌ای تغییر دهیم که Observable را برگرداند:

class GitRepoRepository(private val netManager: NetManager) {

private val localDataSource = GitRepoLocalDataSource()
private val remoteDataSource = GitRepoRemoteDataSource()

fun getRepositories(): Observable<ArrayList<Repository>> {

netManager.isConnectedToInternet?.let {
if (it) {
//todo save those data to local data store
return remoteDataSource.getRepositories()
}
}

return localDataSource.getRepositories()
}
}

 

اگر اتصال اینترنت برقرار باشد، Observable را به صورت ریموت از داده‌های سرور برمی‌گردانیم در غیر این صورت از منبع داده‌ی محلی. و البته دیگر نیازی به اینترفیس OnRepositoryReadyCallback هم نداریم.

همانطور که احتمالاً حدس زده‌اید نیازمند تغییر داده‌ها در MainViewModel هستیم. اکنون ما باید Observable را از gitRepoRepository دریافت کرده و در آن عضو یا subscribe شویم. به محض عضویت یکObservable ،Observer شروع می‌کند به انتشار داده‌ها.

class MainViewModel(application: Application) : AndroidViewModel(application) {
...

fun loadRepositories() {
isLoading.set(true)
gitRepoRepository.getRepositories().subscribe(object: Observer<ArrayList<Repository>>{
override fun onSubscribe(d: Disposable) {
//todo
}

override fun onError(e: Throwable) {
//todo
}

override fun onNext(data: ArrayList<Repository>) {
repositories.value = data
}

override fun onComplete() {
isLoading.set(false)
}
})
}
}

 

به محض عضویت یک Observer در Observable متد onSubscribe آن فراخونی می‌شود. توجه داشته باشید ما یک Disposable هم به عنوان پارامتر متد onSubscribe داریم. در موردش بعداً صحبت می‌کنم.

هر وقت Observable داده‌ای را منتشر می‌کند متد ()onNext فراخوانی می‌شود. بعد از پایان انتشار داده‌ها متد ()onComplete فرخوانی می‌شود. بعد از آن Observable پایان می‌یابد (terminate می‌شود).

در صورت بروز خطا، متد ()onError فرخوانده می‌شود و پس از آن Observable ترمینیت می‌شود. این یعنی با بروز خطا دیگر Observable قادر به انتشار داده‌ها نیست بنابراین در چنین شرایطی نه متد ()OnNext و نه ()onComplete فرخوانی نمی‌شود.

همچنین به این موضوع هم توجه کنید که در صورت عضویت در Observableای که terminate شده با استثنای IllegalStateException مواجه خواهید شد.

بنابراین RxJava چه کمی به ما می‌کند؟

  • اولاً از شر اینترفیس‌ها راحتمان می‌کند. در واقع نوعی boilerplate است که اینترفیسی را برای هر یک از repository ها یا دیتا‌سورس‌ها تولید می‌کند؛
  • اگر از اینترفیس‌ها استفاده کنیم و در لایه‌ی دیتا با خطایی مواجه شویم، برنامه اصطلاحاً کرش می‌کند. ولی هنگام استفاده از RxJava خطا در قالب یک متد ()onError و با یک پیغام مناسب به کاربر نمایش داده می‌شود؛
  • استفاده از RxJava باعث تمییزتر شدن کدها می‌شود؛
  • این را قبلاً نگفتم، استفاده از رویکرد قبلی ممکن است باعث نشت حافظه یا memory leak شود.

با استفاده از RxJava2 و ViewModel چگونه مشکل memory leak را حل کنیم؟

بیایید چرخه‌ی حیات ViewModel را یکبار دیگر بررسی کنیم:

با پایان یافتن یا destroyed شدن اکتیویتی، متد ()onCleared صدا زده می‌شود. در متد ()onCleared بایستی همه‌ی Disposable ها را dispose کنیم. این کار را انجام می‌دهیم:

class MainViewModel(application: Application) : AndroidViewModel(application) {
...

lateinit var disposable: Disposable

fun loadRepositories() {
isLoading.set(true)
gitRepoRepository.getRepositories().subscribe(object: Observer<ArrayList<Repository>>{
override fun onSubscribe(d: Disposable) {
disposable = d
}

override fun onError(e: Throwable) {
//if some error happens in our data layer our app will not crash, we will
// get error here
}

override fun onNext(data: ArrayList<Repository>) {
repositories.value = data
}

override fun onComplete() {
isLoading.set(false)
}
})
}

override fun onCleared() {
super.onCleared()
if(!disposable.isDisposed){
disposable.dispose()
}
}
}

 

بخوانید  آموزش زبان کاتلین – درس 19 (Getter و Setter)

این کد را کمی بیشتر می‌توانیم بهبود دهیم:

ابتدا به جای Observer می‌توانیم از DisposableObserver که Disposable را پیاده‌سازی کرده و حاوی متد ()dispose هست استفاده کنیم. بنابراین نیازی به متد ()onSubScribe نداریم چرا که می‌توانیم مستقیماً متد ()dispose را از نمونه‌ی DisposableObserver فرخوانی کنیم.

دوماً، به جای متد ()subscribe. که void را بر می‌گرداند می‌توانیم از متد ()subscribeWith. استفاده کنیم که obsever مشخصی را برمی‌گرداند.

class MainViewModel(application: Application) : AndroidViewModel(application) {
...

lateinit var disposable: Disposable

fun loadRepositories() {
isLoading.set(true)
disposable = gitRepoRepository.getRepositories().subscribeWith(object: DisposableObserver<ArrayList<Repository>>() {

override fun onError(e: Throwable) {
// todo
}

override fun onNext(data: ArrayList<Repository>) {
repositories.value = data
}

override fun onComplete() {
isLoading.set(false)
}
})
}

override fun onCleared() {
super.onCleared()
if(!disposable.isDisposed){
disposable.dispose()
}
}
}

 

اما هنوز هم جای بهبود این کد وجود دارد:

ما نمونه‌ی Disposable را ذخیره کرده‌ایم تا به محض فراخونی متد ()onCleared بتوانیم Disposeاش کنیم. اما صبر کنید، آیا برای هر فرخوانی روی لایه‌ی دیتا بایستی این کار را انجام دهیم؟ چه می‌شود اگر ۱۰ فرخوانی روی لایه‌ی داده داشته باشیم؟ آیا مهم است که بین disposableها تفاوت قائل شویم؟

خیر. برای این کار راهکار هوشمندانه‌تری وجود دارد. می‌توانیم همه‌ی آنها را در یک بسته قرار دهیم و به محض فراخوانی ()onCleared نابود کنیم. ما می‌توانیم از CompositeDisposable استفاده کنیم.

CompositeDisposable یک کانتینر disposable است که می‌تواند چندین disposable دیگر را نگه‌داری کند.

بنابراین، هر وقت Disposable را ایجاد میکنیم باید داخل یک CompositeDisposable قرار دهیم:

class MainViewModel(application: Application) : AndroidViewModel(application) {
...

private val compositeDisposable = CompositeDisposable()

fun loadRepositories() {
isLoading.set(true)
compositeDisposable.add(gitRepoRepository.getRepositories().subscribeWith(object: DisposableObserver<ArrayList<Repository>>() {

override fun onError(e: Throwable) {
//if some error happens in our data layer our app will not crash, we will
// get error here
}

override fun onNext(data: ArrayList<Repository>) {
repositories.value = data
}

override fun onComplete() {
isLoading.set(false)
}
}))
}

override fun onCleared() {
super.onCleared()
if(!compositeDisposable.isDisposed){
compositeDisposable.dispose()
}
}
}

 

به لطف اکستنشن‌های کاتلین این کد را می‌توانیم باز هم بهبود دهیم:

کاتلین همانند سی‌شارپ و Gosu، توانایی اکستند کردن قابلیت‌های یک کلاس را به شما می‌دهد بدون اینکه نیازی به ارث‌بری از آن کلاس باشد.

اجازه دهید پکیج جدیدی تحت عنوان extensions بسازیم و فایل RxExtensions.kt را به آن اضافه کینم:

import io.reactivex.disposables.CompositeDisposable
import io.reactivex.disposables.Disposable

operator fun CompositeDisposable.plusAssign(disposable: Disposable) {
add(disposable)
}

 

اکنون می‌توانیم شی Disposable را به نمونه‌ی ایجاد شده از CompositeDisposable با استفاده از علامت =+ اضافه کنیم:

class MainViewModel(application: Application) : AndroidViewModel(application) {
...

private val compositeDisposable = CompositeDisposable()

fun loadRepositories() {
isLoading.set(true)
compositeDisposable += gitRepoRepository.getRepositories().subscribeWith(object : DisposableObserver<ArrayList<Repository>>() {

override fun onError(e: Throwable) {
//if some error happens in our data layer our app will not crash, we will
// get error here
}

override fun onNext(data: ArrayList<Repository>) {
repositories.value = data
}

override fun onComplete() {
isLoading.set(false)
}
})
}

override fun onCleared() {
super.onCleared()
if (!compositeDisposable.isDisposed) {
compositeDisposable.dispose()
}
}
}

 

خب بیایید برنامه را اجرا کنیم. وقتی دکمه Load Data را می‌زنید، برنامه به مدت ۲ ثانیه کرش می‌کند. وقتی به لاگ مراجعه می‌کنید سرمنشاء خطا را در متد ()onNext می‌یابید که پیغام استثنای ایجاد شده از این قرار است:

java.lang.IllegalStateException: امکان invoke کردن setValue روی ترد زمینه (background) وجود ندارد.

اما دلیل بروز این خطا چیست؟

Schedulers (همزمانی)

RxJava همراه شده است با Schedulers که توانایی انتخاب بین تردهای اجرا کننده‌ی کد را به ما می‌دهد. یا اگر بخواهم دقیق‌تر بگویم، می‌توانیم انتخاب کنیم observable با استفاده از متد ()subscribeOn روی کدام ترد اجرا شود یا برای observer توسط ()obsdrverOn تردی که روی آن اجرا می‌شود مشخص خواهد شد. معمولاً همه‌ی داده‌های دریافتی از لایه‌ی دیتا در ترد زمینه یا background اجرا می‌شود. به عنوان مثال وقتی از ()Schedulers.newThread استفاده کنیم، هر زمان Scheduler را صدا بزنیم ترد جدیدی می‌سازد. Schedulers متدهای دیگری نیز دارد که به منظور حفظ سادگی بحث از توضیح آن‌ها اجتناب می‌کنم.

احتمالاً این را می‌دانید که همه‌ی کدهای UI (واسط گرافیکی برنامه) در ترد اصلی اندروید اجرا می‌شود. RxJava کتابخانه‌ای است که اطلاعی از ترد اصلی اندروید ندارد. دقیقاً به همین دلیل از RxAndroid استفاده می‌کنیم. RxAndroid این توانایی که از ترد اصلی برای اجرا کدهایمان استفاده کنیم را به ما می‌دهد. واضح است که Observer ما باید روی ترد اصلی اجرا شود. انجامش دهیم:

...
fun loadRepositories() {
isLoading.set(true)
compositeDisposable += gitRepoRepository
.getRepositories()
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribeWith(object : DisposableObserver<ArrayList<Repository>>() {
...
})
}
...

 

بخوانید  آموزش زبان کاتلین – درس 6 (کامنت‌ها)

برنامه را دوباره اجرا می‌کنیم. عالی است. همه چیز درست کار می‌کند.

انواع دیگر observables

انواع دیگری از observable هم داریم که به قرار زیر هستند:

<Single<T، نوعی از observable است که تنها یک آیتم یا خطا منتشر می‌کند.

<Maybe<T، این یکی یا فقط یک آیتم/خطا منتشر می‌کند یا هیچ آیتمی منتشر نمی‌کند.

Completable رویداد onSuccess یا خطا را بر می‌گرداند.

<>Flowable همچون <Observable<T تعداد n آیتم منتشر کرده یا هیچ آیتم و خطایی منتشر نمی‌کند. Observable از backpressure پشتیبانی نمی‌کند ولی Flowable پشتیبانی می‌کند.

backpressure چیست؟

برای درک درست این مفهوم بهتر است مثالی از دنیای واقعی بزنم:

مفهوم backpressure در RxJava

من همچون یک قیف به آن نگاه می‌کنم. وقتی قیف را بیشتر از ظرفیت آن پر کنید سرریز شده و افتضاح به بار می‌آید. گاهی اوقات observer شما توان پردازش تعداد زیادی رخداد دریافتی را ندارد به همین خاطر نیاز است که سرعت پایین بیاید.

برای کسب اطلاعات بیشتر در این زمینه می‌توانید به مستندات RxJava مراجعه کنید.

عملگرها

نکته‌ی جالبی که درباره‌ی RxJava وجود دارد عملگرهای آن است. برخی از مشکلات معمول که در حالت عادی با ۱۰ خط یا بیشتر حل می‌شوند به کمک عملگرها تنها با یک خط کد حل می‌شود. در واقع عملگرهایی وجود دارد که به ما کمک می‌کنند:

  • observableها را ترکیب کنیم؛
  • فیلترشان کنیم؛
  • شرایطی را تعریف کنیم؛
  • observable را به انواع دیگرش تبدیل کنیم.

مثالی بزنم، عمل ذخیره‌سازی داده در GitRepoLocalDataSource را به پایان برسانیم. از آنجایی که می‌خواهیم داده‌ها را ذخیره کنیم به Completable برای شبیه‌سازی‌اش نیاز داریم. اجازه دهید تاخیر ۱ ثانیه‌ای در ذخیره‌سازی را هم شبیه‌سازی کنیم. ساده‌ترین راهش این است:

fun saveRepositories(arrayList: ArrayList<Repository>): Completable {
return Completable.complete().delay(1,TimeUnit.SECONDS)
}

چرا ساده‌ترین راه؟

()Completable.coomplete یک نمونه از Complateble که به محض عضویت در آن پایان می‌یابد را برمی‌گرداند.

به محض کامپلت شدن Complatable این شی terminate می‌شود. بنابراین هیچ عملگری (از جمله delay) بعد از آن اجرا نمی‌شود. در این حالت، Complatable ما هیچ تاخیری ندارد. اجازه دهید راهی برای انجام آن بیابیم:

fun saveRepositories(arrayList: ArrayList<Repository>): Completable {
return Single.just(1).delay(1,TimeUnit.SECONDS).toCompletable()
}

چرا این روش؟

()Single.just یک Single ایجاد می‌کند که فقط عدد ۱ را منتشر می‌کند. از آنجایی که ما از (delay(1,TimeUnit.SECONDS استفاده می‌کنیم، انتشار هر یک ثانیه اتفاق می‌افتد.

بنابراین این کد یک Complatable را برمی‌گرداند که بعد از یک ثانیه onComplete را فرا می‌خواند.

حالا باید GitRepoRepository را تغییر دهیم. ابتدا منطقش را بار دیگر مرور کنیم. اتصال اینترنت را بررسی می‌کنیم. اگر برقرار بود داده را از  سرور دریافت و در دیتابیس محلی ذخیره می‌کنیم، در غیر اینصورت داده‌ها را از دیتابیس محلی دریافت می‌کنیم. نگاه کنید:

fun getRepositories(): Observable<ArrayList<Repository>> {

netManager.isConnectedToInternet?.let {
if (it) {
return remoteDataSource.getRepositories().flatMap {
return@flatMap localDataSource.saveRepositories(it)
.toSingleDefault(it)
.toObservable()
}
}
}

return localDataSource.getRepositories()
}

 

به محض اینکه ()remoteDataSource.getRepositories آیتمی را منتشر می‌کند با استفاده از flatMap. آن آیتم به Observalbe جدیدی که همان آیتم را منتشر می‌کند مپ می‌کنیم. Observable جدید را از Complateable ساخته‌ایم که آیتم یکسانی را در دیتاسورس محلی ذخیره کرده و آن را به یک Single که همان آیتم را منتشر می‌کند تبدیل می‌کند. از آنجایی که ما نیاز داریم Observable را برگردانیم، بایستی Single را به یک Observable تبدیل کنیم.

اصلاً فهمیدید چه شد؟ تصور کنید چه کارهایی که نمی‌شود با RxJava انجام داد. واقعاً ابزار قدرتمندی است. با آن بازی کنید، کدها و مثال‌هایش را بررسی کنید. مطمئنم کمی که بیشتر با RxJava آشنا شوید عاشقش می‌شوید.

سید ایوب کوکبی

نویسنده و مترجم...

0 دیدگاه

نشانی ایمیل شما منتشر نخواهد شد. بخش‌های موردنیاز علامت‌گذاری شده‌اند *