Rework IConnection implementations:

* EchoConnection is now a trivial buffer that stores
  and repeats orders directly without serialization.

* NetworkConnection no longer subclasses EchoConnection,
  and now also caches local orders without serialization.

* Replay recording was moved to NetworkConnection
  (it is never used on EchoConnection).
This commit is contained in:
Paul Chote
2021-08-29 15:15:54 +01:00
committed by abcdefg30
parent 408f30b5cd
commit 6421c17515
6 changed files with 169 additions and 140 deletions

View File

@@ -13,6 +13,7 @@ using System;
using System.Collections.Concurrent; using System.Collections.Concurrent;
using System.Collections.Generic; using System.Collections.Generic;
using System.IO; using System.IO;
using System.Linq;
using System.Net; using System.Net;
using System.Net.Sockets; using System.Net.Sockets;
using System.Threading; using System.Threading;
@@ -37,101 +38,63 @@ namespace OpenRA.Network
void Receive(OrderManager orderManager); void Receive(OrderManager orderManager);
} }
public class EchoConnection : IConnection public sealed class EchoConnection : IConnection
{ {
protected struct ReceivedPacket const int LocalClientId = 1;
readonly Queue<(int Frame, int SyncHash, ulong DefeatState)> sync = new Queue<(int, int, ulong)>();
readonly Queue<(int Frame, OrderPacket Orders)> orders = new Queue<(int, OrderPacket)>();
readonly Queue<OrderPacket> immediateOrders = new Queue<OrderPacket>();
int IConnection.LocalClientId => LocalClientId;
void IConnection.Send(int frame, IEnumerable<Order> o)
{ {
public int FromClient; orders.Enqueue((frame, new OrderPacket(o.ToArray())));
public byte[] Data;
} }
readonly ConcurrentQueue<ReceivedPacket> receivedPackets = new ConcurrentQueue<ReceivedPacket>(); void IConnection.SendImmediate(IEnumerable<Order> o)
public ReplayRecorder Recorder { get; private set; }
public virtual int LocalClientId => 1;
public virtual void Send(int frame, IEnumerable<Order> orders)
{ {
Send(OrderIO.SerializeOrders(frame, orders)); immediateOrders.Enqueue(new OrderPacket(o.ToArray()));
} }
public virtual void SendImmediate(IEnumerable<Order> orders) void IConnection.SendSync(int frame, int syncHash, ulong defeatState)
{ {
Send(OrderIO.SerializeOrders(0, orders)); sync.Enqueue((frame, syncHash, defeatState));
} }
public virtual void SendSync(int frame, int syncHash, ulong defeatState) void IConnection.Receive(OrderManager orderManager)
{ {
Send(OrderIO.SerializeSync(frame, syncHash, defeatState)); while (immediateOrders.TryDequeue(out var i))
orderManager.ReceiveImmediateOrders(LocalClientId, i);
while (orders.TryDequeue(out var o))
orderManager.ReceiveOrders(LocalClientId, o);
while (sync.TryDequeue(out var s))
orderManager.ReceiveSync(s);
} }
protected virtual void Send(byte[] packet) void IDisposable.Dispose() { }
{
if (packet.Length == 0)
throw new NotImplementedException();
AddPacket(new ReceivedPacket { FromClient = LocalClientId, Data = packet });
} }
protected void AddPacket(ReceivedPacket packet) public sealed class NetworkConnection : IConnection
{
receivedPackets.Enqueue(packet);
}
public virtual void Receive(OrderManager orderManager)
{
while (receivedPackets.TryDequeue(out var p))
{
if (OrderIO.TryParseDisconnect(p.Data, out var disconnectClient))
orderManager.ReceiveDisconnect(disconnectClient);
else if (OrderIO.TryParseSync(p.Data, out var syncFrame, out var syncHash, out var defeatState))
orderManager.ReceiveSync(syncFrame, syncHash, defeatState);
else if (OrderIO.TryParseOrderPacket(p.Data, out var ordersFrame, out var orders))
{
if (ordersFrame == 0)
orderManager.ReceiveImmediateOrders(p.FromClient, orders);
else
orderManager.ReceiveOrders(p.FromClient, ordersFrame, orders);
}
Recorder?.Receive(p.FromClient, p.Data);
}
}
public void StartRecording(Func<string> chooseFilename)
{
// If we have a previous recording then save/dispose it and start a new one.
Recorder?.Dispose();
Recorder = new ReplayRecorder(chooseFilename);
}
protected virtual void Dispose(bool disposing)
{
if (disposing)
Recorder?.Dispose();
}
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
}
public sealed class NetworkConnection : EchoConnection
{ {
public readonly ConnectionTarget Target; public readonly ConnectionTarget Target;
internal ReplayRecorder Recorder { get; private set; }
readonly List<byte[]> queuedSyncPackets = new List<byte[]>();
readonly Queue<(int Frame, int SyncHash, ulong DefeatState)> sentSync = new Queue<(int, int, ulong)>();
readonly Queue<(int Frame, OrderPacket Orders)> sentOrders = new Queue<(int, OrderPacket)>();
readonly Queue<OrderPacket> sentImmediateOrders = new Queue<OrderPacket>();
readonly ConcurrentQueue<(int FromClient, byte[] Data)> receivedPackets = new ConcurrentQueue<(int, byte[])>();
TcpClient tcp; TcpClient tcp;
IPEndPoint endpoint; IPEndPoint endpoint;
readonly List<byte[]> queuedSyncPackets = new List<byte[]>();
volatile ConnectionState connectionState = ConnectionState.Connecting; volatile ConnectionState connectionState = ConnectionState.Connecting;
volatile int clientId; volatile int clientId;
bool disposed; bool disposed;
string errorMessage; string errorMessage;
public IPEndPoint EndPoint => endpoint;
public string ErrorMessage => errorMessage;
public NetworkConnection(ConnectionTarget target) public NetworkConnection(ConnectionTarget target)
{ {
Target = target; Target = target;
@@ -228,7 +191,7 @@ namespace OpenRA.Network
var buf = stream.ReadBytes(len); var buf = stream.ReadBytes(len);
if (len == 0) if (len == 0)
throw new NotImplementedException(); throw new NotImplementedException();
AddPacket(new ReceivedPacket { FromClient = client, Data = buf }); receivedPackets.Enqueue((client, buf));
} }
} }
catch (Exception ex) catch (Exception ex)
@@ -242,18 +205,35 @@ namespace OpenRA.Network
} }
} }
public override int LocalClientId => clientId; int IConnection.LocalClientId => clientId;
public ConnectionState ConnectionState => connectionState;
public override void SendSync(int frame, int syncHash, ulong defeatState) void IConnection.Send(int frame, IEnumerable<Order> orders)
{ {
queuedSyncPackets.Add(OrderIO.SerializeSync(frame, syncHash, defeatState)); var o = new OrderPacket(orders.ToArray());
sentOrders.Enqueue((frame, o));
Send(o.Serialize(frame));
} }
protected override void Send(byte[] packet) void IConnection.SendImmediate(IEnumerable<Order> orders)
{ {
base.Send(packet); var o = new OrderPacket(orders.ToArray());
sentImmediateOrders.Enqueue(o);
Send(o.Serialize(0));
}
void IConnection.SendSync(int frame, int syncHash, ulong defeatState)
{
var sync = (frame, syncHash, defeatState);
sentSync.Enqueue(sync);
// Send sync packets together with the next set of orders.
// This was originally explained as reducing network bandwidth
// (TCP overhead?), but the original discussions have been lost to time.
queuedSyncPackets.Add(OrderIO.SerializeSync(sync));
}
void Send(byte[] packet)
{
try try
{ {
var ms = new MemoryStream(); var ms = new MemoryStream();
@@ -264,7 +244,6 @@ namespace OpenRA.Network
{ {
ms.WriteArray(BitConverter.GetBytes(q.Length)); ms.WriteArray(BitConverter.GetBytes(q.Length));
ms.WriteArray(q); ms.WriteArray(q);
base.Send(q);
} }
queuedSyncPackets.Clear(); queuedSyncPackets.Clear();
@@ -276,17 +255,80 @@ namespace OpenRA.Network
catch (IOException) { /* ditto */ } catch (IOException) { /* ditto */ }
} }
protected override void Dispose(bool disposing) void IConnection.Receive(OrderManager orderManager)
{
// Locally generated orders
while (sentImmediateOrders.TryDequeue(out var i))
{
orderManager.ReceiveImmediateOrders(clientId, i);
Recorder?.Receive(clientId, i.Serialize(0));
}
while (sentSync.TryDequeue(out var s))
{
orderManager.ReceiveSync(s);
Recorder?.Receive(clientId, OrderIO.SerializeSync(s));
}
while (sentOrders.TryDequeue(out var o))
{
orderManager.ReceiveOrders(clientId, o);
Recorder?.Receive(clientId, o.Orders.Serialize(o.Frame));
}
// Orders from other players
while (receivedPackets.TryDequeue(out var p))
{
if (OrderIO.TryParseDisconnect(p.Data, out var disconnectClient))
orderManager.ReceiveDisconnect(disconnectClient);
else if (OrderIO.TryParseSync(p.Data, out var sync))
orderManager.ReceiveSync(sync);
else if (OrderIO.TryParseOrderPacket(p.Data, out var orders))
{
if (orders.Frame == 0)
orderManager.ReceiveImmediateOrders(p.FromClient, orders.Orders);
else
orderManager.ReceiveOrders(p.FromClient, orders);
}
else
throw new InvalidDataException($"Received unknown packet from client {p.FromClient} with length {p.Data.Length}");
Recorder?.Receive(p.FromClient, p.Data);
}
}
public void StartRecording(Func<string> chooseFilename)
{
// If we have a previous recording then save/dispose it and start a new one.
Recorder?.Dispose();
Recorder = new ReplayRecorder(chooseFilename);
}
public ConnectionState ConnectionState => connectionState;
public IPEndPoint EndPoint => endpoint;
public string ErrorMessage => errorMessage;
void Dispose(bool disposing)
{ {
if (disposed) if (disposed)
return; return;
disposed = true; disposed = true;
// Closing the stream will cause any reads on the receiving thread to throw. // Closing the stream will cause any reads on the receiving thread to throw.
// This will mark the connection as no longer connected and the thread will terminate cleanly. // This will mark the connection as no longer connected and the thread will terminate cleanly.
tcp?.Close(); tcp?.Close();
base.Dispose(disposing); if (disposing)
Recorder?.Dispose();
}
void IDisposable.Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
} }
} }
} }

