RxJava Quick Start

Basic Theory

  • Flowable: 多个流,响应式流和背压
  • Observable: 多个流,无背压
  • Single: 只有一个元素或者错误的流

Testing Model

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
class DataManager(size: Int = 16) {
private val data = mutableListOf<Student>()

init {
for (index in 0 until size) {
val student = Student("Jack-$index", mutableListOf(Course("English-$index")))
data.add(student)
}
}

fun queryOrigin(): MutableList<Student> {
return this.data
}

}

data class Student(
var name: String,
var courses: MutableList<Course> = mutableListOf()
)
data class Course(var name: String)

Transforming Observables

Map

transform the items emitted by an Observable by applying a function to each item

Example 1

1
2
3
4
5
Observable.just(1, 2, 3, 4, 5)
.map { it.toDouble() }
.subscribe {
println(it)
}

flatMap vs concatMap

transform the items emitted by an Observable into Observables, then flatten the emissions from those into a single Observable

Example 1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
val dataManager = DataManager()
val flatCourse = mutableListOf<Course>()
val queryOrigin = dataManager.queryOrigin()
Observable.fromIterable(queryOrigin)
.flatMap { stu ->
Observable.fromIterable(stu.courses)
}.subscribeOn(Schedulers.newThread())
.blockingSubscribe { course ->
flatCourse.add(course)
}

val concatCourse = mutableListOf<Course>()
Observable.fromIterable(queryOrigin)
.concatMap { stu ->
Observable.fromIterable(stu.courses)
}.subscribeOn(Schedulers.newThread())
.blockingSubscribe { course ->
concatCourse.add(course)
}

println("origin = ${dataManager.queryOrigin()}")
println("flatMap = $flatCourse")
println("concatMap = $concatCourse")
1
2
3
origin = [Student(name=Jack-0, courses=[Course(name=English-0)]), Student(name=Jack-1, courses=[Course(name=English-1)]), Student(name=Jack-2, courses=[Course(name=English-2)])]
flatMap = [Course(name=English-0), Course(name=English-1), Course(name=English-2)]
concatMap = [Course(name=English-0), Course(name=English-1), Course(name=English-2)]

flatMap将一个发送事件的上游Observable变换为多个发送事件的Observables,然后将它们发射的事件合并后放进一个单独的Observable里。需要注意的是, flatMap并不保证事件的顺序,也就是说转换之后的Observables的顺序不必与转换之前的序列的顺序一致。

与flatMap对应的方法是contactMap,后者能够保证最终输出的顺序与上游发送的顺序一致。

Both methods look pretty much the same, but there is a difference: operator usage when merging the final results.

The flatMap() method creates a new Observable by applying a function that you supply to each item emitted by the original Observable, where that function is itself an Observable that emits items, and then merges the results of that function applied to every item emitted by the original Observable, emitting these merged results.

Note that flatMap() may interleave the items emitted by the Observables that result from transforming the items emitted by the source Observable.

If it is important that these items not be interleaved, you can instead use the similar concatMap() method.

FlatMap对这些Observables发射的数据做的是合并(merge)操作,因此它们可能是交错的。也就说,传入的顺序可能跟出来的顺序不一样。如果要保证顺的的话,可以使用concatMap。

flatMapIterable

switchMap

Example 1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
val dataManager = DataManager()

val switchMapCourse = mutableListOf<String>()
val queryOrigin = dataManager.queryOrigin()

Observable.fromIterable(queryOrigin)
.switchMap { stu ->
Observable.timer(1, TimeUnit.SECONDS)
.map { stu.name }
}.subscribeOn(Schedulers.newThread())
.blockingSubscribe { course ->
switchMapCourse.add(course)
}
println("origin = ${dataManager.queryOrigin()}")
println("switchMap = $switchMapCourse")
1
2
origin = [Student(name=Jack-0, courses=[Course(name=English-0)]), Student(name=Jack-1, courses=[Course(name=English-1)]), Student(name=Jack-2, courses=[Course(name=English-2)])]
switchMap = [Jack-2]

If just return a normal observable on switchMap(), example:

Example 1

