DelphiWebsockets/IdHTTPWebsocketClient.pas

1539 lines
42 KiB
ObjectPascal
Raw Normal View History

2013-11-11 21:14:42 +01:00
unit IdHTTPWebsocketClient;
interface
uses
Classes,
2013-11-18 14:27:13 +01:00
IdHTTP,
{$IF CompilerVersion <= 21.0} //D2010
IdHashSHA1,
{$else}
Types,
2013-11-18 14:27:13 +01:00
IdHashSHA, //XE3 etc
{$IFEND}
IdIOHandler,
IdIOHandlerWebsocket,
{$ifdef FMX}
FMX.Types,
{$ELSE}
ExtCtrls,
{$ENDIF}
IdWinsock2, Generics.Collections, SyncObjs,
2013-11-11 21:14:42 +01:00
IdSocketIOHandling;
type
TWebsocketMsgBin = procedure(const aData: TStream) of object;
TWebsocketMsgText = procedure(const aData: string) of object;
2013-11-11 21:14:42 +01:00
TIdHTTPWebsocketClient = class;
TSocketIOMsg = procedure(const AClient: TIdHTTPWebsocketClient; const aText: string; aMsgNr: Integer) of object;
TIdSocketIOHandling_Ext = class(TIdSocketIOHandling)
end;
TIdHTTPWebsocketClient = class(TIdHTTP)
private
FWSResourceName: string;
FHash: TIdHashSHA1;
FOnData: TWebsocketMsgBin;
FOnTextData: TWebsocketMsgText;
2013-11-11 21:14:42 +01:00
function GetIOHandlerWS: TIdIOHandlerWebsocket;
procedure SetIOHandlerWS(const Value: TIdIOHandlerWebsocket);
procedure SetOnData(const Value: TWebsocketMsgBin);
procedure SetOnTextData(const Value: TWebsocketMsgText);
2013-11-11 21:14:42 +01:00
protected
FSocketIOCompatible: Boolean;
FSocketIOHandshakeResponse: string;
FSocketIO: TIdSocketIOHandling_Ext;
FSocketIOContext: ISocketIOContext;
2014-01-23 10:17:13 +01:00
FSocketIOConnectBusy: Boolean;
2014-01-23 16:30:39 +01:00
//FHeartBeat: TTimer;
//procedure HeartBeatTimer(Sender: TObject);
2013-11-11 21:14:42 +01:00
function GetSocketIO: TIdSocketIOHandling;
protected
procedure InternalUpgradeToWebsocket(aRaiseException: Boolean; out aFailedReason: string);virtual;
function MakeImplicitClientHandler: TIdIOHandler; override;
public
procedure AsyncDispatchEvent(const aEvent: TStream); overload; virtual;
procedure AsyncDispatchEvent(const aEvent: string); overload; virtual;
procedure ResetChannel;
public
procedure AfterConstruction; override;
destructor Destroy; override;
function TryUpgradeToWebsocket: Boolean;
procedure UpgradeToWebsocket;
function TryLock: Boolean;
2014-01-31 20:22:10 +01:00
procedure Lock;
procedure UnLock;
procedure Connect; override;
function TryConnect: Boolean;
2013-11-11 21:14:42 +01:00
procedure Disconnect(ANotifyPeer: Boolean); override;
2014-01-23 16:30:39 +01:00
function CheckConnection: Boolean;
procedure Ping;
2013-11-11 21:14:42 +01:00
property IOHandler: TIdIOHandlerWebsocket read GetIOHandlerWS write SetIOHandlerWS;
//websockets
property OnBinData : TWebsocketMsgBin read FOnData write SetOnData;
property OnTextData: TWebsocketMsgText read FOnTextData write SetOnTextData;
2013-11-11 21:14:42 +01:00
//https://github.com/LearnBoost/socket.io-spec
property SocketIOCompatible: Boolean read FSocketIOCompatible write FSocketIOCompatible;
property SocketIO: TIdSocketIOHandling read GetSocketIO;
published
property Host;
property Port;
property WSResourceName: string read FWSResourceName write FWSResourceName;
end;
// on error
2014-01-31 20:22:10 +01:00
(*
2013-11-11 21:14:42 +01:00
TIdHTTPSocketIOClient_old = class(TIdHTTPWebsocketClient)
private
FOnConnected: TNotifyEvent;
FOnDisConnected: TNotifyEvent;
FOnSocketIOMsg: TSocketIOMsg;
FOnSocketIOEvent: TSocketIOMsg;
FOnSocketIOJson: TSocketIOMsg;
protected
FHeartBeat: TTimer;
procedure HeartBeatTimer(Sender: TObject);
procedure InternalUpgradeToWebsocket(aRaiseException: Boolean; out aFailedReason: string);override;
public
procedure AsyncDispatchEvent(const aEvent: string); override;
public
procedure AfterConstruction; override;
destructor Destroy; override;
procedure AutoConnect;
property SocketIOHandshakeResponse: string read FSocketIOHandshakeResponse;
property OnConnected: TNotifyEvent read FOnConnected write FOnConnected;
property OnDisConnected: TNotifyEvent read FOnDisConnected write FOnDisConnected;
// procedure ProcessSocketIORequest(const strmRequest: TStream);
property OnSocketIOMsg : TSocketIOMsg read FOnSocketIOMsg write FOnSocketIOMsg;
property OnSocketIOJson : TSocketIOMsg read FOnSocketIOJson write FOnSocketIOJson;
property OnSocketIOEvent: TSocketIOMsg read FOnSocketIOEvent write FOnSocketIOEvent;
end;
2014-01-31 20:22:10 +01:00
*)
2013-11-11 21:14:42 +01:00
TWSThreadList = class(TThreadList)
public
function Count: Integer;
end;
2013-11-11 21:14:42 +01:00
TIdWebsocketMultiReadThread = class(TThread)
private
class var FInstance: TIdWebsocketMultiReadThread;
protected
FReadTimeout: Integer;
2013-11-11 21:14:42 +01:00
FTempHandle: THandle;
FPendingBreak: Boolean;
Freadset, Fexceptionset: TFDSet;
Finterval: TTimeVal;
procedure InitSpecialEventSocket;
procedure ResetSpecialEventSocket;
procedure BreakSelectWait;
protected
FChannels: TThreadList;
FReconnectlist: TWSThreadList;
FReconnectThread: TIdWebsocketQueueThread;
2013-11-11 21:14:42 +01:00
procedure ReadFromAllChannels;
2014-01-23 16:30:39 +01:00
procedure PingAllChannels;
2013-11-11 21:14:42 +01:00
procedure Execute; override;
public
procedure AfterConstruction;override;
destructor Destroy; override;
procedure Terminate;
procedure AddClient (aChannel: TIdHTTPWebsocketClient);
procedure RemoveClient(aChannel: TIdHTTPWebsocketClient);
property ReadTimeout: Integer read FReadTimeout write FReadTimeout default 5000;
2013-11-11 21:14:42 +01:00
class function Instance: TIdWebsocketMultiReadThread;
class procedure RemoveInstance;
end;
//async process data
TIdWebsocketDispatchThread = class(TIdWebsocketQueueThread)
2013-11-11 21:14:42 +01:00
private
class var FInstance: TIdWebsocketDispatchThread;
public
class function Instance: TIdWebsocketDispatchThread;
class procedure RemoveInstance;
2013-11-11 21:14:42 +01:00
end;
implementation
uses
IdCoderMIME, SysUtils, Math, IdException, IdStackConsts, IdStack,
2014-01-23 16:30:39 +01:00
IdStackBSDBase, IdGlobal, Windows, StrUtils, DateUtils;
2013-11-11 21:14:42 +01:00
//type
// TAnonymousThread = class(TThread)
// protected
// FThreadProc: TThreadProcedure;
// procedure Execute; override;
// public
// constructor Create(AThreadProc: TThreadProcedure);
// end;
//procedure CreateAnonymousThread(AThreadProc: TThreadProcedure);
//begin
// TAnonymousThread.Create(AThreadProc);
//end;
{ TAnonymousThread }
//constructor TAnonymousThread.Create(AThreadProc: TThreadProcedure);
//begin
// FThreadProc := AThreadProc;
// FreeOnTerminate := True;
// inherited Create(False {direct start});
//end;
//
//procedure TAnonymousThread.Execute;
//begin
// if Assigned(FThreadProc) then
// FThreadProc();
//end;
{ TIdHTTPWebsocketClient }
procedure TIdHTTPWebsocketClient.AfterConstruction;
begin
inherited;
FHash := TIdHashSHA1.Create;
IOHandler := TIdIOHandlerWebsocket.Create(nil);
2013-11-18 14:27:13 +01:00
IOHandler.UseNagle := False;
2013-11-11 21:14:42 +01:00
ManagedIOHandler := True;
FSocketIO := TIdSocketIOHandling_Ext.Create;
2014-01-23 16:30:39 +01:00
// FHeartBeat := TTimer.Create(nil);
// FHeartBeat.Enabled := False;
// FHeartBeat.OnTimer := HeartBeatTimer;
2013-11-11 21:14:42 +01:00
end;
procedure TIdHTTPWebsocketClient.AsyncDispatchEvent(const aEvent: TStream);
var
strmevent: TMemoryStream;
begin
if not Assigned(OnBinData) then Exit;
strmevent := TMemoryStream.Create;
strmevent.CopyFrom(aEvent, aEvent.Size);
//events during dispatch? channel is busy so offload event dispatching to different thread!
TIdWebsocketDispatchThread.Instance.QueueEvent(
procedure
begin
if Assigned(OnBinData) then
OnBinData(strmevent);
strmevent.Free;
end);
end;
procedure TIdHTTPWebsocketClient.AsyncDispatchEvent(const aEvent: string);
begin
2014-03-14 09:37:30 +01:00
{$IFDEF DEBUG_WS}
if DebugHook <> 0 then
OutputDebugString(PChar('AsyncDispatchEvent: ' + aEvent) );
{$ENDIF}
2013-11-18 14:27:13 +01:00
//if not Assigned(OnTextData) then Exit;
//events during dispatch? channel is busy so offload event dispatching to different thread!
TIdWebsocketDispatchThread.Instance.QueueEvent(
procedure
begin
if FSocketIOCompatible then
FSocketIO.ProcessSocketIORequest(FSocketIOContext as TSocketIOContext, aEvent)
else if Assigned(OnTextData) then
OnTextData(aEvent);
end);
2013-11-11 21:14:42 +01:00
end;
2014-01-23 16:30:39 +01:00
function TIdHTTPWebsocketClient.CheckConnection: Boolean;
begin
Result := False;
try
if (IOHandler <> nil) and
not IOHandler.ClosedGracefully and
IOHandler.Connected then
begin
IOHandler.CheckForDisconnect(True{error}, True{ignore buffer, check real connection});
Result := True; //ok if we reach here
end;
except
on E:Exception do
begin
//clear inputbuffer, otherwise it stays connected :(
// if (IOHandler <> nil) then
// IOHandler.Clear;
Disconnect(False);
if Assigned(OnDisConnected) then
OnDisConnected(Self);
end;
end;
end;
procedure TIdHTTPWebsocketClient.Connect;
begin
2014-01-31 20:22:10 +01:00
Lock;
try
if Connected then
begin
TryUpgradeToWebsocket;
Exit;
end;
2014-01-31 20:22:10 +01:00
//FHeartBeat.Enabled := True;
if SocketIOCompatible and
not FSocketIOConnectBusy then
begin
//FSocketIOConnectBusy := True;
//try
2014-01-31 20:22:10 +01:00
TryUpgradeToWebsocket; //socket.io connects using HTTP, so no seperate .Connect needed (only gives Connection closed gracefully exceptions because of new http command)
//finally
// FSocketIOConnectBusy := False;
//end;
2014-01-31 20:22:10 +01:00
end
else
begin
//clear inputbuffer, otherwise it can't connect :(
if (IOHandler <> nil) then IOHandler.Clear;
inherited Connect;
2014-01-23 10:17:13 +01:00
end;
2014-01-31 20:22:10 +01:00
finally
UnLock;
end;
end;
2013-11-11 21:14:42 +01:00
destructor TIdHTTPWebsocketClient.Destroy;
2014-01-23 16:30:39 +01:00
//var tmr: TObject;
2013-11-11 21:14:42 +01:00
begin
2014-01-23 16:30:39 +01:00
// tmr := FHeartBeat;
// FHeartBeat := nil;
// TThread.Queue(nil, //otherwise free in other thread than created
// procedure
// begin
2013-11-11 21:14:42 +01:00
//FHeartBeat.Free;
2014-01-23 16:30:39 +01:00
// tmr.Free;
// end);
2013-11-11 21:14:42 +01:00
TIdWebsocketMultiReadThread.Instance.RemoveClient(Self);
FSocketIO.Free;
FHash.Free;
inherited;
end;
procedure TIdHTTPWebsocketClient.DisConnect(ANotifyPeer: Boolean);
begin
2014-01-31 20:22:10 +01:00
if not SocketIOCompatible and
( (IOHandler <> nil) and not IOHandler.IsWebsocket)
then
TIdWebsocketMultiReadThread.Instance.RemoveClient(Self);
2013-11-11 21:14:42 +01:00
if ANotifyPeer and SocketIOCompatible then
FSocketIO.WriteDisConnect(FSocketIOContext as TSocketIOContext)
else
FSocketIO.FreeConnection(FSocketIOContext as TSocketIOContext);
// IInterface(FSocketIOContext)._Release;
FSocketIOContext := nil;
if IOHandler <> nil then
begin
IOHandler.Lock;
try
IOHandler.IsWebsocket := False;
inherited DisConnect(ANotifyPeer);
//clear buffer, other still "connected"
IOHandler.Clear;
2013-11-11 21:14:42 +01:00
//IOHandler.Free;
//IOHandler := TIdIOHandlerWebsocket.Create(nil);
finally
IOHandler.Unlock;
end;
end;
end;
function TIdHTTPWebsocketClient.GetIOHandlerWS: TIdIOHandlerWebsocket;
begin
// if inherited IOHandler is TIdIOHandlerWebsocket then
Result := inherited IOHandler as TIdIOHandlerWebsocket
// else
// Assert(False);
end;
function TIdHTTPWebsocketClient.GetSocketIO: TIdSocketIOHandling;
begin
Result := FSocketIO;
end;
2014-01-23 16:30:39 +01:00
(*
2013-11-11 21:14:42 +01:00
procedure TIdHTTPWebsocketClient.HeartBeatTimer(Sender: TObject);
begin
FHeartBeat.Enabled := False;
FSocketIO.Lock;
try
try
if (IOHandler <> nil) and
not IOHandler.ClosedGracefully and
IOHandler.Connected and
2013-11-11 21:14:42 +01:00
(FSocketIOContext <> nil) then
begin
FSocketIO.WritePing(FSocketIOContext as TSocketIOContext); //heartbeat socket.io message
2013-11-11 21:14:42 +01:00
end
//retry re-connect
else
try
//clear inputbuffer, otherwise it can't connect :(
if (IOHandler <> nil) then
IOHandler.Clear;
2013-11-11 21:14:42 +01:00
2014-01-23 10:17:13 +01:00
Self.ConnectTimeout := 100; //100ms otherwise GUI hangs too much -> todo: do it in background thread!
if not Connected then
Self.Connect;
2013-11-11 21:14:42 +01:00
TryUpgradeToWebsocket;
except
//skip, just retried
end;
except on E:Exception do
begin
//clear inputbuffer, otherwise it stays connected :(
if (IOHandler <> nil) then
IOHandler.Clear;
Disconnect(False);
2013-11-11 21:14:42 +01:00
if Assigned(OnDisConnected) then
OnDisConnected(Self);
try
raise EIdException.Create('Connection lost from ' +
Format('ws://%s:%d/%s', [Host, Port, WSResourceName]) +
' - Error: ' + e.Message);
except
//eat, no error popup!
end;
2013-11-11 21:14:42 +01:00
end;
end;
finally
FSocketIO.UnLock;
FHeartBeat.Enabled := True; //always enable: in case of disconnect it will re-connect
end;
end;
2014-01-23 16:30:39 +01:00
*)
2013-11-11 21:14:42 +01:00
function TIdHTTPWebsocketClient.TryConnect: Boolean;
begin
2014-01-31 20:22:10 +01:00
Lock;
try
2014-01-31 20:22:10 +01:00
try
if Connected then Exit(True);
2014-01-31 20:22:10 +01:00
Connect;
Result := Connected;
//if Result then
// Result := TryUpgradeToWebsocket already done in connect
2014-01-31 20:22:10 +01:00
except
Result := False;
end
finally
UnLock;
end;
end;
function TIdHTTPWebsocketClient.TryLock: Boolean;
begin
Result := System.TMonitor.TryEnter(Self);
end;
2013-11-11 21:14:42 +01:00
function TIdHTTPWebsocketClient.TryUpgradeToWebsocket: Boolean;
var
sError: string;
begin
FSocketIOConnectBusy := True;
2014-01-31 20:22:10 +01:00
Lock;
try
if (IOHandler <> nil) and IOHandler.IsWebsocket then Exit(True);
2014-01-23 10:17:13 +01:00
2014-01-31 20:22:10 +01:00
InternalUpgradeToWebsocket(False{no raise}, sError);
Result := (sError = '');
finally
FSocketIOConnectBusy := False;
2014-01-31 20:22:10 +01:00
UnLock;
end;
2014-01-31 20:22:10 +01:00
end;
procedure TIdHTTPWebsocketClient.UnLock;
begin
System.TMonitor.Exit(Self);
2013-11-11 21:14:42 +01:00
end;
procedure TIdHTTPWebsocketClient.UpgradeToWebsocket;
var
sError: string;
begin
2014-01-31 20:22:10 +01:00
Lock;
try
if not IOHandler.IsWebsocket then
InternalUpgradeToWebsocket(True{raise}, sError);
finally
UnLock;
end;
2013-11-11 21:14:42 +01:00
end;
procedure TIdHTTPWebsocketClient.InternalUpgradeToWebsocket(aRaiseException: Boolean; out aFailedReason: string);
var
sURL: string;
strmResponse: TMemoryStream;
i: Integer;
sKey, sResponseKey: string;
sSocketioextended: string;
2014-03-07 13:40:04 +01:00
bLocked: boolean;
2013-11-11 21:14:42 +01:00
begin
2014-01-23 10:17:13 +01:00
Assert((IOHandler = nil) or not IOHandler.IsWebsocket);
//remove from thread during connection handling
TIdWebsocketMultiReadThread.Instance.RemoveClient(Self);
2013-11-11 21:14:42 +01:00
2014-03-07 13:40:04 +01:00
bLocked := False;
2013-11-11 21:14:42 +01:00
strmResponse := TMemoryStream.Create;
2014-03-07 13:40:04 +01:00
Self.Lock;
2013-11-11 21:14:42 +01:00
try
2014-03-07 13:40:04 +01:00
//reset pending data
if IOHandler <> nil then
begin
IOHandler.Lock;
bLocked := True;
if IOHandler.IsWebsocket then Exit;
IOHandler.Clear;
end;
2013-11-11 21:14:42 +01:00
//special socket.io handling, see https://github.com/LearnBoost/socket.io-spec
if SocketIOCompatible then
begin
Request.Clear;
Request.Connection := 'keep-alive';
sURL := Format('http://%s:%d/socket.io/1/', [Host, Port]);
strmResponse.Clear;
2014-03-28 12:47:25 +01:00
ReadTimeout := 5 * 1000;
2013-11-11 21:14:42 +01:00
//get initial handshake
Post(sURL, strmResponse, strmResponse);
if ResponseCode = 200 {OK} then
begin
//if not Connected then //reconnect
// Self.Connect;
strmResponse.Position := 0;
//The body of the response should contain the session id (sid) given to the client,
//followed by the heartbeat timeout, the connection closing timeout, and the list of supported transports separated by :
//4d4f185e96a7b:15:10:websocket,xhr-polling
with TStreamReader.Create(strmResponse) do
try
FSocketIOHandshakeResponse := ReadToEnd;
finally
Free;
end;
sKey := Copy(FSocketIOHandshakeResponse, 1, Pos(':', FSocketIOHandshakeResponse)-1);
sSocketioextended := 'socket.io/1/websocket/' + sKey;
WSResourceName := sSocketioextended;
end
else
begin
aFailedReason := Format('Initial socket.io handshake failed: "%d: %s"',[ResponseCode, ResponseText]);
if aRaiseException then
raise EIdWebSocketHandleError.Create(aFailedReason);
end;
end;
Request.Clear;
strmResponse.Clear;
//http://www.websocket.org/aboutwebsocket.html
(* GET ws://echo.websocket.org/?encoding=text HTTP/1.1
Origin: http://websocket.org
Cookie: __utma=99as
Connection: Upgrade
Host: echo.websocket.org
Sec-WebSocket-Key: uRovscZjNol/umbTt5uKmw==
Upgrade: websocket
Sec-WebSocket-Version: 13 *)
//Connection: Upgrade
Request.Connection := 'Upgrade';
//Upgrade: websocket
Request.CustomHeaders.Add('Upgrade: websocket');
//Sec-WebSocket-Key
sKey := '';
for i := 1 to 16 do
sKey := sKey + Char(Random(127-32) + 32);
//base64 encoded
sKey := TIdEncoderMIME.EncodeString(sKey);
Request.CustomHeaders.AddValue('Sec-WebSocket-Key', sKey);
//Sec-WebSocket-Version: 13
Request.CustomHeaders.AddValue('Sec-WebSocket-Version', '13');
Request.Host := Format('Host: %s:%d',[Host,Port]);
//ws://host:port/<resourcename>
//about resourcename, see: http://dev.w3.org/html5/websockets/ "Parsing WebSocket URLs"
//sURL := Format('ws://%s:%d/%s', [Host, Port, WSResourceName]);
sURL := Format('http://%s:%d/%s', [Host, Port, WSResourceName]);
ReadTimeout := Max(2 * 1000, ReadTimeout);
{ voorbeeld:
GET http://localhost:9222/devtools/page/642D7227-148E-47C2-B97A-E00850E3AFA3 HTTP/1.1
Upgrade: websocket
Connection: Upgrade
Host: localhost:9222
Origin: http://localhost:9222
Pragma: no-cache
Cache-Control: no-cache
Sec-WebSocket-Key: HIqoAdZkxnWWH9dnVPyW7w==
Sec-WebSocket-Version: 13
Sec-WebSocket-Extensions: x-webkit-deflate-frame
User-Agent: Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/27.0.1453.116 Safari/537.36
Cookie: __utma=1.2040118404.1366961318.1366961318.1366961318.1; __utmc=1; __utmz=1.1366961318.1.1.utmcsr=(direct)|utmccn=(direct)|utmcmd=(none); deviceorder=0123456789101112; MultiTouchEnabled=false; device=3; network_type=0
}
2014-03-12 22:04:30 +01:00
(*
2013-11-11 21:14:42 +01:00
if SocketIOCompatible then
begin
Response.Clear;
Response.ResponseCode := 0;
Request.URL := sURL;
Request.Method := Id_HTTPMethodGet;
Request.Source := nil;
Response.ContentStream := strmResponse;
PrepareRequest(Request);
//connect and upgrade
ConnectToHost(Request, Response);
//check upgrade succesfull
CheckForGracefulDisconnect(True);
CheckConnected;
Assert(Self.Connected);
2013-11-18 14:27:13 +01:00
if Response.ResponseCode = 0 then
Response.ResponseText := Response.ResponseText;
2013-11-11 21:14:42 +01:00
if Response.ResponseCode <> 200{ok} then
begin
aFailedReason := Format('Error while upgrading: "%d: %s"',[ResponseCode, ResponseText]);
if aRaiseException then
raise EIdWebSocketHandleError.Create(aFailedReason)
else
Exit;
end;
Response.Clear;
if IOHandler.CheckForDataOnSource(ReadTimeout) then
Response.ResponseText := IOHandler.InputBufferAsString();
end
else
2014-03-12 22:04:30 +01:00
*)
2013-11-11 21:14:42 +01:00
begin
Get(sURL, strmResponse, [101]);
//http://www.websocket.org/aboutwebsocket.html
(* HTTP/1.1 101 WebSocket Protocol Handshake
Date: Fri, 10 Feb 2012 17:38:18 GMT
Connection: Upgrade
Server: Kaazing Gateway
Upgrade: WebSocket
Access-Control-Allow-Origin: http://websocket.org
Access-Control-Allow-Credentials: true
Sec-WebSocket-Accept: rLHCkw/SKsO9GAH/ZSFhBATDKrU=
Access-Control-Allow-Headers: content-type *)
//'HTTP/1.1 101 Switching Protocols'
if ResponseCode <> 101 then
begin
aFailedReason := Format('Error while upgrading: "%d: %s"',[ResponseCode, ResponseText]);
if aRaiseException then
raise EIdWebSocketHandleError.Create(aFailedReason);
end;
//connection: upgrade
if not SameText(Response.Connection, 'upgrade') then
begin
aFailedReason := Format('Connection not upgraded: "%s"',[Response.Connection]);
if aRaiseException then
raise EIdWebSocketHandleError.Create(aFailedReason);
end;
//upgrade: websocket
if not SameText(Response.RawHeaders.Values['upgrade'], 'websocket') then
begin
aFailedReason := Format('Not upgraded to websocket: "%s"',[Response.RawHeaders.Values['upgrade']]);
if aRaiseException then
raise EIdWebSocketHandleError.Create(aFailedReason);
end;
//check handshake key
sResponseKey := Trim(sKey) + //... "minus any leading and trailing whitespace"
'258EAFA5-E914-47DA-95CA-C5AB0DC85B11'; //special GUID
sResponseKey := TIdEncoderMIME.EncodeBytes( //Base64
FHash.HashString(sResponseKey) ); //SHA1
if not SameText(Response.RawHeaders.Values['sec-websocket-accept'], sResponseKey) then
begin
aFailedReason := 'Invalid key handshake';
if aRaiseException then
raise EIdWebSocketHandleError.Create(aFailedReason);
end;
end;
//upgrade succesful
IOHandler.IsWebsocket := True;
aFailedReason := '';
Assert(Connected);
2013-11-11 21:14:42 +01:00
if SocketIOCompatible then
begin
2014-01-31 20:22:10 +01:00
FSocketIOContext := TSocketIOContext.Create(Self);
(FSocketIOContext as TSocketIOContext).ConnectSend := True; //connect already send via url? GET /socket.io/1/websocket/9elrbEFqiimV29QAM6T-
2013-11-11 21:14:42 +01:00
FSocketIO.WriteConnect(FSocketIOContext as TSocketIOContext);
end;
//always read the data! (e.g. RO use override of AsyncDispatchEvent to process data)
//if Assigned(OnBinData) or Assigned(OnTextData) then
2013-11-11 21:14:42 +01:00
finally
Request.Clear;
strmResponse.Free;
2014-01-31 20:22:10 +01:00
2014-03-07 13:40:04 +01:00
if bLocked and (IOHandler <> nil) then
IOHandler.Unlock;
Unlock;
2014-01-31 20:22:10 +01:00
//add to thread for auto retry/reconnect
TIdWebsocketMultiReadThread.Instance.AddClient(Self);
2013-11-11 21:14:42 +01:00
end;
end;
2014-01-31 20:22:10 +01:00
procedure TIdHTTPWebsocketClient.Lock;
begin
System.TMonitor.Enter(Self);
end;
2013-11-11 21:14:42 +01:00
function TIdHTTPWebsocketClient.MakeImplicitClientHandler: TIdIOHandler;
begin
Result := TIdIOHandlerWebsocket.Create(nil);
end;
2014-01-23 16:30:39 +01:00
procedure TIdHTTPWebsocketClient.Ping;
var
ws: TIdIOHandlerWebsocket;
begin
ws := IOHandler as TIdIOHandlerWebsocket;
2014-02-04 21:23:49 +01:00
ws.LastPingTime := Now;
2014-01-23 16:30:39 +01:00
//socket.io?
if SocketIOCompatible and ws.IsWebsocket then
begin
FSocketIO.Lock;
try
if (FSocketIOContext <> nil) then
FSocketIO.WritePing(FSocketIOContext as TSocketIOContext); //heartbeat socket.io message
finally
FSocketIO.UnLock;
end
end
//only websocket?
else if not SocketIOCompatible and ws.IsWebsocket then
begin
if ws.TryLock then
try
ws.WriteData(nil, wdcPing);
finally
ws.Unlock;
end;
end;
end;
2013-11-11 21:14:42 +01:00
procedure TIdHTTPWebsocketClient.ResetChannel;
//var
// ws: TIdIOHandlerWebsocket;
begin
2014-01-31 20:22:10 +01:00
// TIdWebsocketMultiReadThread.Instance.RemoveClient(Self); keep for reconnect
2013-11-11 21:14:42 +01:00
if IOHandler <> nil then
begin
IOHandler.InputBuffer.Clear;
IOHandler.BusyUpgrading := False;
IOHandler.IsWebsocket := False;
2013-11-11 21:14:42 +01:00
//close/disconnect internal socket
//ws := IndyClient.IOHandler as TIdIOHandlerWebsocket;
//ws.Close; done in disconnect below
end;
Disconnect(False);
end;
procedure TIdHTTPWebsocketClient.SetIOHandlerWS(
const Value: TIdIOHandlerWebsocket);
begin
SetIOHandler(Value);
end;
procedure TIdHTTPWebsocketClient.SetOnData(const Value: TWebsocketMsgBin);
2013-11-11 21:14:42 +01:00
begin
// if not Assigned(Value) and not Assigned(FOnTextData) then
// TIdWebsocketMultiReadThread.Instance.RemoveClient(Self);
FOnData := Value;
// if Assigned(Value) and
// (Self.IOHandler as TIdIOHandlerWebsocket).IsWebsocket
// then
// TIdWebsocketMultiReadThread.Instance.AddClient(Self);
2013-11-11 21:14:42 +01:00
end;
procedure TIdHTTPWebsocketClient.SetOnTextData(const Value: TWebsocketMsgText);
2013-11-11 21:14:42 +01:00
begin
// if not Assigned(Value) and not Assigned(FOnData) then
// TIdWebsocketMultiReadThread.Instance.RemoveClient(Self);
FOnTextData := Value;
// if Assigned(Value) and
// (Self.IOHandler as TIdIOHandlerWebsocket).IsWebsocket
// then
// TIdWebsocketMultiReadThread.Instance.AddClient(Self);
2013-11-11 21:14:42 +01:00
end;
{ TIdHTTPSocketIOClient }
2014-01-31 20:22:10 +01:00
(*
2013-11-11 21:14:42 +01:00
procedure TIdHTTPSocketIOClient_old.AfterConstruction;
begin
inherited;
SocketIOCompatible := True;
FHeartBeat := TTimer.Create(nil);
FHeartBeat.Enabled := False;
FHeartBeat.OnTimer := HeartBeatTimer;
end;
procedure TIdHTTPSocketIOClient_old.AsyncDispatchEvent(const aEvent: string);
begin
//https://github.com/LearnBoost/socket.io-spec
if StartsStr('1:', aEvent) then //connect
Exit;
if aEvent = '2::' then //ping, heartbeat
Exit;
inherited AsyncDispatchEvent(aEvent);
end;
procedure TIdHTTPSocketIOClient_old.AutoConnect;
begin
//for now: timer in mainthread?
TThread.Queue(nil,
procedure
begin
FHeartBeat.Interval := 5 * 1000;
FHeartBeat.Enabled := True;
end);
end;
destructor TIdHTTPSocketIOClient_old.Destroy;
var tmr: TObject;
begin
tmr := FHeartBeat;
TThread.Queue(nil, //otherwise free in other thread than created
procedure
begin
//FHeartBeat.Free;
tmr.Free;
end);
inherited;
end;
procedure TIdHTTPSocketIOClient_old.HeartBeatTimer(Sender: TObject);
begin
FHeartBeat.Enabled := False;
try
try
if (IOHandler <> nil) and
not IOHandler.ClosedGracefully and
IOHandler.Connected then
begin
IOHandler.Write('2:::'); //heartbeat socket.io message
end
//retry connect
else
try
//clear inputbuffer, otherwise it can't connect :(
if (IOHandler <> nil) and
not IOHandler.InputBufferIsEmpty
then
IOHandler.DiscardAll;
Self.Connect;
TryUpgradeToWebsocket;
except
//skip, just retried
end;
except
//clear inputbuffer, otherwise it stays connected :(
if (IOHandler <> nil) and
not IOHandler.InputBufferIsEmpty
then
IOHandler.DiscardAll;
if Assigned(OnDisConnected) then
OnDisConnected(Self);
try
raise EIdException.Create('Connection lost from ' + Format('ws://%s:%d/%s', [Host, Port, WSResourceName]));
except
//eat, no error popup!
end;
end;
finally
FHeartBeat.Enabled := True; //always enable: in case of disconnect it will re-connect
end;
end;
procedure TIdHTTPSocketIOClient_old.InternalUpgradeToWebsocket(
aRaiseException: Boolean; out aFailedReason: string);
var
stimeout: string;
begin
inherited InternalUpgradeToWebsocket(aRaiseException, aFailedReason);
if (aFailedReason = '') and
(IOHandler as TIdIOHandlerWebsocket).IsWebsocket then
begin
stimeout := Copy(SocketIOHandshakeResponse, Pos(':', SocketIOHandshakeResponse)+1, Length(SocketIOHandshakeResponse));
stimeout := Copy(stimeout, 1, Pos(':', stimeout)-1);
if stimeout <> '' then
begin
//if (FHeartBeat.Interval > 0) then
//for now: timer in mainthread?
TThread.Queue(nil,
procedure
begin
FHeartBeat.Interval := StrToIntDef(stimeout, 15) * 1000;
if FHeartBeat.Interval >= 15000 then
//FHeartBeat.Interval := FHeartBeat.Interval - 5000
FHeartBeat.Interval := 5000
else if FHeartBeat.Interval >= 5000 then
FHeartBeat.Interval := FHeartBeat.Interval - 2000;
FHeartBeat.Enabled := (FHeartBeat.Interval > 0);
end);
end;
if Assigned(OnConnected) then
OnConnected(Self);
end;
end;
2014-01-31 20:22:10 +01:00
)
2013-11-11 21:14:42 +01:00
procedure TIdHTTPSocketIOClient_old.ProcessSocketIORequest(
const strmRequest: TStream);
function __ReadToEnd: string;
var
utf8: TBytes;
ilength: Integer;
begin
Result := '';
ilength := strmRequest.Size - strmRequest.Position;
SetLength(utf8, ilength);
strmRequest.Read(utf8[0], ilength);
Result := TEncoding.UTF8.GetString(utf8);
end;
function __GetSocketIOPart(const aData: string; aIndex: Integer): string;
var ipos: Integer;
i: Integer;
begin
//'5::/chat:{"name":"hi!"}'
//0 = 5
//1 =
//2 = /chat
//3 = {"name":"hi!"}
ipos := 0;
for i := 0 to aIndex-1 do
ipos := PosEx(':', aData, ipos+1);
if ipos >= 0 then
begin
Result := Copy(aData, ipos+1, Length(aData));
if aIndex < 3 then // /chat:{"name":"hi!"}'
begin
ipos := PosEx(':', Result, 1); // :{"name":"hi!"}'
if ipos > 0 then
Result := Copy(Result, 1, ipos-1); // /chat
end;
end;
end;
var
str, smsg, schannel, sdata: string;
imsg: Integer;
// bCallback: Boolean;
begin
str := __ReadToEnd;
if str = '' then Exit;
//5:1+:/chat:test
smsg := __GetSocketIOPart(str, 1);
imsg := 0;
// bCallback := False;
if smsg <> '' then // 1+
begin
imsg := StrToIntDef(ReplaceStr(smsg,'+',''), 0); // 1
// bCallback := (Pos('+', smsg) > 1); //trailing +, e.g. 1+
end;
schannel := __GetSocketIOPart(str, 2); // /chat
sdata := __GetSocketIOPart(str, 3); // test
//(0) Disconnect
if StartsStr('0:', str) then
begin
schannel := __GetSocketIOPart(str, 2);
if schannel <> '' then
//todo: close channel
else
Self.Disconnect;
end
//(1) Connect
//'1::' [path] [query]
else if StartsStr('1:', str) then
begin
//todo: add channel/room to authorized channel/room list
Self.IOHandler.Write(str); //write same connect back, e.g. 1::/chat
end
//(2) Heartbeat
else if StartsStr('2:', str) then
begin
Self.IOHandler.Write(str); //write same connect back, e.g. 2::
end
//(3) Message (https://github.com/LearnBoost/socket.io-spec#3-message)
//'3:' [message id ('+')] ':' [message endpoint] ':' [data]
//3::/chat:hi
else if StartsStr('3:', str) then
begin
if Assigned(OnSocketIOMsg) then
OnSocketIOMsg(Self, sdata, imsg);
end
//(4) JSON Message
//'4:' [message id ('+')] ':' [message endpoint] ':' [json]
//4:1::{"a":"b"}
else if StartsStr('4:', str) then
begin
if Assigned(OnSocketIOJson) then
OnSocketIOJson(Self, sdata, imsg);
end
//(5) Event
//'5:' [message id ('+')] ':' [message endpoint] ':' [json encoded event]
//5::/chat:{"name":"my other event","args":[{"my":"data"}]}
//5:1+:/chat:{"name":"GetLocations","args":[""]}
else if StartsStr('5:', str) then
begin
if Assigned(OnSocketIOEvent) then
OnSocketIOEvent(Self, sdata, imsg);
end
//(6) ACK
//6::/news:1+["callback"]
//6:::1+["Response"]
//(7) Error
//(8) Noop
else if StartsStr('8:', str) then
begin
//nothing
end
else
raise Exception.CreateFmt('Unsupported data: "%s"', [str]);
end;
*)
{ TIdWebsocketMultiReadThread }
procedure TIdWebsocketMultiReadThread.AddClient(
aChannel: TIdHTTPWebsocketClient);
var l: TList;
begin
2014-01-31 20:22:10 +01:00
//Assert( (aChannel.IOHandler as TIdIOHandlerWebsocket).IsWebsocket, 'Channel is not a websocket');
2013-11-11 21:14:42 +01:00
l := FChannels.LockList;
try
//already exists?
if l.IndexOf(aChannel) >= 0 then Exit;
2013-11-18 14:27:13 +01:00
2013-11-11 21:14:42 +01:00
Assert(l.Count < 64, 'Max 64 connections can be handled by one read thread!'); //due to restrictions of the "select" API
l.Add(aChannel);
//trigger the "select" wait
BreakSelectWait;
finally
FChannels.UnlockList;
end;
end;
procedure TIdWebsocketMultiReadThread.AfterConstruction;
begin
inherited;
ReadTimeout := 5000;
2013-11-11 21:14:42 +01:00
FChannels := TThreadList.Create;
FillChar(Freadset, SizeOf(Freadset), 0);
FillChar(Fexceptionset, SizeOf(Fexceptionset), 0);
InitSpecialEventSocket;
end;
procedure TIdWebsocketMultiReadThread.BreakSelectWait;
var
2013-11-18 14:27:13 +01:00
//iResult: Integer;
2013-11-11 21:14:42 +01:00
LAddr: TSockAddrIn6;
begin
2013-11-18 14:27:13 +01:00
if FTempHandle = 0 then Exit;
2013-11-11 21:14:42 +01:00
FillChar(LAddr, SizeOf(LAddr), 0);
//Id_IPv4
with PSOCKADDR(@LAddr)^ do
begin
sin_family := Id_PF_INET4;
//dummy address and port
(GStack as TIdStackBSDBase).TranslateStringToTInAddr('0.0.0.0', sin_addr, Id_IPv4);
sin_port := htons(1);
end;
FPendingBreak := True;
//connect to non-existing address to stop "select" from waiting
//Note: this is some kind of "hack" because there is no nice way to stop it
//The only(?) other possibility is to make a "socket pair" and send a byte to it,
//but this requires a dynamic server socket (which can trigger a firewall
//exception/question popup in WindowsXP+)
2013-11-18 14:27:13 +01:00
//iResult :=
IdWinsock2.connect(FTempHandle, PSOCKADDR(@LAddr), SIZE_TSOCKADDRIN);
2013-11-11 21:14:42 +01:00
//non blocking socket, so will always result in "would block"!
2013-11-18 14:27:13 +01:00
// if (iResult <> Id_SOCKET_ERROR) or
// ( (GStack <> nil) and (GStack.WSGetLastError <> WSAEWOULDBLOCK) )
// then
// GStack.CheckForSocketError(iResult);
2013-11-11 21:14:42 +01:00
end;
destructor TIdWebsocketMultiReadThread.Destroy;
begin
IdWinsock2.closesocket(FTempHandle);
2013-11-18 14:27:13 +01:00
FTempHandle := 0;
2013-11-11 21:14:42 +01:00
FChannels.Free;
inherited;
end;
procedure TIdWebsocketMultiReadThread.Execute;
begin
Self.NameThreadForDebugging(AnsiString(Self.ClassName));
while not Terminated do
begin
try
while not Terminated do
2014-01-23 16:30:39 +01:00
begin
2013-11-11 21:14:42 +01:00
ReadFromAllChannels;
2014-01-23 16:30:39 +01:00
PingAllChannels;
end;
2013-11-11 21:14:42 +01:00
except
//continue
end;
end;
end;
procedure TIdWebsocketMultiReadThread.InitSpecialEventSocket;
var
param: Cardinal;
iResult: Integer;
begin
if GStack = nil then Exit; //finalized?
2013-11-18 14:27:13 +01:00
2013-11-11 21:14:42 +01:00
//alloc socket
FTempHandle := GStack.NewSocketHandle(Id_SOCK_STREAM, Id_IPPROTO_IP, Id_IPv4, False);
Assert(FTempHandle <> Id_INVALID_SOCKET);
//non block mode
param := 1; // enable NON blocking mode
iResult := ioctlsocket(FTempHandle, FIONBIO, param);
GStack.CheckForSocketError(iResult);
end;
class function TIdWebsocketMultiReadThread.Instance: TIdWebsocketMultiReadThread;
begin
2013-11-18 14:27:13 +01:00
if (FInstance = nil) then
2013-11-11 21:14:42 +01:00
begin
FInstance := TIdWebsocketMultiReadThread.Create(True);
FInstance.Start;
end;
Result := FInstance;
end;
2014-01-23 16:30:39 +01:00
procedure TIdWebsocketMultiReadThread.PingAllChannels;
var
l: TList;
chn: TIdHTTPWebsocketClient;
ws: TIdIOHandlerWebsocket;
i: Integer;
begin
l := FChannels.LockList;
try
for i := 0 to l.Count - 1 do
begin
chn := TIdHTTPWebsocketClient(l.Items[i]);
ws := chn.IOHandler as TIdIOHandlerWebsocket;
//valid?
if (chn.IOHandler <> nil) and
(chn.IOHandler.IsWebsocket) and
(chn.Socket <> nil) and
2014-01-31 20:22:10 +01:00
(chn.Socket.Binding <> nil) and
(chn.Socket.Binding.Handle > 0) and
2014-01-23 16:30:39 +01:00
(chn.Socket.Binding.Handle <> INVALID_SOCKET) then
begin
//more than 10s nothing done? then send ping
2014-02-04 21:23:49 +01:00
if SecondsBetween(Now, ws.LastPingTime) > 10 then
2014-01-23 16:30:39 +01:00
if chn.CheckConnection then
try
chn.Ping;
except
//retry connect the next time?
end;
end
else if not chn.Connected then
begin
if (ws <> nil) and
(SecondsBetween(Now, ws.LastActivityTime) < 5)
then
Continue;
if FReconnectlist = nil then
FReconnectlist := TWSThreadList.Create;
//if chn.TryLock then
FReconnectlist.Add(chn);
end;
end;
finally
FChannels.UnlockList;
end;
//reconnect needed? (in background)
2014-03-07 13:40:04 +01:00
if (FReconnectlist <> nil) and (FReconnectlist.Count > 0) then
begin
if FReconnectThread = nil then
FReconnectThread := TIdWebsocketQueueThread.Create(False{direct start});
FReconnectThread.QueueEvent(
procedure
var
l: TList;
chn: TIdHTTPWebsocketClient;
begin
while FReconnectlist.Count > 0 do
begin
chn := nil;
2014-03-07 13:40:04 +01:00
try
//get first one
l := FReconnectlist.LockList;
try
if l.Count <= 0 then Exit;
chn := TObject(l.Items[0]) as TIdHTTPWebsocketClient;
if not chn.TryLock then
begin
l.Delete(0);
chn := nil;
Continue;
end;
finally
FReconnectlist.UnlockList;
end;
//try reconnect
ws := chn.IOHandler as TIdIOHandlerWebsocket;
if ( (ws = nil) or
(SecondsBetween(Now, ws.LastActivityTime) >= 5) ) then
begin
2014-03-07 13:40:04 +01:00
try
if not chn.Connected then
begin
if ws <> nil then
ws.LastActivityTime := Now;
//chn.ConnectTimeout := 1000;
if (chn.Host <> '') and (chn.Port > 0) then
chn.TryUpgradeToWebsocket;
end;
except
//just try
end;
end;
//remove from todo list
l := FReconnectlist.LockList;
try
if l.Count > 0 then
l.Delete(0);
finally
FReconnectlist.UnlockList;
end;
finally
2014-03-12 22:04:30 +01:00
if chn <> nil then
chn.Unlock;
2014-01-23 16:30:39 +01:00
end;
end;
end);
2014-01-23 16:30:39 +01:00
end;
end;
2013-11-11 21:14:42 +01:00
procedure TIdWebsocketMultiReadThread.ReadFromAllChannels;
var
l: TList;
chn: TIdHTTPWebsocketClient;
iCount,
i: Integer;
iResult: NativeInt;
strmEvent: TMemoryStream;
swstext: utf8string;
ws: TIdIOHandlerWebsocket;
wscode: TWSDataCode;
begin
l := FChannels.LockList;
try
iCount := 0;
iResult := 0;
2013-11-11 21:14:42 +01:00
Freadset.fd_count := iCount;
for i := 0 to l.Count - 1 do
begin
chn := TIdHTTPWebsocketClient(l.Items[i]);
//valid?
if //not chn.Busy and also take busy channels (will be ignored later), otherwise we have to break/reset for each RO function execution
(chn.IOHandler <> nil) and
(chn.IOHandler.IsWebsocket) and
2014-01-31 20:22:10 +01:00
(chn.Socket <> nil) and
(chn.Socket.Binding <> nil) and
2013-11-11 21:14:42 +01:00
(chn.Socket.Binding.Handle > 0) and
(chn.Socket.Binding.Handle <> INVALID_SOCKET) then
begin
if chn.IOHandler.HasData then
begin
Inc(iResult);
Break;
end;
Freadset.fd_count := iCount+1;
Freadset.fd_array[iCount] := chn.Socket.Binding.Handle;
Inc(iCount);
2013-11-11 21:14:42 +01:00
end;
end;
if FPendingBreak then
ResetSpecialEventSocket;
finally
FChannels.UnlockList;
end;
//special helper socket to be able to stop "select" from waiting
Fexceptionset.fd_count := 1;
Fexceptionset.fd_array[0] := FTempHandle;
//wait 15s till some data
Finterval.tv_sec := Self.ReadTimeout div 1000; //5s
Finterval.tv_usec := Self.ReadTimeout mod 1000;
2013-11-11 21:14:42 +01:00
//nothing to wait for? then sleep some time to prevent 100% CPU
if iResult = 0 then
2013-11-11 21:14:42 +01:00
begin
if iCount = 0 then
begin
iResult := IdWinsock2.select(0, nil, nil, @Fexceptionset, @Finterval);
if iResult = SOCKET_ERROR then
iResult := 1; //ignore errors
end
//wait till a socket has some data (or a signal via exceptionset is fired)
else
iResult := IdWinsock2.select(0, @Freadset, nil, @Fexceptionset, @Finterval);
2013-11-11 21:14:42 +01:00
if iResult = SOCKET_ERROR then
//raise EIdWinsockStubError.Build(WSAGetLastError, '', []);
//ignore error during wait: socket disconnected etc
Exit;
end;
2013-11-11 21:14:42 +01:00
2013-11-18 14:27:13 +01:00
if Terminated then Exit;
2013-11-11 21:14:42 +01:00
//some data?
if (iResult > 0) then
begin
strmEvent := nil;
l := FChannels.LockList;
try
//check for data for all channels
for i := 0 to l.Count - 1 do
begin
chn := TIdHTTPWebsocketClient(l.Items[i]);
2014-01-23 10:17:13 +01:00
ws := chn.IOHandler as TIdIOHandlerWebsocket;
2014-01-31 20:22:10 +01:00
if (ws = nil) then Continue;
2014-01-23 10:17:13 +01:00
if ws.TryLock then //IOHandler.Readable cannot be done during pending action!
try
2014-01-31 20:22:10 +01:00
try
//try to process all events
while chn.IOHandler.HasData or
chn.IOHandler.Readable(0) do //has some data
begin
if strmEvent = nil then
strmEvent := TMemoryStream.Create;
strmEvent.Clear;
//first is the data type TWSDataType(text or bin), but is ignore/not needed
wscode := TWSDataCode(chn.IOHandler.ReadLongWord);
if not (wscode in [wdcText, wdcBinary, wdcPing, wdcPong]) then
Continue;
2013-11-11 21:14:42 +01:00
2014-01-31 20:22:10 +01:00
//next the size + data = stream
chn.IOHandler.ReadStream(strmEvent);
2013-11-11 21:14:42 +01:00
2014-01-31 20:22:10 +01:00
//ignore ping/pong messages
if wscode in [wdcPing, wdcPong] then Continue;
2013-11-11 21:14:42 +01:00
//fire event
//offload event dispatching to different thread! otherwise deadlocks possible? (do to synchronize)
strmEvent.Position := 0;
if wscode = wdcBinary then
begin
chn.AsyncDispatchEvent(strmEvent);
end
else if wscode = wdcText then
begin
SetLength(swstext, strmEvent.Size);
strmEvent.Read(swstext[1], strmEvent.Size);
if swstext <> '' then
begin
chn.AsyncDispatchEvent(string(swstext));
end;
end;
2014-01-31 20:22:10 +01:00
end;
except
l := nil;
FChannels.UnlockList;
chn.ResetChannel;
raise;
2014-01-23 10:17:13 +01:00
end;
finally
ws.Unlock;
2013-11-11 21:14:42 +01:00
end;
end;
if FPendingBreak then
ResetSpecialEventSocket;
finally
if l <> nil then
FChannels.UnlockList;
strmEvent.Free;
end;
2014-01-13 10:00:09 +01:00
end;
2013-11-11 21:14:42 +01:00
end;
procedure TIdWebsocketMultiReadThread.RemoveClient(
aChannel: TIdHTTPWebsocketClient);
begin
if Self = nil then Exit;
aChannel.Lock;
try
2013-11-11 21:14:42 +01:00
FChannels.Remove(aChannel);
if FReconnectlist <> nil then
FReconnectlist.Remove(aChannel);
finally
aChannel.UnLock;
end;
2013-11-11 21:14:42 +01:00
BreakSelectWait;
end;
class procedure TIdWebsocketMultiReadThread.RemoveInstance;
begin
if FInstance <> nil then
begin
FInstance.Terminate;
FInstance.WaitFor;
2013-11-11 21:14:42 +01:00
FreeAndNil(FInstance);
end;
2013-11-11 21:14:42 +01:00
end;
procedure TIdWebsocketMultiReadThread.ResetSpecialEventSocket;
begin
Assert(FPendingBreak);
FPendingBreak := False;
IdWinsock2.closesocket(FTempHandle);
2013-11-18 14:27:13 +01:00
FTempHandle := 0;
2013-11-11 21:14:42 +01:00
InitSpecialEventSocket;
end;
procedure TIdWebsocketMultiReadThread.Terminate;
begin
inherited Terminate;
FChannels.LockList;
try
//fire a signal, so the "select" wait will quit and thread can stop
BreakSelectWait;
finally
FChannels.UnlockList;
end;
end;
{ TIdWebsocketDispatchThread }
class function TIdWebsocketDispatchThread.Instance: TIdWebsocketDispatchThread;
begin
if FInstance = nil then
begin
2013-11-18 14:27:13 +01:00
GlobalNameSpace.BeginWrite;
try
if FInstance = nil then
begin
FInstance := Self.Create(True);
2013-11-18 14:27:13 +01:00
FInstance.Start;
end;
finally
GlobalNameSpace.EndWrite;
end;
2013-11-11 21:14:42 +01:00
end;
Result := FInstance;
end;
class procedure TIdWebsocketDispatchThread.RemoveInstance;
2013-11-11 21:14:42 +01:00
begin
if FInstance <> nil then
begin
FInstance.Terminate;
FInstance.WaitFor;
FreeAndNil(FInstance);
2013-11-11 21:14:42 +01:00
end;
end;
{ TWSThreadList }
function TWSThreadList.Count: Integer;
var l: TList;
begin
l := LockList;
try
Result := l.Count;
finally
UnlockList;
end;
end;
2013-11-11 21:14:42 +01:00
initialization
finalization
TIdWebsocketMultiReadThread.RemoveInstance;
TIdWebsocketDispatchThread.RemoveInstance
2013-11-11 21:14:42 +01:00
end.