// FIXME: Store Promises in the peekQueue instead? Or would this make it more difficult to deal with concurrent peeks/reads? And should the peek spec be changed to account for this?
returnPromise.try(()=>{
returnonRequest();
}).then((result)=>{
peekQueue.push({type:"value",value:result});
}).catch((error)=>{
peekQueue.push({type:"error",error:error});
}).then(()=>{
// FIXME: What if there's an EndOfStream marker being produced? Or an abort? Or some other sort of failure? Shouldn't that return/produce something *other than* `true`?
returntrue;
});
}
});
},
read:functionproduceValue_simpleSource(){
returnPromise.try(()=>{
if(peekQueue.length>0){
/* FIXME: Move all this logic out into an itemBuffer abstraction of some sort (also useful in from-node-stream?) */
letitem=peekQueue.shift();
if(item.type==="value"){
returnitem.value;
}elseif(item.type==="error"){
throwitem.error;
}else{
thrownewError(`Found a queue item of type '${item.type}'; this is a bug in @ppstreams/simple-source, please report it`);
}
}else{
if(errorReason!=null){
if(errorReason===true){
thrownewAborted("Stream was aborted");
}elseif(errorReasoninstanceofError){
thrownewAborted(`Stream was aborted due to error: ${errorReason.message}`,{reason:errorReason});
}
}else{
returnonRequest();
}
}
});
},
abort:functionabort_simpleSource(reason){
returnPromise.try(()=>{
if(errorReason==null){
if(reason===true||reasoninstanceofError){
errorReason=reason;
if(onAbort!=null){
returnonAbort(reason);
}
}else{
thrownewError("You must specify a reason (either `true` or an Error object) when aborting a stream");
}
}else{
// FIXME: Require this behaviour in the spec? Or is there a composability-related reason to permit/require quietly ignoring this, to make it idempotent?