ping + reconnect in read thread

This commit is contained in:
André Mussche 2014-01-23 16:30:39 +01:00
parent 4842291225
commit 73ff121faa
2 changed files with 133 additions and 22 deletions

View file

@ -48,8 +48,8 @@ type
FSocketIOContext: ISocketIOContext;
FSocketIOConnectBusy: Boolean;
FHeartBeat: TTimer;
procedure HeartBeatTimer(Sender: TObject);
//FHeartBeat: TTimer;
//procedure HeartBeatTimer(Sender: TObject);
function GetSocketIO: TIdSocketIOHandling;
protected
procedure InternalUpgradeToWebsocket(aRaiseException: Boolean; out aFailedReason: string);virtual;
@ -68,6 +68,8 @@ type
procedure Connect; override;
function TryConnect: Boolean;
procedure Disconnect(ANotifyPeer: Boolean); override;
function CheckConnection: Boolean;
procedure Ping;
property IOHandler: TIdIOHandlerWebsocket read GetIOHandlerWS write SetIOHandlerWS;
property OnBinData : TDataBinEvent read FOnData write SetOnData;
@ -127,6 +129,7 @@ type
protected
FChannels: TThreadList;
procedure ReadFromAllChannels;
procedure PingAllChannels;
procedure Execute; override;
public
@ -155,7 +158,7 @@ implementation
uses
IdCoderMIME, SysUtils, Math, IdException, IdStackConsts, IdStack,
IdStackBSDBase, IdGlobal, Windows, StrUtils;
IdStackBSDBase, IdGlobal, Windows, StrUtils, DateUtils;
//type
// TAnonymousThread = class(TThread)
@ -198,9 +201,9 @@ begin
ManagedIOHandler := True;
FSocketIO := TIdSocketIOHandling_Ext.Create;
FHeartBeat := TTimer.Create(nil);
FHeartBeat.Enabled := False;
FHeartBeat.OnTimer := HeartBeatTimer;
// FHeartBeat := TTimer.Create(nil);
// FHeartBeat.Enabled := False;
// FHeartBeat.OnTimer := HeartBeatTimer;
end;
procedure TIdHTTPWebsocketClient.AsyncDispatchEvent(const aEvent: TStream);
@ -238,12 +241,37 @@ begin
end);
end;
function TIdHTTPWebsocketClient.CheckConnection: Boolean;
begin
Result := False;
try
if (IOHandler <> nil) and
not IOHandler.ClosedGracefully and
IOHandler.Connected then
begin
IOHandler.CheckForDisconnect(True{error}, True{ignore buffer, check real connection});
Result := True; //ok if we reach here
end;
except
on E:Exception do
begin
//clear inputbuffer, otherwise it stays connected :(
// if (IOHandler <> nil) then
// IOHandler.Clear;
Disconnect(False);
if Assigned(OnDisConnected) then
OnDisConnected(Self);
end;
end;
end;
procedure TIdHTTPWebsocketClient.Connect;
begin
if IOHandler <> nil then
//clear inputbuffer, otherwise it can't connect :(
if (IOHandler <> nil) then
IOHandler.Clear;
FHeartBeat.Enabled := True;
//FHeartBeat.Enabled := True;
if SocketIOCompatible and
not FSocketIOConnectBusy then
begin
@ -259,16 +287,16 @@ begin
end;
destructor TIdHTTPWebsocketClient.Destroy;
var tmr: TObject;
//var tmr: TObject;
begin
tmr := FHeartBeat;
FHeartBeat := nil;
TThread.Queue(nil, //otherwise free in other thread than created
procedure
begin
// tmr := FHeartBeat;
// FHeartBeat := nil;
// TThread.Queue(nil, //otherwise free in other thread than created
// procedure
// begin
//FHeartBeat.Free;
tmr.Free;
end);
// tmr.Free;
// end);
TIdWebsocketMultiReadThread.Instance.RemoveClient(Self);
FSocketIO.Free;
@ -318,6 +346,7 @@ begin
Result := FSocketIO;
end;
(*
procedure TIdHTTPWebsocketClient.HeartBeatTimer(Sender: TObject);
begin
FHeartBeat.Enabled := False;
@ -368,6 +397,7 @@ begin
FHeartBeat.Enabled := True; //always enable: in case of disconnect it will re-connect
end;
end;
*)
function TIdHTTPWebsocketClient.TryConnect: Boolean;
begin
@ -531,12 +561,12 @@ begin
if IOHandler.CheckForDataOnSource(ReadTimeout) then
Response.ResponseText := IOHandler.InputBufferAsString();
//for now: timer in mainthread?
TThread.Queue(nil,
procedure
begin
FHeartBeat.Interval := 2 * 1000;
FHeartBeat.Enabled := True;
end);
// TThread.Queue(nil,
// procedure
// begin
// FHeartBeat.Interval := 2 * 1000;
// FHeartBeat.Enabled := True;
// end);
end
else
begin
@ -619,6 +649,34 @@ begin
Result := TIdIOHandlerWebsocket.Create(nil);
end;
procedure TIdHTTPWebsocketClient.Ping;
var
ws: TIdIOHandlerWebsocket;
begin
ws := IOHandler as TIdIOHandlerWebsocket;
//socket.io?
if SocketIOCompatible and ws.IsWebsocket then
begin
FSocketIO.Lock;
try
if (FSocketIOContext <> nil) then
FSocketIO.WritePing(FSocketIOContext as TSocketIOContext); //heartbeat socket.io message
finally
FSocketIO.UnLock;
end
end
//only websocket?
else if not SocketIOCompatible and ws.IsWebsocket then
begin
if ws.TryLock then
try
ws.WriteData(nil, wdcPing);
finally
ws.Unlock;
end;
end;
end;
procedure TIdHTTPWebsocketClient.ResetChannel;
//var
// ws: TIdIOHandlerWebsocket;
@ -995,7 +1053,10 @@ begin
begin
try
while not Terminated do
begin
ReadFromAllChannels;
PingAllChannels;
end;
except
//continue
end;
@ -1028,6 +1089,49 @@ begin
Result := FInstance;
end;
procedure TIdWebsocketMultiReadThread.PingAllChannels;
var
l: TList;
chn: TIdHTTPWebsocketClient;
ws: TIdIOHandlerWebsocket;
i: Integer;
begin
l := FChannels.LockList;
try
for i := 0 to l.Count - 1 do
begin
chn := TIdHTTPWebsocketClient(l.Items[i]);
ws := chn.IOHandler as TIdIOHandlerWebsocket;
//valid?
if (chn.Socket.Binding.Handle > 0) and
(chn.Socket.Binding.Handle <> INVALID_SOCKET) then
begin
//more than 10s nothing done? then send ping
if SecondsBetween(Now, ws.LastActivityTime) > 10 then
if chn.CheckConnection then
try
chn.Ping;
except
//retry connect the next time?
end;
end
else if not chn.Connected then
begin
try
ws.LastActivityTime := Now;
chn.ConnectTimeout := 250; //250ms otherwise too much delay? todo: seperate ping/connnect thread
chn.Connect;
chn.TryUpgradeToWebsocket;
except
//just try
end;
end;
end;
finally
FChannels.UnlockList;
end;
end;
procedure TIdWebsocketMultiReadThread.ReadFromAllChannels;
var
l: TList;