View File

@@ -69,25 +69,16 @@ namespace OpenRA.Network
{ {
static readonly OrderPacket NoOrders = new OrderPacket(Array.Empty<Order>()); static readonly OrderPacket NoOrders = new OrderPacket(Array.Empty<Order>());
public static byte[] SerializeSync(int frame, int syncHash, ulong defeatState) public static byte[] SerializeSync((int Frame, int SyncHash, ulong DefeatState) data)
{ {
var ms = new MemoryStream(4 + Order.SyncHashOrderLength); var ms = new MemoryStream(4 + Order.SyncHashOrderLength);
ms.WriteArray(BitConverter.GetBytes(frame)); ms.WriteArray(BitConverter.GetBytes(data.Frame));
ms.WriteByte((byte)OrderType.SyncHash); ms.WriteByte((byte)OrderType.SyncHash);
ms.WriteArray(BitConverter.GetBytes(syncHash)); ms.WriteArray(BitConverter.GetBytes(data.SyncHash));
ms.WriteArray(BitConverter.GetBytes(defeatState)); ms.WriteArray(BitConverter.GetBytes(data.DefeatState));
return ms.GetBuffer(); return ms.GetBuffer();
} }
public static byte[] SerializeOrders(int frame, IEnumerable<Order> orders)
{
var ms = new MemoryStream();
ms.WriteArray(BitConverter.GetBytes(frame));
foreach (var o in orders)
ms.WriteArray(o.Serialize());
return ms.ToArray();
}
public static bool TryParseDisconnect(byte[] packet, out int clientId) public static bool TryParseDisconnect(byte[] packet, out int clientId)
{ {
if (packet.Length == Order.DisconnectOrderLength + 4 && packet[4] == (byte)OrderType.Disconnect) if (packet.Length == Order.DisconnectOrderLength + 4 && packet[4] == (byte)OrderType.Disconnect)
@@ -100,43 +91,42 @@ namespace OpenRA.Network
return false; return false;
} }
public static bool TryParseSync(byte[] packet, out int frame, out int syncHash, out ulong defeatState) public static bool TryParseSync(byte[] packet, out (int Frame, int SyncHash, ulong DefeatState) data)
{ {
if (packet.Length != 4 + Order.SyncHashOrderLength || packet[4] != (byte)OrderType.SyncHash) if (packet.Length != 4 + Order.SyncHashOrderLength || packet[4] != (byte)OrderType.SyncHash)
{ {
frame = syncHash = 0; data = (0, 0, 0);
defeatState = 0;
return false; return false;
} }
frame = BitConverter.ToInt32(packet, 0); var frame = BitConverter.ToInt32(packet, 0);
syncHash = BitConverter.ToInt32(packet, 5); var syncHash = BitConverter.ToInt32(packet, 5);
defeatState = BitConverter.ToUInt64(packet, 9); var defeatState = BitConverter.ToUInt64(packet, 9);
data = (frame, syncHash, defeatState);
return true; return true;
} }
public static bool TryParseOrderPacket(byte[] packet, out int frame, out OrderPacket orders) public static bool TryParseOrderPacket(byte[] packet, out (int Frame, OrderPacket Orders) data)
{ {
// Not a valid packet // Not a valid packet
if (packet.Length < 4) if (packet.Length < 4)
{ {
frame = 0; data = (0, null);
orders = null;
return false; return false;
} }
// Wrong packet type // Wrong packet type
if (packet.Length >= 5 && (packet[4] == (byte)OrderType.Disconnect || packet[4] == (byte)OrderType.SyncHash)) if (packet.Length >= 5 && (packet[4] == (byte)OrderType.Disconnect || packet[4] == (byte)OrderType.SyncHash))
{ {
frame = 0; data = (0, null);
orders = null;
return false; return false;
} }
frame = BitConverter.ToInt32(packet, 0); var frame = BitConverter.ToInt32(packet, 0);
// PERF: Skip empty order frames, often per client each frame // PERF: Skip empty order frames, often per client each frame
orders = packet.Length > 4 ? new OrderPacket(new MemoryStream(packet, 4, packet.Length - 4)) : NoOrders; var orders = packet.Length > 4 ? new OrderPacket(new MemoryStream(packet, 4, packet.Length - 4)) : NoOrders;
data = (frame, orders);
return true; return true;
} }
} }

