实现简单的订阅发布

设计模式之订阅发布模式,手动实现简单的定义发布模式,当我们订阅时,当我们订阅后,当我们发布消息时,所有订阅者都会收到订阅消息。

创建订阅发布的对象

 创建一个订阅发布对象,当我们订阅时,调用 subscribe 方法进行订阅,对象实例会把订阅者传进来的函数缓存起来;我们通过调用 next 方法进行发布,当我们通过 next 方法发布消息时,所有的订阅者都会收到消息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
class Subject {
constructor() {
this.callbackArr = []; // 缓存订阅的回调函数
}

// 订阅
subscribe(callback) {
this.callbackArr.push(callback);
return this;
}

// 发布
next(vals) {
this.callbackArr.forEach((callback) => {
callback.call(this, vals);
});
return this;
}

// 取消订阅
unsubscribe() {
this.callbackArr = [];
}
}

// 创建订阅发布实例
const sub = new Subject();

// 每隔1s发布一次消息
let count = 0;
setInterval(() => {
sub.next(count);
count++;
}, 1000);

// 订阅者1
sub.subscribe((item) => {
console.log(item); // 每隔1s依次输出 0 1 2 3 ...
});

// 订阅者2
sub.subscribe((item) => {
console.log(item); // 每隔1s依次输出 0 1 2 3 ...
});

/*
输出:
0
0
1
1
2
2
.
.
*/

我们模拟 rxjsfilter 操作符

rxjsfilter 操作符有点类似于 Array 的 filter 方法,返回的是通过 filter 筛选过后的值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
class Subject {
constructor() {
this.callbackArr = []; // 缓存订阅的回调函数
this.subArr = []; // 缓存筛选后的 Subject 实例
}

// 订阅
subscribe(callback) {
this.callbackArr.push(callback);
return this;
}

// 发布
next(vals) {
this.callbackArr.forEach((callback) => {
callback.call(this, vals);
});
return this;
}

// 过滤,过滤后我们会新建一个新的 Subject 对象来推送过滤后的结果。
filter(callback) {
const newSub = new Subject();
this.subArr.push(newSub);
const newCallback = (value) => {
callback(value) && newSub.next(value);
};
this.callbackArr.push(newCallback);
return newSub;
}

unsubscribe() {
this.callbackArr = [];
this.subArr.forEach((sub) => sub.unsubscribe());
}
}

// 每隔1s发布一次消息
let count = 0;
setInterval(() => {
sub.next(count);
count++;
}, 1000);

// 订阅者1
sub
.filter((item) => item % 2 === 0)
.subscribe((item) => {
console.log(item, "双"); // 每隔2s依次输出 0 2 4 ...
});

// 订阅者2
sub
.filter((item) => item % 2 === 1)
.subscribe((item) => {
console.log(item, "单"); // 每隔2s依次输出 1 3 5 ...
});

/*
输出:
0 双
1 单
2 双
3 单
4 双
.
.
*/

 当发布者隔段时间发布一个消息时,filter 函数会把发布者发布的消息先过滤一遍再推送给订阅者,把 filter 函数返回 false 的消息过滤掉,推送返回 true 的消息。

我们模拟 rxjsmap 操作符

rxjsmap 操作符有点类似于 Array 的 map 方法,返回的是通过 map 方法处理过后的值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
class Subject {
constructor() {
this.callbackArr = []; // 缓存订阅的回调函数
this.subArr = []; // 缓存筛选后的 Subject 实例
}

// 订阅
subscribe(callback) {
this.callbackArr.push(callback);
return this;
}

// 发布
next(vals) {
this.callbackArr.forEach((callback) => {
callback.call(this, vals);
});
return this;
}

// 处理发布者发布过来的值
map(callback) {
const newSub = new Subject();
this.subArr.push(newSub);
const newCallback = (value) => {
newSub.next(callback(value));
};
this.callbackArr.push(newCallback);
return newSub;
}

unsubscribe() {
this.callbackArr = [];
this.subArr.forEach((sub) => sub.unsubscribe());
}
}

// 每隔1s发布一次消息
let count = 0;
setInterval(() => {
sub.next(count);
count++;
}, 1000);

// 订阅者1
sub.subscribe((item) => {
console.log(item); // 每隔1s依次输出 1 2 3 ...
});

// 订阅者2
sub
.map((item) => item * 2)
.subscribe((item) => {
console.log(item, "map"); // 每隔1s依次输出 2 4 6 ...
});

/*
输出:
0
0map
1
2map
2
4map
3
6map
.
.
.
*/

 当发布者隔段时间发布一个消息时,map 函数会把发布者发布的消息先处理一遍再推送给订阅者。