Merge pull request #12694 from RoosterDragon/fixup-network

Fix thread-safety and dispose issues in NetworkConnection.
This commit is contained in:
Paul Chote
2017-02-23 21:04:56 +00:00
committed by GitHub

View File

@@ -44,7 +44,7 @@ namespace OpenRA.Network
public byte[] Data;
}
protected List<ReceivedPacket> receivedPackets = new List<ReceivedPacket>();
readonly List<ReceivedPacket> receivedPackets = new List<ReceivedPacket>();
public ReplayRecorder Recorder { get; private set; }
public virtual int LocalClientId
@@ -87,17 +87,22 @@ namespace OpenRA.Network
{
if (packet.Length == 0)
throw new NotImplementedException();
lock (this)
receivedPackets.Add(new ReceivedPacket { FromClient = LocalClientId, Data = packet });
AddPacket(new ReceivedPacket { FromClient = LocalClientId, Data = packet });
}
protected void AddPacket(ReceivedPacket packet)
{
lock (receivedPackets)
receivedPackets.Add(packet);
}
public virtual void Receive(Action<int, byte[]> packetFn)
{
List<ReceivedPacket> packets;
lock (this)
ReceivedPacket[] packets;
lock (receivedPackets)
{
packets = receivedPackets;
receivedPackets = new List<ReceivedPacket>();
packets = receivedPackets.ToArray();
receivedPackets.Clear();
}
foreach (var p in packets)
@@ -131,57 +136,65 @@ namespace OpenRA.Network
sealed class NetworkConnection : EchoConnection
{
TcpClient socket;
int clientId;
ConnectionState connectionState = ConnectionState.Connecting;
Thread t;
readonly TcpClient tcp;
readonly List<byte[]> queuedSyncPackets = new List<byte[]>();
volatile ConnectionState connectionState = ConnectionState.Connecting;
volatile int clientId;
bool disposed;
public NetworkConnection(string host, int port)
{
t = new Thread(_ =>
try
{
try
tcp = new TcpClient(host, port) { NoDelay = true };
new Thread(NetworkConnectionReceive)
{
socket = new TcpClient(host, port);
socket.NoDelay = true;
var reader = new BinaryReader(socket.GetStream());
var serverProtocol = reader.ReadInt32();
Name = GetType().Name + " " + host + ":" + port,
IsBackground = true
}.Start(tcp.GetStream());
}
catch
{
connectionState = ConnectionState.NotConnected;
}
}
if (ProtocolVersion.Version != serverProtocol)
throw new InvalidOperationException(
"Protocol version mismatch. Server={0} Client={1}"
.F(serverProtocol, ProtocolVersion.Version));
void NetworkConnectionReceive(object networkStreamObject)
{
try
{
var networkStream = (NetworkStream)networkStreamObject;
var reader = new BinaryReader(networkStream);
var serverProtocol = reader.ReadInt32();
clientId = reader.ReadInt32();
connectionState = ConnectionState.Connected;
if (ProtocolVersion.Version != serverProtocol)
throw new InvalidOperationException(
"Protocol version mismatch. Server={0} Client={1}"
.F(serverProtocol, ProtocolVersion.Version));
for (;;)
{
var len = reader.ReadInt32();
var client = reader.ReadInt32();
var buf = reader.ReadBytes(len);
if (len == 0)
throw new NotImplementedException();
lock (this)
receivedPackets.Add(new ReceivedPacket { FromClient = client, Data = buf });
}
}
catch { }
finally
clientId = reader.ReadInt32();
connectionState = ConnectionState.Connected;
for (;;)
{
connectionState = ConnectionState.NotConnected;
if (socket != null)
socket.Close();
var len = reader.ReadInt32();
var client = reader.ReadInt32();
var buf = reader.ReadBytes(len);
if (len == 0)
throw new NotImplementedException();
AddPacket(new ReceivedPacket { FromClient = client, Data = buf });
}
}) { IsBackground = true };
t.Start();
}
catch { }
finally
{
connectionState = ConnectionState.NotConnected;
}
}
public override int LocalClientId { get { return clientId; } }
public override ConnectionState ConnectionState { get { return connectionState; } }
List<byte[]> queuedSyncPackets = new List<byte[]>();
public override void SendSync(int frame, byte[] syncData)
{
var ms = new MemoryStream();
@@ -208,7 +221,7 @@ namespace OpenRA.Network
}
queuedSyncPackets.Clear();
ms.WriteTo(socket.GetStream());
ms.WriteTo(tcp.GetStream());
}
catch (SocketException) { /* drop this on the floor; we'll pick up the disconnect from the reader thread */ }
catch (ObjectDisposedException) { /* ditto */ }
@@ -216,25 +229,18 @@ namespace OpenRA.Network
catch (IOException) { /* ditto */ }
}
bool disposed = false;
protected override void Dispose(bool disposing)
{
if (disposed)
return;
disposed = true;
t.Abort();
if (disposing)
if (socket != null)
socket.Client.Close();
// Closing the stream will cause any reads on the receiving thread to throw.
// This will mark the connection as no longer connected and the thread will terminate cleanly.
if (tcp != null)
tcp.Close();
base.Dispose(disposing);
}
~NetworkConnection()
{
Dispose(false);
}
}
}