Replace server Select loop with individual client threads.
This guarantees that any unexpected blocking calls due to network issues cannot stall the main server thread.
This commit is contained in:
@@ -10,21 +10,22 @@
|
||||
#endregion
|
||||
|
||||
using System;
|
||||
using System.Collections.Concurrent;
|
||||
using System.Collections.Generic;
|
||||
using System.Linq;
|
||||
using System.Net;
|
||||
using System.Net.Sockets;
|
||||
using System.Threading;
|
||||
|
||||
namespace OpenRA.Server
|
||||
{
|
||||
public class Connection
|
||||
public class Connection : IDisposable
|
||||
{
|
||||
public const int MaxOrderLength = 131072;
|
||||
|
||||
public readonly Socket Socket;
|
||||
public readonly List<byte> Data = new List<byte>();
|
||||
public readonly int PlayerIndex;
|
||||
public readonly string AuthToken;
|
||||
public readonly EndPoint EndPoint;
|
||||
|
||||
public long TimeSinceLastResponse => Game.RunTime - lastReceivedTime;
|
||||
public int MostRecentFrame { get; private set; }
|
||||
@@ -32,130 +33,143 @@ namespace OpenRA.Server
|
||||
public bool TimeoutMessageShown;
|
||||
public bool Validated;
|
||||
|
||||
ReceiveState state = ReceiveState.Header;
|
||||
int expectLength = 8;
|
||||
int frame = 0;
|
||||
long lastReceivedTime = 0;
|
||||
|
||||
public Connection(Socket socket, int playerIndex, string authToken)
|
||||
readonly BlockingCollection<byte[]> sendQueue = new BlockingCollection<byte[]>();
|
||||
|
||||
public Connection(Socket socket, int playerIndex, string authToken, Action<Connection, int, byte[]> onPacket, Action<Connection> onDisconnect)
|
||||
{
|
||||
Socket = socket;
|
||||
PlayerIndex = playerIndex;
|
||||
AuthToken = authToken;
|
||||
}
|
||||
EndPoint = socket.RemoteEndPoint;
|
||||
|
||||
public byte[] PopBytes(int n)
|
||||
{
|
||||
var result = Data.GetRange(0, n);
|
||||
Data.RemoveRange(0, n);
|
||||
return result.ToArray();
|
||||
}
|
||||
|
||||
bool ReadDataInner(Server server)
|
||||
{
|
||||
var rx = new byte[1024];
|
||||
var len = 0;
|
||||
|
||||
while (true)
|
||||
new Thread(SendReceiveLoop)
|
||||
{
|
||||
try
|
||||
{
|
||||
// Poll the socket first to see if there's anything there.
|
||||
// This avoids the exception with SocketErrorCode == `SocketError.WouldBlock` thrown
|
||||
// from `socket.Receive(rx)`.
|
||||
if (!Socket.Poll(0, SelectMode.SelectRead)) break;
|
||||
|
||||
if ((len = Socket.Receive(rx)) > 0)
|
||||
Data.AddRange(rx.Take(len));
|
||||
else
|
||||
{
|
||||
if (len == 0)
|
||||
server.DropClient(this);
|
||||
break;
|
||||
}
|
||||
}
|
||||
catch (SocketException e)
|
||||
{
|
||||
// This should no longer be needed with the socket.Poll call above.
|
||||
if (e.SocketErrorCode == SocketError.WouldBlock) break;
|
||||
|
||||
server.DropClient(this);
|
||||
Log.Write("server", "Dropping client {0} because reading the data failed: {1}", PlayerIndex, e);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
lastReceivedTime = Game.RunTime;
|
||||
TimeoutMessageShown = false;
|
||||
|
||||
return true;
|
||||
Name = $"Client communication ({EndPoint}",
|
||||
IsBackground = true
|
||||
}.Start((socket, onPacket, onDisconnect));
|
||||
}
|
||||
|
||||
public void ReadData(Server server)
|
||||
void SendReceiveLoop(object s)
|
||||
{
|
||||
if (ReadDataInner(server))
|
||||
var (socket, onPacket, onDisconnect) = (ValueTuple<Socket, Action<Connection, int, byte[]>, Action<Connection>>)s;
|
||||
socket.Blocking = false;
|
||||
socket.NoDelay = true;
|
||||
|
||||
var receiveBuffer = new byte[1024];
|
||||
var readBuffer = new List<byte>();
|
||||
var state = ReceiveState.Header;
|
||||
var expectLength = 8;
|
||||
var frame = 0;
|
||||
|
||||
try
|
||||
{
|
||||
while (Data.Count >= expectLength)
|
||||
while (true)
|
||||
{
|
||||
var bytes = PopBytes(expectLength);
|
||||
switch (state)
|
||||
// Wait up to 100ms for data to arrive before checking for data to send
|
||||
if (socket.Poll(100000, SelectMode.SelectRead))
|
||||
{
|
||||
case ReceiveState.Header:
|
||||
var read = socket.Receive(receiveBuffer);
|
||||
if (read == 0)
|
||||
{
|
||||
// Empty packet signals that the client has been dropped
|
||||
return;
|
||||
}
|
||||
|
||||
if (read > 0)
|
||||
{
|
||||
readBuffer.AddRange(receiveBuffer.Take(read));
|
||||
lastReceivedTime = Game.RunTime;
|
||||
TimeoutMessageShown = false;
|
||||
}
|
||||
|
||||
while (readBuffer.Count >= expectLength)
|
||||
{
|
||||
var bytes = readBuffer.GetRange(0, expectLength).ToArray();
|
||||
readBuffer.RemoveRange(0, expectLength);
|
||||
|
||||
switch (state)
|
||||
{
|
||||
expectLength = BitConverter.ToInt32(bytes, 0) - 4;
|
||||
frame = BitConverter.ToInt32(bytes, 4);
|
||||
state = ReceiveState.Data;
|
||||
|
||||
if (expectLength < 0 || expectLength > MaxOrderLength)
|
||||
case ReceiveState.Header:
|
||||
{
|
||||
server.DropClient(this);
|
||||
Log.Write("server", "Dropping client {0} for excessive order length = {1}", PlayerIndex, expectLength);
|
||||
return;
|
||||
expectLength = BitConverter.ToInt32(bytes, 0) - 4;
|
||||
frame = BitConverter.ToInt32(bytes, 4);
|
||||
state = ReceiveState.Data;
|
||||
|
||||
if (expectLength < 0 || expectLength > MaxOrderLength)
|
||||
{
|
||||
Log.Write("server", $"Closing socket connection to {EndPoint} because of excessive order length: {expectLength}");
|
||||
return;
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
case ReceiveState.Data:
|
||||
{
|
||||
if (MostRecentFrame < frame)
|
||||
MostRecentFrame = frame;
|
||||
|
||||
case ReceiveState.Data:
|
||||
onPacket(this, frame, bytes);
|
||||
expectLength = 8;
|
||||
state = ReceiveState.Header;
|
||||
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Client has been dropped by the server
|
||||
if (sendQueue.IsCompleted)
|
||||
return;
|
||||
|
||||
// Send all data immediately, we will block again on read
|
||||
while (sendQueue.TryTake(out var data, 0))
|
||||
{
|
||||
var start = 0;
|
||||
var length = data.Length;
|
||||
|
||||
// Non-blocking sends are free to send only part of the data
|
||||
while (start < length)
|
||||
{
|
||||
var sent = socket.Send(data, start, length - start, SocketFlags.None, out var error);
|
||||
if (error == SocketError.WouldBlock)
|
||||
{
|
||||
if (MostRecentFrame < frame)
|
||||
MostRecentFrame = frame;
|
||||
|
||||
server.DispatchOrders(this, frame, bytes);
|
||||
expectLength = 8;
|
||||
state = ReceiveState.Header;
|
||||
|
||||
break;
|
||||
Log.Write("server", "Non-blocking send of {0} bytes failed. Falling back to blocking send.", length - start);
|
||||
socket.Blocking = true;
|
||||
sent = socket.Send(data, start, length - start, SocketFlags.None);
|
||||
socket.Blocking = false;
|
||||
}
|
||||
else if (error != SocketError.Success)
|
||||
throw new SocketException((int)error);
|
||||
|
||||
start += sent;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (SocketException e)
|
||||
{
|
||||
Log.Write("server", $"Closing socket connection to {EndPoint} because of socket error: {e}");
|
||||
}
|
||||
finally
|
||||
{
|
||||
onDisconnect(this);
|
||||
socket.Dispose();
|
||||
}
|
||||
}
|
||||
|
||||
public void SendData(byte[] data)
|
||||
{
|
||||
var start = 0;
|
||||
var length = data.Length;
|
||||
|
||||
// Non-blocking sends are free to send only part of the data
|
||||
while (start < length)
|
||||
{
|
||||
var sent = Socket.Send(data, start, length - start, SocketFlags.None, out var error);
|
||||
if (error == SocketError.WouldBlock)
|
||||
{
|
||||
Log.Write("server", "Non-blocking send of {0} bytes failed. Falling back to blocking send.", length - start);
|
||||
Socket.Blocking = true;
|
||||
sent = Socket.Send(data, start, length - start, SocketFlags.None);
|
||||
Socket.Blocking = false;
|
||||
}
|
||||
else if (error != SocketError.Success)
|
||||
throw new SocketException((int)error);
|
||||
|
||||
start += sent;
|
||||
}
|
||||
sendQueue.Add(data);
|
||||
}
|
||||
|
||||
public EndPoint EndPoint => Socket.RemoteEndPoint;
|
||||
public void Dispose()
|
||||
{
|
||||
// Tell the sendReceiveThread that the socket should be closed
|
||||
sendQueue.CompleteAdding();
|
||||
}
|
||||
}
|
||||
|
||||
public enum ReceiveState { Header, Data }
|
||||
|
||||
Reference in New Issue
Block a user