1
2
3
4
5
6
7
Observable.fromIterable(queryOrigin)
.switchMap { stu ->
Observable.just(stu.name)
}.subscribeOn(Schedulers.newThread())
.blockingSubscribe { course ->
switchMapCourse.add(course)
}
1
2
origin = [Student(name=Jack-0, courses=[Course(name=English-0)]), Student(name=Jack-1, courses=[Course(name=English-1)]), Student(name=Jack-2, courses=[Course(name=English-2)])]
switchMap = [Jack-0, Jack-1, Jack-2]

cast

Example 1

1
2
3
Observable.just(1, 2, 3)
.cast(Number.class)
.subscribe(num -> println(num)));

scan

apply a function to each item emitted by an Observable, sequentially, and emit each successive value

Example 1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
val dataManager = DataManager()

val scanStudent = mutableListOf<Student>()
val queryOrigin = dataManager.queryOrigin()

Observable.fromIterable(queryOrigin)
.scan { lastStu: Student, student: Student ->
println("---------------------")
println("lastStu ${lastStu.name}")
println("current Stu ${student.name}")
println("next Stu ${lastStu.name} + ${student.name}")
Student(name = "${lastStu.name} + ${student.name}")
}.subscribe {
scanStudent.add(it)
}
println("====================================================================================")
println("origin = ${dataManager.queryOrigin()}")
println("====================================================================================")
for (student in scanStudent) {
println("student ${student.name}")
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
---------------------
lastStu Jack-0
current Stu Jack-1
next Stu Jack-0 + Jack-1
---------------------
lastStu Jack-0 + Jack-1
current Stu Jack-2
next Stu Jack-0 + Jack-1 + Jack-2
====================================================================================
origin = [Student(name=Jack-0, courses=[Course(name=English-0)]), Student(name=Jack-1, courses=[Course(name=English-1)]), Student(name=Jack-2, courses=[Course(name=English-2)])]
====================================================================================
student Jack-0
student Jack-0 + Jack-1
student Jack-0 + Jack-1 + Jack-2

buffer

periodically gather items from an Observable into bundles and emit these bundles rather than emitting the items one at a time

Example 1

1
2
3
4
5
Observable.just(1, 2, 3, 4, 5, 6, 7, 8)
.buffer(2)
.subscribe {
println(it)
}

windows

periodically subdivide items from an Observable into Observable windows and emit these windows rather than emitting the items one at a time

Window is similar to Buffer, but rather than emitting packets of items from the source Observable, it emits Observables, each one of which emits a subset of items from the source Observable and then terminates with an onCompleted notification.

Window和Buffer类似,但不是发射来自原始Observable的数据包,它发射的是Observable,这些Observables中的每一个都发射原始Observable数据的一个子集,最后发射一个onCompleted通知。
以下面的程序为例,这里我们首先生成了一个由10个数字组成的整数序列,然后使用window函数将它们每3个作为一组,每组会返回一个对应的Observable对象。
这里我们对该返回的结果进行订阅并进行消费,因为10个数字,所以会被分成4个组,每个对应一个Observable:

Example 1

1
2
3
4
5
6
Observable.range(1, 10).window(3)
.subscribe { observable ->
observable.subscribe { integer ->
println(observable.hashCode().toString() + " : " + integer)
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
901506536 : 1
901506536 : 2
901506536 : 3

747464370 : 4
747464370 : 5
747464370 : 6

1513712028 : 7
1513712028 : 8
1513712028 : 9

1018547642 : 10

groupBy

divide an Observable into a set of Observables that each emit a different group of items from the original Observable, organized by key

groupBy用于分组元素,它可以被用来根据指定的条件将元素分成若干组。

Example 1

1
2
3
4
5
6
7
8
Observable.concat(
Observable.range(1, 4),
Observable.range(1, 6)
).groupBy { integer ->
integer
}.subscribe { groupedObservable: GroupedObservable<Int, Int> ->
groupedObservable.subscribe { println("key ${groupedObservable.key} -> value $it") }
}
1
2
3
4
5
6
7
8
9
10
key 1 -> value 1
key 2 -> value 2
key 3 -> value 3
key 4 -> value 4
key 1 -> value 1
key 2 -> value 2
key 3 -> value 3
key 4 -> value 4
key 5 -> value 5
key 6 -> value 6

Math

average

count

max

min

reduce

apply a function to each item emitted by an Observable, sequentially, and emit the final value


Reference