From 13dbfbba4b0093f66070db1d9b07b4b2bb3ccc38 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Mussche?= Date: Fri, 7 Mar 2014 12:19:32 +0100 Subject: [PATCH] reconnect in seperate thread, callback object reference counted (more async), etc --- IdHTTPWebsocketClient.pas | 140 +++++++++++++++++++++++++++----- IdSocketIOHandling.pas | 69 ++++++++++------ uROIndyHTTPWebsocketChannel.pas | 10 ++- 3 files changed, 171 insertions(+), 48 deletions(-) diff --git a/IdHTTPWebsocketClient.pas b/IdHTTPWebsocketClient.pas index 725a8fb..1505cd9 100644 --- a/IdHTTPWebsocketClient.pas +++ b/IdHTTPWebsocketClient.pas @@ -123,10 +123,16 @@ type end; *) + TWSThreadList = class(TThreadList) + public + function Count: Integer; + end; + TIdWebsocketMultiReadThread = class(TThread) private class var FInstance: TIdWebsocketMultiReadThread; protected + FReadTimeout: Integer; FTempHandle: THandle; FPendingBreak: Boolean; Freadset, Fexceptionset: TFDSet; @@ -136,6 +142,8 @@ type procedure BreakSelectWait; protected FChannels: TThreadList; + FReconnectlist: TWSThreadList; + FReconnectThread: TIdWebsocketQueueThread; procedure ReadFromAllChannels; procedure PingAllChannels; @@ -149,6 +157,8 @@ type procedure AddClient (aChannel: TIdHTTPWebsocketClient); procedure RemoveClient(aChannel: TIdHTTPWebsocketClient); + property ReadTimeout: Integer read FReadTimeout write FReadTimeout default 5000; + class function Instance: TIdWebsocketMultiReadThread; class procedure RemoveInstance; end; @@ -287,18 +297,17 @@ begin if SocketIOCompatible and not FSocketIOConnectBusy then begin - FSocketIOConnectBusy := True; - try + //FSocketIOConnectBusy := True; + //try TryUpgradeToWebsocket; //socket.io connects using HTTP, so no seperate .Connect needed (only gives Connection closed gracefully exceptions because of new http command) - finally - FSocketIOConnectBusy := False; - end; + //finally + // FSocketIOConnectBusy := False; + //end; end else begin //clear inputbuffer, otherwise it can't connect :( if (IOHandler <> nil) then IOHandler.Clear; - inherited Connect; end; finally @@ -450,6 +459,7 @@ function TIdHTTPWebsocketClient.TryUpgradeToWebsocket: Boolean; var sError: string; begin + FSocketIOConnectBusy := True; Lock; try if (IOHandler <> nil) and IOHandler.IsWebsocket then Exit(True); @@ -457,6 +467,7 @@ begin InternalUpgradeToWebsocket(False{no raise}, sError); Result := (sError = ''); finally + FSocketIOConnectBusy := False; UnLock; end; end; @@ -491,6 +502,9 @@ begin //remove from thread during connection handling TIdWebsocketMultiReadThread.Instance.RemoveClient(Self); + if (IOHandler <> nil) then + IOHandler.Clear; + strmResponse := TMemoryStream.Create; try //special socket.io handling, see https://github.com/LearnBoost/socket.io-spec @@ -663,6 +677,7 @@ begin //upgrade succesful IOHandler.IsWebsocket := True; aFailedReason := ''; + Assert(Connected); if SocketIOCompatible then begin @@ -1045,6 +1060,9 @@ end; procedure TIdWebsocketMultiReadThread.AfterConstruction; begin inherited; + + ReadTimeout := 5000; + FChannels := TThreadList.Create; FillChar(Freadset, SizeOf(Freadset), 0); FillChar(Fexceptionset, SizeOf(Fexceptionset), 0); @@ -1169,24 +1187,82 @@ begin end else if not chn.Connected then begin - if chn.TryLock then - try - try - if ws <> nil then - ws.LastActivityTime := Now; - chn.ConnectTimeout := 250; //250ms otherwise too much delay? todo: seperate ping/connnect thread - chn.TryUpgradeToWebsocket; - except - //just try - end; - finally - chn.Unlock; - end; + if (ws <> nil) and + (SecondsBetween(Now, ws.LastActivityTime) < 5) + then + Continue; + + if FReconnectlist = nil then + FReconnectlist := TWSThreadList.Create; + //if chn.TryLock then + FReconnectlist.Add(chn); end; end; finally FChannels.UnlockList; end; + + //reconnect needed? (in background) + if FReconnectlist.Count > 0 then + begin + if FReconnectThread = nil then + FReconnectThread := TIdWebsocketQueueThread.Create(False{direct start}); + FReconnectThread.QueueEvent( + procedure + var + l: TList; + chn: TIdHTTPWebsocketClient; + begin + while FReconnectlist.Count > 0 do + begin + chn := nil; + try + //get first one + l := FReconnectlist.LockList; + try + if l.Count <= 0 then Exit; + + chn := TObject(l.Items[0]) as TIdHTTPWebsocketClient; + if not chn.TryLock then + begin + l.Delete(0); + chn := nil; + Continue; + end; + finally + FReconnectlist.UnlockList; + end; + + //try reconnect + ws := chn.IOHandler as TIdIOHandlerWebsocket; + if ( (ws = nil) or + (SecondsBetween(Now, ws.LastActivityTime) >= 5) ) then + begin + try + if ws <> nil then + ws.LastActivityTime := Now; + chn.ConnectTimeout := 1000; + chn.TryUpgradeToWebsocket; + except + //just try + end; + end; + + //remove from todo list + l := FReconnectlist.LockList; + try + if l.Count > 0 then + l.Delete(0); + finally + FReconnectlist.UnlockList; + end; + finally + if chn <> nil then + chn.Unlock; + end; + end; + end); + end; end; procedure TIdWebsocketMultiReadThread.ReadFromAllChannels; @@ -1242,9 +1318,8 @@ begin Fexceptionset.fd_array[0] := FTempHandle; //wait 15s till some data - Finterval.tv_sec := 5; //5s - {$MESSAGE HINT 'make wait timeout configurable + less in case of reconnect '} - Finterval.tv_usec := 0; + Finterval.tv_sec := Self.ReadTimeout div 1000; //5s + Finterval.tv_usec := Self.ReadTimeout mod 1000; //nothing to wait for? then sleep some time to prevent 100% CPU if iResult = 0 then @@ -1344,7 +1419,15 @@ procedure TIdWebsocketMultiReadThread.RemoveClient( aChannel: TIdHTTPWebsocketClient); begin if Self = nil then Exit; + + aChannel.Lock; + try FChannels.Remove(aChannel); + if FReconnectlist <> nil then + FReconnectlist.Remove(aChannel); + finally + aChannel.UnLock; + end; BreakSelectWait; end; @@ -1411,6 +1494,19 @@ begin end; end; +{ TWSThreadList } + +function TWSThreadList.Count: Integer; +var l: TList; +begin + l := LockList; + try + Result := l.Count; + finally + UnlockList; + end; +end; + initialization finalization TIdWebsocketMultiReadThread.RemoveInstance; diff --git a/IdSocketIOHandling.pas b/IdSocketIOHandling.pas index c9f5298..c859abd 100644 --- a/IdSocketIOHandling.pas +++ b/IdSocketIOHandling.pas @@ -13,12 +13,14 @@ type TSocketIOCallbackObj = class; TIdBaseSocketIOHandling = class; TIdSocketIOHandling = class; - ISocketIOContext = interface; - TSocketIOMsg = reference to procedure(const ASocket: ISocketIOContext; const aText: string; const aCallback: TSocketIOCallbackObj); - TSocketIOMsgJSON = reference to procedure(const ASocket: ISocketIOContext; const aJSON: ISuperObject; const aCallback: TSocketIOCallbackObj); + ISocketIOContext = interface; + ISocketIOCallback = interface; + + TSocketIOMsg = reference to procedure(const ASocket: ISocketIOContext; const aText: string; const aCallback: ISocketIOCallback); + TSocketIOMsgJSON = reference to procedure(const ASocket: ISocketIOContext; const aJSON: ISuperObject; const aCallback: ISocketIOCallback); TSocketIONotify = reference to procedure(const ASocket: ISocketIOContext); - TSocketIOEvent = reference to procedure(const ASocket: ISocketIOContext; const aArgument: TSuperArray; const aCallbackObj: TSocketIOCallbackObj); + TSocketIOEvent = reference to procedure(const ASocket: ISocketIOContext; const aArgument: TSuperArray; const aCallback: ISocketIOCallback); TSocketIOError = reference to procedure(const ASocket: ISocketIOContext; const aErrorClass, aErrorMessage: string); TSocketIONotifyList = class(TList); @@ -105,14 +107,23 @@ type procedure SendJSON(const aJSON: ISuperObject; const aCallback: TSocketIOMsgJSON = nil; const aOnError: TSocketIOError = nil); end; - TSocketIOCallbackObj = class + ISocketIOCallback = interface + ['{BCC31817-7FD8-4CF6-B68B-0F9BAA80DF90}'] + procedure SendResponse(const aResponse: string); + function IsResponseSend: Boolean; + end; + + TSocketIOCallbackObj = class(TInterfacedObject, + ISocketIOCallback) protected FHandling: TIdBaseSocketIOHandling; FSocket: TSocketIOContext; FMsgNr: Integer; - public + {ISocketIOCallback} procedure SendResponse(const aResponse: string); function IsResponseSend: Boolean; + public + constructor Create(aHandling: TIdBaseSocketIOHandling; aSocket: TSocketIOContext; aMsgNr: Integer); end; TIdBaseSocketIOHandling = class(TIdServerBaseHandling) @@ -280,9 +291,17 @@ begin Lock; try for socket in FConnections.Values do + try aEachSocketCallback(socket); + except + //continue: e.g. connnection closed etc + end; for socket in FConnectionsGUID.Values do + try aEachSocketCallback(socket); + except + //continue: e.g. connnection closed etc + end; finally Unlock; end; @@ -443,7 +462,7 @@ var args: TSuperArray; list: TSocketIOEventList; event: TSocketIOEvent; - callback: TSocketIOCallbackObj; + callback: ISocketIOCallback; // socket: TSocketIOContext; begin //'5:' [message id ('+')] ':' [message endpoint] ':' [json encoded event] @@ -462,12 +481,7 @@ begin // socket := FConnections.Items[AContext]; if aHasCallback then - begin - callback := TSocketIOCallbackObj.Create; - callback.FHandling := Self; - callback.FSocket := AContext; - callback.FMsgNr := aMsgNr; - end + callback := TSocketIOCallbackObj.Create(Self, AContext, aMsgNr) else callback := nil; try @@ -482,7 +496,7 @@ begin end; end; finally - callback.Free; + callback := nil; end; end else @@ -658,7 +672,7 @@ var // socket: TSocketIOContext; callback: TSocketIOCallback; callbackref: TSocketIOCallbackRef; - callbackobj: TSocketIOCallbackObj; + callbackobj: ISocketIOCallback; errorref: TSocketIOError; error: ISuperObject; socket: TSocketIOContext; @@ -726,11 +740,8 @@ begin begin if bCallback then begin - callbackobj := TSocketIOCallbackObj.Create; + callbackobj := TSocketIOCallbackObj.Create(Self, socket, imsg); try - callbackobj.FHandling := Self; - callbackobj.FSocket := socket; - callbackobj.FMsgNr := imsg; try OnSocketIOMsg(socket, sdata, callbackobj); //, imsg, bCallback); except @@ -741,7 +752,7 @@ begin end; end; finally - callbackobj.Free; + callbackobj := nil; end end else @@ -759,11 +770,8 @@ begin begin if bCallback then begin - callbackobj := TSocketIOCallbackObj.Create; + callbackobj := TSocketIOCallbackObj.Create(Self, socket, imsg); try - callbackobj.FHandling := Self; - callbackobj.FSocket := socket; - callbackobj.FMsgNr := imsg; try OnSocketIOJson(socket, SO(sdata), callbackobj); //, imsg, bCallback); except @@ -774,7 +782,7 @@ begin end; end; finally - callbackobj.Free; + callbackobj := nil; end end else @@ -828,11 +836,13 @@ begin if FSocketIOEventCallback.TryGetValue(imsg, callback) then begin FSocketIOEventCallback.Remove(imsg); + if Assigned(callback) then callback(sdata); end else if FSocketIOEventCallbackRef.TryGetValue(imsg, callbackref) then begin FSocketIOEventCallbackRef.Remove(imsg); + if Assigned(callbackref) then callbackref(sdata); end else ; @@ -1056,6 +1066,15 @@ end; { TSocketIOCallbackObj } +constructor TSocketIOCallbackObj.Create(aHandling: TIdBaseSocketIOHandling; + aSocket: TSocketIOContext; aMsgNr: Integer); +begin + FHandling := aHandling; + FSocket := aSocket; + FMsgNr := aMsgNr; + inherited Create(); +end; + function TSocketIOCallbackObj.IsResponseSend: Boolean; begin Result := (FMsgNr < 0); diff --git a/uROIndyHTTPWebsocketChannel.pas b/uROIndyHTTPWebsocketChannel.pas index 3ab8cf3..ce274bf 100644 --- a/uROIndyHTTPWebsocketChannel.pas +++ b/uROIndyHTTPWebsocketChannel.pas @@ -153,12 +153,20 @@ begin strmevent.CopyFrom(aEvent, aEvent.Size); //events during dispatch? channel is busy so offload event dispatching to different thread! - CreateAnonymousThread( + TIdWebsocketDispatchThread.Instance.QueueEvent( procedure begin IntDispatchEvent(strmevent); strmevent.Free; end); + + //events during dispatch? channel is busy so offload event dispatching to different thread! +// CreateAnonymousThread( +// procedure +// begin +// IntDispatchEvent(strmevent); +// strmevent.Free; +// end); end; procedure TROIndyHTTPWebsocketChannel.CheckConnection;