/** @module selectiveRepeat */
import {
AbstractARQReceiver,
AbstractWindowedSender
} from "./abstractNodes.js";
import Packet from "./Packet.js";
/**
* A class that implements the sender of *selective-repeat*.
* @extends AbstractWindowedSender
*/
export class SRSender extends AbstractWindowedSender{
/**
* Creates a new SRSender instance.
* @param {Object} options - The constructor options.
* @param {SRReceiver} options.receiver - The *selective-repeat* receiver.
* @param {number} options.timeout - The time to wait for an acknowledgment
* from the receiver.
* @param {Channel} options.channel - The channel through which to send the
* packets.
* @param {number} options.windowSize - The size of the sliding window.
*/
constructor({
receiver,
timeout,
channel,
windowSize
}){
super({
receiver,
timeout,
channel,
windowSize
});
}
/**
* Tests whether a received packet is correct or not. A packet is correct if
* it's an acknowledgment, it's not corrupted and, the acknowledged sequence
* was sent and has not been confirmed already, or the acknowledgment is
* cumulative and the acknowledged sequence is in the current window of sent
* packets.
* @param {Packet} packet - The packet to test.
* @returns {boolean} - true if the packet is correct, false otherwise.
* @protected
* @override
*/
_checkReceivedPkt(packet){
// A sequence that was sent and has not been confirmed already has a
// timeout.
const {isCorrupted, isAck, isCAck, ackNum} = packet;
return !isCorrupted && isAck && (isCAck && this._isInExpectedRange(ackNum)
|| this._isTimeoutSet(ackNum));
}
/**
* Processes a correctly received packet, confirming the acknowledged
* sequence and all previous unacknowledged sent sequences if the packet is
* a cumulative ack. Moves the window if the base sequence of the window is
* confirmed.
* @param {Packet} packet - The packet to process.
* @protected
* @override
*/
_processExpectedPkt(packet){
const {ackNum, isCAck} = packet;
const {base, nextSeqNum} = this;
let seqNum = isCAck ? base : ackNum;
const nextToAckNum = this._nextSequence(ackNum)
for(; seqNum!=nextToAckNum; seqNum=this._nextSequence(seqNum)){
if(this._isTimeoutSet(seqNum)){
this._unsetTimeout(seqNum);
this.onPktConfirmed(seqNum);
}
}
if(ackNum == base || isCAck){
// Move the window
let newBase = base;
while(newBase != nextSeqNum && !this._isTimeoutSet(newBase))
newBase = this._nextSequence(newBase);
this._setBase(newBase);
}
}
/**
* Sends a packet and sets its timeout.
* @protected
* @override
*/
_processSending(){
const {nextSeqNum} = this;
this._sendPkt(new Packet({
sender: this,
receiver: this._receiver,
seqNum: nextSeqNum
}));
this._setTimeout(nextSeqNum);
this._nextSeqNum = this._nextSequence(nextSeqNum);
}
/**
* Resends the packet corresponding to seqNum and resets its timeout.
* @param {number} seqNum - The sequence number for which the timeout passed.
* @protected
* @override
*/
_onTimeout(seqNum){
this._sendPkt(new Packet({
sender: this,
receiver: this._receiver,
seqNum,
wasReSent: true
}));
this._setTimeout(seqNum);
}
}
/**
* A class that implements the receiver of *selective-repeat*.
* @extends AbstractARQReceiver
*/
export class SRReceiver extends AbstractARQReceiver{
/**
* @member {number} - The base sequence number of the window.
* @private
*/
_base = 0;
/**
* @member {number} - The size of the sliding window.
* @private
*/
_windowSize;
/**
* @member {boolean} - If it's true, then the instance will use cumulative
* acks.
* @private
*/
_useCAck;
/**
* @member {Array} - An array of booleans were _buffer[seqNum] is true or
* false if seqNum was received or not respectively.
* @private
*/
_buffer;
/**
* Creates a new SRReceiver instance.
* @param {Object} options - The constructor options.
* @param {Channel} options.channel - The channel through which to send the
* acknowledgments.
* @param {number} options.windowSize - The size of the sliding window.
* @param {boolean} [options.useCAck=false] - true if the instance should send
* cumulative acknowledgments.
*/
constructor({
channel,
windowSize,
useCAck = false
}){
const seqNumLimit = windowSize * 2;
super({
channel,
seqNumLimit
});
this._windowSize = windowSize
this._useCAck = useCAck;
this._buffer = new Array(seqNumLimit);
for(let i=0; i<seqNumLimit; i++)
this._buffer[i] = false;
}
/**
* @member {number} - The base sequence number of the sliding window.
* @readonly
*/
get base(){ return this._base; }
/**
* A callback that is called every time the sliding window is moved.
* @param {number} spaces - The number of spaces that the window was moved.
*/
onWindowMoved(spaces){
return;
}
/**
* Sets the base sequence of the window, moving it and therefore calling
* onWindowMoved().
* @param {number} newBase - The new base sequence of the window.
* @private
*/
_setBase(newBase){
let spaces = newBase - this._base;
if(newBase < this._base)
spaces += this._windowSize * 2
this._base = newBase;
this.onWindowMoved(spaces);
}
/**
* Tests whether seqNum is in the window or not.
* @param {number} seqNum - The sequence to test.
* @returns {boolean} - true if seqNum is in the window, false otherwise.
* @private
*/
_isInWindow(seqNum){
const {base} = this;
const windowLimit = (base + this._windowSize) % (this._windowSize * 2);
if(base < windowLimit)
return base <= seqNum && seqNum < windowLimit;
else if(base > windowLimit)
return !(windowLimit <= seqNum && seqNum < base);
return false;
}
/**
* Tests whether a received packet is correct or not. A packet is correct if
* its sequence is in the window and was not already received.
* @param {Packet} packet - The packet to test.
* @returns {boolean} - true if the packet is correct, false otherwise.
* @protected
* @override
*/
_checkReceivedPkt(packet){
const {isCorrupted, seqNum} = packet;
return !isCorrupted && !this._buffer[seqNum] &&
this._isInWindow(seqNum);
}
/**
* Processes a correctly received packet, moving the window if its sequence
* is the base sequence of the window, and sending the corresponding
* acknowledgment.
* @param {Packet} packet - The packet to process.
* @protected
* @override
*/
_processExpectedPkt(packet){
const {seqNum} = packet;
let {base} = this;
this._buffer[seqNum] = true;
let isCAck = false;
let ackNum = seqNum;
if(seqNum == base){
// Move the window
while(this._buffer[base]){
this._buffer[base] = false;
base = this._nextSequence(base);
}
this._setBase(base);
if(this._useCAck){
isCAck = true;
ackNum = this._previousSequence(base);
}
}
this._sendPkt(new Packet({
sender: this,
receiver: packet.sender,
isAck: true,
isCAck,
ackNum
}));
}
/**
* Processes an incorrectly received packet, sending an acknowledgment when
* it corresponds.
* @param {Packet} packet - The packet to process.
* @protected
* @override
*/
_processUnexpectedPkt(packet){
// Process packets that are corrupted, or are already in the buffer or
// are not in the window
const options = {
sender: this,
receiver: packet.sender,
isAck: true
};
const isInBuffer = this._buffer[packet.seqNum];
if(this._useCAck){
if(isInBuffer){
// The packet is a duplicate
options.ackNum = packet.seqNum;
}else{
// The packet is not in the buffer and, it is corrupted or is
// not in the window
options.isCAck = true;
options.ackNum = this._previousSequence(this._base);
}
this._sendPkt(new Packet(options));
}else{
if(isInBuffer || !this._isInWindow(packet.seqNum)){
// The packet is a duplicate or is older than the window
options.ackNum = packet.seqNum;
this._sendPkt(new Packet(options));
}
// If the packet is in the window, but not in the buffer and is
// corrupted, then we do nothing
}
}
}