Ohm-Management - Projektarbeit B-ME
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

ConnectableObservable.js 4.1KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130
  1. import { SubjectSubscriber } from '../Subject';
  2. import { Observable } from '../Observable';
  3. import { Subscriber } from '../Subscriber';
  4. import { Subscription } from '../Subscription';
  5. import { refCount as higherOrderRefCount } from '../operators/refCount';
  6. export class ConnectableObservable extends Observable {
  7. constructor(source, subjectFactory) {
  8. super();
  9. this.source = source;
  10. this.subjectFactory = subjectFactory;
  11. this._refCount = 0;
  12. this._isComplete = false;
  13. }
  14. _subscribe(subscriber) {
  15. return this.getSubject().subscribe(subscriber);
  16. }
  17. getSubject() {
  18. const subject = this._subject;
  19. if (!subject || subject.isStopped) {
  20. this._subject = this.subjectFactory();
  21. }
  22. return this._subject;
  23. }
  24. connect() {
  25. let connection = this._connection;
  26. if (!connection) {
  27. this._isComplete = false;
  28. connection = this._connection = new Subscription();
  29. connection.add(this.source
  30. .subscribe(new ConnectableSubscriber(this.getSubject(), this)));
  31. if (connection.closed) {
  32. this._connection = null;
  33. connection = Subscription.EMPTY;
  34. }
  35. else {
  36. this._connection = connection;
  37. }
  38. }
  39. return connection;
  40. }
  41. refCount() {
  42. return higherOrderRefCount()(this);
  43. }
  44. }
  45. const connectableProto = ConnectableObservable.prototype;
  46. export const connectableObservableDescriptor = {
  47. operator: { value: null },
  48. _refCount: { value: 0, writable: true },
  49. _subject: { value: null, writable: true },
  50. _connection: { value: null, writable: true },
  51. _subscribe: { value: connectableProto._subscribe },
  52. _isComplete: { value: connectableProto._isComplete, writable: true },
  53. getSubject: { value: connectableProto.getSubject },
  54. connect: { value: connectableProto.connect },
  55. refCount: { value: connectableProto.refCount }
  56. };
  57. class ConnectableSubscriber extends SubjectSubscriber {
  58. constructor(destination, connectable) {
  59. super(destination);
  60. this.connectable = connectable;
  61. }
  62. _error(err) {
  63. this._unsubscribe();
  64. super._error(err);
  65. }
  66. _complete() {
  67. this.connectable._isComplete = true;
  68. this._unsubscribe();
  69. super._complete();
  70. }
  71. _unsubscribe() {
  72. const connectable = this.connectable;
  73. if (connectable) {
  74. this.connectable = null;
  75. const connection = connectable._connection;
  76. connectable._refCount = 0;
  77. connectable._subject = null;
  78. connectable._connection = null;
  79. if (connection) {
  80. connection.unsubscribe();
  81. }
  82. }
  83. }
  84. }
  85. class RefCountOperator {
  86. constructor(connectable) {
  87. this.connectable = connectable;
  88. }
  89. call(subscriber, source) {
  90. const { connectable } = this;
  91. connectable._refCount++;
  92. const refCounter = new RefCountSubscriber(subscriber, connectable);
  93. const subscription = source.subscribe(refCounter);
  94. if (!refCounter.closed) {
  95. refCounter.connection = connectable.connect();
  96. }
  97. return subscription;
  98. }
  99. }
  100. class RefCountSubscriber extends Subscriber {
  101. constructor(destination, connectable) {
  102. super(destination);
  103. this.connectable = connectable;
  104. }
  105. _unsubscribe() {
  106. const { connectable } = this;
  107. if (!connectable) {
  108. this.connection = null;
  109. return;
  110. }
  111. this.connectable = null;
  112. const refCount = connectable._refCount;
  113. if (refCount <= 0) {
  114. this.connection = null;
  115. return;
  116. }
  117. connectable._refCount = refCount - 1;
  118. if (refCount > 1) {
  119. this.connection = null;
  120. return;
  121. }
  122. const { connection } = this;
  123. const sharedConnection = connectable._connection;
  124. this.connection = null;
  125. if (sharedConnection && (!connection || sharedConnection === connection)) {
  126. sharedConnection.unsubscribe();
  127. }
  128. }
  129. }
  130. //# sourceMappingURL=ConnectableObservable.js.map