658 lines
20 KiB
658 lines
20 KiB
//========= Copyright (c), Valve Corporation, All rights reserved. ============//
#include "client_pch.h"
#include "demostreamhttp.h"
#include "cl_steamauth.h"
#include "tier1/keyvaluesjson.h"
#include "tier0/memalloc.h"
#include "cl_demo.h"
#include "sv_steamauth.h"
#include "engine_gcmessages.pb.h"
static ISteamHTTP *s_pSteamHTTP = NULL;
ConVar demo_debug( "demo_debug", "0", 0, "Demo debug info." );
ConVar tv_playcast_origin_auth( "tv_playcast_origin_auth", "", FCVAR_RELEASE | FCVAR_HIDDEN, "Get request X-Origin-Auth string" );
ConVar tv_playcast_max_rcvage( "tv_playcast_max_rcvage", "15", FCVAR_RELEASE | FCVAR_HIDDEN );
ConVar tv_playcast_max_rtdelay( "tv_playcast_max_rtdelay", "55", FCVAR_RELEASE | FCVAR_HIDDEN );
ConVar tv_playcast_delay_prediction( "tv_playcast_delay_prediction", "1", FCVAR_RELEASE );
CDemoStreamHttp::CDemoStreamHttp() :
m_nState( STATE_IDLE ),
m_pStreamSignup( NULL ),
m_pClient( NULL ),
m_bSyncFromGc( false ),
m_flBroadcastKeyframeInterval( 3 )
V_memset( &m_SyncResponse, 0, sizeof( m_SyncResponse ) );
m_dSyncTimeoutEnd = -1;
void CDemoStreamHttp::StartStreaming( const char *pUrl, SyncParams_t syncParams )
if ( !PrepareForStreaming( pUrl ) )
m_SyncParams = syncParams;
SendSync( );
bool CDemoStreamHttp::PrepareForStreaming( const char * pUrl )
if ( pUrl[ 0 ] == 'g' && pUrl[ 1 ] == 'c' && pUrl[ 2 ] == '-' )
m_Url = pUrl + 3;
m_bSyncFromGc = true;
m_Url = pUrl;
m_bSyncFromGc = false;
s_pSteamHTTP = Steam3Client().SteamHTTP();
if ( !s_pSteamHTTP )
s_pSteamHTTP = Steam3Server().SteamHTTP();
if ( !s_pSteamHTTP )
DevMsg( "Cannot get Steam HTTP interface\n" );
return false ;
DevMsg( "Broadcast: Synchronizing stream\n" );
m_nState = STATE_SYNC;
return true;
// this is only called in special cases for debugging, to play back stale contents
void CDemoStreamHttp::StartStreamingCached( const char *pUrl, int nFragment)
if ( !PrepareForStreaming( pUrl ) )
m_nState = STATE_START;
// guess parameters of the stream
int nStartTick = 1;
m_nDemoProtocol = 4; // DEMO_PROTOCOL == 4 is where I started writing this
int nSignupFragment = 0;
m_SyncParams.m_nStartFragment = 0;
m_SyncResponse.flTicksPerSecond = 128;
m_SyncResponse.flKeyframeInterval = 3;
m_SyncResponse.flRealTimeDelay = 0;
m_SyncResponse.flReceiveAge = 0;
m_SyncResponse.nFragment = nFragment;
m_SyncResponse.nSignupFragment = nSignupFragment;
m_SyncResponse.nStartTick = nStartTick;
m_SyncResponse.dPlatTimeReceived = Plat_FloatTime();
SendGet( CFmtStr( "/%d/start", nSignupFragment ), new CStartRequest( ) );
BeginBuffering( nFragment );
void CDemoStreamHttp::SendSync( int nResync )
Assert( m_nState == STATE_SYNC );
if ( m_bSyncFromGc )
GotvHttpStreamId_t params = GetStreamId( m_Url );
DevMsg( "Requesting sync from GC, start fragment %d match id %llu instance %d\n", m_SyncParams.m_nStartFragment, params.m_nMatchId, params.m_nInstanceId );
CEngineGotvSyncPacket msg;
msg.set_match_id( params.m_nMatchId );
msg.set_instance_id( params.m_nInstanceId );
if ( m_SyncParams.m_nStartFragment > 0 )
msg.set_currentfragment( m_SyncParams.m_nStartFragment );
g_ClientDLL->EngineGotvSyncPacket( &msg );
m_dSyncTimeoutEnd = Plat_FloatTime() + 10;
char request[ 128 ];
m_SyncParams.PrintSyncRequest( request, sizeof( request ) );
DevMsg( "Requesting sync from relay %s\n", request );
SendGet( request, new CSyncRequest( m_SyncParams, nResync ) );
void CDemoStreamHttp::Resync( )
m_nState = STATE_SYNC;
SendSync( 1 );
void CDemoStreamHttp::Update()
if ( m_nState == STATE_SYNC && m_bSyncFromGc && m_dSyncTimeoutEnd > 0 && m_dSyncTimeoutEnd < Plat_FloatTime() )
if ( m_nState == STATE_RANDOM_WAIT_AND_SYNC && m_bSyncFromGc && m_dSyncTimeoutEnd > 0 && m_dSyncTimeoutEnd < Plat_FloatTime() )
m_nState = STATE_SYNC;
SendSync( 0 );
// result from "/start" request
void CDemoStreamHttp::OnStart( HTTPRequestHandle hRequest )
m_pStreamSignup = MakeBuffer( hRequest );
m_nStreamSignupFragment = m_SyncResponse.nSignupFragment;
if ( !m_pStreamSignup )
DevMsg( "Broadcast failed to start: cannot retrieve startup packet data\n" );
DevMsg( "Received signup fragment %d\n", m_SyncResponse.nSignupFragment );
if ( m_pClient )
m_pClient->OnDemoStreamStart( GetStreamStartReference(), 0 );
void CDemoStreamHttp::OnFragmentRequestSuccess( HTTPRequestHandle hRequest, int nFragment, FragmentTypeEnum_t nType )
Fragment_t &fragment = Fragment( nFragment );
fragment.ClearStreaming( nType );
if ( Buffer_t *pBuf = MakeBuffer( hRequest ) )
fragment.SetField( nType, pBuf );
DevMsg( "Broadcast playback failed to retrieve %s frame of fragment %d", AsString( nType ), nFragment ); // TODO: implement fault-tolerant recovery; we can request the next fragment's full frame
void CDemoStreamHttp::OnFragmentRequestFailure( EHTTPStatusCode nErrorCode, int nFragment, FragmentTypeEnum_t nType )
Fragment_t &fragment = Fragment( nFragment );
fragment.ClearStreaming( nType );
// TODO: Retry streaming gracefully, implement timeouts, skip fragment and download the next full frame if needed
bool CDemoStreamHttp::OnEngineGotvSyncPacket( const CEngineGotvSyncPacket *pPkt )
GotvHttpStreamId_t streamId = GetStreamId( m_Url );
if ( streamId.m_nMatchId != pPkt->match_id() || streamId.m_nInstanceId != pPkt->instance_id() )
Warning( "Ignoring unexpected sync from gc, match %llu:%d, expected %llu:%d\n", pPkt->match_id(), pPkt->instance_id(), streamId.m_nMatchId, streamId.m_nInstanceId );
return false;
if ( m_nState != STATE_SYNC && m_nState != STATE_RANDOM_WAIT_AND_SYNC ) // we should be waiting for a sync in some way. In case of WAIT_AND_SYNC, maybe GC will send us a packet even though we didn't ask for it, while we're waiting to re-ask for a sync..
Warning( "Ignoring unexpected sync from gc, match %llu:%d\n", pPkt->match_id(), pPkt->instance_id() );
return false;
if ( !pPkt->has_tick() )
// the packet is empty, which means: wait for a few seconds
float flDelay = pPkt->rtdelay() * RandomFloat( 0.5f, 1.5f );
m_dSyncTimeoutEnd = Plat_FloatTime() + flDelay;
DevMsg( "Waiting %.2f seconds\n", flDelay );
return true; // we actually successfully processed the packet
m_nDemoProtocol = 4;
m_SyncResponse.flTicksPerSecond = pPkt->tickrate();
m_SyncResponse.flKeyframeInterval = pPkt->has_keyframe_interval() ? pPkt->keyframe_interval() : 3.0f;
m_SyncResponse.nStartTick = pPkt->tick();
m_SyncResponse.flRealTimeDelay = pPkt->rtdelay();
m_SyncResponse.flReceiveAge = pPkt->rcvage();
m_SyncResponse.nFragment = pPkt->currentfragment();
m_SyncResponse.nSignupFragment = pPkt->signupfragment();
m_SyncResponse.dPlatTimeReceived = Plat_FloatTime();
return OnSync( 0 );
// result from "/sync" request arrived
bool CDemoStreamHttp::OnSync( const char *pBuffer, int nBufferSize, int nResync )
if ( m_nState != STATE_SYNC )
Warning( "Ignoring unexpected sync, %d bytes, resync %d\n", nBufferSize, nResync );
return false;
KeyValuesJSONParser json( pBuffer, nBufferSize );
if ( KeyValues *pSync = json.ParseFile() )
m_SyncResponse.flKeyframeInterval = pSync->GetFloat( "keyframe_interval", 3.0f );
m_SyncResponse.nStartTick = pSync->GetInt( "tick", -1 );
m_SyncResponse.flRealTimeDelay = pSync->GetFloat( "rtdelay", 0 );
m_SyncResponse.flReceiveAge = pSync->GetFloat( "rcvage", 0 );
m_SyncResponse.nFragment = pSync->GetInt( "fragment", 1 );
m_SyncResponse.nSignupFragment = pSync->GetInt( "signup_fragment", 0 );
m_SyncResponse.flTicksPerSecond = pSync->GetInt( "tps", 0 );
m_SyncResponse.dPlatTimeReceived = Plat_FloatTime();
m_nDemoProtocol = pSync->GetInt( "protocol", 4 ); // DEMO_PROTOCOL == 4 is where I started writing this
delete pSync; pSync = NULL;
return OnSync( nResync );
DevMsg( "Broadcast sync: malformed response: %s\n", pBuffer );
return false;
bool CDemoStreamHttp::OnSync( int nResync )
if ( nResync && !IsDebug() && m_SyncResponse.flReceiveAge > tv_playcast_max_rcvage.GetFloat() && m_SyncResponse.flRealTimeDelay > tv_playcast_max_rtdelay.GetFloat() )
DevMsg( "Broadcast resync %d: the stream seems to have stopped (rcvage %.1f, rtdelay %.1f)\n", nResync, m_SyncResponse.flReceiveAge, m_SyncResponse.flRealTimeDelay );
return false;
else if ( /*nTick < 0 || nEndTick < nTick || flSkip < 0 ||*/ m_SyncResponse.nFragment < m_SyncResponse.nSignupFragment || m_SyncResponse.nSignupFragment < 0 )
DevMsg( "Broadcast m_SyncResponse: unexpected response. fragment %d must be at/after start fragment %d\n", m_SyncResponse.nFragment, m_SyncResponse.nSignupFragment );
return false;
DevMsg( "Broadcast: Buffering stream tick %d fragment %d signup fragment %d\n", m_SyncResponse.nStartTick, m_SyncResponse.nSignupFragment, m_SyncResponse.nSignupFragment );
m_nState = STATE_START;
m_dSyncTimeoutEnd = -1;
m_flBroadcastKeyframeInterval = m_SyncResponse.flKeyframeInterval;
if ( nResync )
if ( !m_pClient )
DevMsg( "Broadcast resync failed: Client not connected to Stream\n" );
return false;
if ( m_SyncResponse.nSignupFragment == m_nStreamSignupFragment )
Assert( m_pStreamSignup.IsValid() && m_pClient );
m_pClient->OnDemoStreamStart( GetStreamStartReference(), nResync );
if ( !m_pClient->OnDemoStreamRestarting() )
return false;
DevMsg( "Resync %d response requires full stream restart because signup fragment changed from %d to %d\n", nResync, m_nStreamSignupFragment, m_SyncResponse.nSignupFragment );
SendGet( CFmtStr( "/%d/start", m_SyncResponse.nSignupFragment ), new CStartRequest( ) );
Assert( !m_pStreamSignup ); // when we're restarting, we don't need the start fragment, we already initialized
SendGet( CFmtStr( "/%d/start", m_SyncResponse.nSignupFragment ), new CStartRequest( ) );
if ( nResync || !tv_playcast_delay_prediction.GetBool() )
int nFragment = m_SyncResponse.nFragment;
BeginBuffering( nFragment );
return true;
void CDemoStreamHttp::BeginBuffering( int nFragment )
RequestFragment( nFragment, FRAGMENT_FULL );
for ( int i = 0; i <= 4; ++i )
RequestFragment( nFragment + i, FRAGMENT_DELTA );
void CDemoStreamHttp::RequestFragment( int nFragment, FragmentTypeEnum_t nType )
Fragment_t &fragment = Fragment( nFragment );
if ( !fragment.GetField( nType ) && !fragment.IsStreaming(nType) )
fragment.SetStreaming( nType );
SendGet( CFmtStr( nType == FRAGMENT_FULL ? "/%d/full" : "/%d/delta", nFragment ), new CFragmentRequest( nFragment, nType ) );
void CDemoStreamHttp::ReleaseFragment( int nFragment )
UtlHashHandle_t it = m_FragmentCache.Find( nFragment );
if ( it != m_FragmentCache.InvalidHandle() )
m_FragmentCache[ it ].ResetBuffers();
m_FragmentCache.RemoveByHandle( it );
GotvHttpStreamId_t CDemoStreamHttp::GetStreamId( const char *pUrl )
GotvHttpStreamId_t out;
if ( !pUrl || !*pUrl )
return out;
const char *p = pUrl + V_strlen( pUrl ) - 1;
if ( !V_isdigit( *p ) )
return out;
out.m_nInstanceId = *p - '0';
if ( *p != 'i' )
return out;
out.m_nMatchId = 0;
uint64 digitPlace = 1;
while ( ( --p ) >= pUrl && V_isdigit( *p ) )
if ( out.m_nMatchId > uint64( -1ll ) / 10 )
break; // the number doesn't fit 64 bit, error out
out.m_nMatchId += ( *p - '0' ) * digitPlace;
digitPlace *= 10;
if ( *p != '/' )
// invalid matchid
out.m_nMatchId = 0;
return out;
IDemoStreamClient::DemoStreamReference_t CDemoStreamHttp::GetStreamStartReference( bool bLagCompensation /*= false */ )
IDemoStreamClient::DemoStreamReference_t start;
start.nTick = m_SyncResponse.nStartTick;
start.nFragment = m_SyncResponse.nFragment;
if ( bLagCompensation )
float flSkipSeconds = ( Plat_FloatTime() - m_SyncResponse.dPlatTimeReceived + m_SyncResponse.flReceiveAge );
if ( flSkipSeconds >= 0 && flSkipSeconds < 90 ) // if it's not too suspiciously long interval, we can try to compensate for it
int nTotalSkipTicks = int( flSkipSeconds * m_SyncResponse.flTicksPerSecond );
int nTicksPerFragment = int( m_SyncResponse.flKeyframeInterval * m_SyncResponse.flTicksPerSecond );
start.nSkipTicks = nTotalSkipTicks % nTicksPerFragment;
start.nTick += ( nTotalSkipTicks - start.nSkipTicks );
start.nFragment += nTotalSkipTicks / nTicksPerFragment;
start.nSkipTicks = 0;
start.nSkipTicks = int( m_SyncResponse.flReceiveAge * m_SyncResponse.flTicksPerSecond ); // Maybe GC should send rtdelay - desired_delay instead?
return start;
void CDemoStreamHttp::StopStreaming()
if ( m_nState != STATE_IDLE )
m_nState = STATE_IDLE;
if ( m_pClient )
m_dSyncTimeoutEnd = -1;
while ( m_PendingRequests.Count() )
m_pStreamSignup = NULL; // delete start Buffer_t
FOR_EACH_HASHTABLE( m_FragmentCache, it )
m_FragmentCache.Element( it ).ResetBuffers();
CDemoStreamHttp::Buffer_t * CDemoStreamHttp::GetFragmentBuffer( int nFragment , FragmentTypeEnum_t nFragmentType )
UtlHashHandle_t hFind = m_FragmentCache.Find( nFragment );
if ( hFind == m_FragmentCache.InvalidHandle() )
return NULL;
return m_FragmentCache[ hFind ].GetField( nFragmentType );
void CDemoStreamHttp::CSyncRequest::OnSuccess( const HTTPRequestCompleted_t * pResponse )
uint32 nBodySize;
if( !s_pSteamHTTP->GetHTTPResponseBodySize( pResponse->m_hRequest, &nBodySize ) || nBodySize >= 1024 )
DevMsg( "Broadcast sync: response buffer overflow (%d bytes)\n", nBodySize );
char *pResponseBuffer = StackAlloc( char, nBodySize + 1 );
if ( !s_pSteamHTTP->GetHTTPResponseBodyData( pResponse->m_hRequest, ( uint8* )pResponseBuffer, nBodySize ) )
DevMsg( "Broadcast sync: cannot read response body\n" );
pResponseBuffer[ nBodySize ] = '\0';
m_pParent->OnSync( pResponseBuffer, nBodySize, m_nResync );
void CDemoStreamHttp::CSyncRequest::OnFailure( const HTTPRequestCompleted_t * pResponse )
if ( !m_nResync || m_nResync > 5 )
CPendingRequest::OnFailure( pResponse );
DevMsg( "%d stream resync failed\n", m_nResync );
m_pParent->SendSync( m_nResync + 1 ); // retry a couple times
void CDemoStreamHttp::CPendingRequest::OnFailure( const HTTPRequestCompleted_t * pResponse )
if ( !pResponse )
DevMsg( "Broadcast IO error. Please try again later.\n" );
DevMsg( "Broadcast Streaming error %d\n", pResponse->m_eStatusCode );
void CDemoStreamHttp::CStartRequest::OnSuccess( const HTTPRequestCompleted_t * pResponse )
m_pParent->OnStart( pResponse->m_hRequest );
CDemoStreamHttp::Buffer_t * CDemoStreamHttp::MakeBuffer( HTTPRequestHandle hRequest )
uint32 nBodySize;
if ( !s_pSteamHTTP->GetHTTPResponseBodySize( hRequest, &nBodySize ) )
return NULL;
uint8 *pMemory = new uint8[ sizeof( Buffer_t ) + nBodySize + 1 ];
if ( !s_pSteamHTTP->GetHTTPResponseBodyData( hRequest, pMemory + sizeof( Buffer_t ), nBodySize ) )
delete[] pMemory;
return NULL;
pMemory[ sizeof( Buffer_t ) + nBodySize ] = '\0'; // in case we need to receive and parse some text-only packets in the future
Buffer_t* pBuffer = ( Buffer_t* )pMemory;
pBuffer->m_nRefCount = 0;
pBuffer->m_nSize = nBodySize;
return pBuffer;
void CDemoStreamHttp::SendGet( const char *pPath, CPendingRequest *pRequest )
HTTPRequestHandle hRequest = s_pSteamHTTP->CreateHTTPRequest( k_EHTTPMethodGET, m_Url + pPath );
s_pSteamHTTP->SetHTTPRequestNetworkActivityTimeout( hRequest, 30 );
const char *pOriginAuth = tv_playcast_origin_auth.GetString();
if ( pOriginAuth && *pOriginAuth )
if ( !s_pSteamHTTP->SetHTTPRequestHeaderValue( hRequest, "X-Origin-Auth", pOriginAuth ) )
Warning( "Cannot set http X-Origin-Auth for %s\n", pPath );
SteamAPICall_t hCall;
bool bSentOk = s_pSteamHTTP->SendHTTPRequest( hRequest, &hCall );
pRequest->Init( this, hRequest, hCall );
if ( bSentOk && hCall )
SteamAPI_RegisterCallResult( pRequest, hCall );
s_pSteamHTTP->ReleaseHTTPRequest( hRequest );
DevMsg( "Broadcast streaming: unexpected failure getting %s\n", pPath );
pRequest->OnFailure( NULL ); // IO failure
CDemoStreamHttp::Fragment_t & CDemoStreamHttp::Fragment( int nFragment )
UtlHashHandle_t it = m_FragmentCache.Insert( nFragment );
return m_FragmentCache[ it ];
CDemoStreamHttp::CPendingRequest::CPendingRequest() :
m_pParent( NULL ),
m_hCall( k_uAPICallInvalid )
m_iCallback = HTTPRequestCompleted_t::k_iCallback;
void CDemoStreamHttp::CPendingRequest::Init( CDemoStreamHttp *pParent, HTTPRequestHandle hRequest, SteamAPICall_t hCall )
m_pParent = pParent;
m_hRequest = hRequest;
m_hCall = hCall;
pParent->m_PendingRequests.AddToTail( this );
void CDemoStreamHttp::CPendingRequest::Run( void *pvParam )
m_pParent->m_PendingRequests.FindAndFastRemove( this );
OnSuccess( ( HTTPRequestCompleted_t * )pvParam );
delete this;
void CDemoStreamHttp::CPendingRequest::Run( void *pvParam, bool bIOFailure, SteamAPICall_t hSteamAPICall )
m_pParent->m_PendingRequests.FindAndFastRemove( this );
if ( bIOFailure )
OnFailure( NULL );
EHTTPStatusCode nStatus = ( ( HTTPRequestCompleted_t * )pvParam )->m_eStatusCode;
Assert( ( ( HTTPRequestCompleted_t * )pvParam )->m_hRequest == m_hRequest );
if ( nStatus != k_EHTTPStatusCode200OK ) // we should always get a 200
OnFailure( ( HTTPRequestCompleted_t * )pvParam );
OnSuccess( ( HTTPRequestCompleted_t * )pvParam );
delete this;
void CDemoStreamHttp::CPendingRequest::Cancel()
SteamAPI_UnregisterCallResult( this, m_hCall );
s_pSteamHTTP->ReleaseHTTPRequest( m_hRequest );
m_pParent->m_PendingRequests.FindAndFastRemove( this );
OnFailure( NULL );
delete this;
void CDemoStreamHttp::CFragmentRequest::OnSuccess( const HTTPRequestCompleted_t * pResponse )
m_pParent->OnFragmentRequestSuccess( pResponse->m_hRequest, m_nFragment, m_nType );
static const char *s_pFragmentTypeName[ CDemoStreamHttp::FRAGMENT_COUNT ] =
"delta", "full"
void CDemoStreamHttp::CFragmentRequest::OnFailure( const HTTPRequestCompleted_t * pResponse )
if ( demo_debug.GetBool() )
DevMsg( "Failed to retrieve %s fragment %d\n", s_pFragmentTypeName[ m_nType ], m_nFragment );
m_pParent->OnFragmentRequestFailure( pResponse ? pResponse->m_eStatusCode : k_EHTTPStatusCodeInvalid, m_nFragment, m_nType );
// CPendingRequest::OnFailure( pResponse ); <-- the parent OnFail will stop streaming, and we don't want that because we'll retry this fragment or next for a few times before giving up
void CDemoStreamHttp::Fragment_t::ResetBuffers()
for ( int i = 0; i < FRAGMENT_COUNT; ++i )
if ( m_pField[ i ] )
Buffer_t::Release( m_pField[ i ] );
m_pField[ i ] = NULL;
void CDemoStreamHttp::Fragment_t::SetField( FragmentTypeEnum_t nFragment, Buffer_t *pBuffer )
if ( pBuffer )
Buffer_t::AddRef( pBuffer );
if ( m_pField[ nFragment ] )
Buffer_t::Release( m_pField[ nFragment ] );
m_pField[ nFragment ] = pBuffer;
void CDemoStreamHttp::SyncParams_t::PrintSyncRequest(char *buffer, int nBufferSize) const
if ( m_nStartFragment )
V_snprintf(buffer, nBufferSize, "/sync?fragment=%d", m_nStartFragment );
V_strncpy( buffer, "/sync", nBufferSize );