View File

@@ -137,19 +137,19 @@ namespace OpenRA.Network
pendingOrders.Remove(clientIndex); pendingOrders.Remove(clientIndex);
} }
public void ReceiveSync(int frame, int syncHash, ulong defeatState) public void ReceiveSync((int Frame, int SyncHash, ulong DefeatState) sync)
{ {
// HACK: The shellmap relies on ticking a disposed OM // HACK: The shellmap relies on ticking a disposed OM
if (disposed && World.Type != WorldType.Shellmap) if (disposed && World.Type != WorldType.Shellmap)
return; return;
if (syncForFrame.TryGetValue(frame, out var s)) if (syncForFrame.TryGetValue(sync.Frame, out var s))
{ {
if (s.SyncHash != syncHash || s.DefeatState != defeatState) if (s.SyncHash != sync.SyncHash || s.DefeatState != sync.DefeatState)
OutOfSync(frame); OutOfSync(sync.Frame);
} }
else else
syncForFrame.Add(frame, (syncHash, defeatState)); syncForFrame.Add(sync.Frame, (sync.SyncHash, sync.DefeatState));
} }
public void ReceiveImmediateOrders(int clientId, OrderPacket orders) public void ReceiveImmediateOrders(int clientId, OrderPacket orders)
@@ -168,16 +168,16 @@ namespace OpenRA.Network
} }
} }
public void ReceiveOrders(int clientId, int frame, OrderPacket orders) public void ReceiveOrders(int clientId, (int Frame, OrderPacket Orders) orders)
{ {
// HACK: The shellmap relies on ticking a disposed OM // HACK: The shellmap relies on ticking a disposed OM
if (disposed && World.Type != WorldType.Shellmap) if (disposed && World.Type != WorldType.Shellmap)
return; return;
if (pendingOrders.TryGetValue(clientId, out var queue)) if (pendingOrders.TryGetValue(clientId, out var queue))
queue.Enqueue((frame, orders)); queue.Enqueue((orders.Frame, orders.Orders));
else else
Log.Write("debug", $"Received packet from disconnected client '{clientId}'"); throw new InvalidDataException($"Received packet from disconnected client '{clientId}'");
} }
void ReceiveAllOrdersAndCheckSync() void ReceiveAllOrdersAndCheckSync()

