2013-11-11 21:14:42 +01:00
unit IdHTTPWebsocketClient;
interface
uses
Classes,
2013-11-18 14:27:13 +01:00
IdHTTP,
{$IF CompilerVersion <= 21.0} //D2010
IdHashSHA1,
{$else}
2014-01-03 18:52:46 +01:00
Types,
2013-11-18 14:27:13 +01:00
IdHashSHA, //XE3 etc
{$IFEND}
IdIOHandler,
2014-01-03 18:52:46 +01:00
IdIOHandlerWebsocket,
{$ifdef FMX}
FMX. Types,
{$ELSE}
ExtCtrls,
{$ENDIF}
IdWinsock2, Generics. Collections, SyncObjs,
2013-11-11 21:14:42 +01:00
IdSocketIOHandling;
type
2014-02-10 11:30:56 +01:00
TWebsocketMsgBin = procedure( const aData: TStream) of object ;
TWebsocketMsgText = procedure( const aData: string ) of object ;
2013-11-11 21:14:42 +01:00
TIdHTTPWebsocketClient = class ;
TSocketIOMsg = procedure( const AClient: TIdHTTPWebsocketClient; const aText: string ; aMsgNr: Integer ) of object ;
TIdSocketIOHandling_Ext = class( TIdSocketIOHandling)
end ;
TIdHTTPWebsocketClient = class( TIdHTTP)
private
FWSResourceName: string ;
FHash: TIdHashSHA1;
2014-02-10 11:30:56 +01:00
FOnData: TWebsocketMsgBin;
FOnTextData: TWebsocketMsgText;
2013-11-11 21:14:42 +01:00
function GetIOHandlerWS: TIdIOHandlerWebsocket;
procedure SetIOHandlerWS( const Value: TIdIOHandlerWebsocket) ;
2014-02-10 11:30:56 +01:00
procedure SetOnData( const Value: TWebsocketMsgBin) ;
procedure SetOnTextData( const Value: TWebsocketMsgText) ;
2013-11-11 21:14:42 +01:00
protected
FSocketIOCompatible: Boolean ;
FSocketIOHandshakeResponse: string ;
FSocketIO: TIdSocketIOHandling_Ext;
FSocketIOContext: ISocketIOContext;
2014-01-23 10:17:13 +01:00
FSocketIOConnectBusy: Boolean ;
2014-01-23 16:30:39 +01:00
//FHeartBeat: TTimer;
//procedure HeartBeatTimer(Sender: TObject);
2013-11-11 21:14:42 +01:00
function GetSocketIO: TIdSocketIOHandling;
protected
procedure InternalUpgradeToWebsocket( aRaiseException: Boolean ; out aFailedReason: string ) ; virtual ;
function MakeImplicitClientHandler: TIdIOHandler; override ;
public
procedure AsyncDispatchEvent( const aEvent: TStream) ; overload ; virtual ;
procedure AsyncDispatchEvent( const aEvent: string ) ; overload ; virtual ;
procedure ResetChannel;
public
procedure AfterConstruction; override ;
destructor Destroy; override ;
function TryUpgradeToWebsocket: Boolean ;
procedure UpgradeToWebsocket;
2013-12-10 20:53:20 +01:00
2014-02-10 11:30:56 +01:00
function TryLock: Boolean ;
2014-01-31 20:22:10 +01:00
procedure Lock;
procedure UnLock;
2013-12-10 20:53:20 +01:00
procedure Connect; override ;
function TryConnect: Boolean ;
2013-11-11 21:14:42 +01:00
procedure Disconnect( ANotifyPeer: Boolean ) ; override ;
2014-01-23 16:30:39 +01:00
function CheckConnection: Boolean ;
procedure Ping;
2013-11-11 21:14:42 +01:00
property IOHandler: TIdIOHandlerWebsocket read GetIOHandlerWS write SetIOHandlerWS;
2014-02-10 11:30:56 +01:00
//websockets
property OnBinData : TWebsocketMsgBin read FOnData write SetOnData;
property OnTextData: TWebsocketMsgText read FOnTextData write SetOnTextData;
2013-11-11 21:14:42 +01:00
//https://github.com/LearnBoost/socket.io-spec
property SocketIOCompatible: Boolean read FSocketIOCompatible write FSocketIOCompatible;
property SocketIO: TIdSocketIOHandling read GetSocketIO;
published
property Host;
property Port;
property WSResourceName: string read FWSResourceName write FWSResourceName;
end ;
// on error
2014-01-31 20:22:10 +01:00
( *
2013-11-11 21:14:42 +01:00
TIdHTTPSocketIOClient_old = class( TIdHTTPWebsocketClient)
private
FOnConnected: TNotifyEvent;
FOnDisConnected: TNotifyEvent;
FOnSocketIOMsg: TSocketIOMsg;
FOnSocketIOEvent: TSocketIOMsg;
FOnSocketIOJson: TSocketIOMsg;
protected
FHeartBeat: TTimer;
procedure HeartBeatTimer( Sender: TObject) ;
procedure InternalUpgradeToWebsocket( aRaiseException: Boolean ; out aFailedReason: string ) ; override ;
public
procedure AsyncDispatchEvent( const aEvent: string ) ; override ;
public
procedure AfterConstruction; override ;
destructor Destroy; override ;
procedure AutoConnect;
property SocketIOHandshakeResponse: string read FSocketIOHandshakeResponse;
property OnConnected: TNotifyEvent read FOnConnected write FOnConnected;
property OnDisConnected: TNotifyEvent read FOnDisConnected write FOnDisConnected;
// procedure ProcessSocketIORequest(const strmRequest: TStream);
property OnSocketIOMsg : TSocketIOMsg read FOnSocketIOMsg write FOnSocketIOMsg;
property OnSocketIOJson : TSocketIOMsg read FOnSocketIOJson write FOnSocketIOJson;
property OnSocketIOEvent: TSocketIOMsg read FOnSocketIOEvent write FOnSocketIOEvent;
end ;
2014-01-31 20:22:10 +01:00
* )
2013-11-11 21:14:42 +01:00
2014-03-07 12:19:32 +01:00
TWSThreadList = class( TThreadList)
public
function Count: Integer ;
end ;
2013-11-11 21:14:42 +01:00
TIdWebsocketMultiReadThread = class( TThread)
private
class var FInstance: TIdWebsocketMultiReadThread;
protected
2014-03-07 12:19:32 +01:00
FReadTimeout: Integer ;
2013-11-11 21:14:42 +01:00
FTempHandle: THandle;
FPendingBreak: Boolean ;
Freadset, Fexceptionset: TFDSet;
Finterval: TTimeVal;
procedure InitSpecialEventSocket;
procedure ResetSpecialEventSocket;
procedure BreakSelectWait;
protected
FChannels: TThreadList;
2014-03-07 12:19:32 +01:00
FReconnectlist: TWSThreadList;
FReconnectThread: TIdWebsocketQueueThread;
2013-11-11 21:14:42 +01:00
procedure ReadFromAllChannels;
2014-01-23 16:30:39 +01:00
procedure PingAllChannels;
2013-11-11 21:14:42 +01:00
procedure Execute; override ;
public
procedure AfterConstruction; override ;
destructor Destroy; override ;
procedure Terminate;
procedure AddClient ( aChannel: TIdHTTPWebsocketClient) ;
procedure RemoveClient( aChannel: TIdHTTPWebsocketClient) ;
2014-03-07 12:19:32 +01:00
property ReadTimeout: Integer read FReadTimeout write FReadTimeout default 5 0 0 0 ;
2013-11-11 21:14:42 +01:00
class function Instance: TIdWebsocketMultiReadThread;
class procedure RemoveInstance;
end ;
2014-01-10 15:05:04 +01:00
//async process data
TIdWebsocketDispatchThread = class( TIdWebsocketQueueThread)
2013-11-11 21:14:42 +01:00
private
class var FInstance: TIdWebsocketDispatchThread;
public
2014-01-10 15:05:04 +01:00
class function Instance: TIdWebsocketDispatchThread;
class procedure RemoveInstance;
2013-11-11 21:14:42 +01:00
end ;
implementation
uses
IdCoderMIME, SysUtils, Math, IdException, IdStackConsts, IdStack,
2014-01-23 16:30:39 +01:00
IdStackBSDBase, IdGlobal, Windows, StrUtils, DateUtils;
2013-11-11 21:14:42 +01:00
//type
// TAnonymousThread = class(TThread)
// protected
// FThreadProc: TThreadProcedure;
// procedure Execute; override;
// public
// constructor Create(AThreadProc: TThreadProcedure);
// end;
//procedure CreateAnonymousThread(AThreadProc: TThreadProcedure);
//begin
// TAnonymousThread.Create(AThreadProc);
//end;
{ TAnonymousThread }
//constructor TAnonymousThread.Create(AThreadProc: TThreadProcedure);
//begin
// FThreadProc := AThreadProc;
// FreeOnTerminate := True;
// inherited Create(False {direct start});
//end;
//
//procedure TAnonymousThread.Execute;
//begin
// if Assigned(FThreadProc) then
// FThreadProc();
//end;
{ TIdHTTPWebsocketClient }
procedure TIdHTTPWebsocketClient. AfterConstruction;
begin
inherited ;
FHash : = TIdHashSHA1. Create;
IOHandler : = TIdIOHandlerWebsocket. Create( nil ) ;
2013-11-18 14:27:13 +01:00
IOHandler. UseNagle : = False ;
2013-11-11 21:14:42 +01:00
ManagedIOHandler : = True ;
FSocketIO : = TIdSocketIOHandling_Ext. Create;
2014-01-23 16:30:39 +01:00
// FHeartBeat := TTimer.Create(nil);
// FHeartBeat.Enabled := False;
// FHeartBeat.OnTimer := HeartBeatTimer;
2013-11-11 21:14:42 +01:00
end ;
procedure TIdHTTPWebsocketClient. AsyncDispatchEvent( const aEvent: TStream) ;
var
strmevent: TMemoryStream;
begin
if not Assigned( OnBinData) then Exit;
strmevent : = TMemoryStream. Create;
strmevent. CopyFrom( aEvent, aEvent. Size) ;
//events during dispatch? channel is busy so offload event dispatching to different thread!
TIdWebsocketDispatchThread. Instance. QueueEvent(
procedure
begin
if Assigned( OnBinData) then
OnBinData( strmevent) ;
strmevent. Free;
end ) ;
end ;
procedure TIdHTTPWebsocketClient. AsyncDispatchEvent( const aEvent: string ) ;
begin
2014-01-03 18:52:46 +01:00
OutputDebugString( PChar( 'AsyncDispatchEvent: ' + aEvent) ) ;
2013-11-18 14:27:13 +01:00
//if not Assigned(OnTextData) then Exit;
//events during dispatch? channel is busy so offload event dispatching to different thread!
TIdWebsocketDispatchThread. Instance. QueueEvent(
procedure
begin
if FSocketIOCompatible then
FSocketIO. ProcessSocketIORequest( FSocketIOContext as TSocketIOContext, aEvent)
else if Assigned( OnTextData) then
OnTextData( aEvent) ;
end ) ;
2013-11-11 21:14:42 +01:00
end ;
2014-01-23 16:30:39 +01:00
function TIdHTTPWebsocketClient. CheckConnection: Boolean ;
begin
Result : = False ;
try
if ( IOHandler < > nil ) and
not IOHandler. ClosedGracefully and
IOHandler. Connected then
begin
IOHandler. CheckForDisconnect( True {error} , True {ignore buffer, check real connection} ) ;
Result : = True ; //ok if we reach here
end ;
except
on E: Exception do
begin
//clear inputbuffer, otherwise it stays connected :(
// if (IOHandler <> nil) then
// IOHandler.Clear;
Disconnect( False ) ;
if Assigned( OnDisConnected) then
OnDisConnected( Self) ;
end ;
end ;
end ;
2013-12-10 20:53:20 +01:00
procedure TIdHTTPWebsocketClient. Connect;
begin
2014-01-31 20:22:10 +01:00
Lock;
try
if Connected then
begin
TryUpgradeToWebsocket;
Exit;
end ;
2014-01-10 15:05:04 +01:00
2014-01-31 20:22:10 +01:00
//FHeartBeat.Enabled := True;
if SocketIOCompatible and
not FSocketIOConnectBusy then
begin
2014-03-07 12:19:32 +01:00
//FSocketIOConnectBusy := True;
//try
2014-01-31 20:22:10 +01:00
TryUpgradeToWebsocket; //socket.io connects using HTTP, so no seperate .Connect needed (only gives Connection closed gracefully exceptions because of new http command)
2014-03-07 12:19:32 +01:00
//finally
// FSocketIOConnectBusy := False;
//end;
2014-01-31 20:22:10 +01:00
end
else
begin
//clear inputbuffer, otherwise it can't connect :(
if ( IOHandler < > nil ) then IOHandler. Clear;
inherited Connect;
2014-01-23 10:17:13 +01:00
end ;
2014-01-31 20:22:10 +01:00
finally
UnLock;
end ;
2013-12-10 20:53:20 +01:00
end ;
2013-11-11 21:14:42 +01:00
destructor TIdHTTPWebsocketClient. Destroy;
2014-01-23 16:30:39 +01:00
//var tmr: TObject;
2013-11-11 21:14:42 +01:00
begin
2014-01-23 16:30:39 +01:00
// tmr := FHeartBeat;
// FHeartBeat := nil;
// TThread.Queue(nil, //otherwise free in other thread than created
// procedure
// begin
2013-11-11 21:14:42 +01:00
//FHeartBeat.Free;
2014-01-23 16:30:39 +01:00
// tmr.Free;
// end);
2013-11-11 21:14:42 +01:00
TIdWebsocketMultiReadThread. Instance. RemoveClient( Self) ;
FSocketIO. Free;
FHash. Free;
inherited ;
end ;
procedure TIdHTTPWebsocketClient. DisConnect( ANotifyPeer: Boolean ) ;
begin
2014-01-31 20:22:10 +01:00
if not SocketIOCompatible and
( ( IOHandler < > nil ) and not IOHandler. IsWebsocket)
then
TIdWebsocketMultiReadThread. Instance. RemoveClient( Self) ;
2013-11-11 21:14:42 +01:00
if ANotifyPeer and SocketIOCompatible then
FSocketIO. WriteDisConnect( FSocketIOContext as TSocketIOContext)
else
FSocketIO. FreeConnection( FSocketIOContext as TSocketIOContext) ;
// IInterface(FSocketIOContext)._Release;
FSocketIOContext : = nil ;
if IOHandler < > nil then
begin
IOHandler. Lock;
try
IOHandler. IsWebsocket : = False ;
inherited DisConnect( ANotifyPeer) ;
//clear buffer, other still "connected"
2014-01-10 15:05:04 +01:00
IOHandler. Clear;
2013-11-11 21:14:42 +01:00
//IOHandler.Free;
//IOHandler := TIdIOHandlerWebsocket.Create(nil);
finally
IOHandler. Unlock;
end ;
end ;
end ;
function TIdHTTPWebsocketClient. GetIOHandlerWS: TIdIOHandlerWebsocket;
begin
// if inherited IOHandler is TIdIOHandlerWebsocket then
Result : = inherited IOHandler as TIdIOHandlerWebsocket
// else
// Assert(False);
end ;
function TIdHTTPWebsocketClient. GetSocketIO: TIdSocketIOHandling;
begin
Result : = FSocketIO;
end ;
2014-01-23 16:30:39 +01:00
( *
2013-11-11 21:14:42 +01:00
procedure TIdHTTPWebsocketClient. HeartBeatTimer( Sender: TObject) ;
begin
FHeartBeat. Enabled : = False ;
FSocketIO. Lock;
try
try
if ( IOHandler < > nil ) and
not IOHandler. ClosedGracefully and
2014-01-10 15:05:04 +01:00
IOHandler. Connected and
2013-11-11 21:14:42 +01:00
( FSocketIOContext < > nil ) then
begin
2014-01-10 15:05:04 +01:00
FSocketIO. WritePing( FSocketIOContext as TSocketIOContext) ; //heartbeat socket.io message
2013-11-11 21:14:42 +01:00
end
//retry re-connect
else
try
//clear inputbuffer, otherwise it can't connect :(
2014-01-10 15:05:04 +01:00
if ( IOHandler < > nil ) then
IOHandler. Clear;
2013-11-11 21:14:42 +01:00
2014-01-23 10:17:13 +01:00
Self. ConnectTimeout : = 1 0 0 ; //100ms otherwise GUI hangs too much -> todo: do it in background thread!
if not Connected then
Self. Connect;
2013-11-11 21:14:42 +01:00
TryUpgradeToWebsocket;
except
//skip, just retried
end ;
2014-01-03 18:52:46 +01:00
except on E: Exception do
begin
//clear inputbuffer, otherwise it stays connected :(
2014-01-10 15:05:04 +01:00
if ( IOHandler < > nil ) then
IOHandler. Clear;
2014-01-03 18:52:46 +01:00
Disconnect( False ) ;
2013-11-11 21:14:42 +01:00
2014-01-03 18:52:46 +01:00
if Assigned( OnDisConnected) then
OnDisConnected( Self) ;
try
raise EIdException. Create( 'Connection lost from ' +
Format( 'ws://%s:%d/%s' , [ Host, Port, WSResourceName] ) +
' - Error: ' + e. Message ) ;
except
//eat, no error popup!
end ;
2013-11-11 21:14:42 +01:00
end ;
end ;
finally
FSocketIO. UnLock;
FHeartBeat. Enabled : = True ; //always enable: in case of disconnect it will re-connect
end ;
end ;
2014-01-23 16:30:39 +01:00
* )
2013-11-11 21:14:42 +01:00
2013-12-10 20:53:20 +01:00
function TIdHTTPWebsocketClient. TryConnect: Boolean ;
begin
2014-01-31 20:22:10 +01:00
Lock;
2013-12-10 20:53:20 +01:00
try
2014-01-31 20:22:10 +01:00
try
if Connected then Exit( True ) ;
2013-12-10 20:53:20 +01:00
2014-01-31 20:22:10 +01:00
Connect;
Result : = Connected;
2014-02-10 11:30:56 +01:00
//if Result then
// Result := TryUpgradeToWebsocket already done in connect
2014-01-31 20:22:10 +01:00
except
Result : = False ;
end
finally
UnLock;
end ;
2013-12-10 20:53:20 +01:00
end ;
2014-02-10 11:30:56 +01:00
function TIdHTTPWebsocketClient. TryLock: Boolean ;
begin
Result : = System. TMonitor. TryEnter( Self) ;
end ;
2013-11-11 21:14:42 +01:00
function TIdHTTPWebsocketClient. TryUpgradeToWebsocket: Boolean ;
var
sError: string ;
begin
2014-03-07 12:19:32 +01:00
FSocketIOConnectBusy : = True ;
2014-01-31 20:22:10 +01:00
Lock;
try
if ( IOHandler < > nil ) and IOHandler. IsWebsocket then Exit( True ) ;
2014-01-23 10:17:13 +01:00
2014-01-31 20:22:10 +01:00
InternalUpgradeToWebsocket( False {no raise} , sError) ;
Result : = ( sError = '' ) ;
finally
2014-03-07 12:19:32 +01:00
FSocketIOConnectBusy : = False ;
2014-01-31 20:22:10 +01:00
UnLock;
2014-02-10 11:30:56 +01:00
end ;
2014-01-31 20:22:10 +01:00
end ;
procedure TIdHTTPWebsocketClient. UnLock;
begin
System. TMonitor. Exit( Self) ;
2013-11-11 21:14:42 +01:00
end ;
procedure TIdHTTPWebsocketClient. UpgradeToWebsocket;
var
sError: string ;
begin
2014-01-31 20:22:10 +01:00
Lock;
try
if not IOHandler. IsWebsocket then
InternalUpgradeToWebsocket( True {raise} , sError) ;
finally
UnLock;
end ;
2013-11-11 21:14:42 +01:00
end ;
procedure TIdHTTPWebsocketClient. InternalUpgradeToWebsocket( aRaiseException: Boolean ; out aFailedReason: string ) ;
var
sURL: string ;
strmResponse: TMemoryStream;
i: Integer ;
sKey, sResponseKey: string ;
sSocketioextended: string ;
begin
2014-01-23 10:17:13 +01:00
Assert( ( IOHandler = nil ) or not IOHandler. IsWebsocket) ;
2014-02-10 11:30:56 +01:00
//remove from thread during connection handling
TIdWebsocketMultiReadThread. Instance. RemoveClient( Self) ;
2013-11-11 21:14:42 +01:00
2014-03-07 12:22:18 +01:00
//reset pending data
if IOHandler < > nil then
IOHandler. Clear;
2013-11-11 21:14:42 +01:00
strmResponse : = TMemoryStream. Create;
try
//special socket.io handling, see https://github.com/LearnBoost/socket.io-spec
if SocketIOCompatible then
begin
Request. Clear;
Request. Connection : = 'keep-alive' ;
sURL : = Format( 'http://%s:%d/socket.io/1/' , [ Host, Port] ) ;
strmResponse. Clear;
//get initial handshake
Post( sURL, strmResponse, strmResponse) ;
if ResponseCode = 2 0 0 {OK} then
begin
//if not Connected then //reconnect
// Self.Connect;
strmResponse. Position : = 0 ;
//The body of the response should contain the session id (sid) given to the client,
//followed by the heartbeat timeout, the connection closing timeout, and the list of supported transports separated by :
//4d4f185e96a7b:15:10:websocket,xhr-polling
with TStreamReader. Create( strmResponse) do
try
FSocketIOHandshakeResponse : = ReadToEnd;
finally
Free;
end ;
sKey : = Copy( FSocketIOHandshakeResponse, 1 , Pos( ':' , FSocketIOHandshakeResponse) - 1 ) ;
sSocketioextended : = 'socket.io/1/websocket/' + sKey;
WSResourceName : = sSocketioextended;
end
else
begin
aFailedReason : = Format( 'Initial socket.io handshake failed: "%d: %s"' , [ ResponseCode, ResponseText] ) ;
if aRaiseException then
raise EIdWebSocketHandleError. Create( aFailedReason) ;
end ;
end ;
Request. Clear;
strmResponse. Clear;
//http://www.websocket.org/aboutwebsocket.html
( * GET ws: //echo.websocket.org/?encoding=text HTTP/1.1
Origin: http: //websocket.org
Cookie: __utma= 9 9 as
Connection: Upgrade
Host: echo. websocket. org
Sec- WebSocket- Key: uRovscZjNol/ umbTt5uKmw= =
Upgrade: websocket
Sec- WebSocket- Version: 1 3 * )
//Connection: Upgrade
Request. Connection : = 'Upgrade' ;
//Upgrade: websocket
Request. CustomHeaders. Add( 'Upgrade: websocket' ) ;
//Sec-WebSocket-Key
sKey : = '' ;
for i : = 1 to 1 6 do
sKey : = sKey + Char( Random( 1 2 7 - 3 2 ) + 3 2 ) ;
//base64 encoded
sKey : = TIdEncoderMIME. EncodeString( sKey) ;
Request. CustomHeaders. AddValue( 'Sec-WebSocket-Key' , sKey) ;
//Sec-WebSocket-Version: 13
Request. CustomHeaders. AddValue( 'Sec-WebSocket-Version' , '13' ) ;
Request. Host : = Format( 'Host: %s:%d' , [ Host, Port] ) ;
//ws://host:port/<resourcename>
//about resourcename, see: http://dev.w3.org/html5/websockets/ "Parsing WebSocket URLs"
//sURL := Format('ws://%s:%d/%s', [Host, Port, WSResourceName]);
sURL : = Format( 'http://%s:%d/%s' , [ Host, Port, WSResourceName] ) ;
ReadTimeout : = Max( 2 * 1 0 0 0 , ReadTimeout) ;
{ voorbeeld:
GET http: //localhost:9222/devtools/page/642D7227-148E-47C2-B97A-E00850E3AFA3 HTTP/1.1
Upgrade: websocket
Connection: Upgrade
Host: localhost: 9 2 2 2
Origin: http: //localhost:9222
Pragma: no- cache
Cache- Control: no- cache
Sec- WebSocket- Key: HIqoAdZkxnWWH9dnVPyW7w= =
Sec- WebSocket- Version: 1 3
Sec- WebSocket- Extensions: x- webkit- deflate- frame
User- Agent: Mozilla/ 5.0 ( Windows NT 6.1 ; WOW64) AppleWebKit/ 537.36 ( KHTML, like Gecko) Chrome/ 27.0 . 1453.116 Safari/ 537.36
Cookie: __utma= 1.2040118404 . 1366961318.1366961318 . 1366961318.1 ; __utmc= 1 ; __utmz= 1.1366961318 . 1.1 . utmcsr= ( direct) | utmccn= ( direct) | utmcmd= ( none) ; deviceorder= 0 1 2 3 4 5 6 7 8 9 1 0 1 1 1 2 ; MultiTouchEnabled= false ; device= 3 ; network_type= 0
}
if SocketIOCompatible then
begin
Response. Clear;
Response. ResponseCode : = 0 ;
Request. URL : = sURL;
Request. Method : = Id_HTTPMethodGet;
Request. Source : = nil ;
Response. ContentStream : = strmResponse;
PrepareRequest( Request) ;
//connect and upgrade
ConnectToHost( Request, Response) ;
//check upgrade succesfull
CheckForGracefulDisconnect( True ) ;
CheckConnected;
Assert( Self. Connected) ;
2013-11-18 14:27:13 +01:00
if Response. ResponseCode = 0 then
Response. ResponseText : = Response. ResponseText;
2013-11-11 21:14:42 +01:00
if Response. ResponseCode < > 2 0 0 {ok} then
begin
aFailedReason : = Format( 'Error while upgrading: "%d: %s"' , [ ResponseCode, ResponseText] ) ;
if aRaiseException then
raise EIdWebSocketHandleError. Create( aFailedReason)
else
Exit;
end ;
Response. Clear;
if IOHandler. CheckForDataOnSource( ReadTimeout) then
Response. ResponseText : = IOHandler. InputBufferAsString( ) ;
end
else
begin
Get( sURL, strmResponse, [ 1 0 1 ] ) ;
//http://www.websocket.org/aboutwebsocket.html
( * HTTP/ 1.1 1 0 1 WebSocket Protocol Handshake
Date: Fri, 1 0 Feb 2 0 1 2 1 7 : 3 8 : 1 8 GMT
Connection: Upgrade
Server: Kaazing Gateway
Upgrade: WebSocket
Access- Control- Allow- Origin: http: //websocket.org
Access- Control- Allow- Credentials: true
Sec- WebSocket- Accept: rLHCkw/ SKsO9GAH/ ZSFhBATDKrU=
Access- Control- Allow- Headers: content- type * )
//'HTTP/1.1 101 Switching Protocols'
if ResponseCode < > 1 0 1 then
begin
aFailedReason : = Format( 'Error while upgrading: "%d: %s"' , [ ResponseCode, ResponseText] ) ;
if aRaiseException then
raise EIdWebSocketHandleError. Create( aFailedReason) ;
end ;
//connection: upgrade
if not SameText( Response. Connection, 'upgrade' ) then
begin
aFailedReason : = Format( 'Connection not upgraded: "%s"' , [ Response. Connection] ) ;
if aRaiseException then
raise EIdWebSocketHandleError. Create( aFailedReason) ;
end ;
//upgrade: websocket
if not SameText( Response. RawHeaders. Values[ 'upgrade' ] , 'websocket' ) then
begin
aFailedReason : = Format( 'Not upgraded to websocket: "%s"' , [ Response. RawHeaders. Values[ 'upgrade' ] ] ) ;
if aRaiseException then
raise EIdWebSocketHandleError. Create( aFailedReason) ;
end ;
//check handshake key
sResponseKey : = Trim( sKey) + //... "minus any leading and trailing whitespace"
'258EAFA5-E914-47DA-95CA-C5AB0DC85B11' ; //special GUID
sResponseKey : = TIdEncoderMIME. EncodeBytes( //Base64
FHash. HashString( sResponseKey) ) ; //SHA1
if not SameText( Response. RawHeaders. Values[ 'sec-websocket-accept' ] , sResponseKey) then
begin
aFailedReason : = 'Invalid key handshake' ;
if aRaiseException then
raise EIdWebSocketHandleError. Create( aFailedReason) ;
end ;
end ;
//upgrade succesful
IOHandler. IsWebsocket : = True ;
aFailedReason : = '' ;
2014-03-07 12:19:32 +01:00
Assert( Connected) ;
2013-11-11 21:14:42 +01:00
if SocketIOCompatible then
begin
2014-01-31 20:22:10 +01:00
FSocketIOContext : = TSocketIOContext. Create( Self) ;
2014-01-02 14:59:27 +01:00
( FSocketIOContext as TSocketIOContext) . ConnectSend : = True ; //connect already send via url? GET /socket.io/1/websocket/9elrbEFqiimV29QAM6T-
2013-11-11 21:14:42 +01:00
FSocketIO. WriteConnect( FSocketIOContext as TSocketIOContext) ;
end ;
2014-01-03 18:52:46 +01:00
//always read the data! (e.g. RO use override of AsyncDispatchEvent to process data)
//if Assigned(OnBinData) or Assigned(OnTextData) then
2013-11-11 21:14:42 +01:00
finally
Request. Clear;
strmResponse. Free;
2014-01-31 20:22:10 +01:00
//add to thread for auto retry/reconnect
TIdWebsocketMultiReadThread. Instance. AddClient( Self) ;
2013-11-11 21:14:42 +01:00
end ;
end ;
2014-01-31 20:22:10 +01:00
procedure TIdHTTPWebsocketClient. Lock;
begin
System. TMonitor. Enter( Self) ;
end ;
2013-11-11 21:14:42 +01:00
function TIdHTTPWebsocketClient. MakeImplicitClientHandler: TIdIOHandler;
begin
Result : = TIdIOHandlerWebsocket. Create( nil ) ;
end ;
2014-01-23 16:30:39 +01:00
procedure TIdHTTPWebsocketClient. Ping;
var
ws: TIdIOHandlerWebsocket;
begin
ws : = IOHandler as TIdIOHandlerWebsocket;
2014-02-04 21:23:49 +01:00
ws. LastPingTime : = Now;
2014-01-23 16:30:39 +01:00
//socket.io?
if SocketIOCompatible and ws. IsWebsocket then
begin
FSocketIO. Lock;
try
if ( FSocketIOContext < > nil ) then
FSocketIO. WritePing( FSocketIOContext as TSocketIOContext) ; //heartbeat socket.io message
finally
FSocketIO. UnLock;
end
end
//only websocket?
else if not SocketIOCompatible and ws. IsWebsocket then
begin
if ws. TryLock then
try
ws. WriteData( nil , wdcPing) ;
finally
ws. Unlock;
end ;
end ;
end ;
2013-11-11 21:14:42 +01:00
procedure TIdHTTPWebsocketClient. ResetChannel;
//var
// ws: TIdIOHandlerWebsocket;
begin
2014-01-31 20:22:10 +01:00
// TIdWebsocketMultiReadThread.Instance.RemoveClient(Self); keep for reconnect
2013-11-11 21:14:42 +01:00
if IOHandler < > nil then
begin
IOHandler. InputBuffer. Clear;
2014-02-10 11:30:56 +01:00
IOHandler. BusyUpgrading : = False ;
IOHandler. IsWebsocket : = False ;
2013-11-11 21:14:42 +01:00
//close/disconnect internal socket
//ws := IndyClient.IOHandler as TIdIOHandlerWebsocket;
//ws.Close; done in disconnect below
end ;
Disconnect( False ) ;
end ;
procedure TIdHTTPWebsocketClient. SetIOHandlerWS(
const Value: TIdIOHandlerWebsocket) ;
begin
SetIOHandler( Value) ;
end ;
2014-02-10 11:30:56 +01:00
procedure TIdHTTPWebsocketClient. SetOnData( const Value: TWebsocketMsgBin) ;
2013-11-11 21:14:42 +01:00
begin
// if not Assigned(Value) and not Assigned(FOnTextData) then
// TIdWebsocketMultiReadThread.Instance.RemoveClient(Self);
FOnData : = Value;
2014-02-10 11:30:56 +01:00
// if Assigned(Value) and
// (Self.IOHandler as TIdIOHandlerWebsocket).IsWebsocket
// then
// TIdWebsocketMultiReadThread.Instance.AddClient(Self);
2013-11-11 21:14:42 +01:00
end ;
2014-02-10 11:30:56 +01:00
procedure TIdHTTPWebsocketClient. SetOnTextData( const Value: TWebsocketMsgText) ;
2013-11-11 21:14:42 +01:00
begin
// if not Assigned(Value) and not Assigned(FOnData) then
// TIdWebsocketMultiReadThread.Instance.RemoveClient(Self);
FOnTextData : = Value;
2014-02-10 11:30:56 +01:00
// if Assigned(Value) and
// (Self.IOHandler as TIdIOHandlerWebsocket).IsWebsocket
// then
// TIdWebsocketMultiReadThread.Instance.AddClient(Self);
2013-11-11 21:14:42 +01:00
end ;
{ TIdHTTPSocketIOClient }
2014-01-31 20:22:10 +01:00
( *
2013-11-11 21:14:42 +01:00
procedure TIdHTTPSocketIOClient_old. AfterConstruction;
begin
inherited ;
SocketIOCompatible : = True ;
FHeartBeat : = TTimer. Create( nil ) ;
FHeartBeat. Enabled : = False ;
FHeartBeat. OnTimer : = HeartBeatTimer;
end ;
procedure TIdHTTPSocketIOClient_old. AsyncDispatchEvent( const aEvent: string ) ;
begin
//https://github.com/LearnBoost/socket.io-spec
if StartsStr( '1:' , aEvent) then //connect
Exit;
if aEvent = '2::' then //ping, heartbeat
Exit;
inherited AsyncDispatchEvent( aEvent) ;
end ;
procedure TIdHTTPSocketIOClient_old. AutoConnect;
begin
//for now: timer in mainthread?
TThread. Queue( nil ,
procedure
begin
FHeartBeat. Interval : = 5 * 1 0 0 0 ;
FHeartBeat. Enabled : = True ;
end ) ;
end ;
destructor TIdHTTPSocketIOClient_old. Destroy;
var tmr: TObject;
begin
tmr : = FHeartBeat;
TThread. Queue( nil , //otherwise free in other thread than created
procedure
begin
//FHeartBeat.Free;
tmr. Free;
end ) ;
inherited ;
end ;
procedure TIdHTTPSocketIOClient_old. HeartBeatTimer( Sender: TObject) ;
begin
FHeartBeat. Enabled : = False ;
try
try
if ( IOHandler < > nil ) and
not IOHandler. ClosedGracefully and
IOHandler. Connected then
begin
IOHandler. Write( '2:::' ) ; //heartbeat socket.io message
end
//retry connect
else
try
//clear inputbuffer, otherwise it can't connect :(
if ( IOHandler < > nil ) and
not IOHandler. InputBufferIsEmpty
then
IOHandler. DiscardAll;
Self. Connect;
TryUpgradeToWebsocket;
except
//skip, just retried
end ;
except
//clear inputbuffer, otherwise it stays connected :(
if ( IOHandler < > nil ) and
not IOHandler. InputBufferIsEmpty
then
IOHandler. DiscardAll;
if Assigned( OnDisConnected) then
OnDisConnected( Self) ;
try
raise EIdException. Create( 'Connection lost from ' + Format( 'ws://%s:%d/%s' , [ Host, Port, WSResourceName] ) ) ;
except
//eat, no error popup!
end ;
end ;
finally
FHeartBeat. Enabled : = True ; //always enable: in case of disconnect it will re-connect
end ;
end ;
procedure TIdHTTPSocketIOClient_old. InternalUpgradeToWebsocket(
aRaiseException: Boolean ; out aFailedReason: string ) ;
var
stimeout: string ;
begin
inherited InternalUpgradeToWebsocket( aRaiseException, aFailedReason) ;
if ( aFailedReason = '' ) and
( IOHandler as TIdIOHandlerWebsocket) . IsWebsocket then
begin
stimeout : = Copy( SocketIOHandshakeResponse, Pos( ':' , SocketIOHandshakeResponse) + 1 , Length( SocketIOHandshakeResponse) ) ;
stimeout : = Copy( stimeout, 1 , Pos( ':' , stimeout) - 1 ) ;
if stimeout < > '' then
begin
//if (FHeartBeat.Interval > 0) then
//for now: timer in mainthread?
TThread. Queue( nil ,
procedure
begin
FHeartBeat. Interval : = StrToIntDef( stimeout, 1 5 ) * 1 0 0 0 ;
if FHeartBeat. Interval > = 1 5 0 0 0 then
//FHeartBeat.Interval := FHeartBeat.Interval - 5000
FHeartBeat. Interval : = 5 0 0 0
else if FHeartBeat. Interval > = 5 0 0 0 then
FHeartBeat. Interval : = FHeartBeat. Interval - 2 0 0 0 ;
FHeartBeat. Enabled : = ( FHeartBeat. Interval > 0 ) ;
end ) ;
end ;
if Assigned( OnConnected) then
OnConnected( Self) ;
end ;
end ;
2014-01-31 20:22:10 +01:00
)
2013-11-11 21:14:42 +01:00
procedure TIdHTTPSocketIOClient_old. ProcessSocketIORequest(
const strmRequest: TStream) ;
function __ReadToEnd: string ;
var
utf8: TBytes;
ilength: Integer ;
begin
Result : = '' ;
ilength : = strmRequest. Size - strmRequest. Position;
SetLength( utf8, ilength) ;
strmRequest. Read( utf8[ 0 ] , ilength) ;
Result : = TEncoding. UTF8. GetString( utf8) ;
end ;
function __GetSocketIOPart( const aData: string ; aIndex: Integer ) : string ;
var ipos: Integer ;
i: Integer ;
begin
//'5::/chat:{"name":"hi!"}'
//0 = 5
//1 =
//2 = /chat
//3 = {"name":"hi!"}
ipos : = 0 ;
for i : = 0 to aIndex- 1 do
ipos : = PosEx( ':' , aData, ipos+ 1 ) ;
if ipos > = 0 then
begin
Result : = Copy( aData, ipos+ 1 , Length( aData) ) ;
if aIndex < 3 then // /chat:{"name":"hi!"}'
begin
ipos : = PosEx( ':' , Result , 1 ) ; // :{"name":"hi!"}'
if ipos > 0 then
Result : = Copy( Result , 1 , ipos- 1 ) ; // /chat
end ;
end ;
end ;
var
str, smsg, schannel, sdata: string ;
imsg: Integer ;
// bCallback: Boolean;
begin
str : = __ReadToEnd;
if str = '' then Exit;
//5:1+:/chat:test
smsg : = __GetSocketIOPart( str, 1 ) ;
imsg : = 0 ;
// bCallback := False;
if smsg < > '' then // 1+
begin
imsg : = StrToIntDef( ReplaceStr( smsg, '+' , '' ) , 0 ) ; // 1
// bCallback := (Pos('+', smsg) > 1); //trailing +, e.g. 1+
end ;
schannel : = __GetSocketIOPart( str, 2 ) ; // /chat
sdata : = __GetSocketIOPart( str, 3 ) ; // test
//(0) Disconnect
if StartsStr( '0:' , str) then
begin
schannel : = __GetSocketIOPart( str, 2 ) ;
if schannel < > '' then
//todo: close channel
else
Self. Disconnect;
end
//(1) Connect
//'1::' [path] [query]
else if StartsStr( '1:' , str) then
begin
//todo: add channel/room to authorized channel/room list
Self. IOHandler. Write( str) ; //write same connect back, e.g. 1::/chat
end
//(2) Heartbeat
else if StartsStr( '2:' , str) then
begin
Self. IOHandler. Write( str) ; //write same connect back, e.g. 2::
end
//(3) Message (https://github.com/LearnBoost/socket.io-spec#3-message)
//'3:' [message id ('+')] ':' [message endpoint] ':' [data]
//3::/chat:hi
else if StartsStr( '3:' , str) then
begin
if Assigned( OnSocketIOMsg) then
OnSocketIOMsg( Self, sdata, imsg) ;
end
//(4) JSON Message
//'4:' [message id ('+')] ':' [message endpoint] ':' [json]
//4:1::{"a":"b"}
else if StartsStr( '4:' , str) then
begin
if Assigned( OnSocketIOJson) then
OnSocketIOJson( Self, sdata, imsg) ;
end
//(5) Event
//'5:' [message id ('+')] ':' [message endpoint] ':' [json encoded event]
//5::/chat:{"name":"my other event","args":[{"my":"data"}]}
//5:1+:/chat:{"name":"GetLocations","args":[""]}
else if StartsStr( '5:' , str) then
begin
if Assigned( OnSocketIOEvent) then
OnSocketIOEvent( Self, sdata, imsg) ;
end
//(6) ACK
//6::/news:1+["callback"]
//6:::1+["Response"]
//(7) Error
//(8) Noop
else if StartsStr( '8:' , str) then
begin
//nothing
end
else
raise Exception. CreateFmt( 'Unsupported data: "%s"' , [ str] ) ;
end ;
* )
{ TIdWebsocketMultiReadThread }
procedure TIdWebsocketMultiReadThread. AddClient(
aChannel: TIdHTTPWebsocketClient) ;
var l: TList;
begin
2014-01-31 20:22:10 +01:00
//Assert( (aChannel.IOHandler as TIdIOHandlerWebsocket).IsWebsocket, 'Channel is not a websocket');
2013-11-11 21:14:42 +01:00
l : = FChannels. LockList;
try
//already exists?
if l. IndexOf( aChannel) > = 0 then Exit;
2013-11-18 14:27:13 +01:00
2013-11-11 21:14:42 +01:00
Assert( l. Count < 6 4 , 'Max 64 connections can be handled by one read thread!' ) ; //due to restrictions of the "select" API
l. Add( aChannel) ;
//trigger the "select" wait
BreakSelectWait;
finally
FChannels. UnlockList;
end ;
end ;
procedure TIdWebsocketMultiReadThread. AfterConstruction;
begin
inherited ;
2014-03-07 12:19:32 +01:00
ReadTimeout : = 5 0 0 0 ;
2013-11-11 21:14:42 +01:00
FChannels : = TThreadList. Create;
FillChar( Freadset, SizeOf( Freadset) , 0 ) ;
FillChar( Fexceptionset, SizeOf( Fexceptionset) , 0 ) ;
InitSpecialEventSocket;
end ;
procedure TIdWebsocketMultiReadThread. BreakSelectWait;
var
2013-11-18 14:27:13 +01:00
//iResult: Integer;
2013-11-11 21:14:42 +01:00
LAddr: TSockAddrIn6;
begin
2013-11-18 14:27:13 +01:00
if FTempHandle = 0 then Exit;
2013-11-11 21:14:42 +01:00
FillChar( LAddr, SizeOf( LAddr) , 0 ) ;
//Id_IPv4
with PSOCKADDR( @ LAddr) ^ do
begin
sin_family : = Id_PF_INET4;
//dummy address and port
( GStack as TIdStackBSDBase) . TranslateStringToTInAddr( '0.0.0.0' , sin_addr, Id_IPv4) ;
sin_port : = htons( 1 ) ;
end ;
FPendingBreak : = True ;
//connect to non-existing address to stop "select" from waiting
//Note: this is some kind of "hack" because there is no nice way to stop it
//The only(?) other possibility is to make a "socket pair" and send a byte to it,
//but this requires a dynamic server socket (which can trigger a firewall
//exception/question popup in WindowsXP+)
2013-11-18 14:27:13 +01:00
//iResult :=
IdWinsock2. connect( FTempHandle, PSOCKADDR( @ LAddr) , SIZE_TSOCKADDRIN) ;
2013-11-11 21:14:42 +01:00
//non blocking socket, so will always result in "would block"!
2013-11-18 14:27:13 +01:00
// if (iResult <> Id_SOCKET_ERROR) or
// ( (GStack <> nil) and (GStack.WSGetLastError <> WSAEWOULDBLOCK) )
// then
// GStack.CheckForSocketError(iResult);
2013-11-11 21:14:42 +01:00
end ;
destructor TIdWebsocketMultiReadThread. Destroy;
begin
IdWinsock2. closesocket( FTempHandle) ;
2013-11-18 14:27:13 +01:00
FTempHandle : = 0 ;
2013-11-11 21:14:42 +01:00
FChannels. Free;
inherited ;
end ;
procedure TIdWebsocketMultiReadThread. Execute;
begin
Self. NameThreadForDebugging( AnsiString( Self. ClassName) ) ;
while not Terminated do
begin
try
while not Terminated do
2014-01-23 16:30:39 +01:00
begin
2013-11-11 21:14:42 +01:00
ReadFromAllChannels;
2014-01-23 16:30:39 +01:00
PingAllChannels;
end ;
2013-11-11 21:14:42 +01:00
except
//continue
end ;
end ;
end ;
procedure TIdWebsocketMultiReadThread. InitSpecialEventSocket;
var
param: Cardinal ;
iResult: Integer ;
begin
if GStack = nil then Exit; //finalized?
2013-11-18 14:27:13 +01:00
2013-11-11 21:14:42 +01:00
//alloc socket
FTempHandle : = GStack. NewSocketHandle( Id_SOCK_STREAM, Id_IPPROTO_IP, Id_IPv4, False ) ;
Assert( FTempHandle < > Id_INVALID_SOCKET) ;
//non block mode
param : = 1 ; // enable NON blocking mode
iResult : = ioctlsocket( FTempHandle, FIONBIO, param) ;
GStack. CheckForSocketError( iResult) ;
end ;
class function TIdWebsocketMultiReadThread. Instance: TIdWebsocketMultiReadThread;
begin
2013-11-18 14:27:13 +01:00
if ( FInstance = nil ) then
2013-11-11 21:14:42 +01:00
begin
FInstance : = TIdWebsocketMultiReadThread. Create( True ) ;
FInstance. Start;
end ;
Result : = FInstance;
end ;
2014-01-23 16:30:39 +01:00
procedure TIdWebsocketMultiReadThread. PingAllChannels;
var
l: TList;
chn: TIdHTTPWebsocketClient;
ws: TIdIOHandlerWebsocket;
i: Integer ;
begin
l : = FChannels. LockList;
try
for i : = 0 to l. Count - 1 do
begin
chn : = TIdHTTPWebsocketClient( l. Items[ i] ) ;
ws : = chn. IOHandler as TIdIOHandlerWebsocket;
//valid?
2014-02-10 11:30:56 +01:00
if ( chn. IOHandler < > nil ) and
( chn. IOHandler. IsWebsocket) and
( chn. Socket < > nil ) and
2014-01-31 20:22:10 +01:00
( chn. Socket. Binding < > nil ) and
( chn. Socket. Binding. Handle > 0 ) and
2014-01-23 16:30:39 +01:00
( chn. Socket. Binding. Handle < > INVALID_SOCKET) then
begin
//more than 10s nothing done? then send ping
2014-02-04 21:23:49 +01:00
if SecondsBetween( Now, ws. LastPingTime) > 1 0 then
2014-01-23 16:30:39 +01:00
if chn. CheckConnection then
try
chn. Ping;
except
//retry connect the next time?
end ;
end
else if not chn. Connected then
begin
2014-03-07 12:19:32 +01:00
if ( ws < > nil ) and
( SecondsBetween( Now, ws. LastActivityTime) < 5 )
then
Continue;
if FReconnectlist = nil then
FReconnectlist : = TWSThreadList. Create;
//if chn.TryLock then
FReconnectlist. Add( chn) ;
end ;
end ;
finally
FChannels. UnlockList;
end ;
//reconnect needed? (in background)
if FReconnectlist. Count > 0 then
begin
if FReconnectThread = nil then
FReconnectThread : = TIdWebsocketQueueThread. Create( False {direct start} ) ;
FReconnectThread. QueueEvent(
procedure
var
l: TList;
chn: TIdHTTPWebsocketClient;
begin
while FReconnectlist. Count > 0 do
begin
chn : = nil ;
2014-01-23 16:30:39 +01:00
try
2014-03-07 12:19:32 +01:00
//get first one
l : = FReconnectlist. LockList;
try
if l. Count < = 0 then Exit;
chn : = TObject( l. Items[ 0 ] ) as TIdHTTPWebsocketClient;
if not chn. TryLock then
begin
l. Delete( 0 ) ;
chn : = nil ;
Continue;
end ;
finally
FReconnectlist. UnlockList;
end ;
//try reconnect
ws : = chn. IOHandler as TIdIOHandlerWebsocket;
if ( ( ws = nil ) or
( SecondsBetween( Now, ws. LastActivityTime) > = 5 ) ) then
begin
2014-02-10 11:30:56 +01:00
try
if ws < > nil then
ws. LastActivityTime : = Now;
2014-03-07 12:19:32 +01:00
chn. ConnectTimeout : = 1 0 0 0 ;
2014-02-14 15:46:42 +01:00
if ( chn. Host < > '' ) and ( chn. Port > 0 ) then
chn. TryUpgradeToWebsocket;
2014-02-10 11:30:56 +01:00
except
//just try
end ;
2014-03-07 12:19:32 +01:00
end ;
//remove from todo list
l : = FReconnectlist. LockList;
try
if l. Count > 0 then
l. Delete( 0 ) ;
finally
FReconnectlist. UnlockList;
end ;
2014-02-10 11:30:56 +01:00
finally
2014-03-07 12:19:32 +01:00
if chn < > nil then
2014-02-10 11:30:56 +01:00
chn. Unlock;
2014-01-23 16:30:39 +01:00
end ;
end ;
2014-03-07 12:19:32 +01:00
end ) ;
2014-01-23 16:30:39 +01:00
end ;
end ;
2013-11-11 21:14:42 +01:00
procedure TIdWebsocketMultiReadThread. ReadFromAllChannels;
var
l: TList;
chn: TIdHTTPWebsocketClient;
iCount,
i: Integer ;
iResult: NativeInt ;
strmEvent: TMemoryStream;
swstext: utf8string ;
ws: TIdIOHandlerWebsocket;
wscode: TWSDataCode;
begin
l : = FChannels. LockList;
try
2014-01-03 18:52:46 +01:00
iCount : = 0 ;
iResult : = 0 ;
2013-11-11 21:14:42 +01:00
Freadset. fd_count : = iCount;
for i : = 0 to l. Count - 1 do
begin
chn : = TIdHTTPWebsocketClient( l. Items[ i] ) ;
//valid?
if //not chn.Busy and also take busy channels (will be ignored later), otherwise we have to break/reset for each RO function execution
2014-02-10 11:30:56 +01:00
( chn. IOHandler < > nil ) and
( chn. IOHandler. IsWebsocket) and
2014-01-31 20:22:10 +01:00
( chn. Socket < > nil ) and
( chn. Socket. Binding < > nil ) and
2013-11-11 21:14:42 +01:00
( chn. Socket. Binding. Handle > 0 ) and
( chn. Socket. Binding. Handle < > INVALID_SOCKET) then
begin
2014-01-10 15:05:04 +01:00
if chn. IOHandler. HasData then
begin
Inc( iResult) ;
Break;
end ;
2014-01-03 18:52:46 +01:00
2014-01-10 15:05:04 +01:00
Freadset. fd_count : = iCount+ 1 ;
Freadset. fd_array[ iCount] : = chn. Socket. Binding. Handle;
Inc( iCount) ;
2013-11-11 21:14:42 +01:00
end ;
end ;
if FPendingBreak then
ResetSpecialEventSocket;
finally
FChannels. UnlockList;
end ;
//special helper socket to be able to stop "select" from waiting
Fexceptionset. fd_count : = 1 ;
Fexceptionset. fd_array[ 0 ] : = FTempHandle;
//wait 15s till some data
2014-03-07 12:19:32 +01:00
Finterval. tv_sec : = Self. ReadTimeout div 1 0 0 0 ; //5s
Finterval. tv_usec : = Self. ReadTimeout mod 1 0 0 0 ;
2013-11-11 21:14:42 +01:00
//nothing to wait for? then sleep some time to prevent 100% CPU
2014-01-10 15:05:04 +01:00
if iResult = 0 then
2013-11-11 21:14:42 +01:00
begin
2014-01-10 15:05:04 +01:00
if iCount = 0 then
begin
iResult : = IdWinsock2. select( 0 , nil , nil , @ Fexceptionset, @ Finterval) ;
if iResult = SOCKET_ERROR then
iResult : = 1 ; //ignore errors
end
//wait till a socket has some data (or a signal via exceptionset is fired)
else
iResult : = IdWinsock2. select( 0 , @ Freadset, nil , @ Fexceptionset, @ Finterval) ;
2013-11-11 21:14:42 +01:00
if iResult = SOCKET_ERROR then
2014-01-10 15:05:04 +01:00
//raise EIdWinsockStubError.Build(WSAGetLastError, '', []);
//ignore error during wait: socket disconnected etc
Exit;
end ;
2013-11-11 21:14:42 +01:00
2013-11-18 14:27:13 +01:00
if Terminated then Exit;
2013-11-11 21:14:42 +01:00
//some data?
if ( iResult > 0 ) then
begin
strmEvent : = nil ;
l : = FChannels. LockList;
try
//check for data for all channels
for i : = 0 to l. Count - 1 do
begin
chn : = TIdHTTPWebsocketClient( l. Items[ i] ) ;
2014-01-23 10:17:13 +01:00
ws : = chn. IOHandler as TIdIOHandlerWebsocket;
2014-01-31 20:22:10 +01:00
if ( ws = nil ) then Continue;
2014-01-23 10:17:13 +01:00
if ws. TryLock then //IOHandler.Readable cannot be done during pending action!
try
2014-01-31 20:22:10 +01:00
try
//try to process all events
while chn. IOHandler. HasData or
chn. IOHandler. Readable( 0 ) do //has some data
begin
if strmEvent = nil then
strmEvent : = TMemoryStream. Create;
strmEvent. Clear;
//first is the data type TWSDataType(text or bin), but is ignore/not needed
wscode : = TWSDataCode( chn. IOHandler. ReadLongWord) ;
if not ( wscode in [ wdcText, wdcBinary, wdcPing, wdcPong] ) then
Continue;
2013-11-11 21:14:42 +01:00
2014-01-31 20:22:10 +01:00
//next the size + data = stream
chn. IOHandler. ReadStream( strmEvent) ;
2013-11-11 21:14:42 +01:00
2014-01-31 20:22:10 +01:00
//ignore ping/pong messages
if wscode in [ wdcPing, wdcPong] then Continue;
2013-11-11 21:14:42 +01:00
//fire event
//offload event dispatching to different thread! otherwise deadlocks possible? (do to synchronize)
strmEvent. Position : = 0 ;
if wscode = wdcBinary then
begin
chn. AsyncDispatchEvent( strmEvent) ;
end
else if wscode = wdcText then
begin
SetLength( swstext, strmEvent. Size) ;
strmEvent. Read( swstext[ 1 ] , strmEvent. Size) ;
if swstext < > '' then
begin
chn. AsyncDispatchEvent( string( swstext) ) ;
end ;
end ;
2014-01-31 20:22:10 +01:00
end ;
except
l : = nil ;
FChannels. UnlockList;
chn. ResetChannel;
raise ;
2014-01-23 10:17:13 +01:00
end ;
finally
ws. Unlock;
2013-11-11 21:14:42 +01:00
end ;
end ;
if FPendingBreak then
ResetSpecialEventSocket;
finally
if l < > nil then
FChannels. UnlockList;
strmEvent. Free;
end ;
2014-01-13 10:00:09 +01:00
end ;
2013-11-11 21:14:42 +01:00
end ;
procedure TIdWebsocketMultiReadThread. RemoveClient(
aChannel: TIdHTTPWebsocketClient) ;
begin
if Self = nil then Exit;
2014-03-07 12:19:32 +01:00
aChannel. Lock;
try
2013-11-11 21:14:42 +01:00
FChannels. Remove( aChannel) ;
2014-03-07 12:19:32 +01:00
if FReconnectlist < > nil then
FReconnectlist. Remove( aChannel) ;
finally
aChannel. UnLock;
end ;
2013-11-11 21:14:42 +01:00
BreakSelectWait;
end ;
class procedure TIdWebsocketMultiReadThread. RemoveInstance;
begin
if FInstance < > nil then
2014-01-10 15:05:04 +01:00
begin
FInstance. Terminate;
FInstance. WaitFor;
2013-11-11 21:14:42 +01:00
FreeAndNil( FInstance) ;
2014-01-10 15:05:04 +01:00
end ;
2013-11-11 21:14:42 +01:00
end ;
procedure TIdWebsocketMultiReadThread. ResetSpecialEventSocket;
begin
Assert( FPendingBreak) ;
FPendingBreak : = False ;
IdWinsock2. closesocket( FTempHandle) ;
2013-11-18 14:27:13 +01:00
FTempHandle : = 0 ;
2013-11-11 21:14:42 +01:00
InitSpecialEventSocket;
end ;
procedure TIdWebsocketMultiReadThread. Terminate;
begin
inherited Terminate;
FChannels. LockList;
try
//fire a signal, so the "select" wait will quit and thread can stop
BreakSelectWait;
finally
FChannels. UnlockList;
end ;
end ;
{ TIdWebsocketDispatchThread }
class function TIdWebsocketDispatchThread. Instance: TIdWebsocketDispatchThread;
begin
if FInstance = nil then
begin
2013-11-18 14:27:13 +01:00
GlobalNameSpace. BeginWrite;
try
if FInstance = nil then
begin
2014-01-10 15:05:04 +01:00
FInstance : = Self. Create( True ) ;
2013-11-18 14:27:13 +01:00
FInstance. Start;
end ;
finally
GlobalNameSpace. EndWrite;
end ;
2013-11-11 21:14:42 +01:00
end ;
Result : = FInstance;
end ;
2014-01-10 15:05:04 +01:00
class procedure TIdWebsocketDispatchThread. RemoveInstance;
2013-11-11 21:14:42 +01:00
begin
2014-01-10 15:05:04 +01:00
if FInstance < > nil then
begin
FInstance. Terminate;
FInstance. WaitFor;
FreeAndNil( FInstance) ;
2013-11-11 21:14:42 +01:00
end ;
end ;
2014-03-07 12:19:32 +01:00
{ TWSThreadList }
function TWSThreadList. Count: Integer ;
var l: TList;
begin
l : = LockList;
try
Result : = l. Count;
finally
UnlockList;
end ;
end ;
2013-11-11 21:14:42 +01:00
initialization
finalization
2014-01-10 15:05:04 +01:00
TIdWebsocketMultiReadThread. RemoveInstance;
TIdWebsocketDispatchThread. RemoveInstance
2013-11-11 21:14:42 +01:00
end .