Dive into secure and efficient coding practices with our curated list of the top 10 examples showcasing 'duplexify' in functional components in JavaScript. Our advanced machine learning engine meticulously scans each line of code, cross-referencing millions of open source libraries to ensure your implementation is not just functional, but also robust and secure. Elevate your React applications to new heights by mastering the art of handling side effects, API calls, and asynchronous operations with confidence and precision.
onCompleted: sinon.stub()
}
}
this.mocks.QueryExecutor = sinon.stub().returns(this.mocks.queryExecutor)
this.mocks.Responder = sinon.stub().returns(this.mocks.responder)
this.mocks.SubscribeCallbacks = sinon.stub().returns(this.mocks.subscribeCallbacks)
// proxyquire
this.DataHandler = proxyquire('../src/server/data-handler.js', {
'./active-subscriptions.js': this.mocks.activeSubscriptions,
'./query-executor.js': this.mocks.QueryExecutor,
'./responder.js': this.mocks.Responder,
'./subscribe-callbacks.js': this.mocks.SubscribeCallbacks
})
var toServer = through2.obj()
var toClient = through2.obj()
this.primusClient = duplexify.obj(toServer, toClient)
this.spark = duplexify.obj(toClient, toServer)
this.spark.id = 'sparkId'
})
emitError: true
}, options);
const forkStream = new ForkStream({
classifier: (file, cb) => cb(null, !!condition(file))
});
forkStream.a.pipe(conditionStream);
// merge-stream package cannot be updated because it emits the error
// from conditionStream to mergedStream
const mergedStream = mergeStream(forkStream.b, conditionStream);
const outStream = through2.obj();
mergedStream.pipe(outStream);
const duplexStream = duplexify.obj(forkStream, outStream);
if (options.emitError) {
conditionStream.on('error', err => duplexStream.emit('error', err));
}
return duplexStream;
}
};
? new NodeDefaultCryptographicMaterialsManager(cmm)
: cmm
const parseHeaderStream = new ParseHeaderStream(cmm)
const verifyStream = new VerifyStream({ maxBodySize })
const decipherStream = getDecipherStream()
/* pipeline will _either_ stream.destroy or the callback.
* decipherStream uses destroy to dispose the material.
* So I tack a pass though stream onto the end.
*/
pipeline(parseHeaderStream, verifyStream, decipherStream, new PassThrough(), (err: Error) => {
if (err) stream.emit('error', err)
})
const stream = new Duplexify(parseHeaderStream, decipherStream)
// Forward header events
parseHeaderStream
.once('MessageHeader', header => stream.emit('MessageHeader', header))
return stream
}
const getOrSetStream = (key, getStreamFn, opts = {}) => {
const proxy = duplexify()
const onError = (err) => {
proxy.destroy(err)
}
const getKey = opts.refresh
? () => Promise.reject(new KeyNotExistsError(key))
: () => get(key, { stream: true })
getKey()
.then((rs) => {
proxy.setReadable(rs)
// since already cached, we can fire 'finish' event
proxy.emit('finish')
})
.catch((err) => {
if (err instanceof KeyNotExistsError) {
cmm: KeyringNode|NodeMaterialsManager,
op: EncryptStreamInput = {}
): Duplex {
const { suiteId, encryptionContext = {}, frameLength = FRAME_LENGTH, plaintextLength } = op
/* Precondition: The frameLength must be less than the maximum frame size Node.js stream. */
needs(frameLength > 0 && Maximum.FRAME_SIZE >= frameLength, `frameLength out of bounds: 0 > frameLength >= ${Maximum.FRAME_SIZE}`)
/* If the cmm is a Keyring, wrap it with NodeDefaultCryptographicMaterialsManager. */
cmm = cmm instanceof KeyringNode
? new NodeDefaultCryptographicMaterialsManager(cmm)
: cmm
const suite = suiteId && new NodeAlgorithmSuite(suiteId)
const wrappingStream = new Duplexify()
cmm.getEncryptionMaterials({ suite, encryptionContext, plaintextLength })
.then(async (material) => {
const { dispose, getSigner } = getEncryptHelper(material)
const { getCipher, messageHeader, rawHeader } = getEncryptionInfo(material, frameLength)
wrappingStream.emit('MessageHeader', messageHeader)
const encryptStream = getFramedEncryptStream(getCipher, messageHeader, dispose, plaintextLength)
const signatureStream = new SignatureStream(getSigner)
pipeline(encryptStream, signatureStream)
wrappingStream.setReadable(signatureStream)
// Flush the rawHeader through the signatureStream
// defer piping, so consumer can attach event listeners
// otherwise we might lose events
process.nextTick(() => {
duplex.pipe(this._parser)
})
this._generator.on('error', this.emit.bind(this, 'error'))
this._parser.on('error', this.emit.bind(this, 'error'))
this.stream = duplex
duplex.on('error', this.emit.bind(this, 'error'))
duplex.on('close', this.emit.bind(this, 'close'))
Duplexify.call(this, this._generator, this._parser, { objectMode: true })
// MQTT.js basic default
if (opts.notData !== true) {
var that = this
this.once('data', function (connectPacket) {
that.setOptions(connectPacket)
that.on('data', emitPacket)
if (cb) {
cb()
}
that.emit('data', connectPacket)
})
}
}
GrpcService.prototype.requestWritableStream = function(protoOpts, reqOpts) {
var stream = protoOpts.stream = protoOpts.stream || duplexify.obj();
if (global.GCLOUD_SANDBOX_ENV) {
return stream;
}
var self = this;
if (!this.grpcCredentials) {
// We must establish an authClient to give to grpc.
this.getGrpcCredentials_(function(err, credentials) {
if (err) {
stream.destroy(err);
return;
}
self.grpcCredentials = credentials;
function createStream (script, opts_) {
var opts = opts_ || {}
var duplex = duplexify.obj(null, null)
function errback(err) {
if (err) duplex.destroy(err)
}
// Find the cscript binary. If we're on 64-bit Windows and 32-bit
// Node.js then prefer the native "Sysnative\cscript.exe", because
// otherwise we would be redirected to "SysWow64\cscript.exe" and
// then be unable to access the native registry (without resorting
// to the slower ExecMethod). See also win-detect-browsers#18.
wbin('cscript', { native: opts.native }, function(err, bin) {
if (err) return duplex.destroy(err)
var args = ['//Nologo', '//B', resolve(script)].concat(opts.args || [])
var child = execFile(bin, args, errback)
// special constructor treatment for native websockets in browsers, see
// https://github.com/maxogden/websocket-stream/issues/82
if (isNative && isBrowser) {
socket = new WS(target, protocols)
} else {
socket = new WS(target, protocols, options)
}
socket.binaryType = 'arraybuffer'
}
// was already open when passed in
if (socket.readyState === socket.OPEN) {
stream = proxy
} else {
stream = duplexify.obj()
socket.onopen = onopen
}
stream.socket = socket
socket.onclose = onclose
socket.onerror = onerror
socket.onmessage = onmessage
proxy.on('close', destroy)
var coerceToBuffer = !options.objectMode
function socketWriteNode(chunk, enc, next) {
// avoid errors, this never happens unless
// destroy() is called
function StreamChannels (opts, onchannel) {
if (!(this instanceof StreamChannels)) return new StreamChannels(opts, onchannel)
if (typeof opts === 'function') {
onchannel = opts
opts = null
}
if (!opts) opts = {}
duplexify.call(this)
var self = this
this.destroyed = false
this.limit = opts.limit || 1024
this.state = null // set by someone else. here for perf
this._outgoing = []
this._incoming = []
this._waiting = 0
this._encode = new Sink()
this._decode = lpstream.decode({allowEmpty: true, limit: opts.messageLimit || 5 * 1024 * 1024})
this._decode.on('data', parse)
this._keepAlive = 0
this._remoteKeepAlive = 0