ChatGPT解决这个技术问题 Extra ChatGPT

如何获取 RxJS Subject 或 Observable 的当前值?

我有一个 Angular 2 服务:

import {Storage} from './storage';
import {Injectable} from 'angular2/core';
import {Subject}    from 'rxjs/Subject';

@Injectable()
export class SessionStorage extends Storage {
  private _isLoggedInSource = new Subject<boolean>();
  isLoggedIn = this._isLoggedInSource.asObservable();
  constructor() {
    super('session');
  }
  setIsLoggedIn(value: boolean) {
    this.setItem('_isLoggedIn', value, () => {
      this._isLoggedInSource.next(value);
    });
  }
}

一切都很好。但是我有另一个不需要订阅的组件,它只需要在某个时间点获取 isLoggedIn 的当前值。我怎样才能做到这一点?


G
Günter Zöchbauer

SubjectObservable 没有当前值。当一个值被发出时,它被传递给订阅者并且 Observable 完成了它。

如果您想获得当前值,请使用专为此目的而设计的 BehaviorSubjectBehaviorSubject 保留最后发出的值并立即将其发送给新订阅者。

它还有一个方法 getValue() 来获取当前值。


RxJS 5 的作者的重要说明:使用 getValue() 是一个巨大的危险信号,你做错了。它作为逃生舱口在那里。一般来说,你对 RxJS 所做的一切都应该是声明性的。 getValue() 是必须的。如果您使用 getValue(),则有 99.9% 的可能性是您做错了什么或奇怪的事情。
@BenLesh 如果我有一个 BehaviorSubject<boolean>(false) 并且想切换它怎么办?
@GünterZöchbauer 我的意思是:发出新的布尔值时,我需要知道当前值来切换它。在那种情况下,使用 getValue() 是否合理?
@BenLesh getValue() 对于执行即时操作非常有用,例如 onClick->dispatchToServer(x.getValue()) 但不要在可观察链中使用它。
@IngoBürk 我可以证明您的建议的唯一方法是,如果您不知道 foo$BehaviorSubject (即它被定义为 Observable 并且可能是来自延迟来源的热或冷)。但是,由于您随后继续在 $foo 本身上使用 .next,这意味着您的实现依赖于它作为一个 BehaviorSubject,因此没有理由不只使用 .value 来获取当前价值放在第一位。
f
forivall

你应该从 Observable/Subject 中获取值的唯一方法是订阅!

如果您使用 getValue(),那么您在声明性范式中做了一些必要的事情。它作为逃生舱口在那里,但 99.9% 的情况下您不应该使用 getValue()getValue() 会做一些有趣的事情:如果主题有已取消订阅,如果主题因错误等而死亡,它将阻止您获得价值。但是,在极少数情况下,它再次作为逃生舱口存在。

有几种方法可以通过“Rx-y”方式从 Subject 或 Observable 获取最新值:

使用 BehaviorSubject:但实际上订阅它。当您第一次订阅 BehaviorSubject 时,它将同步发送它接收或初始化的先前值。使用 ReplaySubject(N):这将缓存 N 个值并将它们重播给新订阅者。 A.withLatestFrom(B):当可观察 A 发出时,使用此运算符从可观察 B 获取最新值。将为您提供数组 [a, b] 中的两个值。 A.combineLatest(B):每次 A 或 B 发出时,使用此运算符从 A 和 B 获取最新值。会给你一个数组中的两个值。 shareReplay():通过 ReplaySubject 进行 Observable 多播,但允许您在出错时重试 observable。 (基本上,它为您提供了 promise-y 缓存行为)。 publishReplay()、publishBehavior(initialValue)、multicast(subject: BehaviorSubject | ReplaySubject) 等:利用 BehaviorSubject 和 ReplaySubject 的其他运算符。同一事物的不同风格,它们基本上通过将所有通知汇集到一个主题来多播可观察的源。您需要调用 connect() 来订阅带有主题的源。


你能详细说明为什么使用 getValue() 是一个危险信号吗?如果我只需要一次可观察的当前值,即用户单击按钮的那一刻怎么办?我应该订阅价值并立即取消订阅吗?
有时候没关系。但通常这表明代码的作者正在做一些他们不应该做的非常必要的事情(通常有副作用)。您也可以执行 click$.mergeMap(() => behaviorSubject.take(1)) 来解决您的问题。
很好的解释!实际上,如果您可以保留 Observable 并使用这些方法,这是一个更好的方法。在 typescript 的上下文中,随处使用 Observable 但只有少数定义为 BehaviourSubject,那么它的代码就会不太一致。你提出的方法让我可以在任何地方保留 Observable 类型。
如果您想简单地分派当前值(例如单击时将其发送到服务器),这很好,执行 getValue() 非常方便。但不要在链接可观察运算符时使用它。
我有一个 AuthenticationService,它使用 BehaviourSubject 来存储当前登录状态(boolean truefalse)。它为想要知道状态何时更改的订阅者公开了一个 isLoggedIn$ 可观察对象。它还公开了一个 get isLoggedIn() 属性,该属性通过在底层 BehaviourSubject 上调用 getValue() 来返回当前登录状态 - 我的身份验证保护使用它来检查当前状态。这对我来说似乎是对 getValue() 的合理使用......?
K
Kfir Erez

