Remove packet byte wrangling from OrderManager.

This commit is contained in:
Paul Chote
2021-08-26 20:01:16 +01:00
committed by teinarss
parent 52b597d5d2
commit e389c00a11
6 changed files with 229 additions and 162 deletions

View File

@@ -31,10 +31,10 @@ namespace OpenRA.Network
public interface IConnection : IDisposable
{
int LocalClientId { get; }
void Send(int frame, List<byte[]> orders);
void SendImmediate(IEnumerable<byte[]> orders);
void SendSync(int frame, byte[] syncData);
void Receive(Action<int, byte[]> packetFn);
void Send(int frame, IEnumerable<Order> orders);
void SendImmediate(IEnumerable<Order> orders);
void SendSync(int frame, int syncHash, ulong defeatState);
void Receive(OrderManager orderManager);
}
public class EchoConnection : IConnection
@@ -50,32 +50,29 @@ namespace OpenRA.Network
public virtual int LocalClientId => 1;
public virtual void Send(int frame, List<byte[]> orders)
public virtual void Send(int frame, IEnumerable<Order> orders)
{
var ms = new MemoryStream();
ms.WriteArray(BitConverter.GetBytes(frame));
foreach (var o in orders)
ms.WriteArray(o);
ms.WriteArray(o.Serialize());
Send(ms.ToArray());
}
public virtual void SendImmediate(IEnumerable<byte[]> orders)
public virtual void SendImmediate(IEnumerable<Order> orders)
{
foreach (var o in orders)
{
var ms = new MemoryStream();
ms.WriteArray(BitConverter.GetBytes(0));
ms.WriteArray(o);
ms.WriteArray(o.Serialize());
Send(ms.ToArray());
}
}
public virtual void SendSync(int frame, byte[] syncData)
public virtual void SendSync(int frame, int syncHash, ulong defeatState)
{
var ms = new MemoryStream(4 + syncData.Length);
ms.WriteArray(BitConverter.GetBytes(frame));
ms.WriteArray(syncData);
Send(ms.GetBuffer());
Send(OrderIO.SerializeSync(frame, syncHash, defeatState));
}
protected virtual void Send(byte[] packet)
@@ -90,11 +87,22 @@ namespace OpenRA.Network
receivedPackets.Enqueue(packet);
}
public virtual void Receive(Action<int, byte[]> packetFn)
public virtual void Receive(OrderManager orderManager)
{
while (receivedPackets.TryDequeue(out var p))
{
packetFn(p.FromClient, p.Data);
if (OrderIO.TryParseDisconnect(p.Data, out var disconnectClient))
orderManager.ReceiveDisconnect(disconnectClient);
else if (OrderIO.TryParseSync(p.Data, out var syncFrame, out var syncHash, out var defeatState))
orderManager.ReceiveSync(syncFrame, syncHash, defeatState);
else if (OrderIO.TryParseOrderPacket(p.Data, out var ordersFrame, out var orders))
{
if (ordersFrame == 0)
orderManager.ReceiveImmediateOrders(p.FromClient, orders);
else
orderManager.ReceiveOrders(p.FromClient, ordersFrame, orders);
}
Recorder?.Receive(p.FromClient, p.Data);
}
}
@@ -247,12 +255,9 @@ namespace OpenRA.Network
public override int LocalClientId => clientId;
public ConnectionState ConnectionState => connectionState;
public override void SendSync(int frame, byte[] syncData)
public override void SendSync(int frame, int syncHash, ulong defeatState)
{
var ms = new MemoryStream(4 + syncData.Length);
ms.WriteArray(BitConverter.GetBytes(frame));
ms.WriteArray(syncData);
queuedSyncPackets.Add(ms.GetBuffer());
queuedSyncPackets.Add(OrderIO.SerializeSync(frame, syncHash, defeatState));
}
protected override void Send(byte[] packet)

View File

