From 5a7310896bb875b38dea1efff7151516bc8d0c4d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Mussche?= Date: Fri, 31 Jan 2014 20:22:10 +0100 Subject: [PATCH] better client reconnect + small fixes --- IdHTTPWebsocketClient.pas | 189 +++++++++++++++++++++++--------------- IdSocketIOHandling.pas | 44 +++++++-- 2 files changed, 153 insertions(+), 80 deletions(-) diff --git a/IdHTTPWebsocketClient.pas b/IdHTTPWebsocketClient.pas index 3c9b7dd..cf3862a 100644 --- a/IdHTTPWebsocketClient.pas +++ b/IdHTTPWebsocketClient.pas @@ -65,6 +65,9 @@ type function TryUpgradeToWebsocket: Boolean; procedure UpgradeToWebsocket; + procedure Lock; + procedure UnLock; + procedure Connect; override; function TryConnect: Boolean; procedure Disconnect(ANotifyPeer: Boolean); override; @@ -85,6 +88,7 @@ type end; // on error + (* TIdHTTPSocketIOClient_old = class(TIdHTTPWebsocketClient) private FOnConnected: TNotifyEvent; @@ -114,6 +118,7 @@ type property OnSocketIOJson : TSocketIOMsg read FOnSocketIOJson write FOnSocketIOJson; property OnSocketIOEvent: TSocketIOMsg read FOnSocketIOEvent write FOnSocketIOEvent; end; + *) TIdWebsocketMultiReadThread = class(TThread) private @@ -267,23 +272,35 @@ end; procedure TIdHTTPWebsocketClient.Connect; begin - //clear inputbuffer, otherwise it can't connect :( - if (IOHandler <> nil) then - IOHandler.Clear; - - //FHeartBeat.Enabled := True; - if SocketIOCompatible and - not FSocketIOConnectBusy then - begin - FSocketIOConnectBusy := True; - try - TryUpgradeToWebsocket; //socket.io connects using HTTP, so no seperate .Connect needed (only gives Connection closed gracefully exceptions because of new http command) - finally - FSocketIOConnectBusy := False; + Lock; + try + if Connected then + begin + TryUpgradeToWebsocket; + Exit; end; - end - else - inherited Connect; + + //FHeartBeat.Enabled := True; + if SocketIOCompatible and + not FSocketIOConnectBusy then + begin + FSocketIOConnectBusy := True; + try + TryUpgradeToWebsocket; //socket.io connects using HTTP, so no seperate .Connect needed (only gives Connection closed gracefully exceptions because of new http command) + finally + FSocketIOConnectBusy := False; + end; + end + else + begin + //clear inputbuffer, otherwise it can't connect :( + if (IOHandler <> nil) then IOHandler.Clear; + + inherited Connect; + end; + finally + UnLock; + end; end; destructor TIdHTTPWebsocketClient.Destroy; @@ -306,7 +323,10 @@ end; procedure TIdHTTPWebsocketClient.DisConnect(ANotifyPeer: Boolean); begin - TIdWebsocketMultiReadThread.Instance.RemoveClient(Self); + if not SocketIOCompatible and + ( (IOHandler <> nil) and not IOHandler.IsWebsocket) + then + TIdWebsocketMultiReadThread.Instance.RemoveClient(Self); if ANotifyPeer and SocketIOCompatible then FSocketIO.WriteDisConnect(FSocketIOContext as TSocketIOContext) @@ -401,34 +421,54 @@ end; function TIdHTTPWebsocketClient.TryConnect: Boolean; begin + Lock; try - if Connected then Exit(True); + try + if Connected then Exit(True); - Connect; - Result := Connected; - if Result and SocketIOCompatible then - Result := TryUpgradeToWebsocket; - except - Result := False; - end + Connect; + Result := Connected; + if Result then + Result := TryUpgradeToWebsocket + except + Result := False; + end + finally + UnLock; + end; end; function TIdHTTPWebsocketClient.TryUpgradeToWebsocket: Boolean; var sError: string; begin - if (IOHandler <> nil) and IOHandler.IsWebsocket then Exit(True); + Lock; + try + if (IOHandler <> nil) and IOHandler.IsWebsocket then Exit(True); - InternalUpgradeToWebsocket(False{no raise}, sError); - Result := (sError = ''); + InternalUpgradeToWebsocket(False{no raise}, sError); + Result := (sError = ''); + finally + UnLock; + end; +end; + +procedure TIdHTTPWebsocketClient.UnLock; +begin + System.TMonitor.Exit(Self); end; procedure TIdHTTPWebsocketClient.UpgradeToWebsocket; var sError: string; begin - if not IOHandler.IsWebsocket then - InternalUpgradeToWebsocket(True{raise}, sError); + Lock; + try + if not IOHandler.IsWebsocket then + InternalUpgradeToWebsocket(True{raise}, sError); + finally + UnLock; + end; end; procedure TIdHTTPWebsocketClient.InternalUpgradeToWebsocket(aRaiseException: Boolean; out aFailedReason: string); @@ -560,13 +600,6 @@ begin Response.Clear; if IOHandler.CheckForDataOnSource(ReadTimeout) then Response.ResponseText := IOHandler.InputBufferAsString(); - //for now: timer in mainthread? -// TThread.Queue(nil, -// procedure -// begin -// FHeartBeat.Interval := 2 * 1000; -// FHeartBeat.Enabled := True; -// end); end else begin @@ -623,27 +656,27 @@ begin if SocketIOCompatible then begin -// if FSocketIOContext = nil then -// begin - FSocketIOContext := TSocketIOContext.Create(Self); -// IInterface(FSocketIOContext)._AddRef; -// end -// else -// FSocketIOContext.Create(Self); //update with new iohandler etc - + FSocketIOContext := TSocketIOContext.Create(Self); (FSocketIOContext as TSocketIOContext).ConnectSend := True; //connect already send via url? GET /socket.io/1/websocket/9elrbEFqiimV29QAM6T- FSocketIO.WriteConnect(FSocketIOContext as TSocketIOContext); end; //always read the data! (e.g. RO use override of AsyncDispatchEvent to process data) //if Assigned(OnBinData) or Assigned(OnTextData) then - TIdWebsocketMultiReadThread.Instance.AddClient(Self); finally Request.Clear; strmResponse.Free; + + //add to thread for auto retry/reconnect + TIdWebsocketMultiReadThread.Instance.AddClient(Self); end; end; +procedure TIdHTTPWebsocketClient.Lock; +begin + System.TMonitor.Enter(Self); +end; + function TIdHTTPWebsocketClient.MakeImplicitClientHandler: TIdIOHandler; begin Result := TIdIOHandlerWebsocket.Create(nil); @@ -681,7 +714,7 @@ procedure TIdHTTPWebsocketClient.ResetChannel; //var // ws: TIdIOHandlerWebsocket; begin - TIdWebsocketMultiReadThread.Instance.RemoveClient(Self); +// TIdWebsocketMultiReadThread.Instance.RemoveClient(Self); keep for reconnect if IOHandler <> nil then begin @@ -727,6 +760,7 @@ end; { TIdHTTPSocketIOClient } +(* procedure TIdHTTPSocketIOClient_old.AfterConstruction; begin inherited; @@ -851,7 +885,7 @@ begin end; end; -(*) +) procedure TIdHTTPSocketIOClient_old.ProcessSocketIORequest( const strmRequest: TStream); @@ -977,7 +1011,7 @@ procedure TIdWebsocketMultiReadThread.AddClient( aChannel: TIdHTTPWebsocketClient); var l: TList; begin - Assert( (aChannel.IOHandler as TIdIOHandlerWebsocket).IsWebsocket, 'Channel is not a websocket'); + //Assert( (aChannel.IOHandler as TIdIOHandlerWebsocket).IsWebsocket, 'Channel is not a websocket'); l := FChannels.LockList; try @@ -1103,7 +1137,9 @@ begin chn := TIdHTTPWebsocketClient(l.Items[i]); ws := chn.IOHandler as TIdIOHandlerWebsocket; //valid? - if (chn.Socket.Binding.Handle > 0) and + if (chn.Socket <> nil) and + (chn.Socket.Binding <> nil) and + (chn.Socket.Binding.Handle > 0) and (chn.Socket.Binding.Handle <> INVALID_SOCKET) then begin //more than 10s nothing done? then send ping @@ -1118,7 +1154,8 @@ begin else if not chn.Connected then begin try - ws.LastActivityTime := Now; + if ws <> nil then + ws.LastActivityTime := Now; chn.ConnectTimeout := 250; //250ms otherwise too much delay? todo: seperate ping/connnect thread chn.Connect; chn.TryUpgradeToWebsocket; @@ -1155,6 +1192,8 @@ begin chn := TIdHTTPWebsocketClient(l.Items[i]); //valid? if //not chn.Busy and also take busy channels (will be ignored later), otherwise we have to break/reset for each RO function execution + (chn.Socket <> nil) and + (chn.Socket.Binding <> nil) and (chn.Socket.Binding.Handle > 0) and (chn.Socket.Binding.Handle <> INVALID_SOCKET) then begin @@ -1181,7 +1220,8 @@ begin Fexceptionset.fd_array[0] := FTempHandle; //wait 15s till some data - Finterval.tv_sec := 15; //15s + Finterval.tv_sec := 5; //5s + {$MESSAGE HINT 'make wait timeout configurable + less in case of reconnect '} Finterval.tv_usec := 0; //nothing to wait for? then sleep some time to prevent 100% CPU @@ -1216,24 +1256,29 @@ begin begin chn := TIdHTTPWebsocketClient(l.Items[i]); ws := chn.IOHandler as TIdIOHandlerWebsocket; + if (ws = nil) then Continue; + if ws.TryLock then //IOHandler.Readable cannot be done during pending action! try - try - //try to process all events - while chn.IOHandler.HasData or - chn.IOHandler.Readable(0) do //has some data - begin - if strmEvent = nil then - strmEvent := TMemoryStream.Create; - strmEvent.Clear; + try + //try to process all events + while chn.IOHandler.HasData or + chn.IOHandler.Readable(0) do //has some data + begin + if strmEvent = nil then + strmEvent := TMemoryStream.Create; + strmEvent.Clear; - //first is the data type TWSDataType(text or bin), but is ignore/not needed - wscode := TWSDataCode(chn.IOHandler.ReadLongWord); - //next the size + data = stream - chn.IOHandler.ReadStream(strmEvent); + //first is the data type TWSDataType(text or bin), but is ignore/not needed + wscode := TWSDataCode(chn.IOHandler.ReadLongWord); + if not (wscode in [wdcText, wdcBinary, wdcPing, wdcPong]) then + Continue; - //ignore ping/pong messages - if wscode in [wdcPing, wdcPong] then Continue; + //next the size + data = stream + chn.IOHandler.ReadStream(strmEvent); + + //ignore ping/pong messages + if wscode in [wdcPing, wdcPong] then Continue; //fire event //offload event dispatching to different thread! otherwise deadlocks possible? (do to synchronize) @@ -1251,12 +1296,12 @@ begin chn.AsyncDispatchEvent(string(swstext)); end; end; - end; - except - l := nil; - FChannels.UnlockList; - chn.ResetChannel; - raise; + end; + except + l := nil; + FChannels.UnlockList; + chn.ResetChannel; + raise; end; finally ws.Unlock; diff --git a/IdSocketIOHandling.pas b/IdSocketIOHandling.pas index fd70564..9725006 100644 --- a/IdSocketIOHandling.pas +++ b/IdSocketIOHandling.pas @@ -149,6 +149,7 @@ type procedure Lock; procedure UnLock; + function ConnectionCount: Integer; // procedure EmitEventToAll(const aEventName: string; const aData: ISuperObject; const aCallback: TSocketIOMsgJSON = nil); function NewConnection(const AContext: TIdContext): TSocketIOContext;overload; @@ -192,6 +193,30 @@ begin FSocketIOErrorRef := TDictionary.Create; end; +function TIdBaseSocketIOHandling.ConnectionCount: Integer; +var + context: TSocketIOContext; +begin + Lock; + try + Result := 0; + + //note: is single connection? + for context in FConnections.Values do + begin + if context.IsDisconnected then Continue; + Inc(Result); + end; + for context in FConnectionsGUID.Values do + begin + if context.IsDisconnected then Continue; + Inc(Result); + end; + finally + UnLock; + end; +end; + destructor TIdBaseSocketIOHandling.Destroy; var squid: string; idcontext: TIdContext; @@ -1236,12 +1261,15 @@ var jsonarray: string; isendcount: Integer; begin - if aData.IsType(stArray) then - jsonarray := aData.AsString - else if aData.IsType(stString) then - jsonarray := '["' + aData.AsString + '"]' - else - jsonarray := '[' + aData.AsString + ']'; + if aData <> nil then + begin + if aData.IsType(stArray) then + jsonarray := aData.AsString + else if aData.IsType(stString) then + jsonarray := '["' + aData.AsString + '"]' + else + jsonarray := '[' + aData.AsString + ']'; + end; Lock; try @@ -1278,7 +1306,7 @@ begin end; if isendcount = 0 then - raise EIdSocketIoUnhandledMessage.Create('No socket.io connections!'); + raise EIdSocketIoUnhandledMessage.Create('Cannot emit: no socket.io connections!'); finally UnLock; end; @@ -1325,7 +1353,7 @@ begin end; if isendcount = 0 then - raise EIdSocketIoUnhandledMessage.Create('No socket.io connections!'); + raise EIdSocketIoUnhandledMessage.Create('Cannot send: no socket.io connections!'); finally UnLock; end;