我有类似的情况,迟到的订阅者在主题值到达后订阅主题。

我发现与 BehaviorSubject 类似的 ReplaySubject 在这种情况下就像一个魅力。这里有一个更好解释的链接:http://reactivex.io/rxjs/manual/overview.html#replaysubject


这在我的 Angular4 应用程序中帮助了我 - 我不得不将订阅从组件构造函数移动到 ngOnInit() (这样的组件在路由之间共享),只是离开这里以防有人遇到类似问题
我在使用 Angular 5 应用程序时遇到问题,我使用服务从 api 获取值并在不同组件中设置变量。我正在使用主题/可观察对象,但在路线更改后它不会推送值。 ReplaySubject 是 Subject 的替代品,解决了所有问题。
确保您使用的是 ReplaySubject(1) 否则新订阅者将按顺序获得每个先前发出的值 - 这在运行时并不总是很明显
@Drenai 据我了解 ReplaySubject(1) 的行为与 BehaviorSubject() 相同
不太一样,最大的区别是 ReplaySubject 在订阅时不会立即发出默认值,如果它的 next() 函数尚未被调用,而 BehaviourSubject 会。例如,当 BehaviourSubject 用作服务中可观察的数据源时,立即发出对于将默认值应用到视图非常方便
s
sean
const observable = of('response')

function hasValue(value: any) {
  return value !== null && value !== undefined;
}

function getValue<T>(observable: Observable<T>): Promise<T> {
  return observable
    .pipe(
      filter(hasValue),
      first()
    )
    .toPromise();
}

const result = await getValue(observable)
// Do the logic with the result
// .................
// .................
// .................

您可以从此处查看有关如何实现它的完整文章。 https://www.imkrish.com/blog/development/simple-way-get-value-from-observable


正是我想要的。谢谢!
不幸的是链接已过期。我被使用 getValue(),这个解决方案解决了我的问题。谢谢!
M
Molp Burnbright

我在子组件中遇到了同样的问题,最初它必须具有 Subject 的当前值,然后订阅 Subject 以监听更改。我只是维护服务中的当前值,以便组件可以访问它,例如:

import {Storage} from './storage';
import {Injectable} from 'angular2/core';
import {Subject}    from 'rxjs/Subject';

@Injectable()
export class SessionStorage extends Storage {

  isLoggedIn: boolean;

  private _isLoggedInSource = new Subject<boolean>();
  isLoggedIn = this._isLoggedInSource.asObservable();
  constructor() {
    super('session');
    this.currIsLoggedIn = false;
  }
  setIsLoggedIn(value: boolean) {
    this.setItem('_isLoggedIn', value, () => {
      this._isLoggedInSource.next(value);
    });
    this.isLoggedIn = value;
  }
}

需要当前值的组件可以从服务中访问它,即:

sessionStorage.isLoggedIn

不确定这是否是正确的做法:)


如果您需要组件视图中可观察对象的值,则可以使用 async 管道。
S
Simon_Weaver

一个类似的答案被否决了。但我认为我可以证明我在这里针对有限情况提出的建议是合理的。

虽然 observable 确实没有当前值,但它通常会有一个立即可用的值。例如,对于 redux/flux/akita 存储,您可以根据一些可观察的数据从中央存储请求数据,并且该值通常会立即可用。

如果是这种情况,那么当您 subscribe 时,该值将立即返回。

因此,假设您调用了一项服务,并且在完成后您想从您的商店中获取可能不会发出的最新值:

