Home Manual Reference Source Test Repository
dark themelight theme

es6/Subscriber.js

  1. import { isFunction } from './util/isFunction';
  2. import { Subscription } from './Subscription';
  3. import { empty as emptyObserver } from './Observer';
  4. import { $$rxSubscriber } from './symbol/rxSubscriber';
  5. /**
  6. * Implements the {@link Observer} interface and extends the
  7. * {@link Subscription} class. While the {@link Observer} is the public API for
  8. * consuming the values of an {@link Observable}, all Observers get converted to
  9. * a Subscriber, in order to provide Subscription-like capabilities such as
  10. * `unsubscribe`. Subscriber is a common type in RxJS, and crucial for
  11. * implementing operators, but it is rarely used as a public API.
  12. *
  13. * @class Subscriber<T>
  14. */
  15. export var Subscriber = (function (_super) {
  16. __extends(Subscriber, _super);
  17. /**
  18. * @param {Observer|function(value: T): void} [destinationOrNext] A partially
  19. * defined Observer or a `next` callback function.
  20. * @param {function(e: ?any): void} [error] The `error` callback of an
  21. * Observer.
  22. * @param {function(): void} [complete] The `complete` callback of an
  23. * Observer.
  24. */
  25. function Subscriber(destinationOrNext, error, complete) {
  26. _super.call(this);
  27. this.syncErrorValue = null;
  28. this.syncErrorThrown = false;
  29. this.syncErrorThrowable = false;
  30. this.isStopped = false;
  31. switch (arguments.length) {
  32. case 0:
  33. this.destination = emptyObserver;
  34. break;
  35. case 1:
  36. if (!destinationOrNext) {
  37. this.destination = emptyObserver;
  38. break;
  39. }
  40. if (typeof destinationOrNext === 'object') {
  41. if (destinationOrNext instanceof Subscriber) {
  42. this.destination = destinationOrNext;
  43. this.destination.add(this);
  44. }
  45. else {
  46. this.syncErrorThrowable = true;
  47. this.destination = new SafeSubscriber(this, destinationOrNext);
  48. }
  49. break;
  50. }
  51. default:
  52. this.syncErrorThrowable = true;
  53. this.destination = new SafeSubscriber(this, destinationOrNext, error, complete);
  54. break;
  55. }
  56. }
  57. Subscriber.prototype[$$rxSubscriber] = function () { return this; };
  58. /**
  59. * A static factory for a Subscriber, given a (potentially partial) definition
  60. * of an Observer.
  61. * @param {function(x: ?T): void} [next] The `next` callback of an Observer.
  62. * @param {function(e: ?any): void} [error] The `error` callback of an
  63. * Observer.
  64. * @param {function(): void} [complete] The `complete` callback of an
  65. * Observer.
  66. * @return {Subscriber<T>} A Subscriber wrapping the (partially defined)
  67. * Observer represented by the given arguments.
  68. */
  69. Subscriber.create = function (next, error, complete) {
  70. var subscriber = new Subscriber(next, error, complete);
  71. subscriber.syncErrorThrowable = false;
  72. return subscriber;
  73. };
  74. /**
  75. * The {@link Observer} callback to receive notifications of type `next` from
  76. * the Observable, with a value. The Observable may call this method 0 or more
  77. * times.
  78. * @param {T} [value] The `next` value.
  79. * @return {void}
  80. */
  81. Subscriber.prototype.next = function (value) {
  82. if (!this.isStopped) {
  83. this._next(value);
  84. }
  85. };
  86. /**
  87. * The {@link Observer} callback to receive notifications of type `error` from
  88. * the Observable, with an attached {@link Error}. Notifies the Observer that
  89. * the Observable has experienced an error condition.
  90. * @param {any} [err] The `error` exception.
  91. * @return {void}
  92. */
  93. Subscriber.prototype.error = function (err) {
  94. if (!this.isStopped) {
  95. this.isStopped = true;
  96. this._error(err);
  97. }
  98. };
  99. /**
  100. * The {@link Observer} callback to receive a valueless notification of type
  101. * `complete` from the Observable. Notifies the Observer that the Observable
  102. * has finished sending push-based notifications.
  103. * @return {void}
  104. */
  105. Subscriber.prototype.complete = function () {
  106. if (!this.isStopped) {
  107. this.isStopped = true;
  108. this._complete();
  109. }
  110. };
  111. Subscriber.prototype.unsubscribe = function () {
  112. if (this.closed) {
  113. return;
  114. }
  115. this.isStopped = true;
  116. _super.prototype.unsubscribe.call(this);
  117. };
  118. Subscriber.prototype._next = function (value) {
  119. this.destination.next(value);
  120. };
  121. Subscriber.prototype._error = function (err) {
  122. this.destination.error(err);
  123. this.unsubscribe();
  124. };
  125. Subscriber.prototype._complete = function () {
  126. this.destination.complete();
  127. this.unsubscribe();
  128. };
  129. return Subscriber;
  130. }(Subscription));
  131. /**
  132. * We need this JSDoc comment for affecting ESDoc.
  133. * @ignore
  134. * @extends {Ignored}
  135. */
  136. var SafeSubscriber = (function (_super) {
  137. __extends(SafeSubscriber, _super);
  138. function SafeSubscriber(_parent, observerOrNext, error, complete) {
  139. _super.call(this);
  140. this._parent = _parent;
  141. var next;
  142. var context = this;
  143. if (isFunction(observerOrNext)) {
  144. next = observerOrNext;
  145. }
  146. else if (observerOrNext) {
  147. context = observerOrNext;
  148. next = observerOrNext.next;
  149. error = observerOrNext.error;
  150. complete = observerOrNext.complete;
  151. if (isFunction(context.unsubscribe)) {
  152. this.add(context.unsubscribe.bind(context));
  153. }
  154. context.unsubscribe = this.unsubscribe.bind(this);
  155. }
  156. this._context = context;
  157. this._next = next;
  158. this._error = error;
  159. this._complete = complete;
  160. }
  161. SafeSubscriber.prototype.next = function (value) {
  162. if (!this.isStopped && this._next) {
  163. var _parent = this._parent;
  164. if (!_parent.syncErrorThrowable) {
  165. this.__tryOrUnsub(this._next, value);
  166. }
  167. else if (this.__tryOrSetError(_parent, this._next, value)) {
  168. this.unsubscribe();
  169. }
  170. }
  171. };
  172. SafeSubscriber.prototype.error = function (err) {
  173. if (!this.isStopped) {
  174. var _parent = this._parent;
  175. if (this._error) {
  176. if (!_parent.syncErrorThrowable) {
  177. this.__tryOrUnsub(this._error, err);
  178. this.unsubscribe();
  179. }
  180. else {
  181. this.__tryOrSetError(_parent, this._error, err);
  182. this.unsubscribe();
  183. }
  184. }
  185. else if (!_parent.syncErrorThrowable) {
  186. this.unsubscribe();
  187. throw err;
  188. }
  189. else {
  190. _parent.syncErrorValue = err;
  191. _parent.syncErrorThrown = true;
  192. this.unsubscribe();
  193. }
  194. }
  195. };
  196. SafeSubscriber.prototype.complete = function () {
  197. if (!this.isStopped) {
  198. var _parent = this._parent;
  199. if (this._complete) {
  200. if (!_parent.syncErrorThrowable) {
  201. this.__tryOrUnsub(this._complete);
  202. this.unsubscribe();
  203. }
  204. else {
  205. this.__tryOrSetError(_parent, this._complete);
  206. this.unsubscribe();
  207. }
  208. }
  209. else {
  210. this.unsubscribe();
  211. }
  212. }
  213. };
  214. SafeSubscriber.prototype.__tryOrUnsub = function (fn, value) {
  215. try {
  216. fn.call(this._context, value);
  217. }
  218. catch (err) {
  219. this.unsubscribe();
  220. throw err;
  221. }
  222. };
  223. SafeSubscriber.prototype.__tryOrSetError = function (parent, fn, value) {
  224. try {
  225. fn.call(this._context, value);
  226. }
  227. catch (err) {
  228. parent.syncErrorValue = err;
  229. parent.syncErrorThrown = true;
  230. return true;
  231. }
  232. return false;
  233. };
  234. SafeSubscriber.prototype._unsubscribe = function () {
  235. var _parent = this._parent;
  236. this._context = null;
  237. this._parent = null;
  238. _parent.unsubscribe();
  239. };
  240. return SafeSubscriber;
  241. }(Subscriber));
  242. //# sourceMappingURL=Subscriber.js.map