Fix thread-safety and dispose issues in NetworkConnection.

- Calling Close() on a TcpClient is documented to also close the underlying sockets and streams for us. This means we can avoid also calling socket.Client.Close() and generating exceptions on mono.
- TcpClient is not thread-safe. However the NetworkStream returned by GetStream() is thread-safe for a single reader/single writer scenario. If we create and dispose the client on the calling thread, and pass the NetworkStream into the thread we spawn for reading, then we can avoid thread-safety issues incurred by trying to Close() the connection from another thread.
- The clean shutdown means we don't need to make the dodgy Thread.Abort() call as it will end normally, and that means we don't need a finalizer to ensure the thread is killed off.
- Refactor how receivedPackets work in EchoConnection to avoid lock(this).
- Mark connectionState and clientId as volatile since they are set from another thread.
This commit is contained in:
RoosterDragon
2017-02-04 15:25:25 +00:00
parent 79c95364fb
commit aa3024f1f8

View File

@@ -44,7 +44,7 @@ namespace OpenRA.Network
public byte[] Data; public byte[] Data;
} }
protected List<ReceivedPacket> receivedPackets = new List<ReceivedPacket>(); readonly List<ReceivedPacket> receivedPackets = new List<ReceivedPacket>();
public ReplayRecorder Recorder { get; private set; } public ReplayRecorder Recorder { get; private set; }
public virtual int LocalClientId public virtual int LocalClientId
@@ -87,17 +87,22 @@ namespace OpenRA.Network
{ {
if (packet.Length == 0) if (packet.Length == 0)
throw new NotImplementedException(); throw new NotImplementedException();
lock (this) AddPacket(new ReceivedPacket { FromClient = LocalClientId, Data = packet });
receivedPackets.Add(new ReceivedPacket { FromClient = LocalClientId, Data = packet }); }
protected void AddPacket(ReceivedPacket packet)
{
lock (receivedPackets)
receivedPackets.Add(packet);
} }
public virtual void Receive(Action<int, byte[]> packetFn) public virtual void Receive(Action<int, byte[]> packetFn)
{ {
List<ReceivedPacket> packets; ReceivedPacket[] packets;
lock (this) lock (receivedPackets)
{ {
packets = receivedPackets; packets = receivedPackets.ToArray();
receivedPackets = new List<ReceivedPacket>(); receivedPackets.Clear();
} }
foreach (var p in packets) foreach (var p in packets)
@@ -131,57 +136,65 @@ namespace OpenRA.Network
sealed class NetworkConnection : EchoConnection sealed class NetworkConnection : EchoConnection
{ {
TcpClient socket; readonly TcpClient tcp;
int clientId; readonly List<byte[]> queuedSyncPackets = new List<byte[]>();
ConnectionState connectionState = ConnectionState.Connecting; volatile ConnectionState connectionState = ConnectionState.Connecting;
Thread t; volatile int clientId;
bool disposed;
public NetworkConnection(string host, int port) public NetworkConnection(string host, int port)
{ {
t = new Thread(_ => try
{ {
try tcp = new TcpClient(host, port) { NoDelay = true };
new Thread(NetworkConnectionReceive)
{ {
socket = new TcpClient(host, port); Name = GetType().Name + " " + host + ":" + port,
socket.NoDelay = true; IsBackground = true
var reader = new BinaryReader(socket.GetStream()); }.Start(tcp.GetStream());
var serverProtocol = reader.ReadInt32(); }
catch
{
connectionState = ConnectionState.NotConnected;
}
}
if (ProtocolVersion.Version != serverProtocol) void NetworkConnectionReceive(object networkStreamObject)
throw new InvalidOperationException( {
"Protocol version mismatch. Server={0} Client={1}" try
.F(serverProtocol, ProtocolVersion.Version)); {
var networkStream = (NetworkStream)networkStreamObject;
var reader = new BinaryReader(networkStream);
var serverProtocol = reader.ReadInt32();
clientId = reader.ReadInt32(); if (ProtocolVersion.Version != serverProtocol)
connectionState = ConnectionState.Connected; throw new InvalidOperationException(
"Protocol version mismatch. Server={0} Client={1}"
.F(serverProtocol, ProtocolVersion.Version));
for (;;) clientId = reader.ReadInt32();
{ connectionState = ConnectionState.Connected;
var len = reader.ReadInt32();
var client = reader.ReadInt32(); for (;;)
var buf = reader.ReadBytes(len);
if (len == 0)
throw new NotImplementedException();
lock (this)
receivedPackets.Add(new ReceivedPacket { FromClient = client, Data = buf });
}
}
catch { }
finally
{ {
connectionState = ConnectionState.NotConnected; var len = reader.ReadInt32();
if (socket != null) var client = reader.ReadInt32();
socket.Close(); var buf = reader.ReadBytes(len);
if (len == 0)
throw new NotImplementedException();
AddPacket(new ReceivedPacket { FromClient = client, Data = buf });
} }
}) { IsBackground = true }; }
t.Start(); catch { }
finally
{
connectionState = ConnectionState.NotConnected;
}
} }
public override int LocalClientId { get { return clientId; } } public override int LocalClientId { get { return clientId; } }
public override ConnectionState ConnectionState { get { return connectionState; } } public override ConnectionState ConnectionState { get { return connectionState; } }
List<byte[]> queuedSyncPackets = new List<byte[]>();
public override void SendSync(int frame, byte[] syncData) public override void SendSync(int frame, byte[] syncData)
{ {
var ms = new MemoryStream(); var ms = new MemoryStream();
@@ -208,7 +221,7 @@ namespace OpenRA.Network
} }
queuedSyncPackets.Clear(); queuedSyncPackets.Clear();
ms.WriteTo(socket.GetStream()); ms.WriteTo(tcp.GetStream());
} }
catch (SocketException) { /* drop this on the floor; we'll pick up the disconnect from the reader thread */ } catch (SocketException) { /* drop this on the floor; we'll pick up the disconnect from the reader thread */ }
catch (ObjectDisposedException) { /* ditto */ } catch (ObjectDisposedException) { /* ditto */ }
@@ -216,25 +229,18 @@ namespace OpenRA.Network
catch (IOException) { /* ditto */ } catch (IOException) { /* ditto */ }
} }
bool disposed = false;
protected override void Dispose(bool disposing) protected override void Dispose(bool disposing)
{ {
if (disposed) if (disposed)
return; return;
disposed = true; disposed = true;
t.Abort(); // Closing the stream will cause any reads on the receiving thread to throw.
if (disposing) // This will mark the connection as no longer connected and the thread will terminate cleanly.
if (socket != null) if (tcp != null)
socket.Client.Close(); tcp.Close();
base.Dispose(disposing); base.Dispose(disposing);
} }
~NetworkConnection()
{
Dispose(false);
}
} }
} }