various fixes: threadsafety/better locking, data corruption, ConnectAsync, better socketio connection check, better auto reconnect, server side get current thread socket, optional manual client fetch etc

This commit is contained in:
André Mussche 2014-05-08 09:31:14 +02:00
parent 72711f0f27
commit 77841a45a7
5 changed files with 336 additions and 223 deletions

View file

@ -37,6 +37,7 @@ type
FHash: TIdHashSHA1;
FOnData: TWebsocketMsgBin;
FOnTextData: TWebsocketMsgText;
FNoAsyncRead: Boolean;
function GetIOHandlerWS: TIdIOHandlerWebsocket;
procedure SetIOHandlerWS(const Value: TIdIOHandlerWebsocket);
procedure SetOnData(const Value: TWebsocketMsgBin);
@ -70,10 +71,13 @@ type
procedure UnLock;
procedure Connect; override;
procedure ConnectAsync; virtual;
function TryConnect: Boolean;
procedure Disconnect(ANotifyPeer: Boolean); override;
function CheckConnection: Boolean;
procedure Ping;
procedure ReadAndProcessData;
property IOHandler: TIdIOHandlerWebsocket read GetIOHandlerWS write SetIOHandlerWS;
@ -81,6 +85,8 @@ type
property OnBinData : TWebsocketMsgBin read FOnData write SetOnData;
property OnTextData: TWebsocketMsgText read FOnTextData write SetOnTextData;
property NoAsyncRead: Boolean read FNoAsyncRead write FNoAsyncRead;
//https://github.com/LearnBoost/socket.io-spec
property SocketIOCompatible: Boolean read FSocketIOCompatible write FSocketIOCompatible;
property SocketIO: TIdSocketIOHandling read GetSocketIO;
@ -160,7 +166,7 @@ type
property ReadTimeout: Integer read FReadTimeout write FReadTimeout default 5000;
class function Instance: TIdWebsocketMultiReadThread;
class procedure RemoveInstance;
class procedure RemoveInstance(aForced: boolean = false);
end;
//async process data
@ -169,7 +175,7 @@ type
class var FInstance: TIdWebsocketDispatchThread;
public
class function Instance: TIdWebsocketDispatchThread;
class procedure RemoveInstance;
class procedure RemoveInstance(aForced: boolean = false);
end;
implementation
@ -318,6 +324,11 @@ begin
end;
end;
procedure TIdHTTPWebsocketClient.ConnectAsync;
begin
TIdWebsocketMultiReadThread.Instance.AddClient(Self);
end;
destructor TIdHTTPWebsocketClient.Destroy;
//var tmr: TObject;
begin
@ -462,17 +473,21 @@ function TIdHTTPWebsocketClient.TryUpgradeToWebsocket: Boolean;
var
sError: string;
begin
FSocketIOConnectBusy := True;
Lock;
try
if (IOHandler <> nil) and IOHandler.IsWebsocket then Exit(True);
FSocketIOConnectBusy := True;
Lock;
try
if (IOHandler <> nil) and IOHandler.IsWebsocket then Exit(True);
InternalUpgradeToWebsocket(False{no raise}, sError);
Result := (sError = '');
finally
FSocketIOConnectBusy := False;
UnLock;
end;
InternalUpgradeToWebsocket(False{no raise}, sError);
Result := (sError = '');
finally
FSocketIOConnectBusy := False;
UnLock;
end;
except
Result := False;
end;
end;
procedure TIdHTTPWebsocketClient.UnLock;
@ -556,9 +571,10 @@ begin
end;
Request.Clear;
Request.CustomHeaders.Clear;
strmResponse.Clear;
//http://www.websocket.org/aboutwebsocket.html
(* GET ws://echo.websocket.org/?encoding=text HTTP/1.1
//http://www.websocket.org/aboutwebsocket.html
(* GET ws://echo.websocket.org/?encoding=text HTTP/1.1
Origin: http://websocket.org
Cookie: __utma=99as
Connection: Upgrade
@ -570,7 +586,7 @@ begin
//Connection: Upgrade
Request.Connection := 'Upgrade';
//Upgrade: websocket
Request.CustomHeaders.Add('Upgrade: websocket');
Request.CustomHeaders.Add('Upgrade:websocket');
//Sec-WebSocket-Key
sKey := '';
@ -581,13 +597,17 @@ begin
Request.CustomHeaders.AddValue('Sec-WebSocket-Key', sKey);
//Sec-WebSocket-Version: 13
Request.CustomHeaders.AddValue('Sec-WebSocket-Version', '13');
Request.CustomHeaders.AddValue('Sec-WebSocket-Extensions', '');
Request.Host := Format('Host: %s:%d',[Host,Port]);
Request.CacheControl := 'no-cache';
Request.Pragma := 'no-cache';
Request.Host := Format('Host:%s:%d',[Host,Port]);
Request.CustomHeaders.AddValue('Origin', Format('http://%s:%d',[Host,Port]) );
//ws://host:port/<resourcename>
//about resourcename, see: http://dev.w3.org/html5/websockets/ "Parsing WebSocket URLs"
//sURL := Format('ws://%s:%d/%s', [Host, Port, WSResourceName]);
sURL := Format('http://%s:%d/%s', [Host, Port, WSResourceName]);
ReadTimeout := Max(2 * 1000, ReadTimeout);
ReadTimeout := Max(5 * 1000, ReadTimeout);
{ voorbeeld:
GET http://localhost:9222/devtools/page/642D7227-148E-47C2-B97A-E00850E3AFA3 HTTP/1.1
@ -603,9 +623,9 @@ begin
User-Agent: Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/27.0.1453.116 Safari/537.36
Cookie: __utma=1.2040118404.1366961318.1366961318.1366961318.1; __utmc=1; __utmz=1.1366961318.1.1.utmcsr=(direct)|utmccn=(direct)|utmcmd=(none); deviceorder=0123456789101112; MultiTouchEnabled=false; device=3; network_type=0
}
(*
if SocketIOCompatible then
begin
//1st, try to do socketio specific connection
Response.Clear;
Response.ResponseCode := 0;
Request.URL := sURL;
@ -623,9 +643,8 @@ begin
Assert(Self.Connected);
if Response.ResponseCode = 0 then
Response.ResponseText := Response.ResponseText;
if Response.ResponseCode <> 200{ok} then
Response.ResponseText := Response.ResponseText
else if Response.ResponseCode <> 200{ok} then
begin
aFailedReason := Format('Error while upgrading: "%d: %s"',[ResponseCode, ResponseText]);
if aRaiseException then
@ -634,58 +653,70 @@ begin
Exit;
end;
//2nd, get websocket response
Response.Clear;
if IOHandler.CheckForDataOnSource(ReadTimeout) then
Response.ResponseText := IOHandler.InputBufferAsString();
begin
Self.FHTTPProto.RetrieveHeaders(MaxHeaderLines);
//Response.RawHeaders.Text := IOHandler.InputBufferAsString();
Response.ResponseText := Response.RawHeaders.Text;
end;
end
else
*)
begin
Get(sURL, strmResponse, [101]);
end;
//http://www.websocket.org/aboutwebsocket.html
(* HTTP/1.1 101 WebSocket Protocol Handshake
Date: Fri, 10 Feb 2012 17:38:18 GMT
Connection: Upgrade
Server: Kaazing Gateway
Upgrade: WebSocket
Access-Control-Allow-Origin: http://websocket.org
Access-Control-Allow-Credentials: true
Sec-WebSocket-Accept: rLHCkw/SKsO9GAH/ZSFhBATDKrU=
Access-Control-Allow-Headers: content-type *)
//http://www.websocket.org/aboutwebsocket.html
(* HTTP/1.1 101 WebSocket Protocol Handshake
Date: Fri, 10 Feb 2012 17:38:18 GMT
Connection: Upgrade
Server: Kaazing Gateway
Upgrade: WebSocket
Access-Control-Allow-Origin: http://websocket.org
Access-Control-Allow-Credentials: true
Sec-WebSocket-Accept: rLHCkw/SKsO9GAH/ZSFhBATDKrU=
Access-Control-Allow-Headers: content-type *)
//'HTTP/1.1 101 Switching Protocols'
if ResponseCode <> 101 then
begin
aFailedReason := Format('Error while upgrading: "%d: %s"',[ResponseCode, ResponseText]);
if aRaiseException then
raise EIdWebSocketHandleError.Create(aFailedReason);
end;
//connection: upgrade
if not SameText(Response.Connection, 'upgrade') then
begin
aFailedReason := Format('Connection not upgraded: "%s"',[Response.Connection]);
if aRaiseException then
raise EIdWebSocketHandleError.Create(aFailedReason);
end;
//upgrade: websocket
if not SameText(Response.RawHeaders.Values['upgrade'], 'websocket') then
begin
aFailedReason := Format('Not upgraded to websocket: "%s"',[Response.RawHeaders.Values['upgrade']]);
if aRaiseException then
raise EIdWebSocketHandleError.Create(aFailedReason);
end;
//check handshake key
sResponseKey := Trim(sKey) + //... "minus any leading and trailing whitespace"
'258EAFA5-E914-47DA-95CA-C5AB0DC85B11'; //special GUID
sResponseKey := TIdEncoderMIME.EncodeBytes( //Base64
FHash.HashString(sResponseKey) ); //SHA1
if not SameText(Response.RawHeaders.Values['sec-websocket-accept'], sResponseKey) then
begin
aFailedReason := 'Invalid key handshake';
if aRaiseException then
raise EIdWebSocketHandleError.Create(aFailedReason);
end;
//'HTTP/1.1 101 Switching Protocols'
if Response.ResponseCode <> 101 then
begin
aFailedReason := Format('Error while upgrading: "%d: %s"',[Response.ResponseCode, Response.ResponseText]);
if aRaiseException then
raise EIdWebSocketHandleError.Create(aFailedReason)
else
Exit;
end;
//connection: upgrade
if not SameText(Response.Connection, 'upgrade') then
begin
aFailedReason := Format('Connection not upgraded: "%s"',[Response.Connection]);
if aRaiseException then
raise EIdWebSocketHandleError.Create(aFailedReason)
else
Exit;
end;
//upgrade: websocket
if not SameText(Response.RawHeaders.Values['upgrade'], 'websocket') then
begin
aFailedReason := Format('Not upgraded to websocket: "%s"',[Response.RawHeaders.Values['upgrade']]);
if aRaiseException then
raise EIdWebSocketHandleError.Create(aFailedReason)
else
Exit;
end;
//check handshake key
sResponseKey := Trim(sKey) + //... "minus any leading and trailing whitespace"
'258EAFA5-E914-47DA-95CA-C5AB0DC85B11'; //special GUID
sResponseKey := TIdEncoderMIME.EncodeBytes( //Base64
FHash.HashString(sResponseKey) ); //SHA1
if not SameText(Response.RawHeaders.Values['sec-websocket-accept'], sResponseKey) then
begin
aFailedReason := 'Invalid key handshake';
if aRaiseException then
raise EIdWebSocketHandleError.Create(aFailedReason)
else
Exit;
end;
//upgrade succesful
@ -704,6 +735,7 @@ begin
//if Assigned(OnBinData) or Assigned(OnTextData) then
finally
Request.Clear;
Request.CustomHeaders.Clear;
strmResponse.Free;
if bLocked and (IOHandler <> nil) then
@ -711,7 +743,8 @@ begin
Unlock;
//add to thread for auto retry/reconnect
TIdWebsocketMultiReadThread.Instance.AddClient(Self);
if not Self.NoAsyncRead then
TIdWebsocketMultiReadThread.Instance.AddClient(Self);
end;
end;
@ -755,6 +788,61 @@ begin
end;
end;
procedure TIdHTTPWebsocketClient.ReadAndProcessData;
var
strmEvent: TMemoryStream;
swstext: utf8string;
wscode: TWSDataCode;
begin
strmEvent := nil;
IOHandler.Lock;
try
//try to process all events
while IOHandler.HasData or
(IOHandler.Connected and
IOHandler.Readable(0)) do //has some data
begin
if strmEvent = nil then
strmEvent := TMemoryStream.Create;
strmEvent.Clear;
//first is the data type TWSDataType(text or bin), but is ignore/not needed
wscode := TWSDataCode(IOHandler.ReadLongWord);
if not (wscode in [wdcText, wdcBinary, wdcPing, wdcPong]) then
begin
//Sleep(0);
Continue;
end;
//next the size + data = stream
IOHandler.ReadStream(strmEvent);
//ignore ping/pong messages
if wscode in [wdcPing, wdcPong] then Continue;
//fire event
//offload event dispatching to different thread! otherwise deadlocks possible? (do to synchronize)
strmEvent.Position := 0;
if wscode = wdcBinary then
begin
AsyncDispatchEvent(strmEvent);
end
else if wscode = wdcText then
begin
SetLength(swstext, strmEvent.Size);
strmEvent.Read(swstext[1], strmEvent.Size);
if swstext <> '' then
begin
AsyncDispatchEvent(string(swstext));
end;
end;
end;
finally
IOHandler.Unlock;
strmEvent.Free;
end;
end;
procedure TIdHTTPWebsocketClient.ResetChannel;
//var
// ws: TIdIOHandlerWebsocket;
@ -1185,6 +1273,8 @@ begin
for i := 0 to l.Count - 1 do
begin
chn := TIdHTTPWebsocketClient(l.Items[i]);
if chn.NoAsyncRead then Continue;
ws := chn.IOHandler as TIdIOHandlerWebsocket;
//valid?
if (chn.IOHandler <> nil) and
@ -1278,11 +1368,11 @@ begin
finally
FReconnectlist.UnlockList;
end;
finally
if chn <> nil then
chn.Unlock;
finally
if chn <> nil then
chn.Unlock;
end;
end;
end;
end);
end;
end;
@ -1294,10 +1384,7 @@ var
iCount,
i: Integer;
iResult: NativeInt;
strmEvent: TMemoryStream;
swstext: utf8string;
ws: TIdIOHandlerWebsocket;
wscode: TWSDataCode;
begin
l := FChannels.LockList;
try
@ -1308,6 +1395,8 @@ begin
for i := 0 to l.Count - 1 do
begin
chn := TIdHTTPWebsocketClient(l.Items[i]);
if chn.NoAsyncRead then Continue;
//valid?
if //not chn.Busy and also take busy channels (will be ignored later), otherwise we have to break/reset for each RO function execution
(chn.IOHandler <> nil) and
@ -1366,61 +1455,33 @@ begin
//some data?
if (iResult > 0) then
begin
strmEvent := nil;
//strmEvent := nil;
l := FChannels.LockList;
if l = nil then Exit;
try
//check for data for all channels
for i := 0 to l.Count - 1 do
begin
if l = nil then Exit;
chn := TIdHTTPWebsocketClient(l.Items[i]);
if chn.NoAsyncRead then Continue;
ws := chn.IOHandler as TIdIOHandlerWebsocket;
if (ws = nil) then Continue;
if ws.TryLock then //IOHandler.Readable cannot be done during pending action!
try
try
//try to process all events
while chn.IOHandler.HasData or
chn.IOHandler.Readable(0) do //has some data
begin
if strmEvent = nil then
strmEvent := TMemoryStream.Create;
strmEvent.Clear;
//first is the data type TWSDataType(text or bin), but is ignore/not needed
wscode := TWSDataCode(chn.IOHandler.ReadLongWord);
if not (wscode in [wdcText, wdcBinary, wdcPing, wdcPong]) then
Continue;
//next the size + data = stream
chn.IOHandler.ReadStream(strmEvent);
//ignore ping/pong messages
if wscode in [wdcPing, wdcPong] then Continue;
//fire event
//offload event dispatching to different thread! otherwise deadlocks possible? (do to synchronize)
strmEvent.Position := 0;
if wscode = wdcBinary then
begin
chn.AsyncDispatchEvent(strmEvent);
end
else if wscode = wdcText then
begin
SetLength(swstext, strmEvent.Size);
strmEvent.Read(swstext[1], strmEvent.Size);
if swstext <> '' then
begin
chn.AsyncDispatchEvent(string(swstext));
end;
end;
end;
chn.ReadAndProcessData;
except
l := nil;
FChannels.UnlockList;
chn.ResetChannel;
raise;
on e:Exception do
begin
l := nil;
FChannels.UnlockList;
chn.ResetChannel;
//raise;
end;
end;
finally
ws.Unlock;
@ -1432,7 +1493,7 @@ begin
finally
if l <> nil then
FChannels.UnlockList;
strmEvent.Free;
//strmEvent.Free;
end;
end;
end;
@ -1444,7 +1505,7 @@ begin
aChannel.Lock;
try
FChannels.Remove(aChannel);
FChannels.Remove(aChannel);
if FReconnectlist <> nil then
FReconnectlist.Remove(aChannel);
finally
@ -1453,12 +1514,18 @@ begin
BreakSelectWait;
end;
class procedure TIdWebsocketMultiReadThread.RemoveInstance;
class procedure TIdWebsocketMultiReadThread.RemoveInstance(aForced: boolean);
begin
if FInstance <> nil then
begin
FInstance.Terminate;
FInstance.WaitFor;
if aForced then
begin
WaitForSingleObject(FInstance.Handle, 2 * 1000);
TerminateThread(FInstance.Handle, MaxInt);
end
else
FInstance.WaitFor;
FreeAndNil(FInstance);
end;
end;
@ -1511,6 +1578,11 @@ begin
if FInstance <> nil then
begin
FInstance.Terminate;
if aForced then
begin
WaitForSingleObject(FInstance.Handle, 2 * 1000);
TerminateThread(FInstance.Handle, MaxInt);
end;
FInstance.WaitFor;
FreeAndNil(FInstance);
end;
@ -1531,7 +1603,7 @@ end;
initialization
finalization
TIdWebsocketMultiReadThread.RemoveInstance;
TIdWebsocketDispatchThread.RemoveInstance
TIdWebsocketMultiReadThread.RemoveInstance();
TIdWebsocketDispatchThread.RemoveInstance()
end.

View file

@ -363,7 +363,7 @@ end;
function TIdIOHandlerWebsocket.HasData: Boolean;
begin
//buffered data available? (more data from previous read)
Result := (FWSInputBuffer.Size > 0);
Result := (FWSInputBuffer.Size > 0) or not InputBufferIsEmpty;
end;
function TIdIOHandlerWebsocket.InternalReadDataFromSource(
@ -672,7 +672,6 @@ begin
else
FInputBuffer.Write(LongWord(Result))
except
Unlock; //always unlock when socket exception
FClosedGracefully := True; //closed (but not gracefully?)
Raise;
end;
@ -849,7 +848,8 @@ var
var
temp: TIdBytes;
begin
if HasData then Exit(True);
//if HasData then Exit(True);
if (FWSInputBuffer.Size > 0) then Exit(True);
Result := InternalReadDataFromSource(temp, ARaiseExceptionOnTimeout) > 0;
if Result then

View file

@ -10,7 +10,7 @@ uses
IdHashSHA, //XE3 etc
{$IFEND}
IdServerSocketIOHandling, IdServerWebsocketContext,
Classes, IdServerBaseHandling, IdIOHandlerWebsocket;
Classes, IdServerBaseHandling, IdIOHandlerWebsocket, IdSocketIOHandling;
type
TIdServerSocketIOHandling_Ext = class(TIdServerSocketIOHandling)
@ -25,16 +25,30 @@ type
public
class function ProcessServerCommandGet(AThread: TIdServerWSContext;
ARequestInfo: TIdHTTPRequestInfo; AResponseInfo: TIdHTTPResponseInfo): Boolean;
class function CurrentSocket: ISocketIOContext;
end;
implementation
uses
StrUtils, SysUtils, DateUtils,
IdCustomTCPServer, IdCoderMIME;
IdCustomTCPServer, IdCoderMIME, IdThread;
{ TIdServerWebsocketHandling }
class function TIdServerWebsocketHandling.CurrentSocket: ISocketIOContext;
var
thread: TIdThreadWithTask;
context: TIdServerWSContext;
begin
if not (TThread.Currentthread is TIdThreadWithTask) then Exit(nil);
thread := TThread.Currentthread as TIdThreadWithTask;
if not (thread.Task is TIdServerWSContext) then Exit(nil);
context := thread.Task as TIdServerWSContext;
Result := context.SocketIO.GetSocketIOContext(context);
end;
class procedure TIdServerWebsocketHandling.DoWSExecute(AThread: TIdContext; aSocketIOHandler: TIdServerSocketIOHandling_Ext);
var
strmRequest, strmResponse: TMemoryStream;

View file

@ -180,6 +180,8 @@ type
procedure UnLock;
function ConnectionCount: Integer;
function GetSocketIOContext(const AContext: TIdContext): ISocketIOContext;
// procedure EmitEventToAll(const aEventName: string; const aData: ISuperObject; const aCallback: TSocketIOMsgJSON = nil);
function NewConnection(const AContext: TIdContext): TSocketIOContext;overload;
function NewConnection(const aGUID, aPeerIP: string): TSocketIOContext;overload;
@ -343,6 +345,20 @@ begin
end;
end;
function TIdBaseSocketIOHandling.GetSocketIOContext(const AContext: TIdContext): ISocketIOContext;
var
socket: TSocketIOContext;
begin
Result := nil;
Lock;
try
if FConnections.TryGetValue(AContext, socket) then
Exit(socket);
finally
UnLock;
end;
end;
procedure TIdBaseSocketIOHandling.FreeConnection(const AContext: TIdContext);
var
socket: TSocketIOContext;
@ -837,13 +853,13 @@ begin
begin
FSocketIOEventCallback.Remove(imsg);
if Assigned(callback) then
callback(sdata);
callback(sdata);
end
else if FSocketIOEventCallbackRef.TryGetValue(imsg, callbackref) then
begin
FSocketIOEventCallbackRef.Remove(imsg);
if Assigned(callbackref) then
callbackref(sdata);
callbackref(sdata);
end
else ;
//raise EIdSocketIoUnhandledMessage.Create(str);
@ -1112,13 +1128,16 @@ begin
FreeAndNil(FQueue);
UnLock;
FLock.Free;
FCustomData.Free;
if OwnsCustomData then
FCustomData.Free;
inherited;
end;
procedure TSocketIOContext.EmitEvent(const aEventName, aData: string;
const aCallback: TSocketIOMsgJSON; const aOnError: TSocketIOError);
begin
Assert(FHandling <> nil);
if not Assigned(aCallback) then
FHandling.WriteSocketIOEvent(Self, '', aEventName, '[' + aData + ']', nil, nil)
else
@ -1134,7 +1153,10 @@ end;
procedure TSocketIOContext.EmitEvent(const aEventName: string; const aData: ISuperObject;
const aCallback: TSocketIOMsgJSON; const aOnError: TSocketIOError);
begin
EmitEvent(aEventName, aData.AsJSon, aCallback, aOnError);
if aData <> nil then
EmitEvent(aEventName, aData.AsJSon, aCallback, aOnError)
else
EmitEvent(aEventName, '', aCallback, aOnError);
end;
function TSocketIOContext.GetCustomData: TObject;

View file

@ -236,103 +236,108 @@ var
wscode: TWSDataCode;
swstext: utf8string;
begin
//http server supports websockets?
if not FTriedUpgrade then
begin
if not IndyClient.IOHandler.IsWebsocket then //not already upgraded?
TryUpgradeToWebsocket;
FTriedUpgrade := True; //one shot
end;
ws := IndyClient.IOHandler as TIdIOHandlerWebsocket;
if not ws.IsWebsocket then
//normal http dispatch
inherited IntDispatch(aRequest, aResponse)
else
//websocket dispatch
begin
ws.Lock;
try
//write messagenr at end
aRequest.Position := aRequest.Size;
Inc(FMessageNr);
iMsgNr := FMessageNr;
aRequest.Write(C_RO_WS_NR, Length(C_RO_WS_NR));
aRequest.Write(iMsgNr, SizeOf(iMsgNr));
aRequest.Position := 0;
CheckConnection;
//write
IndyClient.IOHandler.Write(aRequest);
iMsgNr2 := 0;
while iMsgNr2 <= 0 do
begin
aResponse.Size := 0; //clear
//first is the data type TWSDataType(text or bin), but is ignore/not needed
wscode := TWSDataCode(IndyClient.IOHandler.ReadLongWord);
//next the size + data = stream
IndyClient.IOHandler.ReadStream(aResponse);
//ignore ping/pong messages
if wscode in [wdcPing, wdcPong] then Continue;
if aResponse.Size >= Length(C_RO_WS_NR) + SizeOf(iMsgNr) then
begin
//get event or message nr
aResponse.Position := aResponse.Size - Length(C_RO_WS_NR) - SizeOf(iMsgNr2);
aResponse.Read(cWSNR[0], Length(cWSNR));
end;
if (cWSNR = C_RO_WS_NR) then
begin
aResponse.Read(iMsgNr2, SizeOf(iMsgNr2));
aResponse.Size := aResponse.Size - Length(C_RO_WS_NR) - SizeOf(iMsgNr2); //trunc
aResponse.Position := 0;
//event?
if iMsgNr2 < 0 then
begin
//events during dispatch? channel is busy so offload event dispatching to different thread!
AsyncDispatchEvent(aResponse);
aResponse.Size := 0;
{
ws.Unlock;
try
IntDispatchEvent(aResponse);
aResponse.Size := 0;
finally
ws.Lock;
end;
}
end;
end
else
begin
aResponse.Position := 0;
if wscode = wdcBinary then
begin
Self.IndyClient.AsyncDispatchEvent(aResponse);
end
else if wscode = wdcText then
begin
SetLength(swstext, aResponse.Size);
aResponse.Read(swstext[1], aResponse.Size);
if swstext <> '' then
begin
Self.IndyClient.AsyncDispatchEvent(string(swstext));
end;
end;
end;
end;
except
ws.Unlock; //always unlock
ResetChannel;
Raise;
IndyClient.Lock;
try
//http server supports websockets?
if not FTriedUpgrade then
begin
if not IndyClient.IOHandler.IsWebsocket then //not already upgraded?
TryUpgradeToWebsocket;
FTriedUpgrade := True; //one shot
end;
ws.Unlock; //normal unlock (no extra try finally needed)
if iMsgNr2 <> iMsgNr then
Assert(iMsgNr2 = iMsgNr, 'Message number mismatch between send and received!');
ws := IndyClient.IOHandler as TIdIOHandlerWebsocket;
if not ws.IsWebsocket then
//normal http dispatch
inherited IntDispatch(aRequest, aResponse)
else
//websocket dispatch
begin
ws.Lock;
try
//write messagenr at end
aRequest.Position := aRequest.Size;
Inc(FMessageNr);
iMsgNr := FMessageNr;
aRequest.Write(C_RO_WS_NR, Length(C_RO_WS_NR));
aRequest.Write(iMsgNr, SizeOf(iMsgNr));
aRequest.Position := 0;
CheckConnection;
//write
IndyClient.IOHandler.Write(aRequest);
iMsgNr2 := 0;
while iMsgNr2 <= 0 do
begin
aResponse.Size := 0; //clear
//first is the data type TWSDataType(text or bin), but is ignore/not needed
wscode := TWSDataCode(IndyClient.IOHandler.ReadLongWord);
//next the size + data = stream
IndyClient.IOHandler.ReadStream(aResponse);
//ignore ping/pong messages
if wscode in [wdcPing, wdcPong] then Continue;
if aResponse.Size >= Length(C_RO_WS_NR) + SizeOf(iMsgNr) then
begin
//get event or message nr
aResponse.Position := aResponse.Size - Length(C_RO_WS_NR) - SizeOf(iMsgNr2);
aResponse.Read(cWSNR[0], Length(cWSNR));
end;
if (cWSNR = C_RO_WS_NR) then
begin
aResponse.Read(iMsgNr2, SizeOf(iMsgNr2));
aResponse.Size := aResponse.Size - Length(C_RO_WS_NR) - SizeOf(iMsgNr2); //trunc
aResponse.Position := 0;
//event?
if iMsgNr2 < 0 then
begin
//events during dispatch? channel is busy so offload event dispatching to different thread!
AsyncDispatchEvent(aResponse);
aResponse.Size := 0;
{
ws.Unlock;
try
IntDispatchEvent(aResponse);
aResponse.Size := 0;
finally
ws.Lock;
end;
}
end;
end
else
begin
aResponse.Position := 0;
if wscode = wdcBinary then
begin
Self.IndyClient.AsyncDispatchEvent(aResponse);
end
else if wscode = wdcText then
begin
SetLength(swstext, aResponse.Size);
aResponse.Read(swstext[1], aResponse.Size);
if swstext <> '' then
begin
Self.IndyClient.AsyncDispatchEvent(string(swstext));
end;
end;
end;
end;
except
ws.Unlock; //always unlock
ResetChannel;
Raise;
end;
ws.Unlock; //normal unlock (no extra try finally needed)
if iMsgNr2 <> iMsgNr then
Assert(iMsgNr2 = iMsgNr, 'Message number mismatch between send and received!');
end;
finally
IndyClient.UnLock;
end;
end;