diff --git a/OpenRA.Game/Network/Connection.cs b/OpenRA.Game/Network/Connection.cs index e1159a6870..3c78002069 100644 --- a/OpenRA.Game/Network/Connection.cs +++ b/OpenRA.Game/Network/Connection.cs @@ -13,6 +13,7 @@ using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.IO; +using System.Linq; using System.Net; using System.Net.Sockets; using System.Threading; @@ -37,101 +38,63 @@ namespace OpenRA.Network void Receive(OrderManager orderManager); } - public class EchoConnection : IConnection + public sealed class EchoConnection : IConnection { - protected struct ReceivedPacket + const int LocalClientId = 1; + readonly Queue<(int Frame, int SyncHash, ulong DefeatState)> sync = new Queue<(int, int, ulong)>(); + readonly Queue<(int Frame, OrderPacket Orders)> orders = new Queue<(int, OrderPacket)>(); + readonly Queue immediateOrders = new Queue(); + + int IConnection.LocalClientId => LocalClientId; + + void IConnection.Send(int frame, IEnumerable o) { - public int FromClient; - public byte[] Data; + orders.Enqueue((frame, new OrderPacket(o.ToArray()))); } - readonly ConcurrentQueue receivedPackets = new ConcurrentQueue(); - public ReplayRecorder Recorder { get; private set; } - - public virtual int LocalClientId => 1; - - public virtual void Send(int frame, IEnumerable orders) + void IConnection.SendImmediate(IEnumerable o) { - Send(OrderIO.SerializeOrders(frame, orders)); + immediateOrders.Enqueue(new OrderPacket(o.ToArray())); } - public virtual void SendImmediate(IEnumerable orders) + void IConnection.SendSync(int frame, int syncHash, ulong defeatState) { - Send(OrderIO.SerializeOrders(0, orders)); + sync.Enqueue((frame, syncHash, defeatState)); } - public virtual void SendSync(int frame, int syncHash, ulong defeatState) + void IConnection.Receive(OrderManager orderManager) { - Send(OrderIO.SerializeSync(frame, syncHash, defeatState)); + while (immediateOrders.TryDequeue(out var i)) + orderManager.ReceiveImmediateOrders(LocalClientId, i); + + while (orders.TryDequeue(out var o)) + orderManager.ReceiveOrders(LocalClientId, o); + + while (sync.TryDequeue(out var s)) + orderManager.ReceiveSync(s); } - protected virtual void Send(byte[] packet) - { - if (packet.Length == 0) - throw new NotImplementedException(); - AddPacket(new ReceivedPacket { FromClient = LocalClientId, Data = packet }); - } - - protected void AddPacket(ReceivedPacket packet) - { - receivedPackets.Enqueue(packet); - } - - public virtual void Receive(OrderManager orderManager) - { - while (receivedPackets.TryDequeue(out var p)) - { - if (OrderIO.TryParseDisconnect(p.Data, out var disconnectClient)) - orderManager.ReceiveDisconnect(disconnectClient); - else if (OrderIO.TryParseSync(p.Data, out var syncFrame, out var syncHash, out var defeatState)) - orderManager.ReceiveSync(syncFrame, syncHash, defeatState); - else if (OrderIO.TryParseOrderPacket(p.Data, out var ordersFrame, out var orders)) - { - if (ordersFrame == 0) - orderManager.ReceiveImmediateOrders(p.FromClient, orders); - else - orderManager.ReceiveOrders(p.FromClient, ordersFrame, orders); - } - - Recorder?.Receive(p.FromClient, p.Data); - } - } - - public void StartRecording(Func chooseFilename) - { - // If we have a previous recording then save/dispose it and start a new one. - Recorder?.Dispose(); - Recorder = new ReplayRecorder(chooseFilename); - } - - protected virtual void Dispose(bool disposing) - { - if (disposing) - Recorder?.Dispose(); - } - - public void Dispose() - { - Dispose(true); - GC.SuppressFinalize(this); - } + void IDisposable.Dispose() { } } - public sealed class NetworkConnection : EchoConnection + public sealed class NetworkConnection : IConnection { public readonly ConnectionTarget Target; + internal ReplayRecorder Recorder { get; private set; } + + readonly List queuedSyncPackets = new List(); + readonly Queue<(int Frame, int SyncHash, ulong DefeatState)> sentSync = new Queue<(int, int, ulong)>(); + readonly Queue<(int Frame, OrderPacket Orders)> sentOrders = new Queue<(int, OrderPacket)>(); + readonly Queue sentImmediateOrders = new Queue(); + readonly ConcurrentQueue<(int FromClient, byte[] Data)> receivedPackets = new ConcurrentQueue<(int, byte[])>(); TcpClient tcp; IPEndPoint endpoint; - readonly List queuedSyncPackets = new List(); + volatile ConnectionState connectionState = ConnectionState.Connecting; volatile int clientId; bool disposed; string errorMessage; - public IPEndPoint EndPoint => endpoint; - - public string ErrorMessage => errorMessage; - public NetworkConnection(ConnectionTarget target) { Target = target; @@ -228,7 +191,7 @@ namespace OpenRA.Network var buf = stream.ReadBytes(len); if (len == 0) throw new NotImplementedException(); - AddPacket(new ReceivedPacket { FromClient = client, Data = buf }); + receivedPackets.Enqueue((client, buf)); } } catch (Exception ex) @@ -242,18 +205,35 @@ namespace OpenRA.Network } } - public override int LocalClientId => clientId; - public ConnectionState ConnectionState => connectionState; + int IConnection.LocalClientId => clientId; - public override void SendSync(int frame, int syncHash, ulong defeatState) + void IConnection.Send(int frame, IEnumerable orders) { - queuedSyncPackets.Add(OrderIO.SerializeSync(frame, syncHash, defeatState)); + var o = new OrderPacket(orders.ToArray()); + sentOrders.Enqueue((frame, o)); + Send(o.Serialize(frame)); } - protected override void Send(byte[] packet) + void IConnection.SendImmediate(IEnumerable orders) { - base.Send(packet); + var o = new OrderPacket(orders.ToArray()); + sentImmediateOrders.Enqueue(o); + Send(o.Serialize(0)); + } + void IConnection.SendSync(int frame, int syncHash, ulong defeatState) + { + var sync = (frame, syncHash, defeatState); + sentSync.Enqueue(sync); + + // Send sync packets together with the next set of orders. + // This was originally explained as reducing network bandwidth + // (TCP overhead?), but the original discussions have been lost to time. + queuedSyncPackets.Add(OrderIO.SerializeSync(sync)); + } + + void Send(byte[] packet) + { try { var ms = new MemoryStream(); @@ -264,7 +244,6 @@ namespace OpenRA.Network { ms.WriteArray(BitConverter.GetBytes(q.Length)); ms.WriteArray(q); - base.Send(q); } queuedSyncPackets.Clear(); @@ -276,17 +255,80 @@ namespace OpenRA.Network catch (IOException) { /* ditto */ } } - protected override void Dispose(bool disposing) + void IConnection.Receive(OrderManager orderManager) + { + // Locally generated orders + while (sentImmediateOrders.TryDequeue(out var i)) + { + orderManager.ReceiveImmediateOrders(clientId, i); + Recorder?.Receive(clientId, i.Serialize(0)); + } + + while (sentSync.TryDequeue(out var s)) + { + orderManager.ReceiveSync(s); + Recorder?.Receive(clientId, OrderIO.SerializeSync(s)); + } + + while (sentOrders.TryDequeue(out var o)) + { + orderManager.ReceiveOrders(clientId, o); + Recorder?.Receive(clientId, o.Orders.Serialize(o.Frame)); + } + + // Orders from other players + while (receivedPackets.TryDequeue(out var p)) + { + if (OrderIO.TryParseDisconnect(p.Data, out var disconnectClient)) + orderManager.ReceiveDisconnect(disconnectClient); + else if (OrderIO.TryParseSync(p.Data, out var sync)) + orderManager.ReceiveSync(sync); + else if (OrderIO.TryParseOrderPacket(p.Data, out var orders)) + { + if (orders.Frame == 0) + orderManager.ReceiveImmediateOrders(p.FromClient, orders.Orders); + else + orderManager.ReceiveOrders(p.FromClient, orders); + } + else + throw new InvalidDataException($"Received unknown packet from client {p.FromClient} with length {p.Data.Length}"); + + Recorder?.Receive(p.FromClient, p.Data); + } + } + + public void StartRecording(Func chooseFilename) + { + // If we have a previous recording then save/dispose it and start a new one. + Recorder?.Dispose(); + Recorder = new ReplayRecorder(chooseFilename); + } + + public ConnectionState ConnectionState => connectionState; + + public IPEndPoint EndPoint => endpoint; + + public string ErrorMessage => errorMessage; + + void Dispose(bool disposing) { if (disposed) return; + disposed = true; // Closing the stream will cause any reads on the receiving thread to throw. // This will mark the connection as no longer connected and the thread will terminate cleanly. tcp?.Close(); - base.Dispose(disposing); + if (disposing) + Recorder?.Dispose(); + } + + void IDisposable.Dispose() + { + Dispose(true); + GC.SuppressFinalize(this); } } } diff --git a/OpenRA.Game/Network/OrderIO.cs b/OpenRA.Game/Network/OrderIO.cs index d8ca90e162..df9581f0e2 100644 --- a/OpenRA.Game/Network/OrderIO.cs +++ b/OpenRA.Game/Network/OrderIO.cs @@ -69,25 +69,16 @@ namespace OpenRA.Network { static readonly OrderPacket NoOrders = new OrderPacket(Array.Empty()); - public static byte[] SerializeSync(int frame, int syncHash, ulong defeatState) + public static byte[] SerializeSync((int Frame, int SyncHash, ulong DefeatState) data) { var ms = new MemoryStream(4 + Order.SyncHashOrderLength); - ms.WriteArray(BitConverter.GetBytes(frame)); + ms.WriteArray(BitConverter.GetBytes(data.Frame)); ms.WriteByte((byte)OrderType.SyncHash); - ms.WriteArray(BitConverter.GetBytes(syncHash)); - ms.WriteArray(BitConverter.GetBytes(defeatState)); + ms.WriteArray(BitConverter.GetBytes(data.SyncHash)); + ms.WriteArray(BitConverter.GetBytes(data.DefeatState)); return ms.GetBuffer(); } - public static byte[] SerializeOrders(int frame, IEnumerable orders) - { - var ms = new MemoryStream(); - ms.WriteArray(BitConverter.GetBytes(frame)); - foreach (var o in orders) - ms.WriteArray(o.Serialize()); - return ms.ToArray(); - } - public static bool TryParseDisconnect(byte[] packet, out int clientId) { if (packet.Length == Order.DisconnectOrderLength + 4 && packet[4] == (byte)OrderType.Disconnect) @@ -100,43 +91,42 @@ namespace OpenRA.Network return false; } - public static bool TryParseSync(byte[] packet, out int frame, out int syncHash, out ulong defeatState) + public static bool TryParseSync(byte[] packet, out (int Frame, int SyncHash, ulong DefeatState) data) { if (packet.Length != 4 + Order.SyncHashOrderLength || packet[4] != (byte)OrderType.SyncHash) { - frame = syncHash = 0; - defeatState = 0; + data = (0, 0, 0); return false; } - frame = BitConverter.ToInt32(packet, 0); - syncHash = BitConverter.ToInt32(packet, 5); - defeatState = BitConverter.ToUInt64(packet, 9); + var frame = BitConverter.ToInt32(packet, 0); + var syncHash = BitConverter.ToInt32(packet, 5); + var defeatState = BitConverter.ToUInt64(packet, 9); + data = (frame, syncHash, defeatState); return true; } - public static bool TryParseOrderPacket(byte[] packet, out int frame, out OrderPacket orders) + public static bool TryParseOrderPacket(byte[] packet, out (int Frame, OrderPacket Orders) data) { // Not a valid packet if (packet.Length < 4) { - frame = 0; - orders = null; + data = (0, null); return false; } // Wrong packet type if (packet.Length >= 5 && (packet[4] == (byte)OrderType.Disconnect || packet[4] == (byte)OrderType.SyncHash)) { - frame = 0; - orders = null; + data = (0, null); return false; } - frame = BitConverter.ToInt32(packet, 0); + var frame = BitConverter.ToInt32(packet, 0); // PERF: Skip empty order frames, often per client each frame - orders = packet.Length > 4 ? new OrderPacket(new MemoryStream(packet, 4, packet.Length - 4)) : NoOrders; + var orders = packet.Length > 4 ? new OrderPacket(new MemoryStream(packet, 4, packet.Length - 4)) : NoOrders; + data = (frame, orders); return true; } } diff --git a/OpenRA.Game/Network/OrderManager.cs b/OpenRA.Game/Network/OrderManager.cs index 9825f7ea98..de8581dc8f 100644 --- a/OpenRA.Game/Network/OrderManager.cs +++ b/OpenRA.Game/Network/OrderManager.cs @@ -137,19 +137,19 @@ namespace OpenRA.Network pendingOrders.Remove(clientIndex); } - public void ReceiveSync(int frame, int syncHash, ulong defeatState) + public void ReceiveSync((int Frame, int SyncHash, ulong DefeatState) sync) { // HACK: The shellmap relies on ticking a disposed OM if (disposed && World.Type != WorldType.Shellmap) return; - if (syncForFrame.TryGetValue(frame, out var s)) + if (syncForFrame.TryGetValue(sync.Frame, out var s)) { - if (s.SyncHash != syncHash || s.DefeatState != defeatState) - OutOfSync(frame); + if (s.SyncHash != sync.SyncHash || s.DefeatState != sync.DefeatState) + OutOfSync(sync.Frame); } else - syncForFrame.Add(frame, (syncHash, defeatState)); + syncForFrame.Add(sync.Frame, (sync.SyncHash, sync.DefeatState)); } public void ReceiveImmediateOrders(int clientId, OrderPacket orders) @@ -168,16 +168,16 @@ namespace OpenRA.Network } } - public void ReceiveOrders(int clientId, int frame, OrderPacket orders) + public void ReceiveOrders(int clientId, (int Frame, OrderPacket Orders) orders) { // HACK: The shellmap relies on ticking a disposed OM if (disposed && World.Type != WorldType.Shellmap) return; if (pendingOrders.TryGetValue(clientId, out var queue)) - queue.Enqueue((frame, orders)); + queue.Enqueue((orders.Frame, orders.Orders)); else - Log.Write("debug", $"Received packet from disconnected client '{clientId}'"); + throw new InvalidDataException($"Received packet from disconnected client '{clientId}'"); } void ReceiveAllOrdersAndCheckSync() diff --git a/OpenRA.Game/Network/ReplayConnection.cs b/OpenRA.Game/Network/ReplayConnection.cs index 3c39ee7fa8..6dbbcf9093 100644 --- a/OpenRA.Game/Network/ReplayConnection.cs +++ b/OpenRA.Game/Network/ReplayConnection.cs @@ -31,8 +31,6 @@ namespace OpenRA.Network readonly int orderLatency; int ordersFrame; - public int LocalClientId => -1; - public readonly int TickCount; public readonly int FinalGameTick; public readonly bool IsValid; @@ -70,9 +68,9 @@ namespace OpenRA.Network if (frame == 0) { // Parse replay metadata from orders stream - if (OrderIO.TryParseOrderPacket(packet, out _, out var orders)) + if (OrderIO.TryParseOrderPacket(packet, out var orders)) { - foreach (var o in orders.GetOrders(null)) + foreach (var o in orders.Orders.GetOrders(null)) { if (o.OrderString == "StartGame") IsValid = true; @@ -127,10 +125,10 @@ namespace OpenRA.Network } // Do nothing: ignore locally generated orders - public void Send(int frame, IEnumerable orders) { } - public void SendImmediate(IEnumerable orders) { } + void IConnection.Send(int frame, IEnumerable orders) { } + void IConnection.SendImmediate(IEnumerable orders) { } - public void SendSync(int frame, int syncHash, ulong defeatState) + void IConnection.SendSync(int frame, int syncHash, ulong defeatState) { sync.Enqueue((frame, syncHash, defeatState)); @@ -138,13 +136,10 @@ namespace OpenRA.Network ordersFrame = frame + orderLatency; } - public void Receive(OrderManager orderManager) + void IConnection.Receive(OrderManager orderManager) { while (sync.Count != 0) - { - var (syncFrame, syncHash, defeatState) = sync.Dequeue(); - orderManager.ReceiveSync(syncFrame, syncHash, defeatState); - } + orderManager.ReceiveSync(sync.Dequeue()); while (chunks.Count != 0 && chunks.Peek().Frame <= ordersFrame) { @@ -152,19 +147,23 @@ namespace OpenRA.Network { if (OrderIO.TryParseDisconnect(o.Packet, out var disconnectClient)) orderManager.ReceiveDisconnect(disconnectClient); - else if (OrderIO.TryParseSync(o.Packet, out var syncFrame, out var syncHash, out var defeatState)) - orderManager.ReceiveSync(syncFrame, syncHash, defeatState); - else if (OrderIO.TryParseOrderPacket(o.Packet, out var frame, out var orders)) + else if (OrderIO.TryParseSync(o.Packet, out var sync)) + orderManager.ReceiveSync(sync); + else if (OrderIO.TryParseOrderPacket(o.Packet, out var orders)) { - if (frame == 0) - orderManager.ReceiveImmediateOrders(o.ClientId, orders); + if (orders.Frame == 0) + orderManager.ReceiveImmediateOrders(o.ClientId, orders.Orders); else - orderManager.ReceiveOrders(o.ClientId, frame, orders); + orderManager.ReceiveOrders(o.ClientId, orders); } + else + throw new InvalidDataException($"Received unknown packet from client {o.ClientId} with length {o.Packet.Length}"); } } } - public void Dispose() { } + int IConnection.LocalClientId => -1; + + void IDisposable.Dispose() { } } } diff --git a/OpenRA.Game/Network/ReplayRecorder.cs b/OpenRA.Game/Network/ReplayRecorder.cs index d9e2bb823b..1d8bd589d5 100644 --- a/OpenRA.Game/Network/ReplayRecorder.cs +++ b/OpenRA.Game/Network/ReplayRecorder.cs @@ -29,10 +29,10 @@ namespace OpenRA.Network static bool IsGameStart(byte[] data) { - if (!OrderIO.TryParseOrderPacket(data, out var frame, out var orders)) + if (!OrderIO.TryParseOrderPacket(data, out var orders)) return false; - return frame == 0 && orders.GetOrders(null).Any(o => o.OrderString == "StartGame"); + return orders.Frame == 0 && orders.Orders.GetOrders(null).Any(o => o.OrderString == "StartGame"); } public ReplayRecorder(Func chooseFilename) diff --git a/OpenRA.Game/World.cs b/OpenRA.Game/World.cs index 203796e666..04e65485fc 100644 --- a/OpenRA.Game/World.cs +++ b/OpenRA.Game/World.cs @@ -289,10 +289,8 @@ namespace OpenRA gameInfo.DisabledSpawnPoints = OrderManager.LobbyInfo.DisabledSpawnPoints; - var rc = (OrderManager.Connection as EchoConnection)?.Recorder; - - if (rc != null) - rc.Metadata = new ReplayMetadata(gameInfo); + if (OrderManager.Connection is NetworkConnection nc && nc.Recorder != null) + nc.Recorder.Metadata = new ReplayMetadata(gameInfo); } public void SetWorldOwner(Player p)