You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

index.js 7.6KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296
  1. var fs = require('fs');
  2. var util = require('util');
  3. var stream = require('stream');
  4. var Readable = stream.Readable;
  5. var Writable = stream.Writable;
  6. var PassThrough = stream.PassThrough;
  7. var Pend = require('pend');
  8. var EventEmitter = require('events').EventEmitter;
  9. exports.createFromBuffer = createFromBuffer;
  10. exports.createFromFd = createFromFd;
  11. exports.BufferSlicer = BufferSlicer;
  12. exports.FdSlicer = FdSlicer;
  13. util.inherits(FdSlicer, EventEmitter);
  14. function FdSlicer(fd, options) {
  15. options = options || {};
  16. EventEmitter.call(this);
  17. this.fd = fd;
  18. this.pend = new Pend();
  19. this.pend.max = 1;
  20. this.refCount = 0;
  21. this.autoClose = !!options.autoClose;
  22. }
  23. FdSlicer.prototype.read = function(buffer, offset, length, position, callback) {
  24. var self = this;
  25. self.pend.go(function(cb) {
  26. fs.read(self.fd, buffer, offset, length, position, function(err, bytesRead, buffer) {
  27. cb();
  28. callback(err, bytesRead, buffer);
  29. });
  30. });
  31. };
  32. FdSlicer.prototype.write = function(buffer, offset, length, position, callback) {
  33. var self = this;
  34. self.pend.go(function(cb) {
  35. fs.write(self.fd, buffer, offset, length, position, function(err, written, buffer) {
  36. cb();
  37. callback(err, written, buffer);
  38. });
  39. });
  40. };
  41. FdSlicer.prototype.createReadStream = function(options) {
  42. return new ReadStream(this, options);
  43. };
  44. FdSlicer.prototype.createWriteStream = function(options) {
  45. return new WriteStream(this, options);
  46. };
  47. FdSlicer.prototype.ref = function() {
  48. this.refCount += 1;
  49. };
  50. FdSlicer.prototype.unref = function() {
  51. var self = this;
  52. self.refCount -= 1;
  53. if (self.refCount > 0) return;
  54. if (self.refCount < 0) throw new Error("invalid unref");
  55. if (self.autoClose) {
  56. fs.close(self.fd, onCloseDone);
  57. }
  58. function onCloseDone(err) {
  59. if (err) {
  60. self.emit('error', err);
  61. } else {
  62. self.emit('close');
  63. }
  64. }
  65. };
  66. util.inherits(ReadStream, Readable);
  67. function ReadStream(context, options) {
  68. options = options || {};
  69. Readable.call(this, options);
  70. this.context = context;
  71. this.context.ref();
  72. this.start = options.start || 0;
  73. this.endOffset = options.end;
  74. this.pos = this.start;
  75. this.destroyed = false;
  76. }
  77. ReadStream.prototype._read = function(n) {
  78. var self = this;
  79. if (self.destroyed) return;
  80. var toRead = Math.min(self._readableState.highWaterMark, n);
  81. if (self.endOffset != null) {
  82. toRead = Math.min(toRead, self.endOffset - self.pos);
  83. }
  84. if (toRead <= 0) {
  85. self.destroyed = true;
  86. self.push(null);
  87. self.context.unref();
  88. return;
  89. }
  90. self.context.pend.go(function(cb) {
  91. if (self.destroyed) return cb();
  92. var buffer = new Buffer(toRead);
  93. fs.read(self.context.fd, buffer, 0, toRead, self.pos, function(err, bytesRead) {
  94. if (err) {
  95. self.destroy(err);
  96. } else if (bytesRead === 0) {
  97. self.destroyed = true;
  98. self.push(null);
  99. self.context.unref();
  100. } else {
  101. self.pos += bytesRead;
  102. self.push(buffer.slice(0, bytesRead));
  103. }
  104. cb();
  105. });
  106. });
  107. };
  108. ReadStream.prototype.destroy = function(err) {
  109. if (this.destroyed) return;
  110. err = err || new Error("stream destroyed");
  111. this.destroyed = true;
  112. this.emit('error', err);
  113. this.context.unref();
  114. };
  115. util.inherits(WriteStream, Writable);
  116. function WriteStream(context, options) {
  117. options = options || {};
  118. Writable.call(this, options);
  119. this.context = context;
  120. this.context.ref();
  121. this.start = options.start || 0;
  122. this.endOffset = (options.end == null) ? Infinity : +options.end;
  123. this.bytesWritten = 0;
  124. this.pos = this.start;
  125. this.destroyed = false;
  126. this.on('finish', this.destroy.bind(this));
  127. }
  128. WriteStream.prototype._write = function(buffer, encoding, callback) {
  129. var self = this;
  130. if (self.destroyed) return;
  131. if (self.pos + buffer.length > self.endOffset) {
  132. var err = new Error("maximum file length exceeded");
  133. err.code = 'ETOOBIG';
  134. self.destroy();
  135. callback(err);
  136. return;
  137. }
  138. self.context.pend.go(function(cb) {
  139. if (self.destroyed) return cb();
  140. fs.write(self.context.fd, buffer, 0, buffer.length, self.pos, function(err, bytes) {
  141. if (err) {
  142. self.destroy();
  143. cb();
  144. callback(err);
  145. } else {
  146. self.bytesWritten += bytes;
  147. self.pos += bytes;
  148. self.emit('progress');
  149. cb();
  150. callback();
  151. }
  152. });
  153. });
  154. };
  155. WriteStream.prototype.destroy = function() {
  156. if (this.destroyed) return;
  157. this.destroyed = true;
  158. this.context.unref();
  159. };
  160. util.inherits(BufferSlicer, EventEmitter);
  161. function BufferSlicer(buffer, options) {
  162. EventEmitter.call(this);
  163. options = options || {};
  164. this.refCount = 0;
  165. this.buffer = buffer;
  166. this.maxChunkSize = options.maxChunkSize || Number.MAX_SAFE_INTEGER;
  167. }
  168. BufferSlicer.prototype.read = function(buffer, offset, length, position, callback) {
  169. var end = position + length;
  170. var delta = end - this.buffer.length;
  171. var written = (delta > 0) ? delta : length;
  172. this.buffer.copy(buffer, offset, position, end);
  173. setImmediate(function() {
  174. callback(null, written);
  175. });
  176. };
  177. BufferSlicer.prototype.write = function(buffer, offset, length, position, callback) {
  178. buffer.copy(this.buffer, position, offset, offset + length);
  179. setImmediate(function() {
  180. callback(null, length, buffer);
  181. });
  182. };
  183. BufferSlicer.prototype.createReadStream = function(options) {
  184. options = options || {};
  185. var readStream = new PassThrough(options);
  186. readStream.destroyed = false;
  187. readStream.start = options.start || 0;
  188. readStream.endOffset = options.end;
  189. // by the time this function returns, we'll be done.
  190. readStream.pos = readStream.endOffset || this.buffer.length;
  191. // respect the maxChunkSize option to slice up the chunk into smaller pieces.
  192. var entireSlice = this.buffer.slice(readStream.start, readStream.pos);
  193. var offset = 0;
  194. while (true) {
  195. var nextOffset = offset + this.maxChunkSize;
  196. if (nextOffset >= entireSlice.length) {
  197. // last chunk
  198. if (offset < entireSlice.length) {
  199. readStream.write(entireSlice.slice(offset, entireSlice.length));
  200. }
  201. break;
  202. }
  203. readStream.write(entireSlice.slice(offset, nextOffset));
  204. offset = nextOffset;
  205. }
  206. readStream.end();
  207. readStream.destroy = function() {
  208. readStream.destroyed = true;
  209. };
  210. return readStream;
  211. };
  212. BufferSlicer.prototype.createWriteStream = function(options) {
  213. var bufferSlicer = this;
  214. options = options || {};
  215. var writeStream = new Writable(options);
  216. writeStream.start = options.start || 0;
  217. writeStream.endOffset = (options.end == null) ? this.buffer.length : +options.end;
  218. writeStream.bytesWritten = 0;
  219. writeStream.pos = writeStream.start;
  220. writeStream.destroyed = false;
  221. writeStream._write = function(buffer, encoding, callback) {
  222. if (writeStream.destroyed) return;
  223. var end = writeStream.pos + buffer.length;
  224. if (end > writeStream.endOffset) {
  225. var err = new Error("maximum file length exceeded");
  226. err.code = 'ETOOBIG';
  227. writeStream.destroyed = true;
  228. callback(err);
  229. return;
  230. }
  231. buffer.copy(bufferSlicer.buffer, writeStream.pos, 0, buffer.length);
  232. writeStream.bytesWritten += buffer.length;
  233. writeStream.pos = end;
  234. writeStream.emit('progress');
  235. callback();
  236. };
  237. writeStream.destroy = function() {
  238. writeStream.destroyed = true;
  239. };
  240. return writeStream;
  241. };
  242. BufferSlicer.prototype.ref = function() {
  243. this.refCount += 1;
  244. };
  245. BufferSlicer.prototype.unref = function() {
  246. this.refCount -= 1;
  247. if (this.refCount < 0) {
  248. throw new Error("invalid unref");
  249. }
  250. };
  251. function createFromBuffer(buffer, options) {
  252. return new BufferSlicer(buffer, options);
  253. }
  254. function createFromFd(fd, options) {
  255. return new FdSlicer(fd, options);
  256. }