reconnect in seperate thread, callback object reference counted (more async), etc

This commit is contained in:
André Mussche 2014-03-07 12:19:32 +01:00
parent c8733951de
commit 13dbfbba4b
3 changed files with 171 additions and 48 deletions

View file

@ -123,10 +123,16 @@ type
end; end;
*) *)
TWSThreadList = class(TThreadList)
public
function Count: Integer;
end;
TIdWebsocketMultiReadThread = class(TThread) TIdWebsocketMultiReadThread = class(TThread)
private private
class var FInstance: TIdWebsocketMultiReadThread; class var FInstance: TIdWebsocketMultiReadThread;
protected protected
FReadTimeout: Integer;
FTempHandle: THandle; FTempHandle: THandle;
FPendingBreak: Boolean; FPendingBreak: Boolean;
Freadset, Fexceptionset: TFDSet; Freadset, Fexceptionset: TFDSet;
@ -136,6 +142,8 @@ type
procedure BreakSelectWait; procedure BreakSelectWait;
protected protected
FChannels: TThreadList; FChannels: TThreadList;
FReconnectlist: TWSThreadList;
FReconnectThread: TIdWebsocketQueueThread;
procedure ReadFromAllChannels; procedure ReadFromAllChannels;
procedure PingAllChannels; procedure PingAllChannels;
@ -149,6 +157,8 @@ type
procedure AddClient (aChannel: TIdHTTPWebsocketClient); procedure AddClient (aChannel: TIdHTTPWebsocketClient);
procedure RemoveClient(aChannel: TIdHTTPWebsocketClient); procedure RemoveClient(aChannel: TIdHTTPWebsocketClient);
property ReadTimeout: Integer read FReadTimeout write FReadTimeout default 5000;
class function Instance: TIdWebsocketMultiReadThread; class function Instance: TIdWebsocketMultiReadThread;
class procedure RemoveInstance; class procedure RemoveInstance;
end; end;
@ -287,18 +297,17 @@ begin
if SocketIOCompatible and if SocketIOCompatible and
not FSocketIOConnectBusy then not FSocketIOConnectBusy then
begin begin
FSocketIOConnectBusy := True; //FSocketIOConnectBusy := True;
try //try
TryUpgradeToWebsocket; //socket.io connects using HTTP, so no seperate .Connect needed (only gives Connection closed gracefully exceptions because of new http command) TryUpgradeToWebsocket; //socket.io connects using HTTP, so no seperate .Connect needed (only gives Connection closed gracefully exceptions because of new http command)
finally //finally
FSocketIOConnectBusy := False; // FSocketIOConnectBusy := False;
end; //end;
end end
else else
begin begin
//clear inputbuffer, otherwise it can't connect :( //clear inputbuffer, otherwise it can't connect :(
if (IOHandler <> nil) then IOHandler.Clear; if (IOHandler <> nil) then IOHandler.Clear;
inherited Connect; inherited Connect;
end; end;
finally finally
@ -450,6 +459,7 @@ function TIdHTTPWebsocketClient.TryUpgradeToWebsocket: Boolean;
var var
sError: string; sError: string;
begin begin
FSocketIOConnectBusy := True;
Lock; Lock;
try try
if (IOHandler <> nil) and IOHandler.IsWebsocket then Exit(True); if (IOHandler <> nil) and IOHandler.IsWebsocket then Exit(True);
@ -457,6 +467,7 @@ begin
InternalUpgradeToWebsocket(False{no raise}, sError); InternalUpgradeToWebsocket(False{no raise}, sError);
Result := (sError = ''); Result := (sError = '');
finally finally
FSocketIOConnectBusy := False;
UnLock; UnLock;
end; end;
end; end;
@ -491,6 +502,9 @@ begin
//remove from thread during connection handling //remove from thread during connection handling
TIdWebsocketMultiReadThread.Instance.RemoveClient(Self); TIdWebsocketMultiReadThread.Instance.RemoveClient(Self);
if (IOHandler <> nil) then
IOHandler.Clear;
strmResponse := TMemoryStream.Create; strmResponse := TMemoryStream.Create;
try try
//special socket.io handling, see https://github.com/LearnBoost/socket.io-spec //special socket.io handling, see https://github.com/LearnBoost/socket.io-spec
@ -663,6 +677,7 @@ begin
//upgrade succesful //upgrade succesful
IOHandler.IsWebsocket := True; IOHandler.IsWebsocket := True;
aFailedReason := ''; aFailedReason := '';
Assert(Connected);
if SocketIOCompatible then if SocketIOCompatible then
begin begin
@ -1045,6 +1060,9 @@ end;
procedure TIdWebsocketMultiReadThread.AfterConstruction; procedure TIdWebsocketMultiReadThread.AfterConstruction;
begin begin
inherited; inherited;
ReadTimeout := 5000;
FChannels := TThreadList.Create; FChannels := TThreadList.Create;
FillChar(Freadset, SizeOf(Freadset), 0); FillChar(Freadset, SizeOf(Freadset), 0);
FillChar(Fexceptionset, SizeOf(Fexceptionset), 0); FillChar(Fexceptionset, SizeOf(Fexceptionset), 0);
@ -1169,24 +1187,82 @@ begin
end end
else if not chn.Connected then else if not chn.Connected then
begin begin
if chn.TryLock then if (ws <> nil) and
try (SecondsBetween(Now, ws.LastActivityTime) < 5)
try then
if ws <> nil then Continue;
ws.LastActivityTime := Now;
chn.ConnectTimeout := 250; //250ms otherwise too much delay? todo: seperate ping/connnect thread if FReconnectlist = nil then
chn.TryUpgradeToWebsocket; FReconnectlist := TWSThreadList.Create;
except //if chn.TryLock then
//just try FReconnectlist.Add(chn);
end;
finally
chn.Unlock;
end;
end; end;
end; end;
finally finally
FChannels.UnlockList; FChannels.UnlockList;
end; end;
//reconnect needed? (in background)
if FReconnectlist.Count > 0 then
begin
if FReconnectThread = nil then
FReconnectThread := TIdWebsocketQueueThread.Create(False{direct start});
FReconnectThread.QueueEvent(
procedure
var
l: TList;
chn: TIdHTTPWebsocketClient;
begin
while FReconnectlist.Count > 0 do
begin
chn := nil;
try
//get first one
l := FReconnectlist.LockList;
try
if l.Count <= 0 then Exit;
chn := TObject(l.Items[0]) as TIdHTTPWebsocketClient;
if not chn.TryLock then
begin
l.Delete(0);
chn := nil;
Continue;
end;
finally
FReconnectlist.UnlockList;
end;
//try reconnect
ws := chn.IOHandler as TIdIOHandlerWebsocket;
if ( (ws = nil) or
(SecondsBetween(Now, ws.LastActivityTime) >= 5) ) then
begin
try
if ws <> nil then
ws.LastActivityTime := Now;
chn.ConnectTimeout := 1000;
chn.TryUpgradeToWebsocket;
except
//just try
end;
end;
//remove from todo list
l := FReconnectlist.LockList;
try
if l.Count > 0 then
l.Delete(0);
finally
FReconnectlist.UnlockList;
end;
finally
if chn <> nil then
chn.Unlock;
end;
end;
end);
end;
end; end;
procedure TIdWebsocketMultiReadThread.ReadFromAllChannels; procedure TIdWebsocketMultiReadThread.ReadFromAllChannels;
@ -1242,9 +1318,8 @@ begin
Fexceptionset.fd_array[0] := FTempHandle; Fexceptionset.fd_array[0] := FTempHandle;
//wait 15s till some data //wait 15s till some data
Finterval.tv_sec := 5; //5s Finterval.tv_sec := Self.ReadTimeout div 1000; //5s
{$MESSAGE HINT 'make wait timeout configurable + less in case of reconnect '} Finterval.tv_usec := Self.ReadTimeout mod 1000;
Finterval.tv_usec := 0;
//nothing to wait for? then sleep some time to prevent 100% CPU //nothing to wait for? then sleep some time to prevent 100% CPU
if iResult = 0 then if iResult = 0 then
@ -1344,7 +1419,15 @@ procedure TIdWebsocketMultiReadThread.RemoveClient(
aChannel: TIdHTTPWebsocketClient); aChannel: TIdHTTPWebsocketClient);
begin begin
if Self = nil then Exit; if Self = nil then Exit;
aChannel.Lock;
try
FChannels.Remove(aChannel); FChannels.Remove(aChannel);
if FReconnectlist <> nil then
FReconnectlist.Remove(aChannel);
finally
aChannel.UnLock;
end;
BreakSelectWait; BreakSelectWait;
end; end;
@ -1411,6 +1494,19 @@ begin
end; end;
end; end;
{ TWSThreadList }
function TWSThreadList.Count: Integer;
var l: TList;
begin
l := LockList;
try
Result := l.Count;
finally
UnlockList;
end;
end;
initialization initialization
finalization finalization
TIdWebsocketMultiReadThread.RemoveInstance; TIdWebsocketMultiReadThread.RemoveInstance;

