cdffdd25e1
Defines are defined in wsdefines.pas Removing SUPEROBJECT allow to release under MPL license (which i expect) Also fix * bug : framing encoding when sending a frame in multiple parts (fin=false) * bug : TIdIOHandlerWebsocket TIdIOHandlerWebsocket.ReadFrame _WaitByte ; may hang Other changes * Refactoring of TIdServerWebsocketHandling.ProcessServerCommandGet for inheritance * Add event (TIdServerWSContext) to accept or refuse upgrade (allow to check session cookie) * Change TWebsocketChannelRequest var aType:TWSDataType to allow receiving in a mode and answering in an other To use OpenSSL you need a modification in IdSSLOpenSSL to let overwrite TIdSSLIOHandlerSocketOpenSSL class
1681 lines
45 KiB
ObjectPascal
1681 lines
45 KiB
ObjectPascal
unit IdHTTPWebsocketClient;
|
|
interface
|
|
{$I wsdefines.pas}
|
|
uses
|
|
Classes,
|
|
IdHTTP,
|
|
Types,
|
|
IdHashSHA, //XE3 etc
|
|
IdIOHandler,
|
|
IdIOHandlerWebsocket,
|
|
IdWinsock2, Generics.Collections, SyncObjs,
|
|
IdSocketIOHandling;
|
|
|
|
type
|
|
TWebsocketMsgBin = procedure(const aData: TStream) of object;
|
|
TWebsocketMsgText = procedure(const aData: string) of object;
|
|
|
|
TIdHTTPWebsocketClient = class;
|
|
TSocketIOMsg = procedure(const AClient: TIdHTTPWebsocketClient; const aText: string; aMsgNr: Integer) of object;
|
|
|
|
TIdSocketIOHandling_Ext = class(TIdSocketIOHandling)
|
|
end;
|
|
|
|
TIdHTTPWebsocketClient = class(TIdHTTP)
|
|
private
|
|
FWSResourceName: string;
|
|
FHash: TIdHashSHA1;
|
|
FOnData: TWebsocketMsgBin;
|
|
FOnTextData: TWebsocketMsgText;
|
|
FNoAsyncRead: Boolean;
|
|
FWriteTimeout: Integer;
|
|
function GetIOHandlerWS: TIdIOHandlerWebsocket;
|
|
procedure SetIOHandlerWS(const Value: TIdIOHandlerWebsocket);
|
|
procedure SetOnData(const Value: TWebsocketMsgBin);
|
|
procedure SetOnTextData(const Value: TWebsocketMsgText);
|
|
procedure SetWriteTimeout(const Value: Integer);
|
|
protected
|
|
FSocketIOCompatible: Boolean;
|
|
FSocketIOHandshakeResponse: string;
|
|
FSocketIO: TIdSocketIOHandling_Ext;
|
|
FSocketIOContext: ISocketIOContext;
|
|
FSocketIOConnectBusy: Boolean;
|
|
|
|
//FHeartBeat: TTimer;
|
|
//procedure HeartBeatTimer(Sender: TObject);
|
|
function GetSocketIO: TIdSocketIOHandling;
|
|
protected
|
|
procedure InternalUpgradeToWebsocket(aRaiseException: Boolean; out aFailedReason: string);virtual;
|
|
function MakeImplicitClientHandler: TIdIOHandler; override;
|
|
public
|
|
procedure AsyncDispatchEvent(const aEvent: TStream); overload; virtual;
|
|
procedure AsyncDispatchEvent(const aEvent: string); overload; virtual;
|
|
procedure ResetChannel;
|
|
public
|
|
procedure AfterConstruction; override;
|
|
destructor Destroy; override;
|
|
|
|
function TryUpgradeToWebsocket: Boolean;
|
|
procedure UpgradeToWebsocket;
|
|
|
|
function TryLock: Boolean;
|
|
procedure Lock;
|
|
procedure UnLock;
|
|
|
|
procedure Connect; override;
|
|
procedure ConnectAsync; virtual;
|
|
function TryConnect: Boolean;
|
|
procedure Disconnect(ANotifyPeer: Boolean); override;
|
|
|
|
function CheckConnection: Boolean;
|
|
procedure Ping;
|
|
procedure ReadAndProcessData;
|
|
|
|
property IOHandler: TIdIOHandlerWebsocket read GetIOHandlerWS write SetIOHandlerWS;
|
|
|
|
//websockets
|
|
property OnBinData : TWebsocketMsgBin read FOnData write SetOnData;
|
|
property OnTextData: TWebsocketMsgText read FOnTextData write SetOnTextData;
|
|
|
|
property NoAsyncRead: Boolean read FNoAsyncRead write FNoAsyncRead;
|
|
|
|
//https://github.com/LearnBoost/socket.io-spec
|
|
property SocketIOCompatible: Boolean read FSocketIOCompatible write FSocketIOCompatible;
|
|
property SocketIO: TIdSocketIOHandling read GetSocketIO;
|
|
published
|
|
property Host;
|
|
property Port;
|
|
property WSResourceName: string read FWSResourceName write FWSResourceName;
|
|
|
|
property WriteTimeout: Integer read FWriteTimeout write SetWriteTimeout default 2000;
|
|
end;
|
|
|
|
// on error
|
|
(*
|
|
TIdHTTPSocketIOClient_old = class(TIdHTTPWebsocketClient)
|
|
private
|
|
FOnConnected: TNotifyEvent;
|
|
FOnDisConnected: TNotifyEvent;
|
|
FOnSocketIOMsg: TSocketIOMsg;
|
|
FOnSocketIOEvent: TSocketIOMsg;
|
|
FOnSocketIOJson: TSocketIOMsg;
|
|
protected
|
|
FHeartBeat: TTimer;
|
|
procedure HeartBeatTimer(Sender: TObject);
|
|
|
|
procedure InternalUpgradeToWebsocket(aRaiseException: Boolean; out aFailedReason: string);override;
|
|
public
|
|
procedure AsyncDispatchEvent(const aEvent: string); override;
|
|
public
|
|
procedure AfterConstruction; override;
|
|
destructor Destroy; override;
|
|
|
|
procedure AutoConnect;
|
|
|
|
property SocketIOHandshakeResponse: string read FSocketIOHandshakeResponse;
|
|
property OnConnected: TNotifyEvent read FOnConnected write FOnConnected;
|
|
property OnDisConnected: TNotifyEvent read FOnDisConnected write FOnDisConnected;
|
|
|
|
// procedure ProcessSocketIORequest(const strmRequest: TStream);
|
|
property OnSocketIOMsg : TSocketIOMsg read FOnSocketIOMsg write FOnSocketIOMsg;
|
|
property OnSocketIOJson : TSocketIOMsg read FOnSocketIOJson write FOnSocketIOJson;
|
|
property OnSocketIOEvent: TSocketIOMsg read FOnSocketIOEvent write FOnSocketIOEvent;
|
|
end;
|
|
*)
|
|
|
|
TWSThreadList = class(TThreadList)
|
|
public
|
|
function Count: Integer;
|
|
end;
|
|
|
|
TIdWebsocketMultiReadThread = class(TThread)
|
|
private
|
|
class var FInstance: TIdWebsocketMultiReadThread;
|
|
protected
|
|
FReadTimeout: Integer;
|
|
FTempHandle: THandle;
|
|
FPendingBreak: Boolean;
|
|
Freadset, Fexceptionset: TFDSet;
|
|
Finterval: TTimeVal;
|
|
procedure InitSpecialEventSocket;
|
|
procedure ResetSpecialEventSocket;
|
|
procedure BreakSelectWait;
|
|
protected
|
|
FChannels: TThreadList;
|
|
FReconnectlist: TWSThreadList;
|
|
FReconnectThread: TIdWebsocketQueueThread;
|
|
procedure ReadFromAllChannels;
|
|
procedure PingAllChannels;
|
|
|
|
procedure Execute; override;
|
|
public
|
|
procedure AfterConstruction;override;
|
|
destructor Destroy; override;
|
|
|
|
procedure Terminate;
|
|
|
|
procedure AddClient (aChannel: TIdHTTPWebsocketClient);
|
|
procedure RemoveClient(aChannel: TIdHTTPWebsocketClient);
|
|
|
|
property ReadTimeout: Integer read FReadTimeout write FReadTimeout default 5000;
|
|
|
|
class function Instance: TIdWebsocketMultiReadThread;
|
|
class procedure RemoveInstance(aForced: boolean = false);
|
|
end;
|
|
|
|
//async process data
|
|
TIdWebsocketDispatchThread = class(TIdWebsocketQueueThread)
|
|
private
|
|
class var FInstance: TIdWebsocketDispatchThread;
|
|
public
|
|
class function Instance: TIdWebsocketDispatchThread;
|
|
class procedure RemoveInstance(aForced: boolean = false);
|
|
end;
|
|
|
|
implementation
|
|
|
|
uses
|
|
IdCoderMIME, SysUtils, Math, IdException, IdStackConsts, IdStack,
|
|
IdStackBSDBase, IdGlobal, Windows, StrUtils, DateUtils;
|
|
|
|
var
|
|
GUnitFinalized: Boolean = false;
|
|
|
|
//type
|
|
// TAnonymousThread = class(TThread)
|
|
// protected
|
|
// FThreadProc: TThreadProcedure;
|
|
// procedure Execute; override;
|
|
// public
|
|
// constructor Create(AThreadProc: TThreadProcedure);
|
|
// end;
|
|
|
|
//procedure CreateAnonymousThread(AThreadProc: TThreadProcedure);
|
|
//begin
|
|
// TAnonymousThread.Create(AThreadProc);
|
|
//end;
|
|
|
|
{ TAnonymousThread }
|
|
|
|
//constructor TAnonymousThread.Create(AThreadProc: TThreadProcedure);
|
|
//begin
|
|
// FThreadProc := AThreadProc;
|
|
// FreeOnTerminate := True;
|
|
// inherited Create(False {direct start});
|
|
//end;
|
|
//
|
|
//procedure TAnonymousThread.Execute;
|
|
//begin
|
|
// if Assigned(FThreadProc) then
|
|
// FThreadProc();
|
|
//end;
|
|
|
|
{ TIdHTTPWebsocketClient }
|
|
|
|
procedure TIdHTTPWebsocketClient.AfterConstruction;
|
|
begin
|
|
inherited;
|
|
FHash := TIdHashSHA1.Create;
|
|
|
|
IOHandler := TIdIOHandlerWebsocket.Create(nil);
|
|
IOHandler.UseNagle := False;
|
|
ManagedIOHandler := True;
|
|
|
|
FSocketIO := TIdSocketIOHandling_Ext.Create;
|
|
// FHeartBeat := TTimer.Create(nil);
|
|
// FHeartBeat.Enabled := False;
|
|
// FHeartBeat.OnTimer := HeartBeatTimer;
|
|
|
|
FWriteTimeout := 2 * 1000;
|
|
ConnectTimeout := 2000;
|
|
end;
|
|
|
|
procedure TIdHTTPWebsocketClient.AsyncDispatchEvent(const aEvent: TStream);
|
|
var
|
|
strmevent: TMemoryStream;
|
|
begin
|
|
if not Assigned(OnBinData) then Exit;
|
|
|
|
strmevent := TMemoryStream.Create;
|
|
strmevent.CopyFrom(aEvent, aEvent.Size);
|
|
|
|
//events during dispatch? channel is busy so offload event dispatching to different thread!
|
|
TIdWebsocketDispatchThread.Instance.QueueEvent(
|
|
procedure
|
|
begin
|
|
if Assigned(OnBinData) then
|
|
OnBinData(strmevent);
|
|
strmevent.Free;
|
|
end);
|
|
end;
|
|
|
|
procedure TIdHTTPWebsocketClient.AsyncDispatchEvent(const aEvent: string);
|
|
begin
|
|
{$IFDEF DEBUG_WS}
|
|
if DebugHook <> 0 then
|
|
OutputDebugString(PChar('AsyncDispatchEvent: ' + aEvent) );
|
|
{$ENDIF}
|
|
|
|
//if not Assigned(OnTextData) then Exit;
|
|
//events during dispatch? channel is busy so offload event dispatching to different thread!
|
|
TIdWebsocketDispatchThread.Instance.QueueEvent(
|
|
procedure
|
|
begin
|
|
if FSocketIOCompatible then
|
|
FSocketIO.ProcessSocketIORequest(FSocketIOContext as TSocketIOContext, aEvent)
|
|
else if Assigned(OnTextData) then
|
|
OnTextData(aEvent);
|
|
end);
|
|
end;
|
|
|
|
function TIdHTTPWebsocketClient.CheckConnection: Boolean;
|
|
begin
|
|
Result := False;
|
|
try
|
|
if (IOHandler <> nil) and
|
|
not IOHandler.ClosedGracefully and
|
|
IOHandler.Connected then
|
|
begin
|
|
IOHandler.CheckForDisconnect(True{error}, True{ignore buffer, check real connection});
|
|
Result := True; //ok if we reach here
|
|
end;
|
|
except
|
|
on E:Exception do
|
|
begin
|
|
//clear inputbuffer, otherwise it stays connected :(
|
|
// if (IOHandler <> nil) then
|
|
// IOHandler.Clear;
|
|
Disconnect(False);
|
|
if Assigned(OnDisConnected) then
|
|
OnDisConnected(Self);
|
|
end;
|
|
end;
|
|
end;
|
|
|
|
procedure TIdHTTPWebsocketClient.Connect;
|
|
begin
|
|
Lock;
|
|
try
|
|
if Connected then
|
|
begin
|
|
TryUpgradeToWebsocket;
|
|
Exit;
|
|
end;
|
|
|
|
//FHeartBeat.Enabled := True;
|
|
if SocketIOCompatible and
|
|
not FSocketIOConnectBusy then
|
|
begin
|
|
//FSocketIOConnectBusy := True;
|
|
//try
|
|
TryUpgradeToWebsocket; //socket.io connects using HTTP, so no seperate .Connect needed (only gives Connection closed gracefully exceptions because of new http command)
|
|
//finally
|
|
// FSocketIOConnectBusy := False;
|
|
//end;
|
|
end
|
|
else
|
|
begin
|
|
//clear inputbuffer, otherwise it can't connect :(
|
|
if (IOHandler <> nil) then IOHandler.Clear;
|
|
inherited Connect;
|
|
end;
|
|
finally
|
|
UnLock;
|
|
end;
|
|
end;
|
|
|
|
procedure TIdHTTPWebsocketClient.ConnectAsync;
|
|
begin
|
|
TIdWebsocketMultiReadThread.Instance.AddClient(Self);
|
|
end;
|
|
|
|
destructor TIdHTTPWebsocketClient.Destroy;
|
|
//var tmr: TObject;
|
|
begin
|
|
// tmr := FHeartBeat;
|
|
// FHeartBeat := nil;
|
|
// TThread.Queue(nil, //otherwise free in other thread than created
|
|
// procedure
|
|
// begin
|
|
//FHeartBeat.Free;
|
|
// tmr.Free;
|
|
// end);
|
|
|
|
TIdWebsocketMultiReadThread.Instance.RemoveClient(Self);
|
|
FSocketIO.Free;
|
|
FHash.Free;
|
|
inherited;
|
|
end;
|
|
|
|
procedure TIdHTTPWebsocketClient.DisConnect(ANotifyPeer: Boolean);
|
|
begin
|
|
if not SocketIOCompatible and
|
|
( (IOHandler <> nil) and not IOHandler.IsWebsocket)
|
|
then
|
|
TIdWebsocketMultiReadThread.Instance.RemoveClient(Self);
|
|
|
|
if ANotifyPeer and SocketIOCompatible then
|
|
FSocketIO.WriteDisConnect(FSocketIOContext as TSocketIOContext)
|
|
else
|
|
FSocketIO.FreeConnection(FSocketIOContext as TSocketIOContext);
|
|
|
|
// IInterface(FSocketIOContext)._Release;
|
|
FSocketIOContext := nil;
|
|
|
|
Lock;
|
|
try
|
|
if IOHandler <> nil then
|
|
begin
|
|
IOHandler.Lock;
|
|
try
|
|
IOHandler.IsWebsocket := False;
|
|
|
|
inherited DisConnect(ANotifyPeer);
|
|
//clear buffer, other still "connected"
|
|
IOHandler.Clear;
|
|
|
|
//IOHandler.Free;
|
|
//IOHandler := TIdIOHandlerWebsocket.Create(nil);
|
|
finally
|
|
IOHandler.Unlock;
|
|
end;
|
|
end;
|
|
finally
|
|
UnLock;
|
|
end;
|
|
end;
|
|
|
|
function TIdHTTPWebsocketClient.GetIOHandlerWS: TIdIOHandlerWebsocket;
|
|
begin
|
|
// if inherited IOHandler is TIdIOHandlerWebsocket then
|
|
Result := inherited IOHandler as TIdIOHandlerWebsocket
|
|
// else
|
|
// Assert(False);
|
|
end;
|
|
|
|
function TIdHTTPWebsocketClient.GetSocketIO: TIdSocketIOHandling;
|
|
begin
|
|
Result := FSocketIO;
|
|
end;
|
|
|
|
(*
|
|
procedure TIdHTTPWebsocketClient.HeartBeatTimer(Sender: TObject);
|
|
begin
|
|
FHeartBeat.Enabled := False;
|
|
FSocketIO.Lock;
|
|
try
|
|
try
|
|
if (IOHandler <> nil) and
|
|
not IOHandler.ClosedGracefully and
|
|
IOHandler.Connected and
|
|
(FSocketIOContext <> nil) then
|
|
begin
|
|
FSocketIO.WritePing(FSocketIOContext as TSocketIOContext); //heartbeat socket.io message
|
|
end
|
|
//retry re-connect
|
|
else
|
|
try
|
|
//clear inputbuffer, otherwise it can't connect :(
|
|
if (IOHandler <> nil) then
|
|
IOHandler.Clear;
|
|
|
|
Self.ConnectTimeout := 100; //100ms otherwise GUI hangs too much -> todo: do it in background thread!
|
|
if not Connected then
|
|
Self.Connect;
|
|
TryUpgradeToWebsocket;
|
|
except
|
|
//skip, just retried
|
|
end;
|
|
except on E:Exception do
|
|
begin
|
|
//clear inputbuffer, otherwise it stays connected :(
|
|
if (IOHandler <> nil) then
|
|
IOHandler.Clear;
|
|
Disconnect(False);
|
|
|
|
if Assigned(OnDisConnected) then
|
|
OnDisConnected(Self);
|
|
try
|
|
raise EIdException.Create('Connection lost from ' +
|
|
Format('ws://%s:%d/%s', [Host, Port, WSResourceName]) +
|
|
' - Error: ' + e.Message);
|
|
except
|
|
//eat, no error popup!
|
|
end;
|
|
end;
|
|
end;
|
|
finally
|
|
FSocketIO.UnLock;
|
|
FHeartBeat.Enabled := True; //always enable: in case of disconnect it will re-connect
|
|
end;
|
|
end;
|
|
*)
|
|
|
|
function TIdHTTPWebsocketClient.TryConnect: Boolean;
|
|
begin
|
|
Lock;
|
|
try
|
|
try
|
|
if Connected then Exit(True);
|
|
|
|
Connect;
|
|
Result := Connected;
|
|
//if Result then
|
|
// Result := TryUpgradeToWebsocket already done in connect
|
|
except
|
|
Result := False;
|
|
end
|
|
finally
|
|
UnLock;
|
|
end;
|
|
end;
|
|
|
|
function TIdHTTPWebsocketClient.TryLock: Boolean;
|
|
begin
|
|
Result := System.TMonitor.TryEnter(Self);
|
|
end;
|
|
|
|
function TIdHTTPWebsocketClient.TryUpgradeToWebsocket: Boolean;
|
|
var
|
|
sError: string;
|
|
begin
|
|
try
|
|
FSocketIOConnectBusy := True;
|
|
Lock;
|
|
try
|
|
if (IOHandler <> nil) and IOHandler.IsWebsocket then Exit(True);
|
|
|
|
InternalUpgradeToWebsocket(False{no raise}, sError);
|
|
Result := (sError = '');
|
|
finally
|
|
FSocketIOConnectBusy := False;
|
|
UnLock;
|
|
end;
|
|
except
|
|
Result := False;
|
|
end;
|
|
end;
|
|
|
|
procedure TIdHTTPWebsocketClient.UnLock;
|
|
begin
|
|
System.TMonitor.Exit(Self);
|
|
end;
|
|
|
|
procedure TIdHTTPWebsocketClient.UpgradeToWebsocket;
|
|
var
|
|
sError: string;
|
|
begin
|
|
Lock;
|
|
try
|
|
if IOHandler = nil then
|
|
Connect
|
|
else if not IOHandler.IsWebsocket then
|
|
InternalUpgradeToWebsocket(True{raise}, sError);
|
|
finally
|
|
UnLock;
|
|
end;
|
|
end;
|
|
|
|
procedure TIdHTTPWebsocketClient.InternalUpgradeToWebsocket(aRaiseException: Boolean; out aFailedReason: string);
|
|
var
|
|
sURL: string;
|
|
strmResponse: TMemoryStream;
|
|
i: Integer;
|
|
sKey, sResponseKey: string;
|
|
sSocketioextended: string;
|
|
bLocked: boolean;
|
|
begin
|
|
Assert((IOHandler = nil) or not IOHandler.IsWebsocket);
|
|
//remove from thread during connection handling
|
|
TIdWebsocketMultiReadThread.Instance.RemoveClient(Self);
|
|
|
|
bLocked := False;
|
|
strmResponse := TMemoryStream.Create;
|
|
Self.Lock;
|
|
try
|
|
//reset pending data
|
|
if IOHandler <> nil then
|
|
begin
|
|
IOHandler.Lock;
|
|
bLocked := True;
|
|
if IOHandler.IsWebsocket then Exit;
|
|
IOHandler.Clear;
|
|
end;
|
|
|
|
//special socket.io handling, see https://github.com/LearnBoost/socket.io-spec
|
|
if SocketIOCompatible then
|
|
begin
|
|
Request.Clear;
|
|
Request.Connection := 'keep-alive';
|
|
{$IFDEF WEBSOCKETSSL}
|
|
sURL := Format('https://%s:%d/socket.io/1/', [Host, Port]);
|
|
{$ELSE}
|
|
sURL := Format('http://%s:%d/socket.io/1/', [Host, Port]);
|
|
{$ENDIF}
|
|
strmResponse.Clear;
|
|
|
|
ReadTimeout := 5 * 1000;
|
|
//get initial handshake
|
|
Post(sURL, strmResponse, strmResponse);
|
|
if ResponseCode = 200 {OK} then
|
|
begin
|
|
//if not Connected then //reconnect
|
|
// Self.Connect;
|
|
strmResponse.Position := 0;
|
|
//The body of the response should contain the session id (sid) given to the client,
|
|
//followed by the heartbeat timeout, the connection closing timeout, and the list of supported transports separated by :
|
|
//4d4f185e96a7b:15:10:websocket,xhr-polling
|
|
with TStreamReader.Create(strmResponse) do
|
|
try
|
|
FSocketIOHandshakeResponse := ReadToEnd;
|
|
finally
|
|
Free;
|
|
end;
|
|
sKey := Copy(FSocketIOHandshakeResponse, 1, Pos(':', FSocketIOHandshakeResponse)-1);
|
|
sSocketioextended := 'socket.io/1/websocket/' + sKey;
|
|
WSResourceName := sSocketioextended;
|
|
end
|
|
else
|
|
begin
|
|
aFailedReason := Format('Initial socket.io handshake failed: "%d: %s"',[ResponseCode, ResponseText]);
|
|
if aRaiseException then
|
|
raise EIdWebSocketHandleError.Create(aFailedReason);
|
|
end;
|
|
end;
|
|
|
|
Request.Clear;
|
|
Request.CustomHeaders.Clear;
|
|
strmResponse.Clear;
|
|
//http://www.websocket.org/aboutwebsocket.html
|
|
(* GET ws://echo.websocket.org/?encoding=text HTTP/1.1
|
|
Origin: http://websocket.org
|
|
Cookie: __utma=99as
|
|
Connection: Upgrade
|
|
Host: echo.websocket.org
|
|
Sec-WebSocket-Key: uRovscZjNol/umbTt5uKmw==
|
|
Upgrade: websocket
|
|
Sec-WebSocket-Version: 13 *)
|
|
|
|
//Connection: Upgrade
|
|
Request.Connection := 'Upgrade';
|
|
//Upgrade: websocket
|
|
Request.CustomHeaders.Add('Upgrade:websocket');
|
|
|
|
//Sec-WebSocket-Key
|
|
sKey := '';
|
|
for i := 1 to 16 do
|
|
sKey := sKey + Char(Random(127-32) + 32);
|
|
//base64 encoded
|
|
sKey := TIdEncoderMIME.EncodeString(sKey);
|
|
Request.CustomHeaders.AddValue('Sec-WebSocket-Key', sKey);
|
|
//Sec-WebSocket-Version: 13
|
|
Request.CustomHeaders.AddValue('Sec-WebSocket-Version', '13');
|
|
Request.CustomHeaders.AddValue('Sec-WebSocket-Extensions', '');
|
|
|
|
Request.CacheControl := 'no-cache';
|
|
Request.Pragma := 'no-cache';
|
|
Request.Host := Format('Host:%s:%d',[Host,Port]);
|
|
Request.CustomHeaders.AddValue('Origin', Format('http://%s:%d',[Host,Port]) );
|
|
//ws://host:port/<resourcename>
|
|
//about resourcename, see: http://dev.w3.org/html5/websockets/ "Parsing WebSocket URLs"
|
|
//sURL := Format('ws://%s:%d/%s', [Host, Port, WSResourceName]);
|
|
sURL := Format('http://%s:%d/%s', [Host, Port, WSResourceName]);
|
|
ReadTimeout := Max(5 * 1000, ReadTimeout);
|
|
|
|
{ voorbeeld:
|
|
GET http://localhost:9222/devtools/page/642D7227-148E-47C2-B97A-E00850E3AFA3 HTTP/1.1
|
|
Upgrade: websocket
|
|
Connection: Upgrade
|
|
Host: localhost:9222
|
|
Origin: http://localhost:9222
|
|
Pragma: no-cache
|
|
Cache-Control: no-cache
|
|
Sec-WebSocket-Key: HIqoAdZkxnWWH9dnVPyW7w==
|
|
Sec-WebSocket-Version: 13
|
|
Sec-WebSocket-Extensions: x-webkit-deflate-frame
|
|
User-Agent: Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/27.0.1453.116 Safari/537.36
|
|
Cookie: __utma=1.2040118404.1366961318.1366961318.1366961318.1; __utmc=1; __utmz=1.1366961318.1.1.utmcsr=(direct)|utmccn=(direct)|utmcmd=(none); deviceorder=0123456789101112; MultiTouchEnabled=false; device=3; network_type=0
|
|
}
|
|
if SocketIOCompatible then
|
|
begin
|
|
//1st, try to do socketio specific connection
|
|
Response.Clear;
|
|
Response.ResponseCode := 0;
|
|
Request.URL := sURL;
|
|
Request.Method := Id_HTTPMethodGet;
|
|
Request.Source := nil;
|
|
Response.ContentStream := strmResponse;
|
|
PrepareRequest(Request);
|
|
|
|
//connect and upgrade
|
|
ConnectToHost(Request, Response);
|
|
|
|
//check upgrade succesfull
|
|
CheckForGracefulDisconnect(True);
|
|
CheckConnected;
|
|
Assert(Self.Connected);
|
|
|
|
if Response.ResponseCode = 0 then
|
|
Response.ResponseText := Response.ResponseText
|
|
else if Response.ResponseCode <> 200{ok} then
|
|
begin
|
|
aFailedReason := Format('Error while upgrading: "%d: %s"',[ResponseCode, ResponseText]);
|
|
if aRaiseException then
|
|
raise EIdWebSocketHandleError.Create(aFailedReason)
|
|
else
|
|
Exit;
|
|
end;
|
|
|
|
//2nd, get websocket response
|
|
Response.Clear;
|
|
if IOHandler.CheckForDataOnSource(ReadTimeout) then
|
|
begin
|
|
Self.FHTTPProto.RetrieveHeaders(MaxHeaderLines);
|
|
//Response.RawHeaders.Text := IOHandler.InputBufferAsString();
|
|
Response.ResponseText := Response.RawHeaders.Text;
|
|
end;
|
|
end
|
|
else
|
|
begin
|
|
Get(sURL, strmResponse, [101]);
|
|
end;
|
|
|
|
//http://www.websocket.org/aboutwebsocket.html
|
|
(* HTTP/1.1 101 WebSocket Protocol Handshake
|
|
Date: Fri, 10 Feb 2012 17:38:18 GMT
|
|
Connection: Upgrade
|
|
Server: Kaazing Gateway
|
|
Upgrade: WebSocket
|
|
Access-Control-Allow-Origin: http://websocket.org
|
|
Access-Control-Allow-Credentials: true
|
|
Sec-WebSocket-Accept: rLHCkw/SKsO9GAH/ZSFhBATDKrU=
|
|
Access-Control-Allow-Headers: content-type *)
|
|
|
|
//'HTTP/1.1 101 Switching Protocols'
|
|
if Response.ResponseCode <> 101 then
|
|
begin
|
|
aFailedReason := Format('Error while upgrading: "%d: %s"',[Response.ResponseCode, Response.ResponseText]);
|
|
if aRaiseException then
|
|
raise EIdWebSocketHandleError.Create(aFailedReason)
|
|
else
|
|
Exit;
|
|
end;
|
|
//connection: upgrade
|
|
if not SameText(Response.Connection, 'upgrade') then
|
|
begin
|
|
aFailedReason := Format('Connection not upgraded: "%s"',[Response.Connection]);
|
|
if aRaiseException then
|
|
raise EIdWebSocketHandleError.Create(aFailedReason)
|
|
else
|
|
Exit;
|
|
end;
|
|
//upgrade: websocket
|
|
if not SameText(Response.RawHeaders.Values['upgrade'], 'websocket') then
|
|
begin
|
|
aFailedReason := Format('Not upgraded to websocket: "%s"',[Response.RawHeaders.Values['upgrade']]);
|
|
if aRaiseException then
|
|
raise EIdWebSocketHandleError.Create(aFailedReason)
|
|
else
|
|
Exit;
|
|
end;
|
|
//check handshake key
|
|
sResponseKey := Trim(sKey) + //... "minus any leading and trailing whitespace"
|
|
'258EAFA5-E914-47DA-95CA-C5AB0DC85B11'; //special GUID
|
|
sResponseKey := TIdEncoderMIME.EncodeBytes( //Base64
|
|
FHash.HashString(sResponseKey) ); //SHA1
|
|
if not SameText(Response.RawHeaders.Values['sec-websocket-accept'], sResponseKey) then
|
|
begin
|
|
aFailedReason := 'Invalid key handshake';
|
|
if aRaiseException then
|
|
raise EIdWebSocketHandleError.Create(aFailedReason)
|
|
else
|
|
Exit;
|
|
end;
|
|
|
|
//upgrade succesful
|
|
IOHandler.IsWebsocket := True;
|
|
aFailedReason := '';
|
|
Assert(Connected);
|
|
|
|
if SocketIOCompatible then
|
|
begin
|
|
FSocketIOContext := TSocketIOContext.Create(Self);
|
|
(FSocketIOContext as TSocketIOContext).ConnectSend := True; //connect already send via url? GET /socket.io/1/websocket/9elrbEFqiimV29QAM6T-
|
|
FSocketIO.WriteConnect(FSocketIOContext as TSocketIOContext);
|
|
end;
|
|
|
|
//always read the data! (e.g. RO use override of AsyncDispatchEvent to process data)
|
|
//if Assigned(OnBinData) or Assigned(OnTextData) then
|
|
finally
|
|
Request.Clear;
|
|
Request.CustomHeaders.Clear;
|
|
strmResponse.Free;
|
|
|
|
if bLocked and (IOHandler <> nil) then
|
|
IOHandler.Unlock;
|
|
Unlock;
|
|
|
|
//add to thread for auto retry/reconnect
|
|
if not Self.NoAsyncRead then
|
|
TIdWebsocketMultiReadThread.Instance.AddClient(Self);
|
|
end;
|
|
|
|
//default 2s write timeout
|
|
//http://msdn.microsoft.com/en-us/library/windows/desktop/ms740532(v=vs.85).aspx
|
|
if Connected then
|
|
Self.IOHandler.Binding.SetSockOpt(SOL_SOCKET, SO_SNDTIMEO, Self.WriteTimeout);
|
|
end;
|
|
|
|
procedure TIdHTTPWebsocketClient.Lock;
|
|
begin
|
|
System.TMonitor.Enter(Self);
|
|
end;
|
|
|
|
function TIdHTTPWebsocketClient.MakeImplicitClientHandler: TIdIOHandler;
|
|
begin
|
|
Result := TIdIOHandlerWebsocket.Create(nil);
|
|
end;
|
|
|
|
procedure TIdHTTPWebsocketClient.Ping;
|
|
var
|
|
ws: TIdIOHandlerWebsocket;
|
|
begin
|
|
if TryLock then
|
|
try
|
|
ws := IOHandler as TIdIOHandlerWebsocket;
|
|
ws.LastPingTime := Now;
|
|
|
|
//socket.io?
|
|
if SocketIOCompatible and ws.IsWebsocket then
|
|
begin
|
|
FSocketIO.Lock;
|
|
try
|
|
if (FSocketIOContext <> nil) then
|
|
FSocketIO.WritePing(FSocketIOContext as TSocketIOContext); //heartbeat socket.io message
|
|
finally
|
|
FSocketIO.UnLock;
|
|
end
|
|
end
|
|
//only websocket?
|
|
else if not SocketIOCompatible and ws.IsWebsocket then
|
|
begin
|
|
if ws.TryLock then
|
|
try
|
|
ws.WriteData(nil, wdcPing);
|
|
finally
|
|
ws.Unlock;
|
|
end;
|
|
end;
|
|
finally
|
|
Unlock;
|
|
end;
|
|
end;
|
|
|
|
procedure TIdHTTPWebsocketClient.ReadAndProcessData;
|
|
var
|
|
strmEvent: TMemoryStream;
|
|
swstext: utf8string;
|
|
wscode: TWSDataCode;
|
|
begin
|
|
strmEvent := nil;
|
|
IOHandler.Lock;
|
|
try
|
|
//try to process all events
|
|
while IOHandler.HasData or
|
|
(IOHandler.Connected and
|
|
IOHandler.Readable(0)) do //has some data
|
|
begin
|
|
if strmEvent = nil then
|
|
strmEvent := TMemoryStream.Create;
|
|
strmEvent.Clear;
|
|
|
|
//first is the data type TWSDataType(text or bin), but is ignore/not needed
|
|
wscode := TWSDataCode(IOHandler.ReadLongWord);
|
|
if not (wscode in [wdcText, wdcBinary, wdcPing, wdcPong]) then
|
|
begin
|
|
//Sleep(0);
|
|
Continue;
|
|
end;
|
|
|
|
//next the size + data = stream
|
|
IOHandler.ReadStream(strmEvent);
|
|
|
|
//ignore ping/pong messages
|
|
if wscode in [wdcPing, wdcPong] then Continue;
|
|
|
|
//fire event
|
|
//offload event dispatching to different thread! otherwise deadlocks possible? (do to synchronize)
|
|
strmEvent.Position := 0;
|
|
if wscode = wdcBinary then
|
|
begin
|
|
AsyncDispatchEvent(strmEvent);
|
|
end
|
|
else if wscode = wdcText then
|
|
begin
|
|
SetLength(swstext, strmEvent.Size);
|
|
strmEvent.Read(swstext[1], strmEvent.Size);
|
|
if swstext <> '' then
|
|
begin
|
|
AsyncDispatchEvent(string(swstext));
|
|
end;
|
|
end;
|
|
end;
|
|
finally
|
|
IOHandler.Unlock;
|
|
strmEvent.Free;
|
|
end;
|
|
end;
|
|
|
|
procedure TIdHTTPWebsocketClient.ResetChannel;
|
|
//var
|
|
// ws: TIdIOHandlerWebsocket;
|
|
begin
|
|
// TIdWebsocketMultiReadThread.Instance.RemoveClient(Self); keep for reconnect
|
|
|
|
if IOHandler <> nil then
|
|
begin
|
|
IOHandler.InputBuffer.Clear;
|
|
IOHandler.BusyUpgrading := False;
|
|
IOHandler.IsWebsocket := False;
|
|
//close/disconnect internal socket
|
|
//ws := IndyClient.IOHandler as TIdIOHandlerWebsocket;
|
|
//ws.Close; done in disconnect below
|
|
end;
|
|
Disconnect(False);
|
|
end;
|
|
|
|
procedure TIdHTTPWebsocketClient.SetIOHandlerWS(
|
|
const Value: TIdIOHandlerWebsocket);
|
|
begin
|
|
SetIOHandler(Value);
|
|
end;
|
|
|
|
procedure TIdHTTPWebsocketClient.SetOnData(const Value: TWebsocketMsgBin);
|
|
begin
|
|
// if not Assigned(Value) and not Assigned(FOnTextData) then
|
|
// TIdWebsocketMultiReadThread.Instance.RemoveClient(Self);
|
|
|
|
FOnData := Value;
|
|
|
|
// if Assigned(Value) and
|
|
// (Self.IOHandler as TIdIOHandlerWebsocket).IsWebsocket
|
|
// then
|
|
// TIdWebsocketMultiReadThread.Instance.AddClient(Self);
|
|
end;
|
|
|
|
procedure TIdHTTPWebsocketClient.SetOnTextData(const Value: TWebsocketMsgText);
|
|
begin
|
|
// if not Assigned(Value) and not Assigned(FOnData) then
|
|
// TIdWebsocketMultiReadThread.Instance.RemoveClient(Self);
|
|
|
|
FOnTextData := Value;
|
|
|
|
// if Assigned(Value) and
|
|
// (Self.IOHandler as TIdIOHandlerWebsocket).IsWebsocket
|
|
// then
|
|
// TIdWebsocketMultiReadThread.Instance.AddClient(Self);
|
|
end;
|
|
|
|
procedure TIdHTTPWebsocketClient.SetWriteTimeout(const Value: Integer);
|
|
begin
|
|
FWriteTimeout := Value;
|
|
if Connected then
|
|
Self.IOHandler.Binding.SetSockOpt(SOL_SOCKET, SO_SNDTIMEO, Self.WriteTimeout);
|
|
end;
|
|
|
|
{ TIdHTTPSocketIOClient }
|
|
|
|
(*
|
|
procedure TIdHTTPSocketIOClient_old.AfterConstruction;
|
|
begin
|
|
inherited;
|
|
SocketIOCompatible := True;
|
|
|
|
FHeartBeat := TTimer.Create(nil);
|
|
FHeartBeat.Enabled := False;
|
|
FHeartBeat.OnTimer := HeartBeatTimer;
|
|
end;
|
|
|
|
procedure TIdHTTPSocketIOClient_old.AsyncDispatchEvent(const aEvent: string);
|
|
begin
|
|
//https://github.com/LearnBoost/socket.io-spec
|
|
if StartsStr('1:', aEvent) then //connect
|
|
Exit;
|
|
if aEvent = '2::' then //ping, heartbeat
|
|
Exit;
|
|
inherited AsyncDispatchEvent(aEvent);
|
|
end;
|
|
|
|
procedure TIdHTTPSocketIOClient_old.AutoConnect;
|
|
begin
|
|
//for now: timer in mainthread?
|
|
TThread.Queue(nil,
|
|
procedure
|
|
begin
|
|
FHeartBeat.Interval := 5 * 1000;
|
|
FHeartBeat.Enabled := True;
|
|
end);
|
|
end;
|
|
|
|
destructor TIdHTTPSocketIOClient_old.Destroy;
|
|
var tmr: TObject;
|
|
begin
|
|
tmr := FHeartBeat;
|
|
TThread.Queue(nil, //otherwise free in other thread than created
|
|
procedure
|
|
begin
|
|
//FHeartBeat.Free;
|
|
tmr.Free;
|
|
end);
|
|
inherited;
|
|
end;
|
|
|
|
procedure TIdHTTPSocketIOClient_old.HeartBeatTimer(Sender: TObject);
|
|
begin
|
|
FHeartBeat.Enabled := False;
|
|
try
|
|
try
|
|
if (IOHandler <> nil) and
|
|
not IOHandler.ClosedGracefully and
|
|
IOHandler.Connected then
|
|
begin
|
|
IOHandler.Write('2:::'); //heartbeat socket.io message
|
|
end
|
|
//retry connect
|
|
else
|
|
try
|
|
//clear inputbuffer, otherwise it can't connect :(
|
|
if (IOHandler <> nil) and
|
|
not IOHandler.InputBufferIsEmpty
|
|
then
|
|
IOHandler.DiscardAll;
|
|
|
|
Self.Connect;
|
|
TryUpgradeToWebsocket;
|
|
except
|
|
//skip, just retried
|
|
end;
|
|
except
|
|
//clear inputbuffer, otherwise it stays connected :(
|
|
if (IOHandler <> nil) and
|
|
not IOHandler.InputBufferIsEmpty
|
|
then
|
|
IOHandler.DiscardAll;
|
|
|
|
if Assigned(OnDisConnected) then
|
|
OnDisConnected(Self);
|
|
try
|
|
raise EIdException.Create('Connection lost from ' + Format('ws://%s:%d/%s', [Host, Port, WSResourceName]));
|
|
except
|
|
//eat, no error popup!
|
|
end;
|
|
end;
|
|
finally
|
|
FHeartBeat.Enabled := True; //always enable: in case of disconnect it will re-connect
|
|
end;
|
|
end;
|
|
|
|
procedure TIdHTTPSocketIOClient_old.InternalUpgradeToWebsocket(
|
|
aRaiseException: Boolean; out aFailedReason: string);
|
|
var
|
|
stimeout: string;
|
|
begin
|
|
inherited InternalUpgradeToWebsocket(aRaiseException, aFailedReason);
|
|
|
|
if (aFailedReason = '') and
|
|
(IOHandler as TIdIOHandlerWebsocket).IsWebsocket then
|
|
begin
|
|
stimeout := Copy(SocketIOHandshakeResponse, Pos(':', SocketIOHandshakeResponse)+1, Length(SocketIOHandshakeResponse));
|
|
stimeout := Copy(stimeout, 1, Pos(':', stimeout)-1);
|
|
if stimeout <> '' then
|
|
begin
|
|
//if (FHeartBeat.Interval > 0) then
|
|
//for now: timer in mainthread?
|
|
TThread.Queue(nil,
|
|
procedure
|
|
begin
|
|
FHeartBeat.Interval := StrToIntDef(stimeout, 15) * 1000;
|
|
if FHeartBeat.Interval >= 15000 then
|
|
//FHeartBeat.Interval := FHeartBeat.Interval - 5000
|
|
FHeartBeat.Interval := 5000
|
|
else if FHeartBeat.Interval >= 5000 then
|
|
FHeartBeat.Interval := FHeartBeat.Interval - 2000;
|
|
|
|
FHeartBeat.Enabled := (FHeartBeat.Interval > 0);
|
|
end);
|
|
end;
|
|
|
|
if Assigned(OnConnected) then
|
|
OnConnected(Self);
|
|
end;
|
|
end;
|
|
|
|
)
|
|
procedure TIdHTTPSocketIOClient_old.ProcessSocketIORequest(
|
|
const strmRequest: TStream);
|
|
|
|
function __ReadToEnd: string;
|
|
var
|
|
utf8: TBytes;
|
|
ilength: Integer;
|
|
begin
|
|
Result := '';
|
|
ilength := strmRequest.Size - strmRequest.Position;
|
|
SetLength(utf8, ilength);
|
|
strmRequest.Read(utf8[0], ilength);
|
|
Result := TEncoding.UTF8.GetString(utf8);
|
|
end;
|
|
|
|
function __GetSocketIOPart(const aData: string; aIndex: Integer): string;
|
|
var ipos: Integer;
|
|
i: Integer;
|
|
begin
|
|
//'5::/chat:{"name":"hi!"}'
|
|
//0 = 5
|
|
//1 =
|
|
//2 = /chat
|
|
//3 = {"name":"hi!"}
|
|
ipos := 0;
|
|
for i := 0 to aIndex-1 do
|
|
ipos := PosEx(':', aData, ipos+1);
|
|
if ipos >= 0 then
|
|
begin
|
|
Result := Copy(aData, ipos+1, Length(aData));
|
|
if aIndex < 3 then // /chat:{"name":"hi!"}'
|
|
begin
|
|
ipos := PosEx(':', Result, 1); // :{"name":"hi!"}'
|
|
if ipos > 0 then
|
|
Result := Copy(Result, 1, ipos-1); // /chat
|
|
end;
|
|
end;
|
|
end;
|
|
|
|
var
|
|
str, smsg, schannel, sdata: string;
|
|
imsg: Integer;
|
|
// bCallback: Boolean;
|
|
begin
|
|
str := __ReadToEnd;
|
|
if str = '' then Exit;
|
|
|
|
//5:1+:/chat:test
|
|
smsg := __GetSocketIOPart(str, 1);
|
|
imsg := 0;
|
|
// bCallback := False;
|
|
if smsg <> '' then // 1+
|
|
begin
|
|
imsg := StrToIntDef(ReplaceStr(smsg,'+',''), 0); // 1
|
|
// bCallback := (Pos('+', smsg) > 1); //trailing +, e.g. 1+
|
|
end;
|
|
schannel := __GetSocketIOPart(str, 2); // /chat
|
|
sdata := __GetSocketIOPart(str, 3); // test
|
|
|
|
//(0) Disconnect
|
|
if StartsStr('0:', str) then
|
|
begin
|
|
schannel := __GetSocketIOPart(str, 2);
|
|
if schannel <> '' then
|
|
//todo: close channel
|
|
else
|
|
Self.Disconnect;
|
|
end
|
|
//(1) Connect
|
|
//'1::' [path] [query]
|
|
else if StartsStr('1:', str) then
|
|
begin
|
|
//todo: add channel/room to authorized channel/room list
|
|
Self.IOHandler.Write(str); //write same connect back, e.g. 1::/chat
|
|
end
|
|
//(2) Heartbeat
|
|
else if StartsStr('2:', str) then
|
|
begin
|
|
Self.IOHandler.Write(str); //write same connect back, e.g. 2::
|
|
end
|
|
//(3) Message (https://github.com/LearnBoost/socket.io-spec#3-message)
|
|
//'3:' [message id ('+')] ':' [message endpoint] ':' [data]
|
|
//3::/chat:hi
|
|
else if StartsStr('3:', str) then
|
|
begin
|
|
if Assigned(OnSocketIOMsg) then
|
|
OnSocketIOMsg(Self, sdata, imsg);
|
|
end
|
|
//(4) JSON Message
|
|
//'4:' [message id ('+')] ':' [message endpoint] ':' [json]
|
|
//4:1::{"a":"b"}
|
|
else if StartsStr('4:', str) then
|
|
begin
|
|
if Assigned(OnSocketIOJson) then
|
|
OnSocketIOJson(Self, sdata, imsg);
|
|
end
|
|
//(5) Event
|
|
//'5:' [message id ('+')] ':' [message endpoint] ':' [json encoded event]
|
|
//5::/chat:{"name":"my other event","args":[{"my":"data"}]}
|
|
//5:1+:/chat:{"name":"GetLocations","args":[""]}
|
|
else if StartsStr('5:', str) then
|
|
begin
|
|
if Assigned(OnSocketIOEvent) then
|
|
OnSocketIOEvent(Self, sdata, imsg);
|
|
end
|
|
//(6) ACK
|
|
//6::/news:1+["callback"]
|
|
//6:::1+["Response"]
|
|
//(7) Error
|
|
//(8) Noop
|
|
else if StartsStr('8:', str) then
|
|
begin
|
|
//nothing
|
|
end
|
|
else
|
|
raise Exception.CreateFmt('Unsupported data: "%s"', [str]);
|
|
end;
|
|
*)
|
|
|
|
{ TIdWebsocketMultiReadThread }
|
|
|
|
procedure TIdWebsocketMultiReadThread.AddClient(
|
|
aChannel: TIdHTTPWebsocketClient);
|
|
var l: TList;
|
|
begin
|
|
//Assert( (aChannel.IOHandler as TIdIOHandlerWebsocket).IsWebsocket, 'Channel is not a websocket');
|
|
if Self = nil then Exit;
|
|
if Self.Terminated then Exit;
|
|
|
|
l := FChannels.LockList;
|
|
try
|
|
//already exists?
|
|
if l.IndexOf(aChannel) >= 0 then Exit;
|
|
|
|
Assert(l.Count < 64, 'Max 64 connections can be handled by one read thread!'); //due to restrictions of the "select" API
|
|
l.Add(aChannel);
|
|
|
|
//trigger the "select" wait
|
|
BreakSelectWait;
|
|
finally
|
|
FChannels.UnlockList;
|
|
end;
|
|
end;
|
|
|
|
procedure TIdWebsocketMultiReadThread.AfterConstruction;
|
|
begin
|
|
inherited;
|
|
|
|
ReadTimeout := 5000;
|
|
|
|
FChannels := TThreadList.Create;
|
|
FillChar(Freadset, SizeOf(Freadset), 0);
|
|
FillChar(Fexceptionset, SizeOf(Fexceptionset), 0);
|
|
|
|
InitSpecialEventSocket;
|
|
end;
|
|
|
|
procedure TIdWebsocketMultiReadThread.BreakSelectWait;
|
|
var
|
|
//iResult: Integer;
|
|
LAddr: TSockAddrIn6;
|
|
begin
|
|
if FTempHandle = 0 then Exit;
|
|
|
|
FillChar(LAddr, SizeOf(LAddr), 0);
|
|
//Id_IPv4
|
|
with PSOCKADDR(@LAddr)^ do
|
|
begin
|
|
sin_family := Id_PF_INET4;
|
|
//dummy address and port
|
|
(GStack as TIdStackBSDBase).TranslateStringToTInAddr('0.0.0.0', sin_addr, Id_IPv4);
|
|
sin_port := htons(1);
|
|
end;
|
|
|
|
FPendingBreak := True;
|
|
|
|
//connect to non-existing address to stop "select" from waiting
|
|
//Note: this is some kind of "hack" because there is no nice way to stop it
|
|
//The only(?) other possibility is to make a "socket pair" and send a byte to it,
|
|
//but this requires a dynamic server socket (which can trigger a firewall
|
|
//exception/question popup in WindowsXP+)
|
|
//iResult :=
|
|
IdWinsock2.connect(FTempHandle, PSOCKADDR(@LAddr), SIZE_TSOCKADDRIN);
|
|
//non blocking socket, so will always result in "would block"!
|
|
// if (iResult <> Id_SOCKET_ERROR) or
|
|
// ( (GStack <> nil) and (GStack.WSGetLastError <> WSAEWOULDBLOCK) )
|
|
// then
|
|
// GStack.CheckForSocketError(iResult);
|
|
end;
|
|
|
|
destructor TIdWebsocketMultiReadThread.Destroy;
|
|
begin
|
|
if FReconnectThread <> nil then
|
|
begin
|
|
FReconnectThread.Terminate;
|
|
FReconnectThread.WaitFor;
|
|
FReconnectThread.Free;
|
|
end;
|
|
|
|
if FReconnectlist <> nil then
|
|
FReconnectlist.Free;
|
|
|
|
IdWinsock2.closesocket(FTempHandle);
|
|
FTempHandle := 0;
|
|
FChannels.Free;
|
|
inherited;
|
|
end;
|
|
|
|
procedure TIdWebsocketMultiReadThread.Execute;
|
|
begin
|
|
Self.NameThreadForDebugging(AnsiString(Self.ClassName));
|
|
|
|
while not Terminated do
|
|
begin
|
|
try
|
|
while not Terminated do
|
|
begin
|
|
ReadFromAllChannels;
|
|
PingAllChannels;
|
|
end;
|
|
except
|
|
//continue
|
|
end;
|
|
end;
|
|
end;
|
|
|
|
procedure TIdWebsocketMultiReadThread.InitSpecialEventSocket;
|
|
var
|
|
param: Cardinal;
|
|
iResult: Integer;
|
|
begin
|
|
if GStack = nil then Exit; //finalized?
|
|
|
|
//alloc socket
|
|
FTempHandle := GStack.NewSocketHandle(Id_SOCK_STREAM, Id_IPPROTO_IP, Id_IPv4, False);
|
|
Assert(FTempHandle <> Id_INVALID_SOCKET);
|
|
//non block mode
|
|
param := 1; // enable NON blocking mode
|
|
iResult := ioctlsocket(FTempHandle, FIONBIO, param);
|
|
GStack.CheckForSocketError(iResult);
|
|
end;
|
|
|
|
class function TIdWebsocketMultiReadThread.Instance: TIdWebsocketMultiReadThread;
|
|
begin
|
|
if (FInstance = nil) then
|
|
begin
|
|
if GUnitFinalized then Exit(nil);
|
|
|
|
FInstance := TIdWebsocketMultiReadThread.Create(True);
|
|
FInstance.Start;
|
|
end;
|
|
Result := FInstance;
|
|
end;
|
|
|
|
procedure TIdWebsocketMultiReadThread.PingAllChannels;
|
|
var
|
|
l: TList;
|
|
chn: TIdHTTPWebsocketClient;
|
|
ws: TIdIOHandlerWebsocket;
|
|
i: Integer;
|
|
begin
|
|
if Terminated then Exit;
|
|
|
|
l := FChannels.LockList;
|
|
try
|
|
for i := 0 to l.Count - 1 do
|
|
begin
|
|
chn := TIdHTTPWebsocketClient(l.Items[i]);
|
|
if chn.NoAsyncRead then Continue;
|
|
|
|
ws := chn.IOHandler as TIdIOHandlerWebsocket;
|
|
//valid?
|
|
if (chn.IOHandler <> nil) and
|
|
(chn.IOHandler.IsWebsocket) and
|
|
(chn.Socket <> nil) and
|
|
(chn.Socket.Binding <> nil) and
|
|
(chn.Socket.Binding.Handle > 0) and
|
|
(chn.Socket.Binding.Handle <> INVALID_SOCKET) then
|
|
begin
|
|
//more than 10s nothing done? then send ping
|
|
if SecondsBetween(Now, ws.LastPingTime) > 10 then
|
|
if chn.CheckConnection then
|
|
try
|
|
chn.Ping;
|
|
except
|
|
//retry connect the next time?
|
|
end;
|
|
end
|
|
else if not chn.Connected then
|
|
begin
|
|
if (ws <> nil) and
|
|
(SecondsBetween(Now, ws.LastActivityTime) < 5)
|
|
then
|
|
Continue;
|
|
|
|
if FReconnectlist = nil then
|
|
FReconnectlist := TWSThreadList.Create;
|
|
//if chn.TryLock then
|
|
FReconnectlist.Add(chn);
|
|
end;
|
|
end;
|
|
finally
|
|
FChannels.UnlockList;
|
|
end;
|
|
|
|
if Terminated then Exit;
|
|
|
|
//reconnect needed? (in background)
|
|
if FReconnectlist <> nil then
|
|
if FReconnectlist.Count > 0 then
|
|
begin
|
|
if FReconnectThread = nil then
|
|
FReconnectThread := TIdWebsocketQueueThread.Create(False{direct start});
|
|
FReconnectThread.QueueEvent(
|
|
procedure
|
|
var
|
|
l: TList;
|
|
chn: TIdHTTPWebsocketClient;
|
|
begin
|
|
while FReconnectlist.Count > 0 do
|
|
begin
|
|
chn := nil;
|
|
try
|
|
//get first one
|
|
l := FReconnectlist.LockList;
|
|
try
|
|
if l.Count <= 0 then Exit;
|
|
|
|
chn := TObject(l.Items[0]) as TIdHTTPWebsocketClient;
|
|
if not chn.TryLock then
|
|
begin
|
|
l.Delete(0);
|
|
chn := nil;
|
|
Continue;
|
|
end;
|
|
finally
|
|
FReconnectlist.UnlockList;
|
|
end;
|
|
|
|
//try reconnect
|
|
ws := chn.IOHandler as TIdIOHandlerWebsocket;
|
|
if ( (ws = nil) or
|
|
(SecondsBetween(Now, ws.LastActivityTime) >= 5) ) then
|
|
begin
|
|
try
|
|
if not chn.Connected then
|
|
begin
|
|
if ws <> nil then
|
|
ws.LastActivityTime := Now;
|
|
//chn.ConnectTimeout := 1000;
|
|
if (chn.Host <> '') and (chn.Port > 0) then
|
|
chn.TryUpgradeToWebsocket;
|
|
end;
|
|
except
|
|
//just try
|
|
end;
|
|
end;
|
|
|
|
//remove from todo list
|
|
l := FReconnectlist.LockList;
|
|
try
|
|
if l.Count > 0 then
|
|
l.Delete(0);
|
|
finally
|
|
FReconnectlist.UnlockList;
|
|
end;
|
|
finally
|
|
if chn <> nil then
|
|
chn.Unlock;
|
|
end;
|
|
end;
|
|
end);
|
|
end;
|
|
end;
|
|
|
|
procedure TIdWebsocketMultiReadThread.ReadFromAllChannels;
|
|
var
|
|
l: TList;
|
|
chn: TIdHTTPWebsocketClient;
|
|
iCount,
|
|
i: Integer;
|
|
iResult: NativeInt;
|
|
ws: TIdIOHandlerWebsocket;
|
|
begin
|
|
l := FChannels.LockList;
|
|
try
|
|
iCount := 0;
|
|
iResult := 0;
|
|
Freadset.fd_count := iCount;
|
|
|
|
for i := 0 to l.Count - 1 do
|
|
begin
|
|
chn := TIdHTTPWebsocketClient(l.Items[i]);
|
|
if chn.NoAsyncRead then Continue;
|
|
|
|
//valid?
|
|
if //not chn.Busy and also take busy channels (will be ignored later), otherwise we have to break/reset for each RO function execution
|
|
(chn.IOHandler <> nil) and
|
|
(chn.IOHandler.IsWebsocket) and
|
|
(chn.Socket <> nil) and
|
|
(chn.Socket.Binding <> nil) and
|
|
(chn.Socket.Binding.Handle > 0) and
|
|
(chn.Socket.Binding.Handle <> INVALID_SOCKET) then
|
|
begin
|
|
if chn.IOHandler.HasData then
|
|
begin
|
|
Inc(iResult);
|
|
Break;
|
|
end;
|
|
|
|
Freadset.fd_count := iCount+1;
|
|
Freadset.fd_array[iCount] := chn.Socket.Binding.Handle;
|
|
Inc(iCount);
|
|
end;
|
|
end;
|
|
|
|
if FPendingBreak then
|
|
ResetSpecialEventSocket;
|
|
finally
|
|
FChannels.UnlockList;
|
|
end;
|
|
|
|
//special helper socket to be able to stop "select" from waiting
|
|
Fexceptionset.fd_count := 1;
|
|
Fexceptionset.fd_array[0] := FTempHandle;
|
|
|
|
//wait 15s till some data
|
|
Finterval.tv_sec := Self.ReadTimeout div 1000; //5s
|
|
Finterval.tv_usec := Self.ReadTimeout mod 1000;
|
|
|
|
//nothing to wait for? then sleep some time to prevent 100% CPU
|
|
if iResult = 0 then
|
|
begin
|
|
if iCount = 0 then
|
|
begin
|
|
iResult := IdWinsock2.select(0, nil, nil, @Fexceptionset, @Finterval);
|
|
if iResult = SOCKET_ERROR then
|
|
iResult := 1; //ignore errors
|
|
end
|
|
//wait till a socket has some data (or a signal via exceptionset is fired)
|
|
else
|
|
iResult := IdWinsock2.select(0, @Freadset, nil, @Fexceptionset, @Finterval);
|
|
if iResult = SOCKET_ERROR then
|
|
//raise EIdWinsockStubError.Build(WSAGetLastError, '', []);
|
|
//ignore error during wait: socket disconnected etc
|
|
Exit;
|
|
end;
|
|
|
|
if Terminated then Exit;
|
|
|
|
//some data?
|
|
if (iResult > 0) then
|
|
begin
|
|
//make sure the thread is created outside a lock
|
|
TIdWebsocketDispatchThread.Instance;
|
|
|
|
l := FChannels.LockList;
|
|
if l = nil then Exit;
|
|
try
|
|
//check for data for all channels
|
|
for i := 0 to l.Count - 1 do
|
|
begin
|
|
if l = nil then Exit;
|
|
chn := TIdHTTPWebsocketClient(l.Items[i]);
|
|
if chn.NoAsyncRead then Continue;
|
|
|
|
if chn.TryLock then
|
|
try
|
|
ws := chn.IOHandler as TIdIOHandlerWebsocket;
|
|
if (ws = nil) then Continue;
|
|
|
|
if ws.TryLock then //IOHandler.Readable cannot be done during pending action!
|
|
try
|
|
try
|
|
chn.ReadAndProcessData;
|
|
except
|
|
on e:Exception do
|
|
begin
|
|
l := nil;
|
|
FChannels.UnlockList;
|
|
chn.ResetChannel;
|
|
//raise;
|
|
end;
|
|
end;
|
|
finally
|
|
ws.Unlock;
|
|
end;
|
|
finally
|
|
chn.Unlock;
|
|
end;
|
|
end;
|
|
|
|
if FPendingBreak then
|
|
ResetSpecialEventSocket;
|
|
finally
|
|
if l <> nil then
|
|
FChannels.UnlockList;
|
|
//strmEvent.Free;
|
|
end;
|
|
end;
|
|
end;
|
|
|
|
procedure TIdWebsocketMultiReadThread.RemoveClient(
|
|
aChannel: TIdHTTPWebsocketClient);
|
|
begin
|
|
if Self = nil then Exit;
|
|
if Self.Terminated then Exit;
|
|
|
|
aChannel.Lock;
|
|
try
|
|
FChannels.Remove(aChannel);
|
|
if FReconnectlist <> nil then
|
|
FReconnectlist.Remove(aChannel);
|
|
finally
|
|
aChannel.UnLock;
|
|
end;
|
|
BreakSelectWait;
|
|
end;
|
|
|
|
class procedure TIdWebsocketMultiReadThread.RemoveInstance(aForced: boolean);
|
|
var
|
|
o: TIdWebsocketMultiReadThread;
|
|
begin
|
|
if FInstance <> nil then
|
|
begin
|
|
FInstance.Terminate;
|
|
o := FInstance;
|
|
FInstance := nil;
|
|
|
|
if aForced then
|
|
begin
|
|
WaitForSingleObject(o.Handle, 2 * 1000);
|
|
TerminateThread(o.Handle, MaxInt);
|
|
end
|
|
else
|
|
o.WaitFor;
|
|
FreeAndNil(o);
|
|
end;
|
|
end;
|
|
|
|
procedure TIdWebsocketMultiReadThread.ResetSpecialEventSocket;
|
|
begin
|
|
Assert(FPendingBreak);
|
|
FPendingBreak := False;
|
|
|
|
IdWinsock2.closesocket(FTempHandle);
|
|
FTempHandle := 0;
|
|
InitSpecialEventSocket;
|
|
end;
|
|
|
|
procedure TIdWebsocketMultiReadThread.Terminate;
|
|
begin
|
|
inherited Terminate;
|
|
if FReconnectThread <> nil then
|
|
FReconnectThread.Terminate;
|
|
|
|
FChannels.LockList;
|
|
try
|
|
//fire a signal, so the "select" wait will quit and thread can stop
|
|
BreakSelectWait;
|
|
finally
|
|
FChannels.UnlockList;
|
|
end;
|
|
end;
|
|
|
|
{ TIdWebsocketDispatchThread }
|
|
|
|
class function TIdWebsocketDispatchThread.Instance: TIdWebsocketDispatchThread;
|
|
begin
|
|
if FInstance = nil then
|
|
begin
|
|
if GUnitFinalized then Exit(nil);
|
|
|
|
GlobalNameSpace.BeginWrite;
|
|
try
|
|
if FInstance = nil then
|
|
begin
|
|
FInstance := Self.Create(True);
|
|
FInstance.Start;
|
|
end;
|
|
finally
|
|
GlobalNameSpace.EndWrite;
|
|
end;
|
|
end;
|
|
Result := FInstance;
|
|
end;
|
|
|
|
class procedure TIdWebsocketDispatchThread.RemoveInstance;
|
|
var
|
|
o: TIdWebsocketDispatchThread;
|
|
begin
|
|
if FInstance <> nil then
|
|
begin
|
|
FInstance.Terminate;
|
|
o := FInstance;
|
|
FInstance := nil;
|
|
|
|
if aForced then
|
|
begin
|
|
WaitForSingleObject(o.Handle, 2 * 1000);
|
|
TerminateThread(o.Handle, MaxInt);
|
|
end;
|
|
o.WaitFor;
|
|
FreeAndNil(o);
|
|
end;
|
|
end;
|
|
|
|
{ TWSThreadList }
|
|
|
|
function TWSThreadList.Count: Integer;
|
|
var l: TList;
|
|
begin
|
|
l := LockList;
|
|
try
|
|
Result := l.Count;
|
|
finally
|
|
UnlockList;
|
|
end;
|
|
end;
|
|
|
|
initialization
|
|
finalization
|
|
GUnitFinalized := True;
|
|
if TIdWebsocketMultiReadThread.Instance <> nil then
|
|
TIdWebsocketMultiReadThread.Instance.Terminate;
|
|
TIdWebsocketDispatchThread.RemoveInstance();
|
|
TIdWebsocketMultiReadThread.RemoveInstance();
|
|
end.
|