{1.4} Reduce lock contention in shared world relay

This commit is contained in:
lax1dude 2025-01-18 13:19:17 -08:00
parent 3288d05a9a
commit 713ab652f8
4 changed files with 107 additions and 88 deletions

View file

@ -17,7 +17,7 @@ package net.lax1dude.eaglercraft.v1_8.sp.relay.server;
*/
public class Constants {
public static final String versionName = "1.3";
public static final String versionName = "1.4";
public static final String versionBrand = "lax1dude";
public static final int protocolVersion = 1;

View file

@ -13,7 +13,10 @@ import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.java_websocket.WebSocket;
import org.java_websocket.handshake.ClientHandshake;
@ -23,7 +26,7 @@ import net.lax1dude.eaglercraft.v1_8.sp.relay.pkt.*;
import net.lax1dude.eaglercraft.v1_8.sp.relay.server.RateLimiter.RateLimit;
/**
* Copyright (c) 2022-2024 lax1dude. All Rights Reserved.
* Copyright (c) 2022-2025 lax1dude. All Rights Reserved.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
@ -87,27 +90,19 @@ public class EaglerSPRelay extends WebSocketServer {
long millis = Util.millis();
pendingToClose.clear();
clientToClose.clear();
synchronized(pendingConnections) {
Iterator<Entry<WebSocket,PendingConnection>> itr = pendingConnections.entrySet().iterator();
while(itr.hasNext()) {
Entry<WebSocket,PendingConnection> etr = itr.next();
pendingConnections.entrySet().forEach((etr) -> {
if(millis - etr.getValue().openTime > 1000l) {
pendingToClose.add(etr.getKey());
itr.remove();
}
}
}
synchronized(clientConnections) {
Iterator<EaglerSPClient> itr = clientConnections.values().iterator();
while(itr.hasNext()) {
EaglerSPClient cl = itr.next();
});
clientConnections.values().forEach((cl) -> {
if(millis - cl.createdOn > 15000l) {
clientToClose.add(cl);
}
}
}
});
if(!pendingToClose.isEmpty()) {
for(WebSocket cl : pendingToClose) {
pendingConnections.remove(cl);
cl.close();
}
pendingToClose.clear();
@ -173,12 +168,17 @@ public class EaglerSPRelay extends WebSocketServer {
}
private static final Map<WebSocket,PendingConnection> pendingConnections = new HashMap<>();
private static final Map<String,EaglerSPClient> clientIds = new HashMap<>();
private static final Map<WebSocket,EaglerSPClient> clientConnections = new HashMap<>();
private static final Map<String,EaglerSPServer> serverCodes = new HashMap<>();
private static final Map<WebSocket,EaglerSPServer> serverConnections = new HashMap<>();
private static final ConcurrentMap<WebSocket,PendingConnection> pendingConnections = new ConcurrentHashMap<>();
private static final ConcurrentMap<WebSocket,EaglerSPClient> clientConnections = new ConcurrentHashMap<>();
private static final ConcurrentMap<WebSocket,EaglerSPServer> serverConnections = new ConcurrentHashMap<>();
private static final ReadWriteLock clientAddressSetsLock = new ReentrantReadWriteLock();
private static final Map<String,List<EaglerSPClient>> clientAddressSets = new HashMap<>();
private static final ReadWriteLock serverAddressSetsLock = new ReentrantReadWriteLock();
private static final Map<String,List<EaglerSPServer>> serverAddressSets = new HashMap<>();
@Override
@ -202,23 +202,23 @@ public class EaglerSPRelay extends WebSocketServer {
addr = arg0.getRemoteSocketAddress().getAddress().getHostAddress().toLowerCase();
}
int totalCons = 0;
synchronized(pendingConnections) {
Iterator<PendingConnection> pendingItr = pendingConnections.values().iterator();
while(pendingItr.hasNext()) {
if(pendingItr.next().address.equals(addr)) {
++totalCons;
int[] totalCons = new int[1];
pendingConnections.values().forEach((con) -> {
if(con.address.equals(addr)) {
++totalCons[0];
}
}
}
synchronized(clientAddressSets) {
});
clientAddressSetsLock.readLock().lock();
try {
List<EaglerSPClient> lst = clientAddressSets.get(addr);
if(lst != null) {
totalCons += lst.size();
totalCons[0] += lst.size();
}
}finally {
clientAddressSetsLock.readLock().unlock();
}
if(totalCons >= config.getConnectionsPerIP()) {
if(totalCons[0] >= config.getConnectionsPerIP()) {
logger.debug("[{}]: Too many connections are open on this address", (Object) arg0.getAttachment());
arg0.send(RelayPacketFEDisconnectClient.ratelimitPacketTooMany);
arg0.close();
@ -229,18 +229,13 @@ public class EaglerSPRelay extends WebSocketServer {
PendingConnection waiting = new PendingConnection(millis, addr);
logger.debug("[{}]: Connection opened", arg0.getRemoteSocketAddress());
synchronized(pendingConnections) {
pendingConnections.put(arg0, waiting);
}
}
@Override
public void onMessage(WebSocket arg0, ByteBuffer arg1) {
DataInputStream sid = new DataInputStream(new ByteBufferInputStream(arg1));
PendingConnection waiting;
synchronized(pendingConnections) {
waiting = pendingConnections.remove(arg0);
}
PendingConnection waiting = pendingConnections.remove(arg0);
try {
RelayPacket pkt = RelayPacket.readPacket(sid, EaglerSPRelay.logger);
if(waiting != null) {
@ -265,13 +260,16 @@ public class EaglerSPRelay extends WebSocketServer {
return;
}
boolean fuck = false;
synchronized(serverAddressSets) {
serverAddressSetsLock.readLock().lock();
try {
List<EaglerSPServer> lst = serverAddressSets.get(waiting.address);
if(lst != null) {
if(lst.size() >= config.getWorldsPerIP()) {
fuck = true;
}
}
}finally {
serverAddressSetsLock.readLock().unlock();
}
if(fuck) {
logger.debug("[{}]: Too many worlds are open on this address", (Object) arg0.getAttachment());
@ -310,16 +308,18 @@ public class EaglerSPRelay extends WebSocketServer {
arg0.send(RelayPacket.writePacket(ipkt, EaglerSPRelay.logger));
logger.debug("[{}][Relay -> Server] PKT 0x00: Assign join code: {}", (Object) arg0.getAttachment(), code);
synchronized(serverConnections) {
serverConnections.put(arg0, srv);
}
synchronized(serverAddressSets) {
serverAddressSetsLock.writeLock().lock();
try {
List<EaglerSPServer> lst = serverAddressSets.get(srv.serverAddress);
if(lst == null) {
lst = new ArrayList<>();
serverAddressSets.put(srv.serverAddress, lst);
}
lst.add(srv);
}finally {
serverAddressSetsLock.writeLock().unlock();
}
srv.send(new RelayPacket01ICEServers(EaglerSPRelayConfigRelayList.relayServers));
logger.debug("[{}][Relay -> Server] PKT 0x01: Send ICE server list to server", (Object) arg0.getAttachment());
@ -362,16 +362,17 @@ public class EaglerSPRelay extends WebSocketServer {
ipkt.connectionCode = id;
arg0.send(RelayPacket.writePacket(ipkt, EaglerSPRelay.logger));
srv.handleNewClient(cl);
synchronized(clientConnections) {
clientConnections.put(arg0, cl);
}
synchronized(clientAddressSets) {
clientAddressSetsLock.writeLock().lock();
try {
List<EaglerSPClient> lst = clientAddressSets.get(cl.address);
if(lst == null) {
lst = new ArrayList<>();
clientAddressSets.put(cl.address, lst);
}
lst.add(cl);
}finally {
clientAddressSetsLock.writeLock().unlock();
}
cl.send(new RelayPacket01ICEServers(EaglerSPRelayConfigRelayList.relayServers));
logger.debug("[{}][Relay -> Client] PKT 0x01: Send ICE server list to client", (Object) arg0.getAttachment());
@ -403,10 +404,7 @@ public class EaglerSPRelay extends WebSocketServer {
arg0.close();
}
}else {
EaglerSPServer srv;
synchronized(serverConnections) {
srv = serverConnections.get(arg0);
}
EaglerSPServer srv = serverConnections.get(arg0);
if(srv != null) {
if(!srv.handle(pkt)) {
logger.debug("[{}]: Server sent invalid packet: {}", (Object) arg0.getAttachment(), pkt.getClass().getSimpleName());
@ -415,10 +413,7 @@ public class EaglerSPRelay extends WebSocketServer {
arg0.close();
}
}else {
EaglerSPClient cl;
synchronized(clientConnections) {
cl = clientConnections.get(arg0);
}
EaglerSPClient cl = clientConnections.get(arg0);
if(cl != null) {
if(!cl.handle(pkt)) {
logger.debug("[{}]: Client sent invalid packet: {}", (Object) arg0.getAttachment(), pkt.getClass().getSimpleName());
@ -448,16 +443,14 @@ public class EaglerSPRelay extends WebSocketServer {
@Override
public void onClose(WebSocket arg0, int arg1, String arg2, boolean arg3) {
EaglerSPServer srv;
synchronized(serverConnections) {
srv = serverConnections.remove(arg0);
}
EaglerSPServer srv = serverConnections.remove(arg0);
if(srv != null) {
logger.debug("[{}]: Server closed, code: {}", (Object) arg0.getAttachment(), srv.code);
synchronized(serverCodes) {
serverCodes.remove(srv.code);
}
synchronized(serverAddressSets) {
serverAddressSetsLock.writeLock().lock();
try {
List<EaglerSPServer> lst = serverAddressSets.get(srv.serverAddress);
if(lst != null) {
lst.remove(srv);
@ -465,26 +458,26 @@ public class EaglerSPRelay extends WebSocketServer {
serverAddressSets.remove(srv.serverAddress);
}
}
}finally {
serverAddressSetsLock.writeLock().unlock();
}
ArrayList<EaglerSPClient> clientList;
synchronized(clientConnections) {
clientList = new ArrayList<>(clientConnections.values());
final ArrayList<EaglerSPClient> clientList = new ArrayList<>();
clientConnections.values().forEach((cl) -> {
if(cl.server == srv) {
clientList.add(cl);
}
});
Iterator<EaglerSPClient> itr = clientList.iterator();
while(itr.hasNext()) {
EaglerSPClient cl = itr.next();
if(cl.server == srv) {
logger.debug("[{}]: Disconnecting client: {} (id: {})", (Object) cl.socket.getAttachment(), cl.id);
cl.socket.close();
}
}
}else {
EaglerSPClient cl;
synchronized(clientConnections) {
cl = clientConnections.remove(arg0);
}
EaglerSPClient cl = clientConnections.remove(arg0);
if(cl != null) {
synchronized(clientAddressSets) {
clientAddressSetsLock.writeLock().lock();
try {
List<EaglerSPClient> lst = clientAddressSets.get(cl.address);
if(lst != null) {
lst.remove(cl);
@ -492,6 +485,8 @@ public class EaglerSPRelay extends WebSocketServer {
clientAddressSets.remove(cl.address);
}
}
}finally {
clientAddressSetsLock.writeLock().unlock();
}
logger.debug("[{}]: Client closed, id: {}", (Object) arg0.getAttachment(), cl.id);
synchronized(clientIds) {
@ -511,21 +506,36 @@ public class EaglerSPRelay extends WebSocketServer {
if(arg0 != null) arg0.close();
}
@Override
public void stop() throws InterruptedException {
// Handle internal WebSocketServer crashes
Thread killServer = new Thread(() -> {
try {
Thread.sleep(5000l);
}catch(InterruptedException ex) {
}
logger.error("WebSocketServer stopped, but the process is still running, calling System.exit to hopefully restart!");
System.exit(-1);
}, "Terminator");
killServer.setDaemon(true);
killServer.start();
super.stop();
}
private List<RelayPacket07LocalWorlds.LocalWorld> getLocalWorlds(String addr) {
List<RelayPacket07LocalWorlds.LocalWorld> lst = new ArrayList<>();
synchronized(serverAddressSets) {
serverAddressSetsLock.readLock().lock();
try {
List<EaglerSPServer> srvs = serverAddressSets.get(addr);
if(srvs != null) {
if(srvs.size() == 0) {
serverAddressSets.remove(addr);
}else {
if(srvs != null && srvs.size() > 0) {
for(EaglerSPServer s : srvs) {
if(!s.serverHidden) {
lst.add(new RelayPacket07LocalWorlds.LocalWorld(s.serverName, s.code));
}
}
}
}
}finally {
serverAddressSetsLock.readLock().unlock();
}
return lst;
}

View file

@ -73,7 +73,10 @@ public class EaglerSPServer {
public boolean handle(RelayPacket _packet) throws IOException {
if(_packet instanceof RelayPacket03ICECandidate) {
RelayPacket03ICECandidate packet = (RelayPacket03ICECandidate)_packet;
EaglerSPClient cl = clients.get(packet.peerId);
EaglerSPClient cl;
synchronized(clients) {
cl = clients.get(packet.peerId);
}
if(cl != null) {
if(LoginState.assertEquals(cl, LoginState.SENT_ICE_CANDIDATE)) {
cl.state = LoginState.RECIEVED_ICE_CANIDATE;
@ -87,7 +90,10 @@ public class EaglerSPServer {
return true;
}else if(_packet instanceof RelayPacket04Description) {
RelayPacket04Description packet = (RelayPacket04Description)_packet;
EaglerSPClient cl = clients.get(packet.peerId);
EaglerSPClient cl;
synchronized(clients) {
cl = clients.get(packet.peerId);
}
if(cl != null) {
if(LoginState.assertEquals(cl, LoginState.SENT_DESCRIPTION)) {
cl.state = LoginState.RECIEVED_DESCRIPTION;
@ -101,7 +107,10 @@ public class EaglerSPServer {
return true;
}else if(_packet instanceof RelayPacketFEDisconnectClient) {
RelayPacketFEDisconnectClient packet = (RelayPacketFEDisconnectClient)_packet;
EaglerSPClient cl = clients.get(packet.clientId);
EaglerSPClient cl;
synchronized(clients) {
cl = clients.get(packet.clientId);
}
if(cl != null) {
cl.handleServerDisconnectClient(packet);
EaglerSPRelay.logger.debug("[{}][Server -> Relay -> Client] PKT 0xFE: Disconnect: {}: {}", (Object) cl.socket.getAttachment(),