csgo-2018-source/engine/net_steamsocketmgr.cpp

1192 lines
37 KiB
C++
Raw Normal View History

2021-07-25 12:11:47 +08:00
//========= Copyright <20> 1996-2008, Valve Corporation, All rights reserved. ============//
//
// Purpose: "Steam" based pseudo socket support (PC only)
//
// $NoKeywords: $
//
//=============================================================================//
#include "net_ws_headers.h"
#include "tier0/vprof.h"
#include "sv_ipratelimit.h"
#if IsPlatformWindows()
#else
#include <poll.h>
#endif
#if !defined(_X360) && !defined(NO_STEAM) && !defined(DEDICATED)
#define USE_STEAM_SOCKETS
#endif
ConVar net_threaded_socket_recovery_time( "net_threaded_socket_recovery_time", "60", FCVAR_RELEASE, "Number of seconds over which the threaded socket pump algorithm will fully recover client ratelimit." );
ConVar net_threaded_socket_recovery_rate( "net_threaded_socket_recovery_rate", "6400", FCVAR_RELEASE, "Number of packets per second that threaded socket pump algorithm allows from client." );
ConVar net_threaded_socket_burst_cap( "net_threaded_socket_burst_cap",
#ifdef DEDICATED
"256",
#else
"1024",
#endif
FCVAR_RELEASE, "Max number of packets per burst beyond which threaded socket pump algorithm will start dropping packets." );
struct net_threaded_buffer_t
{
int len;
byte buf[ NET_MAX_MESSAGE ];
inline byte *Base() { return buf + len; }
inline int Capacity() { return sizeof( buf ) - len; }
inline byte *MoveAppend( net_threaded_buffer_t *pOther )
{
if ( pOther->len > Capacity() )
return NULL;
byte *pBase = Base();
Q_memcpy( pBase, pOther->buf, pOther->len );
len += pOther->len;
pOther->len = 0;
return pBase;
}
};
CTSPool<net_threaded_buffer_t> g_NetThreadedBuffers;
netadr_t g_NetAdrRatelimited;
int32 g_numRatelimitedPackets = -100;
class CThreadedSocketQueue
{
public:
CThreadedSocketQueue() : m_mapSocketThreads( DefLessFunc( int ) ) {}
~CThreadedSocketQueue()
{
m_mapSocketThreads.PurgeAndDeleteElements();
}
static bool ShouldUseSocketsThreaded()
{
static bool s_bThreaded = !CommandLine()->FindParm( "-nothreadedsockets" );
return s_bThreaded;
}
int recvfrom( int s, char * buf, int len, struct sockaddr * from )
{
CSocketThread *pThread = GetSocketThread( s );
return pThread ? pThread->recvfrom( buf, len, from ) : 0;
}
void EnableThreadedRecv( int s, int nsSock, bool bEnable )
{
CSocketThread *pThread = GetSocketThread( s, nsSock, bEnable );
( void ) ( pThread );
}
void CloseSocket( int s )
{
CUtlMap< int, CSocketThread * >::IndexType_t idx = m_mapSocketThreads.Find( s );
if ( idx != m_mapSocketThreads.InvalidIndex() )
{
CSocketThread *pThread = m_mapSocketThreads.Element( idx );
delete pThread;
m_mapSocketThreads.RemoveAt( idx );
}
}
private:
class CSocketThread
{
public:
explicit CSocketThread( int s, int nsSock ) : m_s( s ), m_nsSock( nsSock ), m_hThread( NULL ), m_pDataQueueBufferCollect( NULL )
{
#if IsPlatformWindows()
m_wsaEvents[0] = ::WSACreateEvent();
if ( !m_wsaEvents[0] )
Error( "WSACreateEvent failed\n" );
m_wsaEvents[1] = ::WSACreateEvent();
if ( !m_wsaEvents[1] )
Error( "WSACreateEvent failed\n" );
int ret = ::WSAEventSelect( m_s, m_wsaEvents[ 1 ], FD_READ );
if ( ret )
Error( "WSAEventSelect failed\n" );
#else
Q_memset( m_sockSignalPipe, -1, sizeof( m_sockSignalPipe ) );
int ret = socketpair( PF_LOCAL, SOCK_STREAM, 0, m_sockSignalPipe ); // 0=main; 1=pump
if ( ret )
Error( "socketpair failed!\n" );
#endif
// Create the thread and start socket processing
m_hThread = CreateSimpleThread( CallbackThreadProc, this );
if ( !m_hThread )
Error( "socket thread failed!\n" );
}
~CSocketThread()
{
// Kill the thread!
#if IsPlatformWindows()
::WSASetEvent( m_wsaEvents[0] );
#else
write( m_sockSignalPipe[0], "", 1 ); // write one zero byte to wake the thread
#endif
// wait for it to die
ThreadJoin( m_hThread );
ReleaseThreadHandle( m_hThread );
m_hThread = NULL;
// Shutdown resources
#if IsPlatformWindows()
::WSACloseEvent( m_wsaEvents[0] );
::WSACloseEvent( m_wsaEvents[1] );
#else
close( m_sockSignalPipe[0] );
close( m_sockSignalPipe[1] );
#endif
// Purge all packet data blocks
m_tslstDataQueue.Purge();
// Return all the buffers back into the pool
do
{
if ( m_pDataQueueBufferCollect )
{
g_NetThreadedBuffers.PutObject( m_pDataQueueBufferCollect );
m_pDataQueueBufferCollect = NULL;
}
} while ( m_tslstBuffers.PopItem( &m_pDataQueueBufferCollect ) );
}
int recvfrom( char * buf, int len, struct sockaddr * from )
{
//
// Called on application main thread
//
ReceivedData_t data;
if ( !m_tslstDataQueue.PopItem( &data ) )
return 0;
Q_memcpy( buf, data.buf, MIN( len, data.len ) );
Q_memcpy( from, &data.from, sizeof( data.from ) );
//
// We are about to return this data, free up allocated buffers up to this
//
if ( !m_pDataQueueBufferCollect )
{
m_tslstBuffers.PopItem( &m_pDataQueueBufferCollect );
}
if ( ( data.buf >= m_pDataQueueBufferCollect->buf ) && ( data.buf < m_pDataQueueBufferCollect->buf + sizeof( m_pDataQueueBufferCollect->buf ) ) )
;// The returned data was still referencing the head of scratch buffers
else
{ // The returned data is ahead in the list, previous buffer can go back in the pool
g_NetThreadedBuffers.PutObject( m_pDataQueueBufferCollect );
m_pDataQueueBufferCollect = NULL;
}
return data.len;
}
private:
struct CPerNetChanRatelimit_t
{
double m_dblNetTimeMark;
int32 m_numPackets;
int32 m_numRatelimited;
};
static uintp CallbackThreadProc( void *pvParam ) { reinterpret_cast<CSocketThread*>(pvParam)->ThreadProc(); return 0; }
void ThreadProc()
{
// Where are we getting new data?
net_threaded_buffer_t *pThreadBufferSyscall = NULL;
net_threaded_buffer_t *pThreadBufferCollect = NULL;
struct sockaddr from;
int fromlen = sizeof( from );
netadr_t adrt;
adrt.SetType( NA_IP );
adrt.Clear();
extern volatile int g_NetChannelsRefreshCounter;
int pumpNetChannelsRefreshCounter = -1;
CUtlVector< struct sockaddr > arrNetChans;
typedef CUtlMap< uint64, CPerNetChanRatelimit_t, int, CDefLess< uint64 > > MapPerClientRatelimit_t;
MapPerClientRatelimit_t mapPerClientRatelimit;
#if IsPlatformWindows()
const DWORD cWsaEvents = Q_ARRAYSIZE( m_wsaEvents );
#else
const int numSyscallPollFDs = 2;
struct pollfd syscallPollFDs[ numSyscallPollFDs ];
Q_memset( syscallPollFDs, 0, sizeof( syscallPollFDs ) );
syscallPollFDs[ 0 ].fd = m_s;
syscallPollFDs[ 0 ].events = POLLIN;
syscallPollFDs[ 1 ].fd = m_sockSignalPipe[1];
syscallPollFDs[ 1 ].events = POLLIN;
#endif
volatile double &vdblNetTimeForThread = net_time;
for ( ;; )
{
if ( !pThreadBufferSyscall )
{
// Need a new buffer
net_threaded_buffer_t *pbuf = g_NetThreadedBuffers.GetObject();
pbuf->len = 0;
m_tslstBuffers.PushItem( pbuf );
pThreadBufferSyscall = pbuf;
}
// Recv socket data
int ret = ::recvfrom( m_s, ( char* ) pThreadBufferSyscall->buf, sizeof( pThreadBufferSyscall->buf ), 0, &from, (socklen_t*)&fromlen );
if ( ret <= 0 )
{
// Efficiently sleep while we wait for next packet
#if IsPlatformWindows()
DWORD dwWsaWaitResult = ::WSAWaitForMultipleEvents( cWsaEvents, m_wsaEvents, FALSE, WSA_INFINITE, FALSE );
if ( dwWsaWaitResult == WSA_WAIT_EVENT_0 )
return; // Finish the socket pump thread due to event signal from destructor
WSANETWORKEVENTS wsaNetworkEvents;
::WSAEnumNetworkEvents( m_s, m_wsaEvents[1], &wsaNetworkEvents ); // Reset socket read event
#else
ret = ::poll( syscallPollFDs, numSyscallPollFDs, -1 ); // Sleep indefinitely until data available
if ( syscallPollFDs[ 1 ].revents )
return; // Finish the socket reading thread if socketpair pump end of the pipe has incoming signal
#endif
continue;
}
if ( ret > 0 )
{
// Is this a connectionless packet?
if ( 0xFFFFFFFF == * ( uint32 * ) pThreadBufferSyscall->buf )
{
adrt.SetFromSockadr( &from );
if ( !CheckConnectionLessRateLimits( adrt ) )
ret = 0;
}
else
{
// Check if we need to refresh our netchans for this socket type
if ( pumpNetChannelsRefreshCounter != g_NetChannelsRefreshCounter )
{
pumpNetChannelsRefreshCounter = g_NetChannelsRefreshCounter;
arrNetChans.RemoveAll();
extern void NET_FindAllNetChannelAddresses( int socket, CUtlVector< struct sockaddr > &arrNetChans );
NET_FindAllNetChannelAddresses( m_nsSock, arrNetChans );
// Since we just recomputed net channels, use this opportunity to expire obsolete ratelimits for old clients
FOR_EACH_MAP_FAST( mapPerClientRatelimit, idxPerClientRateLimit )
{
CPerNetChanRatelimit_t &pncrt = mapPerClientRatelimit.Element( idxPerClientRateLimit );
double dblTimeSinceTimeMark = ( vdblNetTimeForThread - pncrt.m_dblNetTimeMark );
if ( dblTimeSinceTimeMark > net_threaded_socket_recovery_time.GetFloat() )
mapPerClientRatelimit.RemoveAt( idxPerClientRateLimit );
}
}
// This is a connection-oriented packet, must have a netchan
bool bNetChanAvailable = false;
for ( int k = 0; k < arrNetChans.Count(); ++ k )
{
struct sockaddr &sockAddressNetChan = arrNetChans[k];
if (
!Q_memcmp( &((struct sockaddr_in*)&sockAddressNetChan)->sin_addr.s_addr, &((struct sockaddr_in*)&from)->sin_addr.s_addr, sizeof( ((struct sockaddr_in*)&from)->sin_addr.s_addr ) )
&& ( ((struct sockaddr_in*)&sockAddressNetChan)->sin_port == ((struct sockaddr_in*)&from)->sin_port )
)
{
bNetChanAvailable = true;
//
// Track ratelimit on this netchan
//
uint64 uiRatelimitKey = ( uint64( ((struct sockaddr_in*)&from)->sin_addr.s_addr ) << 32 ) | uint32( ((struct sockaddr_in*)&from)->sin_port );
MapPerClientRatelimit_t::IndexType_t idxPerClientRateLimit = mapPerClientRatelimit.Find( uiRatelimitKey );
if ( idxPerClientRateLimit == mapPerClientRatelimit.InvalidIndex() )
{
CPerNetChanRatelimit_t pncrt;
Q_memset( &pncrt, 0, sizeof( pncrt ) );
pncrt.m_dblNetTimeMark = vdblNetTimeForThread;
pncrt.m_numPackets = 1;
mapPerClientRatelimit.Insert( uiRatelimitKey, pncrt );
}
else
{
CPerNetChanRatelimit_t &pncrt = mapPerClientRatelimit.Element( idxPerClientRateLimit );
double dblTimeSinceTimeMark = ( vdblNetTimeForThread - pncrt.m_dblNetTimeMark );
if ( ( dblTimeSinceTimeMark > net_threaded_socket_recovery_time.GetFloat() ) ||
( dblTimeSinceTimeMark < 0 ) )
{
pncrt.m_numPackets = 0;
}
else if ( dblTimeSinceTimeMark > 0 )
{
int32 numPacketsToRecover = dblTimeSinceTimeMark * net_threaded_socket_recovery_rate.GetFloat();
pncrt.m_numPackets = MAX( pncrt.m_numPackets - numPacketsToRecover, 0 );
}
if ( pncrt.m_numPackets > net_threaded_socket_burst_cap.GetInt() )
{
bNetChanAvailable = false;
++ pncrt.m_numRatelimited;
if ( pncrt.m_numRatelimited > g_numRatelimitedPackets + 5 )
{
g_NetAdrRatelimited.SetFromSockadr( &from ); // remember last ratelimited address for logging purposes
g_numRatelimitedPackets = pncrt.m_numRatelimited;
}
}
else
{
pncrt.m_dblNetTimeMark = vdblNetTimeForThread;
++ pncrt.m_numPackets;
pncrt.m_numRatelimited = 0;
}
}
break;
}
}
if ( !bNetChanAvailable )
ret = 0;
}
}
if ( ret > 0 )
{
ReceivedData_t recvData;
recvData.buf = NULL;
recvData.len = ret;
Q_memcpy( &recvData.from, &from, sizeof( recvData.from ) );
// Check if we still have more room in pending recv buffer
pThreadBufferSyscall->len = ret;
if ( byte *pbMoveAppend = pThreadBufferCollect ? pThreadBufferCollect->MoveAppend( pThreadBufferSyscall ) : NULL )
{
recvData.buf = pbMoveAppend;
}
else
{
pThreadBufferCollect = pThreadBufferSyscall;
pThreadBufferSyscall = NULL;
recvData.buf = pThreadBufferCollect->buf;
}
m_tslstDataQueue.PushItem( recvData );
}
}
}
struct ReceivedData_t
{
byte *buf;
int len;
struct sockaddr from;
};
//
// Thread object data members
//
private:
int const m_s; // [const] accessed by pump thread in ::recvfrom
int const m_nsSock; // [const] accessed by pump thread doing netchans lookup
ThreadHandle_t m_hThread; // Thread handle, accessed by main thread
CTSQueue< ReceivedData_t > m_tslstDataQueue; // FIFO - actual data packets pumped from socket, thread-safe access on both threads
CTSQueue< net_threaded_buffer_t * > m_tslstBuffers; // FIFO - buffers storing data pumped from socket, multiple packets can be stored in one memory chunk, thread-safe access on both threads
net_threaded_buffer_t *m_pDataQueueBufferCollect; // Main thread tracking when collect buffer can be returned to global pool
#if IsPlatformWindows()
WSAEVENT m_wsaEvents[2];
#else
int m_sockSignalPipe[2];
#endif
};
CSocketThread * GetSocketThread( int s, int nsSock = -1, bool bRequired = false )
{
CUtlMap< int, CSocketThread * >::IndexType_t idx = m_mapSocketThreads.Find( s );
if ( idx != m_mapSocketThreads.InvalidIndex() )
return m_mapSocketThreads.Element( idx );
if ( bRequired )
{
CSocketThread *pNew = new CSocketThread( s, nsSock );
m_mapSocketThreads.Insert( s, pNew );
return pNew;
}
return NULL;
}
CUtlMap< int, CSocketThread * > m_mapSocketThreads;
};
CThreadedSocketQueue g_ThreadedSocketQueue;
void On_NET_ProcessSocket_Start( int hUDP, int sock )
{
if ( g_numRatelimitedPackets > 0 )
{ // Spew about ratelimit on the main thread
ConMsg( "Net channel ratelimit exceeded for %s: %d packets rejected.\n", g_NetAdrRatelimited.ToString(), g_numRatelimitedPackets );
g_NetAdrRatelimited.Clear();
g_numRatelimitedPackets = -100;
}
}
void On_NET_ProcessSocket_End( int hUDP, int sock )
{
if ( CThreadedSocketQueue::ShouldUseSocketsThreaded() )
g_ThreadedSocketQueue.EnableThreadedRecv( hUDP, sock, true );
}
#if defined( USE_STEAM_SOCKETS )
#include "cl_steamauth.h"
#include "tier1/tokenset.h"
#include "utlmap.h"
// matchmaking
#include "matchmaking/imatchframework.h"
#include "matchmaking/iplayer.h"
#include "matchmaking/imatchtitle.h"
#include "matchmaking/mm_helpers.h"
// for INetSupport defines
#include "engine/inetsupport.h"
#include "server.h"
ConVar net_steamcnx_debug( "net_steamcnx_debug", "0", 0, "Show debug spew for steam based connections, 2 shows all network traffic for steam sockets." );
static ConVar net_steamcnx_enabled( "net_steamcnx_enabled", "1", FCVAR_RELEASE, "Use steam connections on listen server as a fallback, 2 forces use of steam connections instead of raw UDP." );
static ConVar net_steamcnx_allowrelay( "net_steamcnx_allowrelay", "1", FCVAR_RELEASE | FCVAR_ARCHIVE, "Allow steam connections to attempt to use relay servers as fallback (best if specified on command line: +net_steamcnx_allowrelay 1)" );
#define STEAM_CNX_COLOR Color( 255, 255, 100, 255 )
extern ConVar cl_timeout;
static const tokenset_t< ESocketIndex_t > s_SocketIndexMap[] =
{
{ "NS_CLIENT", NS_CLIENT },
{ "NS_SERVER", NS_SERVER },
#ifdef _X360
{ "NS_X360_SYSTEMLINK", NS_X360_SYSTEMLINK },
{ "NS_X360_LOBBY", NS_X360_LOBBY },
{ "NS_X360_TEAMLINK", NS_X360_TEAMLINK },
#endif
{ "NS_HLTV", NS_HLTV },
{ "NS_HLTV1", NS_HLTV1 },
{ NULL, ( ESocketIndex_t )-1 }
};
static const tokenset_t< EP2PSessionError > s_EP2PSessionErrorIndexMap[] =
{
{ "None", k_EP2PSessionErrorNone },
{ "Not running app", k_EP2PSessionErrorNotRunningApp }, // target is not running the same game
{ "No rights to app", k_EP2PSessionErrorNoRightsToApp }, // local user doesn't own the app that is running
{ "User not logged in", k_EP2PSessionErrorDestinationNotLoggedIn }, // target user isn't connected to Steam
{ "Timeout", k_EP2PSessionErrorTimeout }
};
// Why are there two Steam P2P channels instead of one client/server channel?
//
// We use a client receive channel and a server receive channel to simulate sockets. When a user is running a listen server, ::recvfrom will be called
// simultaneously by both the server & client objects. If we were only using one channel, we would need to parse each packet received on that channel,
// determine if really intended for the callers socket, and potentially store if for another socket.
// code in this file only handles two types of sockets
static inline bool IsSteamSocketType( ESocketIndex_t eSocketType )
{
return (eSocketType == NS_CLIENT || eSocketType == NS_SERVER);
}
// assumes you have already called IsSteamSocketType
static inline int GetChannelForSocketType( ESocketIndex_t eSocketType )
{
return (eSocketType == NS_CLIENT) ? INetSupport::SP2PC_RECV_CLIENT : INetSupport::SP2PC_RECV_SERVER;
}
// each virtual socket we have open to another user
class CSteamSocket
{
public:
explicit CSteamSocket( const CSteamID &steamIdRemote, ESocketIndex_t eSocketType, const netadr_t &addr );
const CSteamID &GetSteamID() const { return m_steamID; }
ESocketIndex_t GetSocketType() const { return m_eSocketType; }
const netadr_t &GetNetAddress() const { return m_addr; }
inline ESocketIndex_t GetRemoteSocketType() const
{
return ( m_eSocketType == NS_CLIENT ) ? NS_SERVER : NS_CLIENT;
}
inline int GetRemoteChannel() const
{
return GetChannelForSocketType( GetRemoteSocketType() );
}
inline int GetLocalChannel() const
{
return GetChannelForSocketType( m_eSocketType );
}
private:
CSteamID m_steamID; // SteamID of other user
ESocketIndex_t m_eSocketType; // The socket type this connection was created on
netadr_t m_addr; // The fake net address we have returned for this user
};
CSteamSocket::CSteamSocket( const CSteamID &steamIdRemote, ESocketIndex_t eSocketType, const netadr_t &addr ) :
m_steamID( steamIdRemote ),
m_eSocketType( eSocketType ),
m_addr( addr )
{
}
class CSteamSocketMgr : public ISteamSocketMgr
{
public:
CSteamSocketMgr();
~CSteamSocketMgr();
virtual void Init() OVERRIDE;
virtual void Shutdown() OVERRIDE;
virtual ISteamSocketMgr::ESteamCnxType GetCnxType();
virtual void OpenSocket( int s, int nModule, int nSetPort, int nDefaultPort, const char *pName, int nProtocol, bool bTryAny ) OVERRIDE;
virtual void CloseSocket( int s, int nModule ) OVERRIDE;
virtual int sendto( int s, const char * buf, int len, int flags, const ns_address &to ) OVERRIDE;
virtual int recvfrom( int s, char * buf, int len, int flags, ns_address *from ) OVERRIDE;
virtual uint64 GetSteamIDForRemote( const ns_address &remote ) OVERRIDE;
// client connection state
STEAM_CALLBACK( CSteamSocketMgr, OnP2PSessionRequest, P2PSessionRequest_t, m_callbackP2PSessionRequest );
STEAM_CALLBACK( CSteamSocketMgr, OnP2PSessionConnectFail, P2PSessionConnectFail_t, m_callbackP2PSessionConnectFail );
CSteamSocket *InitiateConnection( ESocketIndex_t eSocketType, const CSteamID &steamID, const byte *data, size_t len );
void DestroyConnection( ESocketIndex_t eSocketType, const CSteamID &steamID );
void PrintStatus();
private:
CSteamSocket *CreateConnection( ESocketIndex_t eSocketType, const CSteamID &steamID );
void DestroyConnection( CSteamSocket *pSocket );
bool GetTypeForSocket( int s, ESocketIndex_t *peType );
netadr_t GenerateRemoteAddress();
CSteamSocket *FindSocketForAddress( const ns_address &adr );
CSteamSocket *FindSocketForUser( ESocketIndex_t eSocketType, const CSteamID &steamID );
bool IsValid() const
{
return m_bInitialized && Steam3Client().SteamNetworking();
}
// For Remote clients
CUtlVector< CSteamSocket * > m_vecRemoteSockets;
CUtlMap< netadr_t, CSteamSocket * > m_mapAdrToSteamSocket;
CUtlMap< int, ESocketIndex_t > m_mapSocketToESocketType;
int m_nNextRemoteAddress;
bool m_bInitialized;
};
CSteamSocketMgr::CSteamSocketMgr() :
m_bInitialized( false ),
m_nNextRemoteAddress( 1 ),
m_mapAdrToSteamSocket( 0, 0, DefLessFunc( netadr_t ) ),
m_mapSocketToESocketType( 0, 0, DefLessFunc( int ) ),
m_callbackP2PSessionRequest( this, &CSteamSocketMgr::OnP2PSessionRequest ),
m_callbackP2PSessionConnectFail( this, &CSteamSocketMgr::OnP2PSessionConnectFail )
{
}
CSteamSocketMgr::~CSteamSocketMgr()
{
}
void CSteamSocketMgr::Init()
{
m_bInitialized = true;
if ( Steam3Client().SteamNetworking() )
Steam3Client().SteamNetworking()->AllowP2PPacketRelay( net_steamcnx_allowrelay.GetBool() );
}
void CSteamSocketMgr::Shutdown()
{
if ( !IsValid() )
return;
// Destroy remote sockets
FOR_EACH_VEC_BACK( m_vecRemoteSockets, i )
{
CSteamSocket *pSocket = m_vecRemoteSockets[i];
// this will delete pSocket
DestroyConnection( pSocket );
}
m_vecRemoteSockets.RemoveAll();
Assert( m_mapAdrToSteamSocket.Count() == 0 );
m_mapAdrToSteamSocket.RemoveAll();
m_mapSocketToESocketType.RemoveAll();
m_bInitialized = false;
}
netadr_t CSteamSocketMgr::GenerateRemoteAddress()
{
netadr_t ret( m_nNextRemoteAddress++, STEAM_CNX_PORT );
return ret;
}
void CSteamSocketMgr::OpenSocket( int s, int nModule, int nSetPort, int nDefaultPort, const char *pName, int nProtocol, bool bTryAny )
{
if ( !IsValid() )
return;
ESocketIndex_t eSocketType = ESocketIndex_t( nModule );
if ( !IsSteamSocketType( eSocketType ) )
return;
// make sure we dont have a socket for this type
FOR_EACH_MAP_FAST( m_mapSocketToESocketType, i )
{
if ( m_mapSocketToESocketType[i] == eSocketType )
{
AssertMsg1( false, "Already have a socket for this type: %s", s_SocketIndexMap->GetNameByToken( eSocketType ) );
return;
}
}
// save socket
m_mapSocketToESocketType.InsertOrReplace( s, eSocketType );
if ( net_steamcnx_debug.GetBool() )
{
ConColorMsg( STEAM_CNX_COLOR, "Opened Steam Socket %s ( socket %d )\n", s_SocketIndexMap->GetNameByToken( eSocketType ), s );
}
}
void CSteamSocketMgr::CloseSocket( int s, int nModule )
{
if ( g_ThreadedSocketQueue.ShouldUseSocketsThreaded() )
g_ThreadedSocketQueue.CloseSocket( s );
if ( !IsValid() )
return;
ESocketIndex_t eSocketType = ESocketIndex_t( nModule );
if ( !IsSteamSocketType( eSocketType ) )
return;
if ( net_steamcnx_debug.GetBool() )
{
ConColorMsg( STEAM_CNX_COLOR, "Closed Steam Socket %s\n", s_SocketIndexMap->GetNameByToken( eSocketType ) );
}
FOR_EACH_VEC_BACK( m_vecRemoteSockets, i )
{
CSteamSocket *pSocket = m_vecRemoteSockets[i];
if ( pSocket->GetSocketType() == eSocketType )
DestroyConnection( pSocket );
}
FOR_EACH_MAP( m_mapSocketToESocketType, i )
{
if ( m_mapSocketToESocketType[ i ] == eSocketType )
{
m_mapSocketToESocketType.RemoveAt( i );
break;
}
}
}
CSteamSocket *CSteamSocketMgr::InitiateConnection( ESocketIndex_t eSocketTypeFrom, const CSteamID &steamID, const byte *data, size_t len )
{
CSteamSocket *pSocket = CreateConnection( eSocketTypeFrom, steamID );
if ( !pSocket )
return NULL;
// don't have to wait for a connection to be established.. just send the packet
if ( !Steam3Client().SteamNetworking()->SendP2PPacket( pSocket->GetSteamID(), data, len, k_EP2PSendReliable, pSocket->GetRemoteChannel() ) )
{
DestroyConnection( eSocketTypeFrom, steamID );
return NULL;
}
return pSocket;
}
CSteamSocket *CSteamSocketMgr::CreateConnection( ESocketIndex_t eSocketType, const CSteamID &steamID )
{
if ( !IsValid() )
return NULL;
// if we already have a socket for this user, return that
CSteamSocket *pSocket = FindSocketForUser( eSocketType, steamID );
if ( pSocket )
return pSocket;
netadr_t adrRemote = GenerateRemoteAddress();
ConColorMsg( STEAM_CNX_COLOR, "Generated %s for %llx\n", adrRemote.ToString(), steamID.ConvertToUint64() );
// create
pSocket = new CSteamSocket( steamID, eSocketType, adrRemote );
m_mapAdrToSteamSocket.Insert( adrRemote, pSocket );
m_vecRemoteSockets.AddToTail( pSocket );
if ( net_steamcnx_debug.GetBool() )
{
ConColorMsg( STEAM_CNX_COLOR, "Created %s connection to %llx\n", s_SocketIndexMap->GetNameByToken( eSocketType ), steamID.ConvertToUint64() );
}
return pSocket;
}
void CSteamSocketMgr::DestroyConnection( CSteamSocket *pSocket )
{
if ( !IsValid() || !pSocket )
return;
// remove from address map & vector
m_mapAdrToSteamSocket.Remove( pSocket->GetNetAddress() );
m_vecRemoteSockets.FindAndFastRemove( pSocket );
// we can close both channels with this user, as we can only talk to his server or client. If their client is talking to our server,
// our client shouldn't be talking to their server
Steam3Client().SteamNetworking()->CloseP2PChannelWithUser( pSocket->GetSteamID(), pSocket->GetLocalChannel() );
Steam3Client().SteamNetworking()->CloseP2PChannelWithUser( pSocket->GetSteamID(), pSocket->GetRemoteChannel() );
// log
if ( net_steamcnx_debug.GetBool() )
{
ConColorMsg( STEAM_CNX_COLOR, "Destroyed %s connection to %llx\n", s_SocketIndexMap->GetNameByToken( pSocket->GetSocketType() ), pSocket->GetSteamID().ConvertToUint64() );
}
// done with socket
delete pSocket;
}
void CSteamSocketMgr::DestroyConnection( ESocketIndex_t eSocketType, const CSteamID &steamID )
{
if ( !IsValid() )
return;
CSteamSocket *pSocket = FindSocketForUser( eSocketType, steamID );
DestroyConnection( pSocket );
}
CSteamSocket *CSteamSocketMgr::FindSocketForAddress( const ns_address &adr )
{
// !FIXME! Eventually we probably should actually use SteamID P2P address type for this,
// and not assign virtual addresses.
if ( !adr.IsType<netadr_t>() )
{
Assert( false );
return nullptr;
}
int idx = m_mapAdrToSteamSocket.Find( adr.AsType<netadr_t>() );
if ( idx == m_mapAdrToSteamSocket.InvalidIndex() )
return NULL;
return m_mapAdrToSteamSocket[ idx ];
}
CSteamSocket *CSteamSocketMgr::FindSocketForUser( ESocketIndex_t eSocketType, const CSteamID &steamID )
{
FOR_EACH_VEC( m_vecRemoteSockets, i )
{
CSteamSocket *pSocket = m_vecRemoteSockets[i];
if ( pSocket->GetSteamID() == steamID && pSocket->GetSocketType() == eSocketType )
return pSocket;
}
return NULL;
}
void CSteamSocketMgr::OnP2PSessionRequest( P2PSessionRequest_t *pParam )
{
#ifndef DEDICATED
// on listen servers, don't accept connections from others if they aren't in our matchmaking session
if ( !g_pMatchFramework || !g_pMatchFramework->GetMatchSession() )
return;
if ( GetBaseLocalClient().IsConnected() && !sv.IsActive() )
return;
if ( !SessionMembersFindPlayer( g_pMatchFramework->GetMatchSession()->GetSessionSettings(), pParam->m_steamIDRemote.ConvertToUint64() ) )
return;
#endif
// accept all connections
Steam3Client().SteamNetworking()->AcceptP2PSessionWithUser( pParam->m_steamIDRemote );
if ( net_steamcnx_debug.GetBool() )
{
ConColorMsg( STEAM_CNX_COLOR, "Accepted P2P connection with %llx\n", pParam->m_steamIDRemote.ConvertToUint64() );
}
}
void CSteamSocketMgr::OnP2PSessionConnectFail( P2PSessionConnectFail_t *pParam )
{
// log disconnect
if ( net_steamcnx_debug.GetBool() )
{
const char *pchP2PError = s_EP2PSessionErrorIndexMap->GetNameByToken( (EP2PSessionError)pParam->m_eP2PSessionError );
ConColorMsg( STEAM_CNX_COLOR, "Received connection fail for user %llx %s\n", pParam->m_steamIDRemote.ConvertToUint64(), pchP2PError );
}
// close all connections to this user
FOR_EACH_VEC_BACK( m_vecRemoteSockets, i )
{
CSteamSocket *pSocket = m_vecRemoteSockets[i];
if ( pSocket->GetSteamID() != pParam->m_steamIDRemote )
continue;
DestroyConnection( pSocket );
}
}
int CSteamSocketMgr::sendto( int s, const char * buf, int len, int flags, const ns_address &to )
{
if ( !to.IsType<netadr_t>() )
{
Warning( "WARNING: sendto: don't know how to send to non-IP address '%s'\n", ns_address_render( to ).String() );
Assert( false );
return -1; // return socket_error
}
if ( IsValid() )
{
CSteamSocket *pSteamSocket = FindSocketForAddress( to );
if ( pSteamSocket )
{
Steam3Client().SteamNetworking()->SendP2PPacket( pSteamSocket->GetSteamID(), buf, len, k_EP2PSendUnreliable, pSteamSocket->GetRemoteChannel() );
if ( net_steamcnx_debug.GetInt() >= 3 )
{
P2PSessionState_t p2pSessionState;
Q_memset( &p2pSessionState, 0, sizeof( p2pSessionState ) );
bool bSuccess = Steam3Client().SteamNetworking()->GetP2PSessionState( pSteamSocket->GetSteamID(), &p2pSessionState );
ESocketIndex_t eType = NS_INVALID;
GetTypeForSocket( s, &eType );
ConColorMsg( STEAM_CNX_COLOR, " Send to %llx %u bytes on %s (status %s - %s)\n",
pSteamSocket->GetSteamID().ConvertToUint64(),
len,
s_SocketIndexMap->GetNameByToken( eType ),
bSuccess ? "true" : "false",
p2pSessionState.m_bConnectionActive ? "connected" :"not connected" );
}
return len;
}
else if ( to.AsType<netadr_t>().GetPort() == STEAM_CNX_PORT )
{
if ( net_steamcnx_debug.GetInt() >= 1 )
{
ConColorMsg( STEAM_CNX_COLOR, " Attempted to send %u bytes on unknown steam socket address %s\n", len, ns_address_render( to ).String() );
}
return len;
}
}
if ( OnlyUseSteamSockets() )
{
Warning( "WARNING: sendto: CSteamSocketMgr isn't initialized and we aren't falling back to our own sockets\n");
return -1; // return socket_error
}
// Plain old socket send
sockaddr sadr;
to.AsType<netadr_t>().ToSockadr( &sadr );
return ::sendto( s, buf, len, flags, &sadr, sizeof(sadr) );
}
bool CSteamSocketMgr::GetTypeForSocket( int s, ESocketIndex_t *peType )
{
int i = m_mapSocketToESocketType.Find( s );
if ( i == m_mapSocketToESocketType.InvalidIndex() )
return false;
*peType = m_mapSocketToESocketType[i];
return true;
}
int CSteamSocketMgr::recvfrom( int s, char * buf, int len, int flags, ns_address *from )
{
// Check non-steam socket first
if ( !OnlyUseSteamSockets() )
{
sockaddr sadrfrom;
socklen_t fromlen = sizeof(sadrfrom);
int iret = ( g_ThreadedSocketQueue.ShouldUseSocketsThreaded() )
? g_ThreadedSocketQueue.recvfrom( s, buf, len, &sadrfrom )
: ::recvfrom( s, buf, len, flags, &sadrfrom, &fromlen );
if ( iret > 0 )
{
from->SetFromSockadr( &sadrfrom );
return iret;
}
}
if ( !IsValid() )
return 0;
// need to get data by socket type
ESocketIndex_t eSocketType = NS_INVALID;
if ( !GetTypeForSocket( s, &eSocketType ) )
return 0;
//
// IPC-Steam performance optimization: don't do any IPC to P2P sockets API calls
// if the session settings indicate that there can be no P2P communication
//
switch ( eSocketType )
{
case NS_CLIENT:
// We can only be receiving P2P communication if we are a client of another
// listen server, make sure that the session has the right data
if ( sv.IsDedicated() )
return 0;
if ( sv.IsActive() )
return 0;
// Otherwise check how many players are connected to our session, if nobody is connected
// then do no P2P communication with nobody
if ( !g_pMatchFramework || !g_pMatchFramework->GetMatchSession() ||
( g_pMatchFramework->GetMatchSession()->GetSessionSettings()->GetInt( "members/numMachines", 0 ) < 2 ) ||
Q_stricmp( g_pMatchFramework->GetMatchSession()->GetSessionSettings()->GetString( "server/server" ), "listen" ) )
return 0;
break;
case NS_SERVER:
// Dedicated servers don't do P2P communication
if ( sv.IsDedicated() )
return 0;
// If we are not running a listen server then there shouldn't be any P2P communication
if ( !sv.IsActive() )
return 0;
// Otherwise check how many players are connected to our session, if nobody is connected
// then do no P2P communication with nobody
if ( !g_pMatchFramework || !g_pMatchFramework->GetMatchSession() ||
( g_pMatchFramework->GetMatchSession()->GetSessionSettings()->GetInt( "members/numMachines", 0 ) < 2 ) )
return 0;
break;
default:
// There can be no P2P communication on any other socket type
return 0;
}
uint32 cubMsg = 0;
CSteamID steamIDRemote;
// if no data to read, will return false
if ( !Steam3Client().SteamNetworking()->ReadP2PPacket( buf, len, &cubMsg, &steamIDRemote, GetChannelForSocketType( eSocketType ) ) || cubMsg == 0 )
return 0;
// We have the SteamID for a user who sent us a packet on the channel, but we dont know on what channel the sender is waiting for a response.
// We could add the response channel to each packet, but because clients only communicate with servers, and vice versa, we can assume
// that the sender is our opposite. This conversion is done in CSteamSocket::GetTargetSocketType()
// could be a new connection from this user.. add if necessary
CSteamSocket *pSocket = FindSocketForUser( eSocketType, steamIDRemote );
if ( !pSocket )
{
pSocket = CreateConnection( eSocketType, steamIDRemote );
Assert( pSocket );
}
// got data.. update params
*from = ns_address( pSocket->GetNetAddress() );
if ( net_steamcnx_debug.GetInt() >= 3 )
{
ConColorMsg( STEAM_CNX_COLOR, " Received from %llx %u bytes on %s\n", steamIDRemote.ConvertToUint64(), cubMsg, s_SocketIndexMap->GetNameByToken( eSocketType ) );
}
return cubMsg;
}
ISteamSocketMgr::ESteamCnxType CSteamSocketMgr::GetCnxType()
{
return (ISteamSocketMgr::ESteamCnxType)clamp( net_steamcnx_enabled.GetInt(), (int)ESCT_NEVER, (int)ESCT_MAXTYPE - 1 );
}
uint64 CSteamSocketMgr::GetSteamIDForRemote( const ns_address &remote )
{
const CSteamSocket *pSocket = FindSocketForAddress( remote );
if ( pSocket )
{
return pSocket->GetSteamID().ConvertToUint64();
}
return 0ull;
}
void CSteamSocketMgr::PrintStatus()
{
ConColorMsg( STEAM_CNX_COLOR, "SteamSocketMgr Status\n" );
if ( !IsValid() )
{
ConColorMsg( STEAM_CNX_COLOR, " Invalid (no Steam3Client API?)\n" );
return;
}
// print socket info
ConColorMsg( STEAM_CNX_COLOR, " %d connections\n", m_vecRemoteSockets.Count() );
FOR_EACH_VEC( m_vecRemoteSockets, i )
{
CSteamSocket *pSocket = m_vecRemoteSockets[i];
P2PSessionState_t p2pSessionState;
if ( !Steam3Client().SteamNetworking()->GetP2PSessionState( pSocket->GetSteamID(), &p2pSessionState ) )
{
ConColorMsg( STEAM_CNX_COLOR, " %d %llx, failed to get session state\n", i, pSocket->GetSteamID().ConvertToUint64() );
continue;
}
ConColorMsg( STEAM_CNX_COLOR, " %d %llx, type(%s), psuedoAddr(%s), connected(%s), connecting(%s), relay(%s), bytesQueued(%d), packetsQueued(%d), lasterror(%s)\n",
i,
pSocket->GetSteamID().ConvertToUint64(),
s_SocketIndexMap->GetNameByToken( pSocket->GetSocketType() ),
pSocket->GetNetAddress().ToString(),
p2pSessionState.m_bConnectionActive ? "yes" : "no",
p2pSessionState.m_bConnecting ? "yes" : "no",
p2pSessionState.m_bUsingRelay ? "yes" : "no",
p2pSessionState.m_nBytesQueuedForSend,
p2pSessionState.m_nPacketsQueuedForSend,
s_EP2PSessionErrorIndexMap->GetNameByToken( (EP2PSessionError)p2pSessionState.m_eP2PSessionError ) );
}
}
#else
// For LINUX it's basically all stubbed
#ifdef _PS3
ASSERT_INVARIANT( sizeof( int ) == sizeof( socklen_t ) );
#endif
class CSteamSocketMgr : public ISteamSocketMgr
{
public:
virtual void Init() {}
virtual void Shutdown() {}
ISteamSocketMgr::ESteamCnxType GetCnxType() { return ESCT_NEVER; }
virtual void OpenSocket( int s, int nModule, int nSetPort, int nDefaultPort, const char *pName, int nProtocol, bool bTryAny ) {}
virtual void CloseSocket( int s, int nModule )
{
if ( g_ThreadedSocketQueue.ShouldUseSocketsThreaded() )
g_ThreadedSocketQueue.CloseSocket( s );
}
virtual int sendto( int s, const char * buf, int len, int flags, const ns_address &to ) OVERRIDE
{
if ( to.IsType<netadr_t>() )
{
sockaddr sadr;
to.AsType<netadr_t>().ToSockadr( &sadr );
return ::sendto( s, buf, len, flags, &sadr, sizeof(sadr) );
}
AssertMsg1( false, "Tried to send to non-IP address '%s'", ns_address_render( to ).String() );
return -1;
}
virtual int recvfrom( int s, char * buf, int len, int flags, ns_address *from ) OVERRIDE
{
sockaddr sadrfrom;
socklen_t fromlen = sizeof(sadrfrom);
int iret = ( g_ThreadedSocketQueue.ShouldUseSocketsThreaded() )
? g_ThreadedSocketQueue.recvfrom( s, buf, len, &sadrfrom )
: ::recvfrom( s, buf, len, flags, &sadrfrom, &fromlen );
if ( iret > 0 )
from->SetFromSockadr( &sadrfrom );
return iret;
}
virtual uint64 GetSteamIDForRemote( const ns_address &remote )
{
return 0ull;
}
void PrintStatus()
{
}
};
#endif
CSteamSocketMgr g_SteamSocketMgr;
ISteamSocketMgr *g_pSteamSocketMgr = &g_SteamSocketMgr;
netadr_t NET_InitiateSteamConnection(int sock, uint64 uSteamID, const char *format, ...)
{
netadr_t adr;
#if defined( USE_STEAM_SOCKETS )
if ( uSteamID == 0ull )
{
Warning( "NET_InitiateSteamConnection called with uSteamID == 0\n" );
return adr;
}
va_list argptr;
char string[ MAX_ROUTABLE_PAYLOAD ];
va_start( argptr, format );
Q_vsnprintf( string, sizeof( string ), format, argptr );
va_end( argptr );
int length = Q_strlen( string );
CUtlBuffer sendBuf;
sendBuf.PutUnsignedInt( (unsigned int)-1 );
sendBuf.Put( string, length );
CSteamID steamID;
steamID.SetFromUint64( uSteamID );
if ( net_steamcnx_debug.GetBool() )
{
ConColorMsg( STEAM_CNX_COLOR, "Initiate %llx\n", uSteamID );
}
CSteamSocket *pSocket = g_SteamSocketMgr.InitiateConnection( (ESocketIndex_t)sock, steamID, (const byte *)sendBuf.Base(), sendBuf.TellPut() );
if ( !pSocket )
{
Warning( "NET_InitiateSteamConnection failed to create a socket\n" );
return adr;
}
adr = pSocket->GetNetAddress();
#endif
return adr;
}
void NET_TerminateSteamConnection( int sock, uint64 uSteamID )
{
#if defined( USE_STEAM_SOCKETS )
if ( uSteamID == 0ull )
return;
if ( net_steamcnx_debug.GetBool() )
{
ConColorMsg( STEAM_CNX_COLOR, "Terminate %llx\n", uSteamID );
}
g_SteamSocketMgr.DestroyConnection( (ESocketIndex_t)sock, uSteamID );
#endif
}
CON_COMMAND( net_steamcnx_status, "Print status of steam connection sockets." )
{
g_SteamSocketMgr.PrintStatus();
}