better client reconnect + small fixes
This commit is contained in:
parent
73ff121faa
commit
5a7310896b
|
@ -65,6 +65,9 @@ type
|
|||
function TryUpgradeToWebsocket: Boolean;
|
||||
procedure UpgradeToWebsocket;
|
||||
|
||||
procedure Lock;
|
||||
procedure UnLock;
|
||||
|
||||
procedure Connect; override;
|
||||
function TryConnect: Boolean;
|
||||
procedure Disconnect(ANotifyPeer: Boolean); override;
|
||||
|
@ -85,6 +88,7 @@ type
|
|||
end;
|
||||
|
||||
// on error
|
||||
(*
|
||||
TIdHTTPSocketIOClient_old = class(TIdHTTPWebsocketClient)
|
||||
private
|
||||
FOnConnected: TNotifyEvent;
|
||||
|
@ -114,6 +118,7 @@ type
|
|||
property OnSocketIOJson : TSocketIOMsg read FOnSocketIOJson write FOnSocketIOJson;
|
||||
property OnSocketIOEvent: TSocketIOMsg read FOnSocketIOEvent write FOnSocketIOEvent;
|
||||
end;
|
||||
*)
|
||||
|
||||
TIdWebsocketMultiReadThread = class(TThread)
|
||||
private
|
||||
|
@ -267,23 +272,35 @@ end;
|
|||
|
||||
procedure TIdHTTPWebsocketClient.Connect;
|
||||
begin
|
||||
//clear inputbuffer, otherwise it can't connect :(
|
||||
if (IOHandler <> nil) then
|
||||
IOHandler.Clear;
|
||||
|
||||
//FHeartBeat.Enabled := True;
|
||||
if SocketIOCompatible and
|
||||
not FSocketIOConnectBusy then
|
||||
begin
|
||||
FSocketIOConnectBusy := True;
|
||||
try
|
||||
TryUpgradeToWebsocket; //socket.io connects using HTTP, so no seperate .Connect needed (only gives Connection closed gracefully exceptions because of new http command)
|
||||
finally
|
||||
FSocketIOConnectBusy := False;
|
||||
Lock;
|
||||
try
|
||||
if Connected then
|
||||
begin
|
||||
TryUpgradeToWebsocket;
|
||||
Exit;
|
||||
end;
|
||||
end
|
||||
else
|
||||
inherited Connect;
|
||||
|
||||
//FHeartBeat.Enabled := True;
|
||||
if SocketIOCompatible and
|
||||
not FSocketIOConnectBusy then
|
||||
begin
|
||||
FSocketIOConnectBusy := True;
|
||||
try
|
||||
TryUpgradeToWebsocket; //socket.io connects using HTTP, so no seperate .Connect needed (only gives Connection closed gracefully exceptions because of new http command)
|
||||
finally
|
||||
FSocketIOConnectBusy := False;
|
||||
end;
|
||||
end
|
||||
else
|
||||
begin
|
||||
//clear inputbuffer, otherwise it can't connect :(
|
||||
if (IOHandler <> nil) then IOHandler.Clear;
|
||||
|
||||
inherited Connect;
|
||||
end;
|
||||
finally
|
||||
UnLock;
|
||||
end;
|
||||
end;
|
||||
|
||||
destructor TIdHTTPWebsocketClient.Destroy;
|
||||
|
@ -306,7 +323,10 @@ end;
|
|||
|
||||
procedure TIdHTTPWebsocketClient.DisConnect(ANotifyPeer: Boolean);
|
||||
begin
|
||||
TIdWebsocketMultiReadThread.Instance.RemoveClient(Self);
|
||||
if not SocketIOCompatible and
|
||||
( (IOHandler <> nil) and not IOHandler.IsWebsocket)
|
||||
then
|
||||
TIdWebsocketMultiReadThread.Instance.RemoveClient(Self);
|
||||
|
||||
if ANotifyPeer and SocketIOCompatible then
|
||||
FSocketIO.WriteDisConnect(FSocketIOContext as TSocketIOContext)
|
||||
|
@ -401,34 +421,54 @@ end;
|
|||
|
||||
function TIdHTTPWebsocketClient.TryConnect: Boolean;
|
||||
begin
|
||||
Lock;
|
||||
try
|
||||
if Connected then Exit(True);
|
||||
try
|
||||
if Connected then Exit(True);
|
||||
|
||||
Connect;
|
||||
Result := Connected;
|
||||
if Result and SocketIOCompatible then
|
||||
Result := TryUpgradeToWebsocket;
|
||||
except
|
||||
Result := False;
|
||||
end
|
||||
Connect;
|
||||
Result := Connected;
|
||||
if Result then
|
||||
Result := TryUpgradeToWebsocket
|
||||
except
|
||||
Result := False;
|
||||
end
|
||||
finally
|
||||
UnLock;
|
||||
end;
|
||||
end;
|
||||
|
||||
function TIdHTTPWebsocketClient.TryUpgradeToWebsocket: Boolean;
|
||||
var
|
||||
sError: string;
|
||||
begin
|
||||
if (IOHandler <> nil) and IOHandler.IsWebsocket then Exit(True);
|
||||
Lock;
|
||||
try
|
||||
if (IOHandler <> nil) and IOHandler.IsWebsocket then Exit(True);
|
||||
|
||||
InternalUpgradeToWebsocket(False{no raise}, sError);
|
||||
Result := (sError = '');
|
||||
InternalUpgradeToWebsocket(False{no raise}, sError);
|
||||
Result := (sError = '');
|
||||
finally
|
||||
UnLock;
|
||||
end;
|
||||
end;
|
||||
|
||||
procedure TIdHTTPWebsocketClient.UnLock;
|
||||
begin
|
||||
System.TMonitor.Exit(Self);
|
||||
end;
|
||||
|
||||
procedure TIdHTTPWebsocketClient.UpgradeToWebsocket;
|
||||
var
|
||||
sError: string;
|
||||
begin
|
||||
if not IOHandler.IsWebsocket then
|
||||
InternalUpgradeToWebsocket(True{raise}, sError);
|
||||
Lock;
|
||||
try
|
||||
if not IOHandler.IsWebsocket then
|
||||
InternalUpgradeToWebsocket(True{raise}, sError);
|
||||
finally
|
||||
UnLock;
|
||||
end;
|
||||
end;
|
||||
|
||||
procedure TIdHTTPWebsocketClient.InternalUpgradeToWebsocket(aRaiseException: Boolean; out aFailedReason: string);
|
||||
|
@ -560,13 +600,6 @@ begin
|
|||
Response.Clear;
|
||||
if IOHandler.CheckForDataOnSource(ReadTimeout) then
|
||||
Response.ResponseText := IOHandler.InputBufferAsString();
|
||||
//for now: timer in mainthread?
|
||||
// TThread.Queue(nil,
|
||||
// procedure
|
||||
// begin
|
||||
// FHeartBeat.Interval := 2 * 1000;
|
||||
// FHeartBeat.Enabled := True;
|
||||
// end);
|
||||
end
|
||||
else
|
||||
begin
|
||||
|
@ -623,27 +656,27 @@ begin
|
|||
|
||||
if SocketIOCompatible then
|
||||
begin
|
||||
// if FSocketIOContext = nil then
|
||||
// begin
|
||||
FSocketIOContext := TSocketIOContext.Create(Self);
|
||||
// IInterface(FSocketIOContext)._AddRef;
|
||||
// end
|
||||
// else
|
||||
// FSocketIOContext.Create(Self); //update with new iohandler etc
|
||||
|
||||
FSocketIOContext := TSocketIOContext.Create(Self);
|
||||
(FSocketIOContext as TSocketIOContext).ConnectSend := True; //connect already send via url? GET /socket.io/1/websocket/9elrbEFqiimV29QAM6T-
|
||||
FSocketIO.WriteConnect(FSocketIOContext as TSocketIOContext);
|
||||
end;
|
||||
|
||||
//always read the data! (e.g. RO use override of AsyncDispatchEvent to process data)
|
||||
//if Assigned(OnBinData) or Assigned(OnTextData) then
|
||||
TIdWebsocketMultiReadThread.Instance.AddClient(Self);
|
||||
finally
|
||||
Request.Clear;
|
||||
strmResponse.Free;
|
||||
|
||||
//add to thread for auto retry/reconnect
|
||||
TIdWebsocketMultiReadThread.Instance.AddClient(Self);
|
||||
end;
|
||||
end;
|
||||
|
||||
procedure TIdHTTPWebsocketClient.Lock;
|
||||
begin
|
||||
System.TMonitor.Enter(Self);
|
||||
end;
|
||||
|
||||
function TIdHTTPWebsocketClient.MakeImplicitClientHandler: TIdIOHandler;
|
||||
begin
|
||||
Result := TIdIOHandlerWebsocket.Create(nil);
|
||||
|
@ -681,7 +714,7 @@ procedure TIdHTTPWebsocketClient.ResetChannel;
|
|||
//var
|
||||
// ws: TIdIOHandlerWebsocket;
|
||||
begin
|
||||
TIdWebsocketMultiReadThread.Instance.RemoveClient(Self);
|
||||
// TIdWebsocketMultiReadThread.Instance.RemoveClient(Self); keep for reconnect
|
||||
|
||||
if IOHandler <> nil then
|
||||
begin
|
||||
|
@ -727,6 +760,7 @@ end;
|
|||
|
||||
{ TIdHTTPSocketIOClient }
|
||||
|
||||
(*
|
||||
procedure TIdHTTPSocketIOClient_old.AfterConstruction;
|
||||
begin
|
||||
inherited;
|
||||
|
@ -851,7 +885,7 @@ begin
|
|||
end;
|
||||
end;
|
||||
|
||||
(*)
|
||||
)
|
||||
procedure TIdHTTPSocketIOClient_old.ProcessSocketIORequest(
|
||||
const strmRequest: TStream);
|
||||
|
||||
|
@ -977,7 +1011,7 @@ procedure TIdWebsocketMultiReadThread.AddClient(
|
|||
aChannel: TIdHTTPWebsocketClient);
|
||||
var l: TList;
|
||||
begin
|
||||
Assert( (aChannel.IOHandler as TIdIOHandlerWebsocket).IsWebsocket, 'Channel is not a websocket');
|
||||
//Assert( (aChannel.IOHandler as TIdIOHandlerWebsocket).IsWebsocket, 'Channel is not a websocket');
|
||||
|
||||
l := FChannels.LockList;
|
||||
try
|
||||
|
@ -1103,7 +1137,9 @@ begin
|
|||
chn := TIdHTTPWebsocketClient(l.Items[i]);
|
||||
ws := chn.IOHandler as TIdIOHandlerWebsocket;
|
||||
//valid?
|
||||
if (chn.Socket.Binding.Handle > 0) and
|
||||
if (chn.Socket <> nil) and
|
||||
(chn.Socket.Binding <> nil) and
|
||||
(chn.Socket.Binding.Handle > 0) and
|
||||
(chn.Socket.Binding.Handle <> INVALID_SOCKET) then
|
||||
begin
|
||||
//more than 10s nothing done? then send ping
|
||||
|
@ -1118,7 +1154,8 @@ begin
|
|||
else if not chn.Connected then
|
||||
begin
|
||||
try
|
||||
ws.LastActivityTime := Now;
|
||||
if ws <> nil then
|
||||
ws.LastActivityTime := Now;
|
||||
chn.ConnectTimeout := 250; //250ms otherwise too much delay? todo: seperate ping/connnect thread
|
||||
chn.Connect;
|
||||
chn.TryUpgradeToWebsocket;
|
||||
|
@ -1155,6 +1192,8 @@ begin
|
|||
chn := TIdHTTPWebsocketClient(l.Items[i]);
|
||||
//valid?
|
||||
if //not chn.Busy and also take busy channels (will be ignored later), otherwise we have to break/reset for each RO function execution
|
||||
(chn.Socket <> nil) and
|
||||
(chn.Socket.Binding <> nil) and
|
||||
(chn.Socket.Binding.Handle > 0) and
|
||||
(chn.Socket.Binding.Handle <> INVALID_SOCKET) then
|
||||
begin
|
||||
|
@ -1181,7 +1220,8 @@ begin
|
|||
Fexceptionset.fd_array[0] := FTempHandle;
|
||||
|
||||
//wait 15s till some data
|
||||
Finterval.tv_sec := 15; //15s
|
||||
Finterval.tv_sec := 5; //5s
|
||||
{$MESSAGE HINT 'make wait timeout configurable + less in case of reconnect '}
|
||||
Finterval.tv_usec := 0;
|
||||
|
||||
//nothing to wait for? then sleep some time to prevent 100% CPU
|
||||
|
@ -1216,24 +1256,29 @@ begin
|
|||
begin
|
||||
chn := TIdHTTPWebsocketClient(l.Items[i]);
|
||||
ws := chn.IOHandler as TIdIOHandlerWebsocket;
|
||||
if (ws = nil) then Continue;
|
||||
|
||||
if ws.TryLock then //IOHandler.Readable cannot be done during pending action!
|
||||
try
|
||||
try
|
||||
//try to process all events
|
||||
while chn.IOHandler.HasData or
|
||||
chn.IOHandler.Readable(0) do //has some data
|
||||
begin
|
||||
if strmEvent = nil then
|
||||
strmEvent := TMemoryStream.Create;
|
||||
strmEvent.Clear;
|
||||
try
|
||||
//try to process all events
|
||||
while chn.IOHandler.HasData or
|
||||
chn.IOHandler.Readable(0) do //has some data
|
||||
begin
|
||||
if strmEvent = nil then
|
||||
strmEvent := TMemoryStream.Create;
|
||||
strmEvent.Clear;
|
||||
|
||||
//first is the data type TWSDataType(text or bin), but is ignore/not needed
|
||||
wscode := TWSDataCode(chn.IOHandler.ReadLongWord);
|
||||
//next the size + data = stream
|
||||
chn.IOHandler.ReadStream(strmEvent);
|
||||
//first is the data type TWSDataType(text or bin), but is ignore/not needed
|
||||
wscode := TWSDataCode(chn.IOHandler.ReadLongWord);
|
||||
if not (wscode in [wdcText, wdcBinary, wdcPing, wdcPong]) then
|
||||
Continue;
|
||||
|
||||
//ignore ping/pong messages
|
||||
if wscode in [wdcPing, wdcPong] then Continue;
|
||||
//next the size + data = stream
|
||||
chn.IOHandler.ReadStream(strmEvent);
|
||||
|
||||
//ignore ping/pong messages
|
||||
if wscode in [wdcPing, wdcPong] then Continue;
|
||||
|
||||
//fire event
|
||||
//offload event dispatching to different thread! otherwise deadlocks possible? (do to synchronize)
|
||||
|
@ -1251,12 +1296,12 @@ begin
|
|||
chn.AsyncDispatchEvent(string(swstext));
|
||||
end;
|
||||
end;
|
||||
end;
|
||||
except
|
||||
l := nil;
|
||||
FChannels.UnlockList;
|
||||
chn.ResetChannel;
|
||||
raise;
|
||||
end;
|
||||
except
|
||||
l := nil;
|
||||
FChannels.UnlockList;
|
||||
chn.ResetChannel;
|
||||
raise;
|
||||
end;
|
||||
finally
|
||||
ws.Unlock;
|
||||
|
|
|
@ -149,6 +149,7 @@ type
|
|||
|
||||
procedure Lock;
|
||||
procedure UnLock;
|
||||
function ConnectionCount: Integer;
|
||||
|
||||
// procedure EmitEventToAll(const aEventName: string; const aData: ISuperObject; const aCallback: TSocketIOMsgJSON = nil);
|
||||
function NewConnection(const AContext: TIdContext): TSocketIOContext;overload;
|
||||
|
@ -192,6 +193,30 @@ begin
|
|||
FSocketIOErrorRef := TDictionary<Integer,TSocketIOError>.Create;
|
||||
end;
|
||||
|
||||
function TIdBaseSocketIOHandling.ConnectionCount: Integer;
|
||||
var
|
||||
context: TSocketIOContext;
|
||||
begin
|
||||
Lock;
|
||||
try
|
||||
Result := 0;
|
||||
|
||||
//note: is single connection?
|
||||
for context in FConnections.Values do
|
||||
begin
|
||||
if context.IsDisconnected then Continue;
|
||||
Inc(Result);
|
||||
end;
|
||||
for context in FConnectionsGUID.Values do
|
||||
begin
|
||||
if context.IsDisconnected then Continue;
|
||||
Inc(Result);
|
||||
end;
|
||||
finally
|
||||
UnLock;
|
||||
end;
|
||||
end;
|
||||
|
||||
destructor TIdBaseSocketIOHandling.Destroy;
|
||||
var squid: string;
|
||||
idcontext: TIdContext;
|
||||
|
@ -1236,12 +1261,15 @@ var
|
|||
jsonarray: string;
|
||||
isendcount: Integer;
|
||||
begin
|
||||
if aData.IsType(stArray) then
|
||||
jsonarray := aData.AsString
|
||||
else if aData.IsType(stString) then
|
||||
jsonarray := '["' + aData.AsString + '"]'
|
||||
else
|
||||
jsonarray := '[' + aData.AsString + ']';
|
||||
if aData <> nil then
|
||||
begin
|
||||
if aData.IsType(stArray) then
|
||||
jsonarray := aData.AsString
|
||||
else if aData.IsType(stString) then
|
||||
jsonarray := '["' + aData.AsString + '"]'
|
||||
else
|
||||
jsonarray := '[' + aData.AsString + ']';
|
||||
end;
|
||||
|
||||
Lock;
|
||||
try
|
||||
|
@ -1278,7 +1306,7 @@ begin
|
|||
end;
|
||||
|
||||
if isendcount = 0 then
|
||||
raise EIdSocketIoUnhandledMessage.Create('No socket.io connections!');
|
||||
raise EIdSocketIoUnhandledMessage.Create('Cannot emit: no socket.io connections!');
|
||||
finally
|
||||
UnLock;
|
||||
end;
|
||||
|
@ -1325,7 +1353,7 @@ begin
|
|||
end;
|
||||
|
||||
if isendcount = 0 then
|
||||
raise EIdSocketIoUnhandledMessage.Create('No socket.io connections!');
|
||||
raise EIdSocketIoUnhandledMessage.Create('Cannot send: no socket.io connections!');
|
||||
finally
|
||||
UnLock;
|
||||
end;
|
||||
|
|
Loading…
Reference in a new issue