View file

@ -13,12 +13,14 @@ type
TSocketIOCallbackObj = class; TSocketIOCallbackObj = class;
TIdBaseSocketIOHandling = class; TIdBaseSocketIOHandling = class;
TIdSocketIOHandling = class; TIdSocketIOHandling = class;
ISocketIOContext = interface;
TSocketIOMsg = reference to procedure(const ASocket: ISocketIOContext; const aText: string; const aCallback: TSocketIOCallbackObj); ISocketIOContext = interface;
TSocketIOMsgJSON = reference to procedure(const ASocket: ISocketIOContext; const aJSON: ISuperObject; const aCallback: TSocketIOCallbackObj); ISocketIOCallback = interface;
TSocketIOMsg = reference to procedure(const ASocket: ISocketIOContext; const aText: string; const aCallback: ISocketIOCallback);
TSocketIOMsgJSON = reference to procedure(const ASocket: ISocketIOContext; const aJSON: ISuperObject; const aCallback: ISocketIOCallback);
TSocketIONotify = reference to procedure(const ASocket: ISocketIOContext); TSocketIONotify = reference to procedure(const ASocket: ISocketIOContext);
TSocketIOEvent = reference to procedure(const ASocket: ISocketIOContext; const aArgument: TSuperArray; const aCallbackObj: TSocketIOCallbackObj); TSocketIOEvent = reference to procedure(const ASocket: ISocketIOContext; const aArgument: TSuperArray; const aCallback: ISocketIOCallback);
TSocketIOError = reference to procedure(const ASocket: ISocketIOContext; const aErrorClass, aErrorMessage: string); TSocketIOError = reference to procedure(const ASocket: ISocketIOContext; const aErrorClass, aErrorMessage: string);
TSocketIONotifyList = class(TList<TSocketIONotify>); TSocketIONotifyList = class(TList<TSocketIONotify>);
@ -105,14 +107,23 @@ type
procedure SendJSON(const aJSON: ISuperObject; const aCallback: TSocketIOMsgJSON = nil; const aOnError: TSocketIOError = nil); procedure SendJSON(const aJSON: ISuperObject; const aCallback: TSocketIOMsgJSON = nil; const aOnError: TSocketIOError = nil);
end; end;
TSocketIOCallbackObj = class ISocketIOCallback = interface
['{BCC31817-7FD8-4CF6-B68B-0F9BAA80DF90}']
procedure SendResponse(const aResponse: string);
function IsResponseSend: Boolean;
end;
TSocketIOCallbackObj = class(TInterfacedObject,
ISocketIOCallback)
protected protected
FHandling: TIdBaseSocketIOHandling; FHandling: TIdBaseSocketIOHandling;
FSocket: TSocketIOContext; FSocket: TSocketIOContext;
FMsgNr: Integer; FMsgNr: Integer;
public {ISocketIOCallback}
procedure SendResponse(const aResponse: string); procedure SendResponse(const aResponse: string);
function IsResponseSend: Boolean; function IsResponseSend: Boolean;
public
constructor Create(aHandling: TIdBaseSocketIOHandling; aSocket: TSocketIOContext; aMsgNr: Integer);
end; end;
TIdBaseSocketIOHandling = class(TIdServerBaseHandling) TIdBaseSocketIOHandling = class(TIdServerBaseHandling)
@ -280,9 +291,17 @@ begin
Lock; Lock;
try try
for socket in FConnections.Values do for socket in FConnections.Values do
try
aEachSocketCallback(socket); aEachSocketCallback(socket);
except
//continue: e.g. connnection closed etc
end;
for socket in FConnectionsGUID.Values do for socket in FConnectionsGUID.Values do
try
aEachSocketCallback(socket); aEachSocketCallback(socket);
except
//continue: e.g. connnection closed etc
end;
finally finally
Unlock; Unlock;
end; end;
@ -443,7 +462,7 @@ var
args: TSuperArray; args: TSuperArray;
list: TSocketIOEventList; list: TSocketIOEventList;
event: TSocketIOEvent; event: TSocketIOEvent;
callback: TSocketIOCallbackObj; callback: ISocketIOCallback;
// socket: TSocketIOContext; // socket: TSocketIOContext;
begin begin
//'5:' [message id ('+')] ':' [message endpoint] ':' [json encoded event] //'5:' [message id ('+')] ':' [message endpoint] ':' [json encoded event]
@ -462,12 +481,7 @@ begin
// socket := FConnections.Items[AContext]; // socket := FConnections.Items[AContext];
if aHasCallback then if aHasCallback then
begin callback := TSocketIOCallbackObj.Create(Self, AContext, aMsgNr)
callback := TSocketIOCallbackObj.Create;
callback.FHandling := Self;
callback.FSocket := AContext;
callback.FMsgNr := aMsgNr;
end
else else
callback := nil; callback := nil;
try try
@ -482,7 +496,7 @@ begin
end; end;
end; end;
finally finally
callback.Free; callback := nil;
end; end;
end end
else else
@ -658,7 +672,7 @@ var
// socket: TSocketIOContext; // socket: TSocketIOContext;
callback: TSocketIOCallback; callback: TSocketIOCallback;
callbackref: TSocketIOCallbackRef; callbackref: TSocketIOCallbackRef;
callbackobj: TSocketIOCallbackObj; callbackobj: ISocketIOCallback;
errorref: TSocketIOError; errorref: TSocketIOError;
error: ISuperObject; error: ISuperObject;
socket: TSocketIOContext; socket: TSocketIOContext;
@ -726,11 +740,8 @@ begin
begin begin
if bCallback then if bCallback then
begin begin
callbackobj := TSocketIOCallbackObj.Create; callbackobj := TSocketIOCallbackObj.Create(Self, socket, imsg);
try try
callbackobj.FHandling := Self;
callbackobj.FSocket := socket;
callbackobj.FMsgNr := imsg;
try try
OnSocketIOMsg(socket, sdata, callbackobj); //, imsg, bCallback); OnSocketIOMsg(socket, sdata, callbackobj); //, imsg, bCallback);
except except
@ -741,7 +752,7 @@ begin
end; end;
end; end;
finally finally
callbackobj.Free; callbackobj := nil;
end end
end end
else else
@ -759,11 +770,8 @@ begin
begin begin
if bCallback then if bCallback then
begin begin
callbackobj := TSocketIOCallbackObj.Create; callbackobj := TSocketIOCallbackObj.Create(Self, socket, imsg);
try try
callbackobj.FHandling := Self;
callbackobj.FSocket := socket;
callbackobj.FMsgNr := imsg;
try try
OnSocketIOJson(socket, SO(sdata), callbackobj); //, imsg, bCallback); OnSocketIOJson(socket, SO(sdata), callbackobj); //, imsg, bCallback);
except except
@ -774,7 +782,7 @@ begin
end; end;
end; end;
finally finally
callbackobj.Free; callbackobj := nil;
end end
end end
else else
@ -828,11 +836,13 @@ begin
if FSocketIOEventCallback.TryGetValue(imsg, callback) then if FSocketIOEventCallback.TryGetValue(imsg, callback) then
begin begin
FSocketIOEventCallback.Remove(imsg); FSocketIOEventCallback.Remove(imsg);
if Assigned(callback) then
callback(sdata); callback(sdata);
end end
else if FSocketIOEventCallbackRef.TryGetValue(imsg, callbackref) then else if FSocketIOEventCallbackRef.TryGetValue(imsg, callbackref) then
begin begin
FSocketIOEventCallbackRef.Remove(imsg); FSocketIOEventCallbackRef.Remove(imsg);
if Assigned(callbackref) then
callbackref(sdata); callbackref(sdata);
end end
else ; else ;
@ -1056,6 +1066,15 @@ end;
{ TSocketIOCallbackObj } { TSocketIOCallbackObj }
constructor TSocketIOCallbackObj.Create(aHandling: TIdBaseSocketIOHandling;
aSocket: TSocketIOContext; aMsgNr: Integer);
begin
FHandling := aHandling;
FSocket := aSocket;
FMsgNr := aMsgNr;
inherited Create();
end;
function TSocketIOCallbackObj.IsResponseSend: Boolean; function TSocketIOCallbackObj.IsResponseSend: Boolean;
begin begin
Result := (FMsgNr < 0); Result := (FMsgNr < 0);

View file

@ -153,12 +153,20 @@ begin
strmevent.CopyFrom(aEvent, aEvent.Size); strmevent.CopyFrom(aEvent, aEvent.Size);
//events during dispatch? channel is busy so offload event dispatching to different thread! //events during dispatch? channel is busy so offload event dispatching to different thread!
CreateAnonymousThread( TIdWebsocketDispatchThread.Instance.QueueEvent(
procedure procedure
begin begin
IntDispatchEvent(strmevent); IntDispatchEvent(strmevent);
strmevent.Free; strmevent.Free;
end); end);
//events during dispatch? channel is busy so offload event dispatching to different thread!
// CreateAnonymousThread(
// procedure
// begin
// IntDispatchEvent(strmevent);
// strmevent.Free;
// end);
end; end;
procedure TROIndyHTTPWebsocketChannel.CheckConnection; procedure TROIndyHTTPWebsocketChannel.CheckConnection;