diff --git a/IdHTTPWebsocketClient.pas b/IdHTTPWebsocketClient.pas index e88edff..448f4b0 100644 --- a/IdHTTPWebsocketClient.pas +++ b/IdHTTPWebsocketClient.pas @@ -37,6 +37,7 @@ type FHash: TIdHashSHA1; FOnData: TWebsocketMsgBin; FOnTextData: TWebsocketMsgText; + FNoAsyncRead: Boolean; function GetIOHandlerWS: TIdIOHandlerWebsocket; procedure SetIOHandlerWS(const Value: TIdIOHandlerWebsocket); procedure SetOnData(const Value: TWebsocketMsgBin); @@ -70,10 +71,13 @@ type procedure UnLock; procedure Connect; override; + procedure ConnectAsync; virtual; function TryConnect: Boolean; procedure Disconnect(ANotifyPeer: Boolean); override; + function CheckConnection: Boolean; procedure Ping; + procedure ReadAndProcessData; property IOHandler: TIdIOHandlerWebsocket read GetIOHandlerWS write SetIOHandlerWS; @@ -81,6 +85,8 @@ type property OnBinData : TWebsocketMsgBin read FOnData write SetOnData; property OnTextData: TWebsocketMsgText read FOnTextData write SetOnTextData; + property NoAsyncRead: Boolean read FNoAsyncRead write FNoAsyncRead; + //https://github.com/LearnBoost/socket.io-spec property SocketIOCompatible: Boolean read FSocketIOCompatible write FSocketIOCompatible; property SocketIO: TIdSocketIOHandling read GetSocketIO; @@ -160,7 +166,7 @@ type property ReadTimeout: Integer read FReadTimeout write FReadTimeout default 5000; class function Instance: TIdWebsocketMultiReadThread; - class procedure RemoveInstance; + class procedure RemoveInstance(aForced: boolean = false); end; //async process data @@ -169,7 +175,7 @@ type class var FInstance: TIdWebsocketDispatchThread; public class function Instance: TIdWebsocketDispatchThread; - class procedure RemoveInstance; + class procedure RemoveInstance(aForced: boolean = false); end; implementation @@ -318,6 +324,11 @@ begin end; end; +procedure TIdHTTPWebsocketClient.ConnectAsync; +begin + TIdWebsocketMultiReadThread.Instance.AddClient(Self); +end; + destructor TIdHTTPWebsocketClient.Destroy; //var tmr: TObject; begin @@ -462,17 +473,21 @@ function TIdHTTPWebsocketClient.TryUpgradeToWebsocket: Boolean; var sError: string; begin - FSocketIOConnectBusy := True; - Lock; try - if (IOHandler <> nil) and IOHandler.IsWebsocket then Exit(True); + FSocketIOConnectBusy := True; + Lock; + try + if (IOHandler <> nil) and IOHandler.IsWebsocket then Exit(True); - InternalUpgradeToWebsocket(False{no raise}, sError); - Result := (sError = ''); - finally - FSocketIOConnectBusy := False; - UnLock; - end; + InternalUpgradeToWebsocket(False{no raise}, sError); + Result := (sError = ''); + finally + FSocketIOConnectBusy := False; + UnLock; + end; + except + Result := False; + end; end; procedure TIdHTTPWebsocketClient.UnLock; @@ -556,9 +571,10 @@ begin end; Request.Clear; + Request.CustomHeaders.Clear; strmResponse.Clear; - //http://www.websocket.org/aboutwebsocket.html - (* GET ws://echo.websocket.org/?encoding=text HTTP/1.1 + //http://www.websocket.org/aboutwebsocket.html + (* GET ws://echo.websocket.org/?encoding=text HTTP/1.1 Origin: http://websocket.org Cookie: __utma=99as Connection: Upgrade @@ -570,7 +586,7 @@ begin //Connection: Upgrade Request.Connection := 'Upgrade'; //Upgrade: websocket - Request.CustomHeaders.Add('Upgrade: websocket'); + Request.CustomHeaders.Add('Upgrade:websocket'); //Sec-WebSocket-Key sKey := ''; @@ -581,13 +597,17 @@ begin Request.CustomHeaders.AddValue('Sec-WebSocket-Key', sKey); //Sec-WebSocket-Version: 13 Request.CustomHeaders.AddValue('Sec-WebSocket-Version', '13'); + Request.CustomHeaders.AddValue('Sec-WebSocket-Extensions', ''); - Request.Host := Format('Host: %s:%d',[Host,Port]); + Request.CacheControl := 'no-cache'; + Request.Pragma := 'no-cache'; + Request.Host := Format('Host:%s:%d',[Host,Port]); + Request.CustomHeaders.AddValue('Origin', Format('http://%s:%d',[Host,Port]) ); //ws://host:port/ //about resourcename, see: http://dev.w3.org/html5/websockets/ "Parsing WebSocket URLs" //sURL := Format('ws://%s:%d/%s', [Host, Port, WSResourceName]); sURL := Format('http://%s:%d/%s', [Host, Port, WSResourceName]); - ReadTimeout := Max(2 * 1000, ReadTimeout); + ReadTimeout := Max(5 * 1000, ReadTimeout); { voorbeeld: GET http://localhost:9222/devtools/page/642D7227-148E-47C2-B97A-E00850E3AFA3 HTTP/1.1 @@ -603,9 +623,9 @@ begin User-Agent: Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/27.0.1453.116 Safari/537.36 Cookie: __utma=1.2040118404.1366961318.1366961318.1366961318.1; __utmc=1; __utmz=1.1366961318.1.1.utmcsr=(direct)|utmccn=(direct)|utmcmd=(none); deviceorder=0123456789101112; MultiTouchEnabled=false; device=3; network_type=0 } - (* if SocketIOCompatible then begin + //1st, try to do socketio specific connection Response.Clear; Response.ResponseCode := 0; Request.URL := sURL; @@ -623,9 +643,8 @@ begin Assert(Self.Connected); if Response.ResponseCode = 0 then - Response.ResponseText := Response.ResponseText; - - if Response.ResponseCode <> 200{ok} then + Response.ResponseText := Response.ResponseText + else if Response.ResponseCode <> 200{ok} then begin aFailedReason := Format('Error while upgrading: "%d: %s"',[ResponseCode, ResponseText]); if aRaiseException then @@ -634,58 +653,70 @@ begin Exit; end; + //2nd, get websocket response Response.Clear; if IOHandler.CheckForDataOnSource(ReadTimeout) then - Response.ResponseText := IOHandler.InputBufferAsString(); + begin + Self.FHTTPProto.RetrieveHeaders(MaxHeaderLines); + //Response.RawHeaders.Text := IOHandler.InputBufferAsString(); + Response.ResponseText := Response.RawHeaders.Text; + end; end else - *) begin Get(sURL, strmResponse, [101]); + end; - //http://www.websocket.org/aboutwebsocket.html - (* HTTP/1.1 101 WebSocket Protocol Handshake - Date: Fri, 10 Feb 2012 17:38:18 GMT - Connection: Upgrade - Server: Kaazing Gateway - Upgrade: WebSocket - Access-Control-Allow-Origin: http://websocket.org - Access-Control-Allow-Credentials: true - Sec-WebSocket-Accept: rLHCkw/SKsO9GAH/ZSFhBATDKrU= - Access-Control-Allow-Headers: content-type *) + //http://www.websocket.org/aboutwebsocket.html + (* HTTP/1.1 101 WebSocket Protocol Handshake + Date: Fri, 10 Feb 2012 17:38:18 GMT + Connection: Upgrade + Server: Kaazing Gateway + Upgrade: WebSocket + Access-Control-Allow-Origin: http://websocket.org + Access-Control-Allow-Credentials: true + Sec-WebSocket-Accept: rLHCkw/SKsO9GAH/ZSFhBATDKrU= + Access-Control-Allow-Headers: content-type *) - //'HTTP/1.1 101 Switching Protocols' - if ResponseCode <> 101 then - begin - aFailedReason := Format('Error while upgrading: "%d: %s"',[ResponseCode, ResponseText]); - if aRaiseException then - raise EIdWebSocketHandleError.Create(aFailedReason); - end; - //connection: upgrade - if not SameText(Response.Connection, 'upgrade') then - begin - aFailedReason := Format('Connection not upgraded: "%s"',[Response.Connection]); - if aRaiseException then - raise EIdWebSocketHandleError.Create(aFailedReason); - end; - //upgrade: websocket - if not SameText(Response.RawHeaders.Values['upgrade'], 'websocket') then - begin - aFailedReason := Format('Not upgraded to websocket: "%s"',[Response.RawHeaders.Values['upgrade']]); - if aRaiseException then - raise EIdWebSocketHandleError.Create(aFailedReason); - end; - //check handshake key - sResponseKey := Trim(sKey) + //... "minus any leading and trailing whitespace" - '258EAFA5-E914-47DA-95CA-C5AB0DC85B11'; //special GUID - sResponseKey := TIdEncoderMIME.EncodeBytes( //Base64 - FHash.HashString(sResponseKey) ); //SHA1 - if not SameText(Response.RawHeaders.Values['sec-websocket-accept'], sResponseKey) then - begin - aFailedReason := 'Invalid key handshake'; - if aRaiseException then - raise EIdWebSocketHandleError.Create(aFailedReason); - end; + //'HTTP/1.1 101 Switching Protocols' + if Response.ResponseCode <> 101 then + begin + aFailedReason := Format('Error while upgrading: "%d: %s"',[Response.ResponseCode, Response.ResponseText]); + if aRaiseException then + raise EIdWebSocketHandleError.Create(aFailedReason) + else + Exit; + end; + //connection: upgrade + if not SameText(Response.Connection, 'upgrade') then + begin + aFailedReason := Format('Connection not upgraded: "%s"',[Response.Connection]); + if aRaiseException then + raise EIdWebSocketHandleError.Create(aFailedReason) + else + Exit; + end; + //upgrade: websocket + if not SameText(Response.RawHeaders.Values['upgrade'], 'websocket') then + begin + aFailedReason := Format('Not upgraded to websocket: "%s"',[Response.RawHeaders.Values['upgrade']]); + if aRaiseException then + raise EIdWebSocketHandleError.Create(aFailedReason) + else + Exit; + end; + //check handshake key + sResponseKey := Trim(sKey) + //... "minus any leading and trailing whitespace" + '258EAFA5-E914-47DA-95CA-C5AB0DC85B11'; //special GUID + sResponseKey := TIdEncoderMIME.EncodeBytes( //Base64 + FHash.HashString(sResponseKey) ); //SHA1 + if not SameText(Response.RawHeaders.Values['sec-websocket-accept'], sResponseKey) then + begin + aFailedReason := 'Invalid key handshake'; + if aRaiseException then + raise EIdWebSocketHandleError.Create(aFailedReason) + else + Exit; end; //upgrade succesful @@ -704,6 +735,7 @@ begin //if Assigned(OnBinData) or Assigned(OnTextData) then finally Request.Clear; + Request.CustomHeaders.Clear; strmResponse.Free; if bLocked and (IOHandler <> nil) then @@ -711,7 +743,8 @@ begin Unlock; //add to thread for auto retry/reconnect - TIdWebsocketMultiReadThread.Instance.AddClient(Self); + if not Self.NoAsyncRead then + TIdWebsocketMultiReadThread.Instance.AddClient(Self); end; end; @@ -755,6 +788,61 @@ begin end; end; +procedure TIdHTTPWebsocketClient.ReadAndProcessData; +var + strmEvent: TMemoryStream; + swstext: utf8string; + wscode: TWSDataCode; +begin + strmEvent := nil; + IOHandler.Lock; + try + //try to process all events + while IOHandler.HasData or + (IOHandler.Connected and + IOHandler.Readable(0)) do //has some data + begin + if strmEvent = nil then + strmEvent := TMemoryStream.Create; + strmEvent.Clear; + + //first is the data type TWSDataType(text or bin), but is ignore/not needed + wscode := TWSDataCode(IOHandler.ReadLongWord); + if not (wscode in [wdcText, wdcBinary, wdcPing, wdcPong]) then + begin + //Sleep(0); + Continue; + end; + + //next the size + data = stream + IOHandler.ReadStream(strmEvent); + + //ignore ping/pong messages + if wscode in [wdcPing, wdcPong] then Continue; + + //fire event + //offload event dispatching to different thread! otherwise deadlocks possible? (do to synchronize) + strmEvent.Position := 0; + if wscode = wdcBinary then + begin + AsyncDispatchEvent(strmEvent); + end + else if wscode = wdcText then + begin + SetLength(swstext, strmEvent.Size); + strmEvent.Read(swstext[1], strmEvent.Size); + if swstext <> '' then + begin + AsyncDispatchEvent(string(swstext)); + end; + end; + end; + finally + IOHandler.Unlock; + strmEvent.Free; + end; +end; + procedure TIdHTTPWebsocketClient.ResetChannel; //var // ws: TIdIOHandlerWebsocket; @@ -1185,6 +1273,8 @@ begin for i := 0 to l.Count - 1 do begin chn := TIdHTTPWebsocketClient(l.Items[i]); + if chn.NoAsyncRead then Continue; + ws := chn.IOHandler as TIdIOHandlerWebsocket; //valid? if (chn.IOHandler <> nil) and @@ -1278,11 +1368,11 @@ begin finally FReconnectlist.UnlockList; end; - finally - if chn <> nil then - chn.Unlock; + finally + if chn <> nil then + chn.Unlock; + end; end; - end; end); end; end; @@ -1294,10 +1384,7 @@ var iCount, i: Integer; iResult: NativeInt; - strmEvent: TMemoryStream; - swstext: utf8string; ws: TIdIOHandlerWebsocket; - wscode: TWSDataCode; begin l := FChannels.LockList; try @@ -1308,6 +1395,8 @@ begin for i := 0 to l.Count - 1 do begin chn := TIdHTTPWebsocketClient(l.Items[i]); + if chn.NoAsyncRead then Continue; + //valid? if //not chn.Busy and also take busy channels (will be ignored later), otherwise we have to break/reset for each RO function execution (chn.IOHandler <> nil) and @@ -1366,61 +1455,33 @@ begin //some data? if (iResult > 0) then begin - strmEvent := nil; + //strmEvent := nil; l := FChannels.LockList; + if l = nil then Exit; try //check for data for all channels for i := 0 to l.Count - 1 do begin + if l = nil then Exit; chn := TIdHTTPWebsocketClient(l.Items[i]); + if chn.NoAsyncRead then Continue; + ws := chn.IOHandler as TIdIOHandlerWebsocket; if (ws = nil) then Continue; if ws.TryLock then //IOHandler.Readable cannot be done during pending action! try try - //try to process all events - while chn.IOHandler.HasData or - chn.IOHandler.Readable(0) do //has some data - begin - if strmEvent = nil then - strmEvent := TMemoryStream.Create; - strmEvent.Clear; - - //first is the data type TWSDataType(text or bin), but is ignore/not needed - wscode := TWSDataCode(chn.IOHandler.ReadLongWord); - if not (wscode in [wdcText, wdcBinary, wdcPing, wdcPong]) then - Continue; - - //next the size + data = stream - chn.IOHandler.ReadStream(strmEvent); - - //ignore ping/pong messages - if wscode in [wdcPing, wdcPong] then Continue; - - //fire event - //offload event dispatching to different thread! otherwise deadlocks possible? (do to synchronize) - strmEvent.Position := 0; - if wscode = wdcBinary then - begin - chn.AsyncDispatchEvent(strmEvent); - end - else if wscode = wdcText then - begin - SetLength(swstext, strmEvent.Size); - strmEvent.Read(swstext[1], strmEvent.Size); - if swstext <> '' then - begin - chn.AsyncDispatchEvent(string(swstext)); - end; - end; - end; + chn.ReadAndProcessData; except - l := nil; - FChannels.UnlockList; - chn.ResetChannel; - raise; + on e:Exception do + begin + l := nil; + FChannels.UnlockList; + chn.ResetChannel; + //raise; + end; end; finally ws.Unlock; @@ -1432,7 +1493,7 @@ begin finally if l <> nil then FChannels.UnlockList; - strmEvent.Free; + //strmEvent.Free; end; end; end; @@ -1444,7 +1505,7 @@ begin aChannel.Lock; try - FChannels.Remove(aChannel); + FChannels.Remove(aChannel); if FReconnectlist <> nil then FReconnectlist.Remove(aChannel); finally @@ -1453,12 +1514,18 @@ begin BreakSelectWait; end; -class procedure TIdWebsocketMultiReadThread.RemoveInstance; +class procedure TIdWebsocketMultiReadThread.RemoveInstance(aForced: boolean); begin if FInstance <> nil then begin FInstance.Terminate; - FInstance.WaitFor; + if aForced then + begin + WaitForSingleObject(FInstance.Handle, 2 * 1000); + TerminateThread(FInstance.Handle, MaxInt); + end + else + FInstance.WaitFor; FreeAndNil(FInstance); end; end; @@ -1511,6 +1578,11 @@ begin if FInstance <> nil then begin FInstance.Terminate; + if aForced then + begin + WaitForSingleObject(FInstance.Handle, 2 * 1000); + TerminateThread(FInstance.Handle, MaxInt); + end; FInstance.WaitFor; FreeAndNil(FInstance); end; @@ -1531,7 +1603,7 @@ end; initialization finalization - TIdWebsocketMultiReadThread.RemoveInstance; - TIdWebsocketDispatchThread.RemoveInstance + TIdWebsocketMultiReadThread.RemoveInstance(); + TIdWebsocketDispatchThread.RemoveInstance() end. diff --git a/IdIOHandlerWebsocket.pas b/IdIOHandlerWebsocket.pas index ee38c60..19fc06c 100644 --- a/IdIOHandlerWebsocket.pas +++ b/IdIOHandlerWebsocket.pas @@ -363,7 +363,7 @@ end; function TIdIOHandlerWebsocket.HasData: Boolean; begin //buffered data available? (more data from previous read) - Result := (FWSInputBuffer.Size > 0); + Result := (FWSInputBuffer.Size > 0) or not InputBufferIsEmpty; end; function TIdIOHandlerWebsocket.InternalReadDataFromSource( @@ -672,7 +672,6 @@ begin else FInputBuffer.Write(LongWord(Result)) except - Unlock; //always unlock when socket exception FClosedGracefully := True; //closed (but not gracefully?) Raise; end; @@ -849,7 +848,8 @@ var var temp: TIdBytes; begin - if HasData then Exit(True); + //if HasData then Exit(True); + if (FWSInputBuffer.Size > 0) then Exit(True); Result := InternalReadDataFromSource(temp, ARaiseExceptionOnTimeout) > 0; if Result then diff --git a/IdServerWebsocketHandling.pas b/IdServerWebsocketHandling.pas index 5efeae6..f1f6f52 100644 --- a/IdServerWebsocketHandling.pas +++ b/IdServerWebsocketHandling.pas @@ -10,7 +10,7 @@ uses IdHashSHA, //XE3 etc {$IFEND} IdServerSocketIOHandling, IdServerWebsocketContext, - Classes, IdServerBaseHandling, IdIOHandlerWebsocket; + Classes, IdServerBaseHandling, IdIOHandlerWebsocket, IdSocketIOHandling; type TIdServerSocketIOHandling_Ext = class(TIdServerSocketIOHandling) @@ -25,16 +25,30 @@ type public class function ProcessServerCommandGet(AThread: TIdServerWSContext; ARequestInfo: TIdHTTPRequestInfo; AResponseInfo: TIdHTTPResponseInfo): Boolean; + + class function CurrentSocket: ISocketIOContext; end; implementation uses StrUtils, SysUtils, DateUtils, - IdCustomTCPServer, IdCoderMIME; + IdCustomTCPServer, IdCoderMIME, IdThread; { TIdServerWebsocketHandling } +class function TIdServerWebsocketHandling.CurrentSocket: ISocketIOContext; +var + thread: TIdThreadWithTask; + context: TIdServerWSContext; +begin + if not (TThread.Currentthread is TIdThreadWithTask) then Exit(nil); + thread := TThread.Currentthread as TIdThreadWithTask; + if not (thread.Task is TIdServerWSContext) then Exit(nil); + context := thread.Task as TIdServerWSContext; + Result := context.SocketIO.GetSocketIOContext(context); +end; + class procedure TIdServerWebsocketHandling.DoWSExecute(AThread: TIdContext; aSocketIOHandler: TIdServerSocketIOHandling_Ext); var strmRequest, strmResponse: TMemoryStream; diff --git a/IdSocketIOHandling.pas b/IdSocketIOHandling.pas index b677d86..92d7c3d 100644 --- a/IdSocketIOHandling.pas +++ b/IdSocketIOHandling.pas @@ -180,6 +180,8 @@ type procedure UnLock; function ConnectionCount: Integer; + function GetSocketIOContext(const AContext: TIdContext): ISocketIOContext; + // procedure EmitEventToAll(const aEventName: string; const aData: ISuperObject; const aCallback: TSocketIOMsgJSON = nil); function NewConnection(const AContext: TIdContext): TSocketIOContext;overload; function NewConnection(const aGUID, aPeerIP: string): TSocketIOContext;overload; @@ -343,6 +345,20 @@ begin end; end; +function TIdBaseSocketIOHandling.GetSocketIOContext(const AContext: TIdContext): ISocketIOContext; +var + socket: TSocketIOContext; +begin + Result := nil; + Lock; + try + if FConnections.TryGetValue(AContext, socket) then + Exit(socket); + finally + UnLock; + end; +end; + procedure TIdBaseSocketIOHandling.FreeConnection(const AContext: TIdContext); var socket: TSocketIOContext; @@ -837,13 +853,13 @@ begin begin FSocketIOEventCallback.Remove(imsg); if Assigned(callback) then - callback(sdata); + callback(sdata); end else if FSocketIOEventCallbackRef.TryGetValue(imsg, callbackref) then begin FSocketIOEventCallbackRef.Remove(imsg); if Assigned(callbackref) then - callbackref(sdata); + callbackref(sdata); end else ; //raise EIdSocketIoUnhandledMessage.Create(str); @@ -1112,13 +1128,16 @@ begin FreeAndNil(FQueue); UnLock; FLock.Free; - FCustomData.Free; + if OwnsCustomData then + FCustomData.Free; inherited; end; procedure TSocketIOContext.EmitEvent(const aEventName, aData: string; const aCallback: TSocketIOMsgJSON; const aOnError: TSocketIOError); begin + Assert(FHandling <> nil); + if not Assigned(aCallback) then FHandling.WriteSocketIOEvent(Self, '', aEventName, '[' + aData + ']', nil, nil) else @@ -1134,7 +1153,10 @@ end; procedure TSocketIOContext.EmitEvent(const aEventName: string; const aData: ISuperObject; const aCallback: TSocketIOMsgJSON; const aOnError: TSocketIOError); begin - EmitEvent(aEventName, aData.AsJSon, aCallback, aOnError); + if aData <> nil then + EmitEvent(aEventName, aData.AsJSon, aCallback, aOnError) + else + EmitEvent(aEventName, '', aCallback, aOnError); end; function TSocketIOContext.GetCustomData: TObject; diff --git a/uROIndyHTTPWebsocketChannel.pas b/uROIndyHTTPWebsocketChannel.pas index ce274bf..32649a3 100644 --- a/uROIndyHTTPWebsocketChannel.pas +++ b/uROIndyHTTPWebsocketChannel.pas @@ -236,103 +236,108 @@ var wscode: TWSDataCode; swstext: utf8string; begin - //http server supports websockets? - if not FTriedUpgrade then - begin - if not IndyClient.IOHandler.IsWebsocket then //not already upgraded? - TryUpgradeToWebsocket; - FTriedUpgrade := True; //one shot - end; - - ws := IndyClient.IOHandler as TIdIOHandlerWebsocket; - if not ws.IsWebsocket then - //normal http dispatch - inherited IntDispatch(aRequest, aResponse) - else - //websocket dispatch - begin - ws.Lock; - try - //write messagenr at end - aRequest.Position := aRequest.Size; - Inc(FMessageNr); - iMsgNr := FMessageNr; - aRequest.Write(C_RO_WS_NR, Length(C_RO_WS_NR)); - aRequest.Write(iMsgNr, SizeOf(iMsgNr)); - aRequest.Position := 0; - - CheckConnection; - - //write - IndyClient.IOHandler.Write(aRequest); - - iMsgNr2 := 0; - while iMsgNr2 <= 0 do - begin - aResponse.Size := 0; //clear - //first is the data type TWSDataType(text or bin), but is ignore/not needed - wscode := TWSDataCode(IndyClient.IOHandler.ReadLongWord); - //next the size + data = stream - IndyClient.IOHandler.ReadStream(aResponse); - //ignore ping/pong messages - if wscode in [wdcPing, wdcPong] then Continue; - if aResponse.Size >= Length(C_RO_WS_NR) + SizeOf(iMsgNr) then - begin - //get event or message nr - aResponse.Position := aResponse.Size - Length(C_RO_WS_NR) - SizeOf(iMsgNr2); - aResponse.Read(cWSNR[0], Length(cWSNR)); - end; - - if (cWSNR = C_RO_WS_NR) then - begin - aResponse.Read(iMsgNr2, SizeOf(iMsgNr2)); - aResponse.Size := aResponse.Size - Length(C_RO_WS_NR) - SizeOf(iMsgNr2); //trunc - aResponse.Position := 0; - - //event? - if iMsgNr2 < 0 then - begin - //events during dispatch? channel is busy so offload event dispatching to different thread! - AsyncDispatchEvent(aResponse); - aResponse.Size := 0; - { - ws.Unlock; - try - IntDispatchEvent(aResponse); - aResponse.Size := 0; - finally - ws.Lock; - end; - } - end; - end - else - begin - aResponse.Position := 0; - if wscode = wdcBinary then - begin - Self.IndyClient.AsyncDispatchEvent(aResponse); - end - else if wscode = wdcText then - begin - SetLength(swstext, aResponse.Size); - aResponse.Read(swstext[1], aResponse.Size); - if swstext <> '' then - begin - Self.IndyClient.AsyncDispatchEvent(string(swstext)); - end; - end; - end; - end; - except - ws.Unlock; //always unlock - ResetChannel; - Raise; + IndyClient.Lock; + try + //http server supports websockets? + if not FTriedUpgrade then + begin + if not IndyClient.IOHandler.IsWebsocket then //not already upgraded? + TryUpgradeToWebsocket; + FTriedUpgrade := True; //one shot end; - ws.Unlock; //normal unlock (no extra try finally needed) - if iMsgNr2 <> iMsgNr then - Assert(iMsgNr2 = iMsgNr, 'Message number mismatch between send and received!'); + ws := IndyClient.IOHandler as TIdIOHandlerWebsocket; + if not ws.IsWebsocket then + //normal http dispatch + inherited IntDispatch(aRequest, aResponse) + else + //websocket dispatch + begin + ws.Lock; + try + //write messagenr at end + aRequest.Position := aRequest.Size; + Inc(FMessageNr); + iMsgNr := FMessageNr; + aRequest.Write(C_RO_WS_NR, Length(C_RO_WS_NR)); + aRequest.Write(iMsgNr, SizeOf(iMsgNr)); + aRequest.Position := 0; + + CheckConnection; + + //write + IndyClient.IOHandler.Write(aRequest); + + iMsgNr2 := 0; + while iMsgNr2 <= 0 do + begin + aResponse.Size := 0; //clear + //first is the data type TWSDataType(text or bin), but is ignore/not needed + wscode := TWSDataCode(IndyClient.IOHandler.ReadLongWord); + //next the size + data = stream + IndyClient.IOHandler.ReadStream(aResponse); + //ignore ping/pong messages + if wscode in [wdcPing, wdcPong] then Continue; + if aResponse.Size >= Length(C_RO_WS_NR) + SizeOf(iMsgNr) then + begin + //get event or message nr + aResponse.Position := aResponse.Size - Length(C_RO_WS_NR) - SizeOf(iMsgNr2); + aResponse.Read(cWSNR[0], Length(cWSNR)); + end; + + if (cWSNR = C_RO_WS_NR) then + begin + aResponse.Read(iMsgNr2, SizeOf(iMsgNr2)); + aResponse.Size := aResponse.Size - Length(C_RO_WS_NR) - SizeOf(iMsgNr2); //trunc + aResponse.Position := 0; + + //event? + if iMsgNr2 < 0 then + begin + //events during dispatch? channel is busy so offload event dispatching to different thread! + AsyncDispatchEvent(aResponse); + aResponse.Size := 0; + { + ws.Unlock; + try + IntDispatchEvent(aResponse); + aResponse.Size := 0; + finally + ws.Lock; + end; + } + end; + end + else + begin + aResponse.Position := 0; + if wscode = wdcBinary then + begin + Self.IndyClient.AsyncDispatchEvent(aResponse); + end + else if wscode = wdcText then + begin + SetLength(swstext, aResponse.Size); + aResponse.Read(swstext[1], aResponse.Size); + if swstext <> '' then + begin + Self.IndyClient.AsyncDispatchEvent(string(swstext)); + end; + end; + end; + end; + except + ws.Unlock; //always unlock + ResetChannel; + Raise; + end; + ws.Unlock; //normal unlock (no extra try finally needed) + + if iMsgNr2 <> iMsgNr then + Assert(iMsgNr2 = iMsgNr, 'Message number mismatch between send and received!'); + end; + finally + IndyClient.UnLock; end; end;