View file

@ -36,6 +36,7 @@ type
FCloseReason: string;
FCloseCode: Integer;
FClosing: Boolean;
FLastActivityTime: TDateTime;
class var FUseSingleWriteThread: Boolean;
protected
FMessageStream: TMemoryStream;
@ -89,6 +90,8 @@ type
procedure Write(AStream: TStream; aType: TWSDataType); overload;
procedure WriteBufferFlush(AByteCount: Integer); override;
property LastActivityTime: TDateTime read FLastActivityTime write FLastActivityTime;
class property UseSingleWriteThread: Boolean read FUseSingleWriteThread write FUseSingleWriteThread;
end;
@ -819,6 +822,7 @@ begin
SetLength(aData, 0);
if not _WaitByte(False) then Exit;
FLastActivityTime := Now; //received some data
//wait + process data
iByte := _GetByte;
@ -930,7 +934,9 @@ begin
Assert(Binding <> nil);
strmData := TMemoryStream.Create;
Lock;
try
FLastActivityTime := Now; //sending some data
(* 0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 (nr)
7 6 5 4 3 2 1 0|7 6 5 4 3 2 1 0|7 6 5 4 3 2 1 0|7 6 5 4 3 2 1 0 (bit)
@ -1033,6 +1039,7 @@ begin
//if debughook > 0 then
// OutputDebugString(PChar('Written: ' + BytesToStringRaw(bData)));
finally
Unlock;
strmData.Free;
end;
end;