diff --git a/IdHTTPWebsocketClient.pas b/IdHTTPWebsocketClient.pas index 8674db5..3c9b7dd 100644 --- a/IdHTTPWebsocketClient.pas +++ b/IdHTTPWebsocketClient.pas @@ -48,8 +48,8 @@ type FSocketIOContext: ISocketIOContext; FSocketIOConnectBusy: Boolean; - FHeartBeat: TTimer; - procedure HeartBeatTimer(Sender: TObject); + //FHeartBeat: TTimer; + //procedure HeartBeatTimer(Sender: TObject); function GetSocketIO: TIdSocketIOHandling; protected procedure InternalUpgradeToWebsocket(aRaiseException: Boolean; out aFailedReason: string);virtual; @@ -68,6 +68,8 @@ type procedure Connect; override; function TryConnect: Boolean; procedure Disconnect(ANotifyPeer: Boolean); override; + function CheckConnection: Boolean; + procedure Ping; property IOHandler: TIdIOHandlerWebsocket read GetIOHandlerWS write SetIOHandlerWS; property OnBinData : TDataBinEvent read FOnData write SetOnData; @@ -127,6 +129,7 @@ type protected FChannels: TThreadList; procedure ReadFromAllChannels; + procedure PingAllChannels; procedure Execute; override; public @@ -155,7 +158,7 @@ implementation uses IdCoderMIME, SysUtils, Math, IdException, IdStackConsts, IdStack, - IdStackBSDBase, IdGlobal, Windows, StrUtils; + IdStackBSDBase, IdGlobal, Windows, StrUtils, DateUtils; //type // TAnonymousThread = class(TThread) @@ -198,9 +201,9 @@ begin ManagedIOHandler := True; FSocketIO := TIdSocketIOHandling_Ext.Create; - FHeartBeat := TTimer.Create(nil); - FHeartBeat.Enabled := False; - FHeartBeat.OnTimer := HeartBeatTimer; +// FHeartBeat := TTimer.Create(nil); +// FHeartBeat.Enabled := False; +// FHeartBeat.OnTimer := HeartBeatTimer; end; procedure TIdHTTPWebsocketClient.AsyncDispatchEvent(const aEvent: TStream); @@ -238,12 +241,37 @@ begin end); end; +function TIdHTTPWebsocketClient.CheckConnection: Boolean; +begin + Result := False; + try + if (IOHandler <> nil) and + not IOHandler.ClosedGracefully and + IOHandler.Connected then + begin + IOHandler.CheckForDisconnect(True{error}, True{ignore buffer, check real connection}); + Result := True; //ok if we reach here + end; + except + on E:Exception do + begin + //clear inputbuffer, otherwise it stays connected :( +// if (IOHandler <> nil) then +// IOHandler.Clear; + Disconnect(False); + if Assigned(OnDisConnected) then + OnDisConnected(Self); + end; + end; +end; + procedure TIdHTTPWebsocketClient.Connect; begin - if IOHandler <> nil then + //clear inputbuffer, otherwise it can't connect :( + if (IOHandler <> nil) then IOHandler.Clear; - FHeartBeat.Enabled := True; + //FHeartBeat.Enabled := True; if SocketIOCompatible and not FSocketIOConnectBusy then begin @@ -259,16 +287,16 @@ begin end; destructor TIdHTTPWebsocketClient.Destroy; -var tmr: TObject; +//var tmr: TObject; begin - tmr := FHeartBeat; - FHeartBeat := nil; - TThread.Queue(nil, //otherwise free in other thread than created - procedure - begin +// tmr := FHeartBeat; +// FHeartBeat := nil; +// TThread.Queue(nil, //otherwise free in other thread than created +// procedure +// begin //FHeartBeat.Free; - tmr.Free; - end); +// tmr.Free; +// end); TIdWebsocketMultiReadThread.Instance.RemoveClient(Self); FSocketIO.Free; @@ -318,6 +346,7 @@ begin Result := FSocketIO; end; +(* procedure TIdHTTPWebsocketClient.HeartBeatTimer(Sender: TObject); begin FHeartBeat.Enabled := False; @@ -368,6 +397,7 @@ begin FHeartBeat.Enabled := True; //always enable: in case of disconnect it will re-connect end; end; +*) function TIdHTTPWebsocketClient.TryConnect: Boolean; begin @@ -531,12 +561,12 @@ begin if IOHandler.CheckForDataOnSource(ReadTimeout) then Response.ResponseText := IOHandler.InputBufferAsString(); //for now: timer in mainthread? - TThread.Queue(nil, - procedure - begin - FHeartBeat.Interval := 2 * 1000; - FHeartBeat.Enabled := True; - end); +// TThread.Queue(nil, +// procedure +// begin +// FHeartBeat.Interval := 2 * 1000; +// FHeartBeat.Enabled := True; +// end); end else begin @@ -619,6 +649,34 @@ begin Result := TIdIOHandlerWebsocket.Create(nil); end; +procedure TIdHTTPWebsocketClient.Ping; +var + ws: TIdIOHandlerWebsocket; +begin + ws := IOHandler as TIdIOHandlerWebsocket; + //socket.io? + if SocketIOCompatible and ws.IsWebsocket then + begin + FSocketIO.Lock; + try + if (FSocketIOContext <> nil) then + FSocketIO.WritePing(FSocketIOContext as TSocketIOContext); //heartbeat socket.io message + finally + FSocketIO.UnLock; + end + end + //only websocket? + else if not SocketIOCompatible and ws.IsWebsocket then + begin + if ws.TryLock then + try + ws.WriteData(nil, wdcPing); + finally + ws.Unlock; + end; + end; +end; + procedure TIdHTTPWebsocketClient.ResetChannel; //var // ws: TIdIOHandlerWebsocket; @@ -995,7 +1053,10 @@ begin begin try while not Terminated do + begin ReadFromAllChannels; + PingAllChannels; + end; except //continue end; @@ -1028,6 +1089,49 @@ begin Result := FInstance; end; +procedure TIdWebsocketMultiReadThread.PingAllChannels; +var + l: TList; + chn: TIdHTTPWebsocketClient; + ws: TIdIOHandlerWebsocket; + i: Integer; +begin + l := FChannels.LockList; + try + for i := 0 to l.Count - 1 do + begin + chn := TIdHTTPWebsocketClient(l.Items[i]); + ws := chn.IOHandler as TIdIOHandlerWebsocket; + //valid? + if (chn.Socket.Binding.Handle > 0) and + (chn.Socket.Binding.Handle <> INVALID_SOCKET) then + begin + //more than 10s nothing done? then send ping + if SecondsBetween(Now, ws.LastActivityTime) > 10 then + if chn.CheckConnection then + try + chn.Ping; + except + //retry connect the next time? + end; + end + else if not chn.Connected then + begin + try + ws.LastActivityTime := Now; + chn.ConnectTimeout := 250; //250ms otherwise too much delay? todo: seperate ping/connnect thread + chn.Connect; + chn.TryUpgradeToWebsocket; + except + //just try + end; + end; + end; + finally + FChannels.UnlockList; + end; +end; + procedure TIdWebsocketMultiReadThread.ReadFromAllChannels; var l: TList; diff --git a/IdIOHandlerWebsocket.pas b/IdIOHandlerWebsocket.pas index 317fc99..925f226 100644 --- a/IdIOHandlerWebsocket.pas +++ b/IdIOHandlerWebsocket.pas @@ -36,6 +36,7 @@ type FCloseReason: string; FCloseCode: Integer; FClosing: Boolean; + FLastActivityTime: TDateTime; class var FUseSingleWriteThread: Boolean; protected FMessageStream: TMemoryStream; @@ -89,6 +90,8 @@ type procedure Write(AStream: TStream; aType: TWSDataType); overload; procedure WriteBufferFlush(AByteCount: Integer); override; + property LastActivityTime: TDateTime read FLastActivityTime write FLastActivityTime; + class property UseSingleWriteThread: Boolean read FUseSingleWriteThread write FUseSingleWriteThread; end; @@ -819,6 +822,7 @@ begin SetLength(aData, 0); if not _WaitByte(False) then Exit; + FLastActivityTime := Now; //received some data //wait + process data iByte := _GetByte; @@ -930,7 +934,9 @@ begin Assert(Binding <> nil); strmData := TMemoryStream.Create; + Lock; try + FLastActivityTime := Now; //sending some data (* 0 1 2 3 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 (nr) 7 6 5 4 3 2 1 0|7 6 5 4 3 2 1 0|7 6 5 4 3 2 1 0|7 6 5 4 3 2 1 0 (bit) @@ -1033,6 +1039,7 @@ begin //if debughook > 0 then // OutputDebugString(PChar('Written: ' + BytesToStringRaw(bData))); finally + Unlock; strmData.Free; end; end;