Files
OpenRA/OpenRA.Game/Network/Connection.cs
RoosterDragon aa3024f1f8 Fix thread-safety and dispose issues in NetworkConnection.
- Calling Close() on a TcpClient is documented to also close the underlying sockets and streams for us. This means we can avoid also calling socket.Client.Close() and generating exceptions on mono.
- TcpClient is not thread-safe. However the NetworkStream returned by GetStream() is thread-safe for a single reader/single writer scenario. If we create and dispose the client on the calling thread, and pass the NetworkStream into the thread we spawn for reading, then we can avoid thread-safety issues incurred by trying to Close() the connection from another thread.
- The clean shutdown means we don't need to make the dodgy Thread.Abort() call as it will end normally, and that means we don't need a finalizer to ensure the thread is killed off.
- Refactor how receivedPackets work in EchoConnection to avoid lock(this).
- Mark connectionState and clientId as volatile since they are set from another thread.
2017-02-21 14:10:06 +00:00

247 lines
5.9 KiB
C#

#region Copyright & License Information
/*
* Copyright 2007-2017 The OpenRA Developers (see AUTHORS)
* This file is part of OpenRA, which is free software. It is made
* available to you under the terms of the GNU General Public License
* as published by the Free Software Foundation, either version 3 of
* the License, or (at your option) any later version. For more
* information, see COPYING.
*/
#endregion
using System;
using System.Collections.Generic;
using System.IO;
using System.Net.Sockets;
using System.Threading;
using OpenRA.Server;
namespace OpenRA.Network
{
public enum ConnectionState
{
PreConnecting,
NotConnected,
Connecting,
Connected,
}
public interface IConnection : IDisposable
{
int LocalClientId { get; }
ConnectionState ConnectionState { get; }
void Send(int frame, List<byte[]> orders);
void SendImmediate(List<byte[]> orders);
void SendSync(int frame, byte[] syncData);
void Receive(Action<int, byte[]> packetFn);
}
class EchoConnection : IConnection
{
protected struct ReceivedPacket
{
public int FromClient;
public byte[] Data;
}
readonly List<ReceivedPacket> receivedPackets = new List<ReceivedPacket>();
public ReplayRecorder Recorder { get; private set; }
public virtual int LocalClientId
{
get { return 1; }
}
public virtual ConnectionState ConnectionState
{
get { return ConnectionState.PreConnecting; }
}
public virtual void Send(int frame, List<byte[]> orders)
{
var ms = new MemoryStream();
ms.Write(BitConverter.GetBytes(frame));
foreach (var o in orders)
ms.Write(o);
Send(ms.ToArray());
}
public virtual void SendImmediate(List<byte[]> orders)
{
var ms = new MemoryStream();
ms.Write(BitConverter.GetBytes(0));
foreach (var o in orders)
ms.Write(o);
Send(ms.ToArray());
}
public virtual void SendSync(int frame, byte[] syncData)
{
var ms = new MemoryStream();
ms.Write(BitConverter.GetBytes(frame));
ms.Write(syncData);
Send(ms.ToArray());
}
protected virtual void Send(byte[] packet)
{
if (packet.Length == 0)
throw new NotImplementedException();
AddPacket(new ReceivedPacket { FromClient = LocalClientId, Data = packet });
}
protected void AddPacket(ReceivedPacket packet)
{
lock (receivedPackets)
receivedPackets.Add(packet);
}
public virtual void Receive(Action<int, byte[]> packetFn)
{
ReceivedPacket[] packets;
lock (receivedPackets)
{
packets = receivedPackets.ToArray();
receivedPackets.Clear();
}
foreach (var p in packets)
{
packetFn(p.FromClient, p.Data);
if (Recorder != null)
Recorder.Receive(p.FromClient, p.Data);
}
}
public void StartRecording(Func<string> chooseFilename)
{
// If we have a previous recording then save/dispose it and start a new one.
if (Recorder != null)
Recorder.Dispose();
Recorder = new ReplayRecorder(chooseFilename);
}
protected virtual void Dispose(bool disposing)
{
if (disposing && Recorder != null)
Recorder.Dispose();
}
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
}
sealed class NetworkConnection : EchoConnection
{
readonly TcpClient tcp;
readonly List<byte[]> queuedSyncPackets = new List<byte[]>();
volatile ConnectionState connectionState = ConnectionState.Connecting;
volatile int clientId;
bool disposed;
public NetworkConnection(string host, int port)
{
try
{
tcp = new TcpClient(host, port) { NoDelay = true };
new Thread(NetworkConnectionReceive)
{
Name = GetType().Name + " " + host + ":" + port,
IsBackground = true
}.Start(tcp.GetStream());
}
catch
{
connectionState = ConnectionState.NotConnected;
}
}
void NetworkConnectionReceive(object networkStreamObject)
{
try
{
var networkStream = (NetworkStream)networkStreamObject;
var reader = new BinaryReader(networkStream);
var serverProtocol = reader.ReadInt32();
if (ProtocolVersion.Version != serverProtocol)
throw new InvalidOperationException(
"Protocol version mismatch. Server={0} Client={1}"
.F(serverProtocol, ProtocolVersion.Version));
clientId = reader.ReadInt32();
connectionState = ConnectionState.Connected;
for (;;)
{
var len = reader.ReadInt32();
var client = reader.ReadInt32();
var buf = reader.ReadBytes(len);
if (len == 0)
throw new NotImplementedException();
AddPacket(new ReceivedPacket { FromClient = client, Data = buf });
}
}
catch { }
finally
{
connectionState = ConnectionState.NotConnected;
}
}
public override int LocalClientId { get { return clientId; } }
public override ConnectionState ConnectionState { get { return connectionState; } }
public override void SendSync(int frame, byte[] syncData)
{
var ms = new MemoryStream();
ms.Write(BitConverter.GetBytes(frame));
ms.Write(syncData);
queuedSyncPackets.Add(ms.ToArray());
}
protected override void Send(byte[] packet)
{
base.Send(packet);
try
{
var ms = new MemoryStream();
ms.Write(BitConverter.GetBytes(packet.Length));
ms.Write(packet);
foreach (var q in queuedSyncPackets)
{
ms.Write(BitConverter.GetBytes(q.Length));
ms.Write(q);
base.Send(q);
}
queuedSyncPackets.Clear();
ms.WriteTo(tcp.GetStream());
}
catch (SocketException) { /* drop this on the floor; we'll pick up the disconnect from the reader thread */ }
catch (ObjectDisposedException) { /* ditto */ }
catch (InvalidOperationException) { /* ditto */ }
catch (IOException) { /* ditto */ }
}
protected override void Dispose(bool disposing)
{
if (disposed)
return;
disposed = true;
// Closing the stream will cause any reads on the receiving thread to throw.
// This will mark the connection as no longer connected and the thread will terminate cleanly.
if (tcp != null)
tcp.Close();
base.Dispose(disposing);
}
}
}