Wait for acks before resending file fragments

This commit is contained in:
Louis-Antoine 2020-05-20 16:21:18 +02:00
parent 06060c02d3
commit bf00955786
3 changed files with 48 additions and 6 deletions

View file

@ -326,6 +326,7 @@ typedef struct
{
UINT8 fileid;
UINT32 filesize;
UINT8 iteration;
UINT32 position;
UINT16 size;
UINT8 data[0]; // Size is variable using hardware_MAXPACKETLENGTH
@ -340,6 +341,7 @@ typedef struct
typedef struct
{
UINT8 fileid;
UINT8 iteration;
UINT8 numsegments;
fileacksegment_t segments[0];
} ATTRPACK fileack_pak;

View file

@ -76,10 +76,13 @@ typedef struct filetx_s
typedef struct filetran_s
{
filetx_t *txlist; // Linked list of all files for the node
UINT8 iteration;
UINT8 ackediteration;
UINT32 position; // The current position in the file
boolean *ackedfragments;
UINT32 ackedsize;
FILE *currentfile; // The file currently being sent/received
tic_t dontsenduntil;
} filetran_t;
static filetran_t transfer[MAXNETNODES];
@ -986,20 +989,36 @@ void FileSendTicker(void)
else // Sending RAM
transfer[i].currentfile = (FILE *)1; // Set currentfile to a non-null value to indicate that it is open
transfer[i].iteration = 1;
transfer[i].ackediteration = 0;
transfer[i].position = 0;
transfer[i].ackedsize = 0;
transfer[i].ackedfragments = calloc(f->size / FILEFRAGMENTSIZE + 1, sizeof(*transfer[i].ackedfragments));
if (!transfer[i].ackedfragments)
I_Error("FileSendTicker: No more memory\n");
transfer[i].dontsenduntil = 0;
}
// If the client hasn't acknowledged any fragment from the previous iteration,
// it is most likely because their acks haven't had enough time to reach the server
// yet, due to latency. In that case, we wait a little to avoid useless resend.
if (I_GetTime() < transfer[i].dontsenduntil)
continue;
// Find the first non-acknowledged fragment
while (transfer[i].ackedfragments[transfer[i].position / FILEFRAGMENTSIZE])
{
transfer[i].position += FILEFRAGMENTSIZE;
if (transfer[i].position >= f->size)
{
if (transfer[i].ackediteration < transfer[i].iteration)
transfer[i].dontsenduntil = I_GetTime() + TICRATE / 2;
transfer[i].position = 0;
transfer[i].iteration++;
}
}
// Build a packet containing a file fragment
@ -1016,6 +1035,7 @@ void FileSendTicker(void)
if (fread(p->data, 1, fragmentsize, transfer[i].currentfile) != fragmentsize)
I_Error("FileSendTicker: can't read %s byte on %s at %d because %s", sizeu1(fragmentsize), f->id.filename, transfer[i].position, M_FileError(transfer[i].currentfile));
}
p->iteration = transfer[i].iteration;
p->position = LONG(transfer[i].position);
p->fileid = f->fileid;
p->filesize = LONG(f->size);
@ -1026,7 +1046,13 @@ void FileSendTicker(void)
{ // Success
transfer[i].position = (UINT32)(transfer[i].position + fragmentsize);
if (transfer[i].position >= f->size)
{
if (transfer[i].ackediteration < transfer[i].iteration)
transfer[i].dontsenduntil = I_GetTime() + TICRATE / 2;
transfer[i].position = 0;
transfer[i].iteration++;
}
}
else
{ // Not sent for some odd reason, retry at next call
@ -1053,6 +1079,13 @@ void PT_FileAck(void)
return;
}
if (packet->iteration > trans->ackediteration)
{
trans->ackediteration = packet->iteration;
if (trans->ackediteration >= trans->iteration - 1)
trans->dontsenduntil = 0;
}
for (i = 0; i < packet->numsegments; i++)
{
fileacksegment_t *segment = &packet->segments[i];
@ -1114,10 +1147,12 @@ static void SendAckPacket(fileack_pak *packet, UINT8 fileid)
memset(packet, 0, sizeof(*packet) + 512);
}
static void AddFragmentToAckPacket(fileack_pak *packet, UINT32 fragmentpos, UINT8 fileid)
static void AddFragmentToAckPacket(fileack_pak *packet, UINT8 iteration, UINT32 fragmentpos, UINT8 fileid)
{
fileacksegment_t *segment = &packet->segments[packet->numsegments - 1];
packet->iteration = max(packet->iteration, iteration);
if (packet->numsegments == 0
|| fragmentpos < segment->start
|| fragmentpos - segment->start >= 32)
@ -1157,7 +1192,7 @@ void FileReceiveTicker(void)
for (j = 0; j < 2048; j++)
{
if (file->receivedfragments[file->ackresendposition])
AddFragmentToAckPacket(file->ackpacket, file->ackresendposition, i);
AddFragmentToAckPacket(file->ackpacket, file->iteration, file->ackresendposition, i);
file->ackresendposition++;
if (file->ackresendposition * file->fragmentsize >= file->totalsize)
@ -1207,6 +1242,7 @@ void PT_FileFragment(void)
file->status = FS_DOWNLOADING;
file->fragmentsize = fragmentsize;
file->iteration = 0;
file->ackpacket = calloc(1, sizeof(*file->ackpacket) + 512);
if (!file->ackpacket)
@ -1254,6 +1290,8 @@ void PT_FileFragment(void)
if (fragmentpos >= file->totalsize)
I_Error("Invalid file fragment\n");
file->iteration = max(file->iteration, netbuffer->u.filetxpak.iteration);
if (!file->receivedfragments[fragmentpos / fragmentsize]) // Not received yet
{
file->receivedfragments[fragmentpos / fragmentsize] = true;
@ -1264,7 +1302,7 @@ void PT_FileFragment(void)
I_Error("Can't write to %s: %s\n",filename, M_FileError(file->file));
file->currentsize += boundedfragmentsize;
AddFragmentToAckPacket(file->ackpacket, fragmentpos / fragmentsize, filenum);
AddFragmentToAckPacket(file->ackpacket, file->iteration, fragmentpos / fragmentsize, filenum);
// Finished?
if (file->currentsize == file->totalsize)
@ -1295,7 +1333,7 @@ void PT_FileFragment(void)
{
// If they are sending us the fragment again, it's probably because
// they missed our previous ack, so we must re-acknowledge it
AddFragmentToAckPacket(file->ackpacket, fragmentpos / fragmentsize, filenum);
AddFragmentToAckPacket(file->ackpacket, file->iteration, fragmentpos / fragmentsize, filenum);
}
}
else if (!file->justdownloaded)

View file

@ -40,16 +40,18 @@ typedef struct
UINT8 willsend; // Is the server willing to send it?
char filename[MAX_WADPATH];
UINT8 md5sum[16];
filestatus_t status; // The value returned by recsearch
boolean justdownloaded; // To prevent late fragments from causing an I_Error
// Used only for download
FILE *file;
boolean *receivedfragments;
UINT32 fragmentsize;
UINT8 iteration;
fileack_pak *ackpacket;
UINT32 currentsize;
UINT32 totalsize;
UINT32 ackresendposition; // Used when resuming downloads
filestatus_t status; // The value returned by recsearch
boolean justdownloaded; // To prevent late fragments from causing an I_Error
} fileneeded_t;
extern INT32 fileneedednum;