write timeout + error handling (try except for callbacks)

This commit is contained in:
André Mussche 2014-07-03 11:39:18 +02:00
parent a1b813b767
commit f79c3cdb8a
4 changed files with 65 additions and 17 deletions

View file

@ -38,10 +38,12 @@ type
FOnData: TWebsocketMsgBin; FOnData: TWebsocketMsgBin;
FOnTextData: TWebsocketMsgText; FOnTextData: TWebsocketMsgText;
FNoAsyncRead: Boolean; FNoAsyncRead: Boolean;
FWriteTimeout: Integer;
function GetIOHandlerWS: TIdIOHandlerWebsocket; function GetIOHandlerWS: TIdIOHandlerWebsocket;
procedure SetIOHandlerWS(const Value: TIdIOHandlerWebsocket); procedure SetIOHandlerWS(const Value: TIdIOHandlerWebsocket);
procedure SetOnData(const Value: TWebsocketMsgBin); procedure SetOnData(const Value: TWebsocketMsgBin);
procedure SetOnTextData(const Value: TWebsocketMsgText); procedure SetOnTextData(const Value: TWebsocketMsgText);
procedure SetWriteTimeout(const Value: Integer);
protected protected
FSocketIOCompatible: Boolean; FSocketIOCompatible: Boolean;
FSocketIOHandshakeResponse: string; FSocketIOHandshakeResponse: string;
@ -94,6 +96,8 @@ type
property Host; property Host;
property Port; property Port;
property WSResourceName: string read FWSResourceName write FWSResourceName; property WSResourceName: string read FWSResourceName write FWSResourceName;
property WriteTimeout: Integer read FWriteTimeout write SetWriteTimeout default 2000;
end; end;
// on error // on error
@ -231,6 +235,8 @@ begin
// FHeartBeat := TTimer.Create(nil); // FHeartBeat := TTimer.Create(nil);
// FHeartBeat.Enabled := False; // FHeartBeat.Enabled := False;
// FHeartBeat.OnTimer := HeartBeatTimer; // FHeartBeat.OnTimer := HeartBeatTimer;
FWriteTimeout := 2 * 1000;
end; end;
procedure TIdHTTPWebsocketClient.AsyncDispatchEvent(const aEvent: TStream); procedure TIdHTTPWebsocketClient.AsyncDispatchEvent(const aEvent: TStream);
@ -752,6 +758,11 @@ begin
if not Self.NoAsyncRead then if not Self.NoAsyncRead then
TIdWebsocketMultiReadThread.Instance.AddClient(Self); TIdWebsocketMultiReadThread.Instance.AddClient(Self);
end; end;
//default 2s write timeout
//http://msdn.microsoft.com/en-us/library/windows/desktop/ms740532(v=vs.85).aspx
if Connected then
Self.IOHandler.Binding.SetSockOpt(SOL_SOCKET, SO_SNDTIMEO, Self.WriteTimeout);
end; end;
procedure TIdHTTPWebsocketClient.Lock; procedure TIdHTTPWebsocketClient.Lock;
@ -899,6 +910,13 @@ begin
// TIdWebsocketMultiReadThread.Instance.AddClient(Self); // TIdWebsocketMultiReadThread.Instance.AddClient(Self);
end; end;
procedure TIdHTTPWebsocketClient.SetWriteTimeout(const Value: Integer);
begin
FWriteTimeout := Value;
if Connected then
Self.IOHandler.Binding.SetSockOpt(SOL_SOCKET, SO_SNDTIMEO, Self.WriteTimeout);
end;
{ TIdHTTPSocketIOClient } { TIdHTTPSocketIOClient }
(* (*

View file

@ -88,28 +88,36 @@ begin
begin begin
if context.IsDisconnected then Continue; if context.IsDisconnected then Continue;
if not Assigned(aCallback) then try
WriteSocketIOEvent(context, ''{no room}, aEventName, jsonarray, nil, nil) if not Assigned(aCallback) then
else WriteSocketIOEvent(context, ''{no room}, aEventName, jsonarray, nil, nil)
WriteSocketIOEventRef(context, ''{no room}, aEventName, jsonarray, else
procedure(const aData: string) WriteSocketIOEventRef(context, ''{no room}, aEventName, jsonarray,
begin procedure(const aData: string)
aCallback(context, SO(aData), nil); begin
end, aOnError); aCallback(context, SO(aData), nil);
end, aOnError);
except
//try to send to others
end;
Inc(Result); Inc(Result);
end; end;
for context in FConnectionsGUID.Values do for context in FConnectionsGUID.Values do
begin begin
if context.IsDisconnected then Continue; if context.IsDisconnected then Continue;
if not Assigned(aCallback) then try
WriteSocketIOEvent(context, ''{no room}, aEventName, jsonarray, nil, nil) if not Assigned(aCallback) then
else WriteSocketIOEvent(context, ''{no room}, aEventName, jsonarray, nil, nil)
WriteSocketIOEventRef(context, ''{no room}, aEventName, jsonarray, else
procedure(const aData: string) WriteSocketIOEventRef(context, ''{no room}, aEventName, jsonarray,
begin procedure(const aData: string)
aCallback(context, SO(aData), nil); begin
end, aOnError); aCallback(context, SO(aData), nil);
end, aOnError);
except
//try to send to others
end;
Inc(Result); Inc(Result);
end; end;
finally finally

View file

@ -601,6 +601,8 @@ procedure TIdBaseSocketIOHandling.ProcessSocketIORequest(
begin begin
Result := ''; Result := '';
ilength := strmRequest.Size - strmRequest.Position; ilength := strmRequest.Size - strmRequest.Position;
if ilength <= 0 then
Exit;
SetLength(utf8, ilength); SetLength(utf8, ilength);
strmRequest.Read(utf8[0], ilength); strmRequest.Read(utf8[0], ilength);
Result := TEncoding.UTF8.GetString(utf8); Result := TEncoding.UTF8.GetString(utf8);
@ -1365,6 +1367,11 @@ begin
FEvent := TEvent.Create; FEvent := TEvent.Create;
FQueue.Add(aData); FQueue.Add(aData);
//max 1000 items in queue (otherwise infinite mem leak possible?)
while FQueue.Count > 1000 do
FQueue.Delete(0);
FEvent.SetEvent; FEvent.SetEvent;
end; end;

View file

@ -15,7 +15,9 @@ type
FSocketIO: TIdServerSocketIOHandling_Ext; FSocketIO: TIdServerSocketIOHandling_Ext;
FOnMessageText: TWebsocketMessageText; FOnMessageText: TWebsocketMessageText;
FOnMessageBin: TWebsocketMessageBin; FOnMessageBin: TWebsocketMessageBin;
FWriteTimeout: Integer;
function GetSocketIO: TIdServerSocketIOHandling; function GetSocketIO: TIdServerSocketIOHandling;
procedure SetWriteTimeout(const Value: Integer);
protected protected
procedure DoCommandGet(AContext: TIdContext; ARequestInfo: TIdHTTPRequestInfo; procedure DoCommandGet(AContext: TIdContext; ARequestInfo: TIdHTTPRequestInfo;
AResponseInfo: TIdHTTPResponseInfo); override; AResponseInfo: TIdHTTPResponseInfo); override;
@ -34,12 +36,14 @@ type
property OnMessageBin : TWebsocketMessageBin read FOnMessageBin write FOnMessageBin; property OnMessageBin : TWebsocketMessageBin read FOnMessageBin write FOnMessageBin;
property SocketIO: TIdServerSocketIOHandling read GetSocketIO; property SocketIO: TIdServerSocketIOHandling read GetSocketIO;
published
property WriteTimeout: Integer read FWriteTimeout write SetWriteTimeout default 2000;
end; end;
implementation implementation
uses uses
IdServerIOHandlerWebsocket, IdStreamVCL, IdGlobal, Windows; IdServerIOHandlerWebsocket, IdStreamVCL, IdGlobal, Windows, IdWinsock2;
{ TIdWebsocketServer } { TIdWebsocketServer }
@ -52,12 +56,18 @@ begin
ContextClass := TIdServerWSContext; ContextClass := TIdServerWSContext;
if IOHandler = nil then if IOHandler = nil then
IOHandler := TIdServerIOHandlerWebsocket.Create(Self); IOHandler := TIdServerIOHandlerWebsocket.Create(Self);
FWriteTimeout := 2 * 1000; //2s
end; end;
procedure TIdWebsocketServer.ContextCreated(AContext: TIdContext); procedure TIdWebsocketServer.ContextCreated(AContext: TIdContext);
begin begin
inherited ContextCreated(AContext); inherited ContextCreated(AContext);
(AContext as TIdServerWSContext).OnCustomChannelExecute := Self.WebsocketChannelRequest; (AContext as TIdServerWSContext).OnCustomChannelExecute := Self.WebsocketChannelRequest;
//default 2s write timeout
//http://msdn.microsoft.com/en-us/library/windows/desktop/ms740532(v=vs.85).aspx
AContext.Connection.Socket.Binding.SetSockOpt(SOL_SOCKET, SO_SNDTIMEO, Self.WriteTimeout);
end; end;
procedure TIdWebsocketServer.ContextDisconnected(AContext: TIdContext); procedure TIdWebsocketServer.ContextDisconnected(AContext: TIdContext);
@ -109,6 +119,11 @@ begin
end; end;
end; end;
procedure TIdWebsocketServer.SetWriteTimeout(const Value: Integer);
begin
FWriteTimeout := Value;
end;
procedure TIdWebsocketServer.WebsocketChannelRequest( procedure TIdWebsocketServer.WebsocketChannelRequest(
const AContext: TIdServerWSContext; aType: TWSDataType; const aStrmRequest, const AContext: TIdServerWSContext; aType: TWSDataType; const aStrmRequest,
aStrmResponse: TMemoryStream); aStrmResponse: TMemoryStream);