Ohm-Management - Projektarbeit B-ME
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

_stream_writable.js 16KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551
  1. // A bit simpler than readable streams.
  2. // Implement an async ._write(chunk, encoding, cb), and it'll handle all
  3. // the drain event emission and buffering.
  4. 'use strict';
  5. module.exports = Writable;
  6. /*<replacement>*/
  7. var processNextTick = require('process-nextick-args');
  8. /*</replacement>*/
  9. /*<replacement>*/
  10. var asyncWrite = !process.browser && ['v0.10', 'v0.9.'].indexOf(process.version.slice(0, 5)) > -1 ? setImmediate : processNextTick;
  11. /*</replacement>*/
  12. /*<replacement>*/
  13. var Duplex;
  14. /*</replacement>*/
  15. Writable.WritableState = WritableState;
  16. /*<replacement>*/
  17. var util = require('core-util-is');
  18. util.inherits = require('inherits');
  19. /*</replacement>*/
  20. /*<replacement>*/
  21. var internalUtil = {
  22. deprecate: require('util-deprecate')
  23. };
  24. /*</replacement>*/
  25. /*<replacement>*/
  26. var Stream;
  27. (function () {
  28. try {
  29. Stream = require('st' + 'ream');
  30. } catch (_) {} finally {
  31. if (!Stream) Stream = require('events').EventEmitter;
  32. }
  33. })();
  34. /*</replacement>*/
  35. var Buffer = require('buffer').Buffer;
  36. /*<replacement>*/
  37. var bufferShim = require('buffer-shims');
  38. /*</replacement>*/
  39. util.inherits(Writable, Stream);
  40. function nop() {}
  41. function WriteReq(chunk, encoding, cb) {
  42. this.chunk = chunk;
  43. this.encoding = encoding;
  44. this.callback = cb;
  45. this.next = null;
  46. }
  47. function WritableState(options, stream) {
  48. Duplex = Duplex || require('./_stream_duplex');
  49. options = options || {};
  50. // object stream flag to indicate whether or not this stream
  51. // contains buffers or objects.
  52. this.objectMode = !!options.objectMode;
  53. if (stream instanceof Duplex) this.objectMode = this.objectMode || !!options.writableObjectMode;
  54. // the point at which write() starts returning false
  55. // Note: 0 is a valid value, means that we always return false if
  56. // the entire buffer is not flushed immediately on write()
  57. var hwm = options.highWaterMark;
  58. var defaultHwm = this.objectMode ? 16 : 16 * 1024;
  59. this.highWaterMark = hwm || hwm === 0 ? hwm : defaultHwm;
  60. // cast to ints.
  61. this.highWaterMark = ~~this.highWaterMark;
  62. // drain event flag.
  63. this.needDrain = false;
  64. // at the start of calling end()
  65. this.ending = false;
  66. // when end() has been called, and returned
  67. this.ended = false;
  68. // when 'finish' is emitted
  69. this.finished = false;
  70. // should we decode strings into buffers before passing to _write?
  71. // this is here so that some node-core streams can optimize string
  72. // handling at a lower level.
  73. var noDecode = options.decodeStrings === false;
  74. this.decodeStrings = !noDecode;
  75. // Crypto is kind of old and crusty. Historically, its default string
  76. // encoding is 'binary' so we have to make this configurable.
  77. // Everything else in the universe uses 'utf8', though.
  78. this.defaultEncoding = options.defaultEncoding || 'utf8';
  79. // not an actual buffer we keep track of, but a measurement
  80. // of how much we're waiting to get pushed to some underlying
  81. // socket or file.
  82. this.length = 0;
  83. // a flag to see when we're in the middle of a write.
  84. this.writing = false;
  85. // when true all writes will be buffered until .uncork() call
  86. this.corked = 0;
  87. // a flag to be able to tell if the onwrite cb is called immediately,
  88. // or on a later tick. We set this to true at first, because any
  89. // actions that shouldn't happen until "later" should generally also
  90. // not happen before the first write call.
  91. this.sync = true;
  92. // a flag to know if we're processing previously buffered items, which
  93. // may call the _write() callback in the same tick, so that we don't
  94. // end up in an overlapped onwrite situation.
  95. this.bufferProcessing = false;
  96. // the callback that's passed to _write(chunk,cb)
  97. this.onwrite = function (er) {
  98. onwrite(stream, er);
  99. };
  100. // the callback that the user supplies to write(chunk,encoding,cb)
  101. this.writecb = null;
  102. // the amount that is being written when _write is called.
  103. this.writelen = 0;
  104. this.bufferedRequest = null;
  105. this.lastBufferedRequest = null;
  106. // number of pending user-supplied write callbacks
  107. // this must be 0 before 'finish' can be emitted
  108. this.pendingcb = 0;
  109. // emit prefinish if the only thing we're waiting for is _write cbs
  110. // This is relevant for synchronous Transform streams
  111. this.prefinished = false;
  112. // True if the error was already emitted and should not be thrown again
  113. this.errorEmitted = false;
  114. // count buffered requests
  115. this.bufferedRequestCount = 0;
  116. // allocate the first CorkedRequest, there is always
  117. // one allocated and free to use, and we maintain at most two
  118. this.corkedRequestsFree = new CorkedRequest(this);
  119. }
  120. WritableState.prototype.getBuffer = function getBuffer() {
  121. var current = this.bufferedRequest;
  122. var out = [];
  123. while (current) {
  124. out.push(current);
  125. current = current.next;
  126. }
  127. return out;
  128. };
  129. (function () {
  130. try {
  131. Object.defineProperty(WritableState.prototype, 'buffer', {
  132. get: internalUtil.deprecate(function () {
  133. return this.getBuffer();
  134. }, '_writableState.buffer is deprecated. Use _writableState.getBuffer ' + 'instead.')
  135. });
  136. } catch (_) {}
  137. })();
  138. // Test _writableState for inheritance to account for Duplex streams,
  139. // whose prototype chain only points to Readable.
  140. var realHasInstance;
  141. if (typeof Symbol === 'function' && Symbol.hasInstance && typeof Function.prototype[Symbol.hasInstance] === 'function') {
  142. realHasInstance = Function.prototype[Symbol.hasInstance];
  143. Object.defineProperty(Writable, Symbol.hasInstance, {
  144. value: function (object) {
  145. if (realHasInstance.call(this, object)) return true;
  146. return object && object._writableState instanceof WritableState;
  147. }
  148. });
  149. } else {
  150. realHasInstance = function (object) {
  151. return object instanceof this;
  152. };
  153. }
  154. function Writable(options) {
  155. Duplex = Duplex || require('./_stream_duplex');
  156. // Writable ctor is applied to Duplexes, too.
  157. // `realHasInstance` is necessary because using plain `instanceof`
  158. // would return false, as no `_writableState` property is attached.
  159. // Trying to use the custom `instanceof` for Writable here will also break the
  160. // Node.js LazyTransform implementation, which has a non-trivial getter for
  161. // `_writableState` that would lead to infinite recursion.
  162. if (!realHasInstance.call(Writable, this) && !(this instanceof Duplex)) {
  163. return new Writable(options);
  164. }
  165. this._writableState = new WritableState(options, this);
  166. // legacy.
  167. this.writable = true;
  168. if (options) {
  169. if (typeof options.write === 'function') this._write = options.write;
  170. if (typeof options.writev === 'function') this._writev = options.writev;
  171. }
  172. Stream.call(this);
  173. }
  174. // Otherwise people can pipe Writable streams, which is just wrong.
  175. Writable.prototype.pipe = function () {
  176. this.emit('error', new Error('Cannot pipe, not readable'));
  177. };
  178. function writeAfterEnd(stream, cb) {
  179. var er = new Error('write after end');
  180. // TODO: defer error events consistently everywhere, not just the cb
  181. stream.emit('error', er);
  182. processNextTick(cb, er);
  183. }
  184. // Checks that a user-supplied chunk is valid, especially for the particular
  185. // mode the stream is in. Currently this means that `null` is never accepted
  186. // and undefined/non-string values are only allowed in object mode.
  187. function validChunk(stream, state, chunk, cb) {
  188. var valid = true;
  189. var er = false;
  190. if (chunk === null) {
  191. er = new TypeError('May not write null values to stream');
  192. } else if (typeof chunk !== 'string' && chunk !== undefined && !state.objectMode) {
  193. er = new TypeError('Invalid non-string/buffer chunk');
  194. }
  195. if (er) {
  196. stream.emit('error', er);
  197. processNextTick(cb, er);
  198. valid = false;
  199. }
  200. return valid;
  201. }
  202. Writable.prototype.write = function (chunk, encoding, cb) {
  203. var state = this._writableState;
  204. var ret = false;
  205. var isBuf = Buffer.isBuffer(chunk);
  206. if (typeof encoding === 'function') {
  207. cb = encoding;
  208. encoding = null;
  209. }
  210. if (isBuf) encoding = 'buffer';else if (!encoding) encoding = state.defaultEncoding;
  211. if (typeof cb !== 'function') cb = nop;
  212. if (state.ended) writeAfterEnd(this, cb);else if (isBuf || validChunk(this, state, chunk, cb)) {
  213. state.pendingcb++;
  214. ret = writeOrBuffer(this, state, isBuf, chunk, encoding, cb);
  215. }
  216. return ret;
  217. };
  218. Writable.prototype.cork = function () {
  219. var state = this._writableState;
  220. state.corked++;
  221. };
  222. Writable.prototype.uncork = function () {
  223. var state = this._writableState;
  224. if (state.corked) {
  225. state.corked--;
  226. if (!state.writing && !state.corked && !state.finished && !state.bufferProcessing && state.bufferedRequest) clearBuffer(this, state);
  227. }
  228. };
  229. Writable.prototype.setDefaultEncoding = function setDefaultEncoding(encoding) {
  230. // node::ParseEncoding() requires lower case.
  231. if (typeof encoding === 'string') encoding = encoding.toLowerCase();
  232. if (!(['hex', 'utf8', 'utf-8', 'ascii', 'binary', 'base64', 'ucs2', 'ucs-2', 'utf16le', 'utf-16le', 'raw'].indexOf((encoding + '').toLowerCase()) > -1)) throw new TypeError('Unknown encoding: ' + encoding);
  233. this._writableState.defaultEncoding = encoding;
  234. return this;
  235. };
  236. function decodeChunk(state, chunk, encoding) {
  237. if (!state.objectMode && state.decodeStrings !== false && typeof chunk === 'string') {
  238. chunk = bufferShim.from(chunk, encoding);
  239. }
  240. return chunk;
  241. }
  242. // if we're already writing something, then just put this
  243. // in the queue, and wait our turn. Otherwise, call _write
  244. // If we return false, then we need a drain event, so set that flag.
  245. function writeOrBuffer(stream, state, isBuf, chunk, encoding, cb) {
  246. if (!isBuf) {
  247. chunk = decodeChunk(state, chunk, encoding);
  248. if (Buffer.isBuffer(chunk)) encoding = 'buffer';
  249. }
  250. var len = state.objectMode ? 1 : chunk.length;
  251. state.length += len;
  252. var ret = state.length < state.highWaterMark;
  253. // we must ensure that previous needDrain will not be reset to false.
  254. if (!ret) state.needDrain = true;
  255. if (state.writing || state.corked) {
  256. var last = state.lastBufferedRequest;
  257. state.lastBufferedRequest = new WriteReq(chunk, encoding, cb);
  258. if (last) {
  259. last.next = state.lastBufferedRequest;
  260. } else {
  261. state.bufferedRequest = state.lastBufferedRequest;
  262. }
  263. state.bufferedRequestCount += 1;
  264. } else {
  265. doWrite(stream, state, false, len, chunk, encoding, cb);
  266. }
  267. return ret;
  268. }
  269. function doWrite(stream, state, writev, len, chunk, encoding, cb) {
  270. state.writelen = len;
  271. state.writecb = cb;
  272. state.writing = true;
  273. state.sync = true;
  274. if (writev) stream._writev(chunk, state.onwrite);else stream._write(chunk, encoding, state.onwrite);
  275. state.sync = false;
  276. }
  277. function onwriteError(stream, state, sync, er, cb) {
  278. --state.pendingcb;
  279. if (sync) processNextTick(cb, er);else cb(er);
  280. stream._writableState.errorEmitted = true;
  281. stream.emit('error', er);
  282. }
  283. function onwriteStateUpdate(state) {
  284. state.writing = false;
  285. state.writecb = null;
  286. state.length -= state.writelen;
  287. state.writelen = 0;
  288. }
  289. function onwrite(stream, er) {
  290. var state = stream._writableState;
  291. var sync = state.sync;
  292. var cb = state.writecb;
  293. onwriteStateUpdate(state);
  294. if (er) onwriteError(stream, state, sync, er, cb);else {
  295. // Check if we're actually ready to finish, but don't emit yet
  296. var finished = needFinish(state);
  297. if (!finished && !state.corked && !state.bufferProcessing && state.bufferedRequest) {
  298. clearBuffer(stream, state);
  299. }
  300. if (sync) {
  301. /*<replacement>*/
  302. asyncWrite(afterWrite, stream, state, finished, cb);
  303. /*</replacement>*/
  304. } else {
  305. afterWrite(stream, state, finished, cb);
  306. }
  307. }
  308. }
  309. function afterWrite(stream, state, finished, cb) {
  310. if (!finished) onwriteDrain(stream, state);
  311. state.pendingcb--;
  312. cb();
  313. finishMaybe(stream, state);
  314. }
  315. // Must force callback to be called on nextTick, so that we don't
  316. // emit 'drain' before the write() consumer gets the 'false' return
  317. // value, and has a chance to attach a 'drain' listener.
  318. function onwriteDrain(stream, state) {
  319. if (state.length === 0 && state.needDrain) {
  320. state.needDrain = false;
  321. stream.emit('drain');
  322. }
  323. }
  324. // if there's something in the buffer waiting, then process it
  325. function clearBuffer(stream, state) {
  326. state.bufferProcessing = true;
  327. var entry = state.bufferedRequest;
  328. if (stream._writev && entry && entry.next) {
  329. // Fast case, write everything using _writev()
  330. var l = state.bufferedRequestCount;
  331. var buffer = new Array(l);
  332. var holder = state.corkedRequestsFree;
  333. holder.entry = entry;
  334. var count = 0;
  335. while (entry) {
  336. buffer[count] = entry;
  337. entry = entry.next;
  338. count += 1;
  339. }
  340. doWrite(stream, state, true, state.length, buffer, '', holder.finish);
  341. // doWrite is almost always async, defer these to save a bit of time
  342. // as the hot path ends with doWrite
  343. state.pendingcb++;
  344. state.lastBufferedRequest = null;
  345. if (holder.next) {
  346. state.corkedRequestsFree = holder.next;
  347. holder.next = null;
  348. } else {
  349. state.corkedRequestsFree = new CorkedRequest(state);
  350. }
  351. } else {
  352. // Slow case, write chunks one-by-one
  353. while (entry) {
  354. var chunk = entry.chunk;
  355. var encoding = entry.encoding;
  356. var cb = entry.callback;
  357. var len = state.objectMode ? 1 : chunk.length;
  358. doWrite(stream, state, false, len, chunk, encoding, cb);
  359. entry = entry.next;
  360. // if we didn't call the onwrite immediately, then
  361. // it means that we need to wait until it does.
  362. // also, that means that the chunk and cb are currently
  363. // being processed, so move the buffer counter past them.
  364. if (state.writing) {
  365. break;
  366. }
  367. }
  368. if (entry === null) state.lastBufferedRequest = null;
  369. }
  370. state.bufferedRequestCount = 0;
  371. state.bufferedRequest = entry;
  372. state.bufferProcessing = false;
  373. }
  374. Writable.prototype._write = function (chunk, encoding, cb) {
  375. cb(new Error('_write() is not implemented'));
  376. };
  377. Writable.prototype._writev = null;
  378. Writable.prototype.end = function (chunk, encoding, cb) {
  379. var state = this._writableState;
  380. if (typeof chunk === 'function') {
  381. cb = chunk;
  382. chunk = null;
  383. encoding = null;
  384. } else if (typeof encoding === 'function') {
  385. cb = encoding;
  386. encoding = null;
  387. }
  388. if (chunk !== null && chunk !== undefined) this.write(chunk, encoding);
  389. // .end() fully uncorks
  390. if (state.corked) {
  391. state.corked = 1;
  392. this.uncork();
  393. }
  394. // ignore unnecessary end() calls.
  395. if (!state.ending && !state.finished) endWritable(this, state, cb);
  396. };
  397. function needFinish(state) {
  398. return state.ending && state.length === 0 && state.bufferedRequest === null && !state.finished && !state.writing;
  399. }
  400. function prefinish(stream, state) {
  401. if (!state.prefinished) {
  402. state.prefinished = true;
  403. stream.emit('prefinish');
  404. }
  405. }
  406. function finishMaybe(stream, state) {
  407. var need = needFinish(state);
  408. if (need) {
  409. if (state.pendingcb === 0) {
  410. prefinish(stream, state);
  411. state.finished = true;
  412. stream.emit('finish');
  413. } else {
  414. prefinish(stream, state);
  415. }
  416. }
  417. return need;
  418. }
  419. function endWritable(stream, state, cb) {
  420. state.ending = true;
  421. finishMaybe(stream, state);
  422. if (cb) {
  423. if (state.finished) processNextTick(cb);else stream.once('finish', cb);
  424. }
  425. state.ended = true;
  426. stream.writable = false;
  427. }
  428. // It seems a linked list but it is not
  429. // there will be only 2 of these for each stream
  430. function CorkedRequest(state) {
  431. var _this = this;
  432. this.next = null;
  433. this.entry = null;
  434. this.finish = function (err) {
  435. var entry = _this.entry;
  436. _this.entry = null;
  437. while (entry) {
  438. var cb = entry.callback;
  439. state.pendingcb--;
  440. cb(err);
  441. entry = entry.next;
  442. }
  443. if (state.corkedRequestsFree) {
  444. state.corkedRequestsFree.next = _this;
  445. } else {
  446. state.corkedRequestsFree = _this;
  447. }
  448. };
  449. }