您可以尝试这样做(并且您应该尽可能将东西保留在“管道内”):

 serviceCallResponse$.pipe(withLatestFrom(store$.select(x => x.customer)))
                     .subscribe(([ serviceCallResponse, customer] => {

                        // we have serviceCallResponse and customer 
                     });

这样做的问题是它会阻塞,直到辅助 observable 发出一个值,这可能永远不会。

我最近发现自己只有在值立即可用时才需要评估 observable,更重要的是我需要能够检测它是否不可用。我最终这样做了:

 serviceCallResponse$.pipe()
                     .subscribe(serviceCallResponse => {

                        // immediately try to subscribe to get the 'available' value
                        // note: immediately unsubscribe afterward to 'cancel' if needed
                        let customer = undefined;

                        // whatever the secondary observable is
                        const secondary$ = store$.select(x => x.customer);

                        // subscribe to it, and assign to closure scope
                        sub = secondary$.pipe(take(1)).subscribe(_customer => customer = _customer);
                        sub.unsubscribe();

                        // if there's a delay or customer isn't available the value won't have been set before we get here
                        if (customer === undefined) 
                        {
                           // handle, or ignore as needed
                           return throwError('Customer was not immediately available');
                        }
                     });

请注意,对于上述所有内容,我使用 subscribe 来获取值(正如@Ben 所讨论的那样)。不使用 .value 属性,即使我有 BehaviorSubject


顺便说一句,这是因为默认情况下“计划”使用当前线程。
j
j3ff

虽然听起来可能有点矫枉过正,但这只是保持 Observable 类型并减少样板文件的另一种“可能”解决方案......

你总是可以创建一个扩展 getter 来获取 Observable 的当前值。

为此,您需要在 global.d.ts 类型声明文件中扩展 Observable<T> 接口。然后在 observable.extension.ts 文件中实现 extension getter,最后将类型和扩展文件都包含到您的应用程序中。

您可以参考此 StackOverflow Answer 以了解如何将扩展包含到您的 Angular 应用程序中。

// global.d.ts
declare module 'rxjs' {
  interface Observable<T> {
    /**
     * _Extension Method_ - Returns current value of an Observable.
     * Value is retrieved using _first()_ operator to avoid the need to unsubscribe.
     */
    value: Observable<T>;
  }
}

// observable.extension.ts
Object.defineProperty(Observable.prototype, 'value', {
  get <T>(this: Observable<T>): Observable<T> {
    return this.pipe(
      filter(value => value !== null && value !== undefined),
      first());
  },
});

// using the extension getter example
this.myObservable$.value
  .subscribe(value => {
    // whatever code you need...
  });

B
Brittank88

有两种方法可以实现这一目标。

BehaviorSubject 有一个方法 getValue() ,您可以在特定时间点获取值。您可以直接使用 BehaviorSubject 订阅,并且可以将订阅的值传递给类成员、字段或属性。

我不会推荐这两种方法。

在第一种方法中,这是一种方便的方法,您可以随时获取值,您可以将其称为该时间点的当前快照。问题是你可以在你的代码中引入竞争条件,你可以在许多不同的地方和不同的时间调用这个方法,这很难调试。

第二种方法是大多数开发人员在订阅时想要原始值时采用的方法,您可以跟踪订阅以及何时准确取消订阅以避免进一步的内存泄漏,如果您真的不顾一切将其绑定到变量,则可以使用此方法并且没有其他方法可以连接它。

我会建议,再次查看您的用例,您在哪里使用它?例如,您想在调用任何 API 时确定用户是否登录,您可以将其与其他 observable 组合:

const data$ = apiRequestCall$().pipe(
 // Latest snapshot from BehaviorSubject.
 withLatestFrom(isLoggedIn),
 // Allow call only if logged in.
 filter(([request, loggedIn]) => loggedIn)
 // Do something else..
);

有了这个,您可以通过管道 data$ | async 将它直接用于 UI,以防有角度。


S
SushiGuy

可以创建订阅,然后在获取第一个发出的项目后,将其销毁。在下面的示例中, pipe() 是一个使用 Observable 作为其输入并返回另一个 Observable 作为其输出的函数,同时不修改第一个 Observable。

使用 Angular 8.1.0"rxjs": "6.5.3""rxjs-observable": "0.0.7" 创建的示例

  ngOnInit() {

    ...

    // If loading with previously saved value
    if (this.controlValue) {

      // Take says once you have 1, then close the subscription
      this.selectList.pipe(take(1)).subscribe(x => {
        let opt = x.find(y => y.value === this.controlValue);
        this.updateValue(opt);
      });

    }
  }

S
Slawa

您可以将最后发出的值与 Observable 分开存储。然后在需要时阅读。

let lastValue: number;

const subscription = new Service().start();
subscription
    .subscribe((data) => {
        lastValue = data;
    }
);

在可观察对象之外存储一些东西并不是一种被动的方法。相反,您应该在可观察流中流动尽可能多的数据。
y
yaya

最好的方法是使用 Behaviur Subject,下面是一个示例:

var sub = new rxjs.BehaviorSubject([0, 1])
sub.next([2, 3])
setTimeout(() => {sub.next([4, 5])}, 1500)
sub.subscribe(a => console.log(a)) //2, 3 (current value) -> wait 2 sec -> 4, 5