View File

@@ -31,8 +31,6 @@ namespace OpenRA.Network
readonly int orderLatency; readonly int orderLatency;
int ordersFrame; int ordersFrame;
public int LocalClientId => -1;
public readonly int TickCount; public readonly int TickCount;
public readonly int FinalGameTick; public readonly int FinalGameTick;
public readonly bool IsValid; public readonly bool IsValid;
@@ -70,9 +68,9 @@ namespace OpenRA.Network
if (frame == 0) if (frame == 0)
{ {
// Parse replay metadata from orders stream // Parse replay metadata from orders stream
if (OrderIO.TryParseOrderPacket(packet, out _, out var orders)) if (OrderIO.TryParseOrderPacket(packet, out var orders))
{ {
foreach (var o in orders.GetOrders(null)) foreach (var o in orders.Orders.GetOrders(null))
{ {
if (o.OrderString == "StartGame") if (o.OrderString == "StartGame")
IsValid = true; IsValid = true;
@@ -127,10 +125,10 @@ namespace OpenRA.Network
} }
// Do nothing: ignore locally generated orders // Do nothing: ignore locally generated orders
public void Send(int frame, IEnumerable<Order> orders) { } void IConnection.Send(int frame, IEnumerable<Order> orders) { }
public void SendImmediate(IEnumerable<Order> orders) { } void IConnection.SendImmediate(IEnumerable<Order> orders) { }
public void SendSync(int frame, int syncHash, ulong defeatState) void IConnection.SendSync(int frame, int syncHash, ulong defeatState)
{ {
sync.Enqueue((frame, syncHash, defeatState)); sync.Enqueue((frame, syncHash, defeatState));
@@ -138,13 +136,10 @@ namespace OpenRA.Network
ordersFrame = frame + orderLatency; ordersFrame = frame + orderLatency;
} }
public void Receive(OrderManager orderManager) void IConnection.Receive(OrderManager orderManager)
{ {
while (sync.Count != 0) while (sync.Count != 0)
{ orderManager.ReceiveSync(sync.Dequeue());
var (syncFrame, syncHash, defeatState) = sync.Dequeue();
orderManager.ReceiveSync(syncFrame, syncHash, defeatState);
}
while (chunks.Count != 0 && chunks.Peek().Frame <= ordersFrame) while (chunks.Count != 0 && chunks.Peek().Frame <= ordersFrame)
{ {
@@ -152,19 +147,23 @@ namespace OpenRA.Network
{ {
if (OrderIO.TryParseDisconnect(o.Packet, out var disconnectClient)) if (OrderIO.TryParseDisconnect(o.Packet, out var disconnectClient))
orderManager.ReceiveDisconnect(disconnectClient); orderManager.ReceiveDisconnect(disconnectClient);
else if (OrderIO.TryParseSync(o.Packet, out var syncFrame, out var syncHash, out var defeatState)) else if (OrderIO.TryParseSync(o.Packet, out var sync))
orderManager.ReceiveSync(syncFrame, syncHash, defeatState); orderManager.ReceiveSync(sync);
else if (OrderIO.TryParseOrderPacket(o.Packet, out var frame, out var orders)) else if (OrderIO.TryParseOrderPacket(o.Packet, out var orders))
{ {
if (frame == 0) if (orders.Frame == 0)
orderManager.ReceiveImmediateOrders(o.ClientId, orders); orderManager.ReceiveImmediateOrders(o.ClientId, orders.Orders);
else else
orderManager.ReceiveOrders(o.ClientId, frame, orders); orderManager.ReceiveOrders(o.ClientId, orders);
} }
else
throw new InvalidDataException($"Received unknown packet from client {o.ClientId} with length {o.Packet.Length}");
} }
} }
} }
public void Dispose() { } int IConnection.LocalClientId => -1;
void IDisposable.Dispose() { }
} }
} }