@@ -372,11 +372,16 @@ namespace OpenRA
case TargetType.Terrain:
if (fields.HasField(OrderFields.TargetIsCell))
{
w.Write(Target.SerializableCell.Value);
w.Write(Target.SerializableCell.Value.Bits);
w.Write((byte)Target.SerializableSubCell);
}
else
w.Write(Target.SerializablePos);
{
w.Write(Target.SerializablePos.X);
w.Write(Target.SerializablePos.Y);
w.Write(Target.SerializablePos.Z);
}
break;
}
}
@@ -392,7 +397,7 @@ namespace OpenRA
}
if (fields.HasField(OrderFields.ExtraLocation))
w.Write(ExtraLocation);
w.Write(ExtraLocation.Bits);
if (fields.HasField(OrderFields.ExtraData))
w.Write(ExtraData);

View File

@@ -9,70 +9,126 @@
*/
#endregion
using System;
using System.Collections.Generic;
using System.IO;
namespace OpenRA.Network
{
public static class OrderIO
public class OrderPacket
{
static readonly List<Order> EmptyOrderList = new List<Order>(0);
public static List<Order> ToOrderList(this byte[] bytes, World world)
readonly Order[] orders;
readonly MemoryStream data;
public OrderPacket(Order[] orders)
{
// PERF: Skip empty order frames, often per client each frame
if (bytes.Length == 4)
return EmptyOrderList;
this.orders = orders;
data = null;
}
var ms = new MemoryStream(bytes, 4, bytes.Length - 4);
var reader = new BinaryReader(ms);
var ret = new List<Order>();
while (ms.Position < ms.Length)
public OrderPacket(MemoryStream data)
{
orders = null;
this.data = data;
}
public IEnumerable<Order> GetOrders(World world)
{
return orders ?? ParseData(world);
}
IEnumerable<Order> ParseData(World world)
{
if (data == null)
yield break;
// Order deserialization depends on the current world state,
// so must be deferred until we are ready to consume them.
var reader = new BinaryReader(data);
while (data.Position < data.Length)
{
var o = Order.Deserialize(world, reader);
if (o != null)
ret.Add(o);
yield return o;
}
}
return ret;
}
public static byte[] SerializeSync(int sync, ulong defeatState)
public byte[] Serialize(int frame)
{
var ms = new MemoryStream(Order.SyncHashOrderLength);
using (var writer = new BinaryWriter(ms))
{
writer.Write((byte)OrderType.SyncHash);
writer.Write(sync);
writer.Write(defeatState);
if (data != null)
return data.ToArray();
var ms = new MemoryStream();
ms.WriteArray(BitConverter.GetBytes(frame));
foreach (var o in orders)
ms.WriteArray(o.Serialize());
return ms.ToArray();
}
}
public static class OrderIO
{
static readonly OrderPacket NoOrders = new OrderPacket(Array.Empty<Order>());
public static byte[] SerializeSync(int frame, int syncHash, ulong defeatState)
{
var ms = new MemoryStream(4 + Order.SyncHashOrderLength);
ms.WriteArray(BitConverter.GetBytes(frame));
ms.WriteByte((byte)OrderType.SyncHash);
ms.WriteArray(BitConverter.GetBytes(syncHash));
ms.WriteArray(BitConverter.GetBytes(defeatState));
return ms.GetBuffer();
}
public static int2 ReadInt2(this BinaryReader r)
public static bool TryParseDisconnect(byte[] packet, out int clientId)
{
var x = r.ReadInt32();
var y = r.ReadInt32();
return new int2(x, y);
if (packet.Length == Order.DisconnectOrderLength + 4 && packet[4] == (byte)OrderType.Disconnect)
{
clientId = BitConverter.ToInt32(packet, 5);
return true;
}
public static void Write(this BinaryWriter w, int2 p)
{
w.Write(p.X);
w.Write(p.Y);
clientId = 0;
return false;
}
public static void Write(this BinaryWriter w, CPos cell)
public static bool TryParseSync(byte[] packet, out int frame, out int syncHash, out ulong defeatState)
{
w.Write(cell.Bits);
if (packet.Length != 4 + Order.SyncHashOrderLength || packet[4] != (byte)OrderType.SyncHash)
{
frame = syncHash = 0;
defeatState = 0;
return false;
}
public static void Write(this BinaryWriter w, WPos pos)
frame = BitConverter.ToInt32(packet, 0);
syncHash = BitConverter.ToInt32(packet, 5);
defeatState = BitConverter.ToUInt64(packet, 9);
return true;
}
public static bool TryParseOrderPacket(byte[] packet, out int frame, out OrderPacket orders)
{
w.Write(pos.X);
w.Write(pos.Y);
w.Write(pos.Z);
// Not a valid packet
if (packet.Length < 4)
{
frame = 0;
orders = null;
return false;
}
// Wrong packet type
if (packet.Length >= 5 && (packet[4] == (byte)OrderType.Disconnect || packet[4] == (byte)OrderType.SyncHash))
{
frame = 0;
orders = null;
return false;
}
frame = BitConverter.ToInt32(packet, 0);
// PERF: Skip empty order frames, often per client each frame
orders = packet.Length > 4 ? new OrderPacket(new MemoryStream(packet, 4, packet.Length - 4)) : NoOrders;
return true;
}
}
}

View File

@@ -13,7 +13,6 @@ using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using OpenRA.Primitives;
using OpenRA.Support;
using OpenRA.Widgets;
@@ -22,8 +21,8 @@ namespace OpenRA.Network
public sealed class OrderManager : IDisposable
{
readonly SyncReport syncReport;
readonly Dictionary<int, Queue<byte[]>> pendingPackets = new Dictionary<int, Queue<byte[]>>();
readonly Dictionary<int, Queue<(int Frame, OrderPacket Orders)>> pendingOrders = new Dictionary<int, Queue<(int, OrderPacket)>>();
readonly Dictionary<int, (int SyncHash, ulong DefeatState)> syncForFrame = new Dictionary<int, (int, ulong)>();
public Session LobbyInfo = new Session();
public Session.Client LocalClient => LobbyInfo.ClientWithIndex(Connection.LocalClientId);
@@ -78,7 +77,7 @@ namespace OpenRA.Network
foreach (var client in LobbyInfo.Clients)
if (!client.IsBot)
pendingPackets.Add(client.Index, new Queue<byte[]>());
pendingOrders.Add(client.Index, new Queue<(int, OrderPacket)>());
// Generating sync reports is expensive, so only do it if we have
// other players to compare against if a desync did occur
@@ -90,7 +89,7 @@ namespace OpenRA.Network
if (GameSaveLastFrame < 0)
for (var i = NetFrameNumber; i <= FramesAhead; i++)
Connection.Send(i, new List<byte[]>());
Connection.Send(i, Array.Empty<Order>());
}
public OrderManager(IConnection conn)
@@ -125,37 +124,41 @@ namespace OpenRA.Network
void SendImmediateOrders()
{
if (localImmediateOrders.Count != 0 && GameSaveLastFrame < NetFrameNumber + FramesAhead)
Connection.SendImmediate(localImmediateOrders.Select(o => o.Serialize()));
Connection.SendImmediate(localImmediateOrders);
localImmediateOrders.Clear();
}
void ReceiveAllOrdersAndCheckSync()
{
Connection.Receive(
(clientId, packet) =>
public void ReceiveDisconnect(int clientIndex)
{
// HACK: The shellmap relies on ticking a disposed OM
if (disposed && World.Type != WorldType.Shellmap)
return;
var frame = BitConverter.ToInt32(packet, 0);
if (packet.Length == Order.DisconnectOrderLength + 4 && packet[4] == (byte)OrderType.Disconnect)
{
pendingPackets.Remove(BitConverter.ToInt32(packet, 5));
}
else if (packet.Length > 4 && packet[4] == (byte)OrderType.SyncHash)
{
if (packet.Length != 4 + Order.SyncHashOrderLength)
{
Log.Write("debug", $"Dropped sync order with length {packet.Length}. Expected length {4 + Order.SyncHashOrderLength}.");
return;
pendingOrders.Remove(clientIndex);
}
CheckSync(packet);
}
else if (frame == 0)
public void ReceiveSync(int frame, int syncHash, ulong defeatState)
{
foreach (var o in packet.ToOrderList(World))
// HACK: The shellmap relies on ticking a disposed OM
if (disposed && World.Type != WorldType.Shellmap)
return;
if (syncForFrame.TryGetValue(frame, out var s))
{
if (s.SyncHash != syncHash || s.DefeatState != defeatState)
OutOfSync(frame);
}
else
syncForFrame.Add(frame, (syncHash, defeatState));
}
public void ReceiveImmediateOrders(int clientId, OrderPacket orders)
{
// HACK: The shellmap relies on ticking a disposed OM
if (disposed && World.Type != WorldType.Shellmap)
return;
foreach (var o in orders.GetOrders(World))
{
UnitOrders.ProcessOrder(this, World, clientId, o);
@@ -164,35 +167,25 @@ namespace OpenRA.Network
return;
}
}
else
public void ReceiveOrders(int clientId, int frame, OrderPacket orders)
{
if (pendingPackets.TryGetValue(clientId, out var queue))
queue.Enqueue(packet);
// HACK: The shellmap relies on ticking a disposed OM
if (disposed && World.Type != WorldType.Shellmap)
return;
if (pendingOrders.TryGetValue(clientId, out var queue))
queue.Enqueue((frame, orders));
else
Log.Write("debug", $"Received packet from disconnected client '{clientId}'");
}
});
}
Dictionary<int, byte[]> syncForFrame = new Dictionary<int, byte[]>();
void CheckSync(byte[] packet)
void ReceiveAllOrdersAndCheckSync()
{
var frame = BitConverter.ToInt32(packet, 0);
if (syncForFrame.TryGetValue(frame, out var existingSync))
{
if (packet.Length != existingSync.Length)
OutOfSync(frame);
else
for (var i = 0; i < packet.Length; i++)
if (packet[i] != existingSync[i])
OutOfSync(frame);
}
else
syncForFrame.Add(frame, packet);
Connection.Receive(this);
}
bool IsReadyForNextFrame => GameStarted && pendingPackets.All(p => p.Value.Count > 0);
bool IsReadyForNextFrame => GameStarted && pendingOrders.All(p => p.Value.Count > 0);
int SuggestedTimestep
{
@@ -218,7 +211,7 @@ namespace OpenRA.Network
if (GameSaveLastFrame < NetFrameNumber + FramesAhead)
{
Connection.Send(NetFrameNumber + FramesAhead, localOrders.Select(o => o.Serialize()).ToList());
Connection.Send(NetFrameNumber + FramesAhead, localOrders);
localOrders.Clear();
}
}
@@ -227,20 +220,19 @@ namespace OpenRA.Network
{
var clientOrders = new List<ClientOrder>();
foreach (var (clientId, clientPackets) in pendingPackets)
foreach (var (clientId, frameOrders) in pendingOrders)
{
// The IsReadyForNextFrame check above guarantees that all clients have sent a packet
var frameData = clientPackets.Dequeue();
var (frameNumber, orders) = frameOrders.Dequeue();
// Orders are synchronised by sending an initial FramesAhead set of empty packets
// and then making sure that we enqueue and process exactly one packet for each player each tick.
// This may change in the future, so sanity check that the orders are for the frame we expect
// and crash early instead of risking desyncs.
var frameNumber = BitConverter.ToInt32(frameData, 0);
if (frameNumber != NetFrameNumber)
throw new InvalidDataException($"Attempted to process orders from client {clientId} for frame {frameNumber} on frame {NetFrameNumber}");
foreach (var order in frameData.ToOrderList(World))
foreach (var order in orders.GetOrders(World))
{
UnitOrders.ProcessOrder(this, World, clientId, order);
clientOrders.Add(new ClientOrder { Client = clientId, Order = order });
@@ -254,10 +246,10 @@ namespace OpenRA.Network
if (World.Players[i].WinState == WinState.Lost)
defeatState |= 1UL << i;
Connection.SendSync(NetFrameNumber, OrderIO.SerializeSync(World.SyncHash(), defeatState));
Connection.SendSync(NetFrameNumber, World.SyncHash(), defeatState);
}
else
Connection.SendSync(NetFrameNumber, OrderIO.SerializeSync(0, 0));
Connection.SendSync(NetFrameNumber, 0, 0);
if (generateSyncReport)
using (new PerfSample("sync_report"))
@@ -287,7 +279,7 @@ namespace OpenRA.Network
{
// Check whether or not we will be ready for a tick next frame
// We don't need to include ourselves in the equation because we can always generate orders this frame
shouldTick = pendingPackets.All(p => p.Key == Connection.LocalClientId || p.Value.Count > 0);
shouldTick = pendingOrders.All(p => p.Key == Connection.LocalClientId || p.Value.Count > 0);
// Send orders only if we are currently ready, this prevents us sending orders too soon if we are
// stalling

View File

@@ -25,20 +25,14 @@ namespace OpenRA.Network
public (int ClientId, byte[] Packet)[] Packets;
}
Queue<Chunk> chunks = new Queue<Chunk>();
Queue<byte[]> sync = new Queue<byte[]>();
readonly Queue<Chunk> chunks = new Queue<Chunk>();
readonly Queue<(int Frame, int SyncHash, ulong DefeatState)> sync = new Queue<(int, int, ulong)>();
readonly Dictionary<int, int> lastClientsFrame = new Dictionary<int, int>();
readonly int orderLatency;
int ordersFrame;
Dictionary<int, int> lastClientsFrame = new Dictionary<int, int>();
public int LocalClientId => -1;
public IPEndPoint EndPoint => throw new NotSupportedException("A replay connection doesn't have an endpoint");
public string ErrorMessage => null;
public readonly int TickCount;
public readonly int FinalGameTick;
public readonly bool IsValid;
@@ -76,8 +70,9 @@ namespace OpenRA.Network
if (frame == 0)
{
// Parse replay metadata from orders stream
var orders = packet.ToOrderList(null);
foreach (var o in orders)
if (OrderIO.TryParseOrderPacket(packet, out _, out var orders))
{
foreach (var o in orders.GetOrders(null))
{
if (o.OrderString == "StartGame")
IsValid = true;
@@ -85,6 +80,7 @@ namespace OpenRA.Network
LobbyInfo = Session.Deserialize(o.TargetString);
}
}
}
else
{
// Regular order - finalize the chunk
@@ -131,28 +127,42 @@ namespace OpenRA.Network
}
// Do nothing: ignore locally generated orders
public void Send(int frame, List<byte[]> orders) { }
public void SendImmediate(IEnumerable<byte[]> orders) { }
public void Send(int frame, IEnumerable<Order> orders) { }
public void SendImmediate(IEnumerable<Order> orders) { }
public void SendSync(int frame, byte[] syncData)
public void SendSync(int frame, int syncHash, ulong defeatState)
{
var ms = new MemoryStream(4 + syncData.Length);
ms.WriteArray(BitConverter.GetBytes(frame));
ms.WriteArray(syncData);
sync.Enqueue(ms.GetBuffer());
sync.Enqueue((frame, syncHash, defeatState));
// Store the current frame so Receive() can return the next chunk of orders.
ordersFrame = frame + orderLatency;
}
public void Receive(Action<int, byte[]> packetFn)
public void Receive(OrderManager orderManager)
{
while (sync.Count != 0)
packetFn(LocalClientId, sync.Dequeue());
{
var (syncFrame, syncHash, defeatState) = sync.Dequeue();
orderManager.ReceiveSync(syncFrame, syncHash, defeatState);
}
while (chunks.Count != 0 && chunks.Peek().Frame <= ordersFrame)
{
foreach (var o in chunks.Dequeue().Packets)
packetFn(o.ClientId, o.Packet);
{
if (OrderIO.TryParseDisconnect(o.Packet, out var disconnectClient))
orderManager.ReceiveDisconnect(disconnectClient);
else if (OrderIO.TryParseSync(o.Packet, out var syncFrame, out var syncHash, out var defeatState))
orderManager.ReceiveSync(syncFrame, syncHash, defeatState);
else if (OrderIO.TryParseOrderPacket(o.Packet, out var frame, out var orders))
{
if (frame == 0)
orderManager.ReceiveImmediateOrders(o.ClientId, orders);
else
orderManager.ReceiveOrders(o.ClientId, frame, orders);
}
}
}
}
public void Dispose() { }

View File

@@ -29,11 +29,10 @@ namespace OpenRA.Network
static bool IsGameStart(byte[] data)
{
if (data.Length > 4 && (data[4] == (byte)OrderType.Disconnect || data[4] == (byte)OrderType.SyncHash))
if (!OrderIO.TryParseOrderPacket(data, out var frame, out var orders))
return false;
var frame = BitConverter.ToInt32(data, 0);
return frame == 0 && data.ToOrderList(null).Any(o => o.OrderString == "StartGame");
return frame == 0 && orders.GetOrders(null).Any(o => o.OrderString == "StartGame");
}
public ReplayRecorder(Func<string> chooseFilename)