View File

@@ -29,10 +29,10 @@ namespace OpenRA.Network
static bool IsGameStart(byte[] data) static bool IsGameStart(byte[] data)
{ {
if (!OrderIO.TryParseOrderPacket(data, out var frame, out var orders)) if (!OrderIO.TryParseOrderPacket(data, out var orders))
return false; return false;
return frame == 0 && orders.GetOrders(null).Any(o => o.OrderString == "StartGame"); return orders.Frame == 0 && orders.Orders.GetOrders(null).Any(o => o.OrderString == "StartGame");
} }
public ReplayRecorder(Func<string> chooseFilename) public ReplayRecorder(Func<string> chooseFilename)

View File

@@ -289,10 +289,8 @@ namespace OpenRA
gameInfo.DisabledSpawnPoints = OrderManager.LobbyInfo.DisabledSpawnPoints; gameInfo.DisabledSpawnPoints = OrderManager.LobbyInfo.DisabledSpawnPoints;
var rc = (OrderManager.Connection as EchoConnection)?.Recorder; if (OrderManager.Connection is NetworkConnection nc && nc.Recorder != null)
nc.Recorder.Metadata = new ReplayMetadata(gameInfo);
if (rc != null)
rc.Metadata = new ReplayMetadata(gameInfo);
} }
public void SetWorldOwner(Player p) public void SetWorldOwner(Player p)