- sending all data at once (instead of seperate header + data -> receive order gets different!)

- including partial data send handling
- process data from internal frame buffer instead of only waiting on real data on the socket
- terminate deadlock due to tmonitor, using tcriticalsection instead
- singlewritethread option
This commit is contained in:
André Mussche 2014-01-10 15:05:04 +01:00
parent e6d3ad515a
commit ea489db31c
4 changed files with 503 additions and 219 deletions

View file

@ -140,23 +140,13 @@ type
class procedure RemoveInstance;
end;
//async post data
TIdWebsocketDispatchThread = class(TThread)
//async process data
TIdWebsocketDispatchThread = class(TIdWebsocketQueueThread)
private
class var FInstance: TIdWebsocketDispatchThread;
protected
FEvent: TEvent;
FEvents, FProcessing: TList<TThreadProcedure>;
procedure Execute; override;
public
procedure AfterConstruction;override;
destructor Destroy; override;
procedure Terminate;
procedure QueueEvent(aEvent: TThreadProcedure);
class function Instance: TIdWebsocketDispatchThread;
class function Instance: TIdWebsocketDispatchThread;
class procedure RemoveInstance;
end;
implementation
@ -248,6 +238,9 @@ end;
procedure TIdHTTPWebsocketClient.Connect;
begin
if IOHandler <> nil then
IOHandler.Clear;
FHeartBeat.Enabled := True;
inherited Connect;
end;
@ -289,7 +282,7 @@ begin
inherited DisConnect(ANotifyPeer);
//clear buffer, other still "connected"
IOHandler.InputBuffer.Clear;
IOHandler.Clear;
//IOHandler.Free;
//IOHandler := TIdIOHandlerWebsocket.Create(nil);
@ -320,20 +313,17 @@ begin
try
if (IOHandler <> nil) and
not IOHandler.ClosedGracefully and
//IOHandler.Connected and
IOHandler.Connected and
(FSocketIOContext <> nil) then
begin
//not threadsafe because of background ReadAllThreads?
//FSocketIO.WritePing(FSocketIOContext as TSocketIOContext); //heartbeat socket.io message
FSocketIO.WritePing(FSocketIOContext as TSocketIOContext); //heartbeat socket.io message
end
//retry re-connect
else
try
//clear inputbuffer, otherwise it can't connect :(
if (IOHandler <> nil) and
not IOHandler.InputBufferIsEmpty
then
IOHandler.DiscardAll;
if (IOHandler <> nil) then
IOHandler.Clear;
Self.Connect;
TryUpgradeToWebsocket;
@ -343,10 +333,8 @@ begin
except on E:Exception do
begin
//clear inputbuffer, otherwise it stays connected :(
if (IOHandler <> nil) and
not IOHandler.InputBufferIsEmpty
then
IOHandler.DiscardAll;
if (IOHandler <> nil) then
IOHandler.Clear;
Disconnect(False);
if Assigned(OnDisConnected) then
@ -1048,22 +1036,15 @@ begin
(chn.Socket.Binding.Handle > 0) and
(chn.Socket.Binding.Handle <> INVALID_SOCKET) then
begin
// if chn.IOHandler.TryLock then
// try
if chn.IOHandler.HasData then
begin
Inc(iResult);
Break;
end;
//todo: seperate read thread is needed because of threadsafety
if chn.IOHandler.Readable( 1000 div l.Count ) then
begin
Inc(iResult);
Break;
end;
// finally
// chn.IOHandler.Unlock;
// end;
// Freadset.fd_count := iCount+1;
// Freadset.fd_array[iCount] := chn.Socket.Binding.Handle;
// Inc(iCount);
Freadset.fd_count := iCount+1;
Freadset.fd_array[iCount] := chn.Socket.Binding.Handle;
Inc(iCount);
end;
end;
@ -1081,23 +1062,24 @@ begin
Finterval.tv_sec := 15; //15s
Finterval.tv_usec := 0;
{
//nothing to wait for? then sleep some time to prevent 100% CPU
if iCount = 0 then
if iResult = 0 then
begin
iResult := IdWinsock2.select(0, nil, nil, @Fexceptionset, @Finterval);
if iCount = 0 then
begin
iResult := IdWinsock2.select(0, nil, nil, @Fexceptionset, @Finterval);
if iResult = SOCKET_ERROR then
iResult := 1; //ignore errors
end
//wait till a socket has some data (or a signal via exceptionset is fired)
else
iResult := IdWinsock2.select(0, @Freadset, nil, @Fexceptionset, @Finterval);
if iResult = SOCKET_ERROR then
iResult := 1; //ignore errors
end
//wait till a socket has some data (or a signal via exceptionset is fired)
else
iResult := IdWinsock2.select(0, @Freadset, nil, @Fexceptionset, @Finterval);
//raise EIdWinsockStubError.Build(WSAGetLastError, '', []);
//ignore error during wait: socket disconnected etc
Exit;
end;
if iResult = SOCKET_ERROR then
//raise EIdWinsockStubError.Build(WSAGetLastError, '', []);
//ignore error during wait: socket disconnected etc
Exit;
}
if Terminated then Exit;
//some data?
@ -1113,7 +1095,8 @@ begin
chn := TIdHTTPWebsocketClient(l.Items[i]);
try
//try to process all events
while chn.IOHandler.Readable(0) do //has some data
while chn.IOHandler.HasData or
chn.IOHandler.Readable(0) do //has some data
begin
ws := chn.IOHandler as TIdIOHandlerWebsocket;
//no pending dispatch active? (so actually we only read events here?)
@ -1172,7 +1155,6 @@ begin
end;
end
else
Sleep(10);
end;
procedure TIdWebsocketMultiReadThread.RemoveClient(
@ -1186,7 +1168,11 @@ end;
class procedure TIdWebsocketMultiReadThread.RemoveInstance;
begin
if FInstance <> nil then
begin
FInstance.Terminate;
FInstance.WaitFor;
FreeAndNil(FInstance);
end;
end;
procedure TIdWebsocketMultiReadThread.ResetSpecialEventSocket;
@ -1214,63 +1200,6 @@ end;
{ TIdWebsocketDispatchThread }
procedure TIdWebsocketDispatchThread.AfterConstruction;
begin
inherited;
FEvents := TList<TThreadProcedure>.Create;
FProcessing := TList<TThreadProcedure>.Create;
FEvent := TEvent.Create;
end;
destructor TIdWebsocketDispatchThread.Destroy;
begin
System.TMonitor.Enter(FEvents);
FEvents.Clear;
FEvents.Free;
FProcessing.Free;
FEvent.Free;
inherited;
end;
procedure TIdWebsocketDispatchThread.Execute;
var
proc: Classes.TThreadProcedure;
begin
TThread.NameThreadForDebugging(Self.ClassName);
while not Terminated do
begin
try
if FEvent.WaitFor(3 * 1000) = wrSignaled then
begin
FEvent.ResetEvent;
System.TMonitor.Enter(FEvents);
try
//copy
while FEvents.Count > 0 do
begin
proc := FEvents.Items[0];
FProcessing.Add(proc);
FEvents.Delete(0);
end;
finally
System.TMonitor.Exit(FEvents);
end;
end;
while FProcessing.Count > 0 do
begin
proc := FProcessing.Items[0];
FProcessing.Delete(0);
proc();
end;
except
//continue
end;
end;
end;
class function TIdWebsocketDispatchThread.Instance: TIdWebsocketDispatchThread;
begin
if FInstance = nil then
@ -1279,7 +1208,7 @@ begin
try
if FInstance = nil then
begin
FInstance := TIdWebsocketDispatchThread.Create(True);
FInstance := Self.Create(True);
FInstance.Start;
end;
finally
@ -1289,31 +1218,19 @@ begin
Result := FInstance;
end;
procedure TIdWebsocketDispatchThread.QueueEvent(aEvent: TThreadProcedure);
class procedure TIdWebsocketDispatchThread.RemoveInstance;
begin
System.TMonitor.Enter(FEvents);
try
FEvents.Add(aEvent);
finally
System.TMonitor.Exit(FEvents);
if FInstance <> nil then
begin
FInstance.Terminate;
FInstance.WaitFor;
FreeAndNil(FInstance);
end;
FEvent.SetEvent;
end;
procedure TIdWebsocketDispatchThread.Terminate;
begin
inherited Terminate;
FEvent.SetEvent;
end;
initialization
finalization
if TIdWebsocketMultiReadThread.FInstance <> nil then
begin
TIdWebsocketMultiReadThread.Instance.Terminate;
TIdWebsocketMultiReadThread.Instance.WaitFor;
// TBaseNamedThread.WaitForThread(TIdWebsocketMultiReadThread.Instance, 5 * 1000);
TIdWebsocketMultiReadThread.RemoveInstance;
end;
TIdWebsocketMultiReadThread.RemoveInstance;
TIdWebsocketDispatchThread.RemoveInstance
end.

View file

@ -31,10 +31,12 @@ type
FCloseReason: string;
FCloseCode: Integer;
FClosing: Boolean;
class var FUseSingleWriteThread: Boolean;
protected
FMessageStream: TMemoryStream;
FWriteTextToTarget: Boolean;
FCloseCodeSend: Boolean;
FPendingWriteCount: Integer;
function InternalReadDataFromSource(var VBuffer: TIdBytes; ARaiseExceptionOnTimeout: Boolean): Integer;
function ReadDataFromSource(var VBuffer: TIdBytes): Integer; override;
@ -50,6 +52,7 @@ type
property IsServerSide : Boolean read FIsServerSide write FIsServerSide;
property ClientExtensionBits : TWSExtensionBits read FExtensionBits write FExtensionBits;
public
class constructor Create;
procedure AfterConstruction;override;
destructor Destroy; override;
@ -57,6 +60,8 @@ type
procedure Unlock;
function TryLock: Boolean;
function HasData: Boolean;
procedure Clear;
function Readable(AMSec: Integer = IdTimeoutDefault): Boolean; override;
function Connected: Boolean; override;
@ -71,8 +76,47 @@ type
procedure WriteLnRFC(const AOut: string = ''; AEncoding: TIdTextEncoding = nil); override;
procedure Write(AValue: TStrings; AWriteLinesCount: Boolean = False; AEncoding: TIdTextEncoding = nil); overload; override;
procedure Write(AStream: TStream; aType: TWSDataType); overload;
procedure WriteBufferFlush(AByteCount: Integer); override;
class property UseSingleWriteThread: Boolean read FUseSingleWriteThread write FUseSingleWriteThread;
end;
TIdWebsocketQueueThread = class(TThread)
private
function GetThreadID: TThreadID;
protected
FLock: TCriticalSection;
FTempThread: Integer;
FEvent: TEvent;
FEvents, FProcessing: TList<TThreadProcedure>;
public
procedure AfterConstruction;override;
destructor Destroy; override;
procedure Lock;
procedure UnLock;
procedure ProcessQueue;
procedure Execute; override;
property ThreadID: TThreadID read GetThreadID;
procedure Terminate;
procedure QueueEvent(aEvent: TThreadProcedure);
end;
//http://tangentsoft.net/wskfaq/intermediate.html
//Winsock is not threadsafe, use a single/seperate write thread and seperate read thread
//(do not write in 2 different thread or read in 2 different threads)
TIdWebsocketWriteThread = class(TIdWebsocketQueueThread)
private
class var FInstance: TIdWebsocketWriteThread;
public
class function Instance: TIdWebsocketWriteThread;
class procedure RemoveInstance;
end;
//close frame codes
const
C_FrameClose_Normal = 1000; //1000 indicates a normal closure, meaning that the purpose for
@ -125,7 +169,8 @@ const
implementation
uses
SysUtils, Math, IdStream, IdStack, IdWinsock2, IdExceptionCore,
SysUtils, Math, Windows,
IdStream, IdStack, IdWinsock2, IdExceptionCore,
IdResourceStrings, IdResourceStringsCore;
//frame codes
@ -139,6 +184,23 @@ const
C_FrameCode_Pong = 10 {A};
//B-F are reserved for further control frames
function BytesToStringRaw(const AValue: TIdBytes): string;
var
i: Integer;
begin
//SetLength(Result, Length(aValue));
for i := 0 to High(AValue) do
begin
if (AValue[i] < 33) or
( (AValue[i] > 126) and
(AValue[i] < 161) )
then
Result := Result + '#' + IntToStr(AValue[i])
else
Result := Result + Char(AValue[i])
end;
end;
{ TIdIOHandlerStack_Websocket }
procedure TIdIOHandlerWebsocket.AfterConstruction;
@ -150,6 +212,12 @@ begin
FSelectLock := TCriticalSection.Create;
end;
procedure TIdIOHandlerWebsocket.Clear;
begin
FWSInputBuffer.Clear;
InputBuffer.Clear;
end;
procedure TIdIOHandlerWebsocket.Close;
var
iaWriteBuffer: TIdBytes;
@ -224,11 +292,24 @@ end;
function TIdIOHandlerWebsocket.Connected: Boolean;
begin
Result := inherited Connected;
Lock;
try
Result := inherited Connected;
finally
Unlock;
end;
end;
class constructor TIdIOHandlerWebsocket.Create;
begin
//UseSingleWriteThread := True;
end;
destructor TIdIOHandlerWebsocket.Destroy;
begin
while FPendingWriteCount > 0 do
Sleep(1);
FLock.Enter;
FSelectLock.Enter;
FLock.Free;
@ -239,10 +320,15 @@ begin
inherited;
end;
function TIdIOHandlerWebsocket.HasData: Boolean;
begin
//buffered data available? (more data from previous read)
Result := (FWSInputBuffer.Size > 0);
end;
function TIdIOHandlerWebsocket.InternalReadDataFromSource(
var VBuffer: TIdBytes; ARaiseExceptionOnTimeout: Boolean): Integer;
begin
Result := -1;
SetLength(VBuffer, 0);
CheckForDisconnect;
@ -268,7 +354,8 @@ begin
begin
CheckForDisconnect; //disconnected in the mean time?
GStack.CheckForSocketError(GStack.WSGetLastError); //check for socket error
EIdNoDataToRead.Toss(RSIdNoDataToRead); //nothing read? then connection is probably closed -> exit
if ARaiseExceptionOnTimeout then
EIdNoDataToRead.Toss(RSIdNoDataToRead); //nothing read? then connection is probably closed -> exit
end;
SetLength(VBuffer, Result);
end;
@ -276,61 +363,163 @@ end;
procedure TIdIOHandlerWebsocket.WriteLn(const AOut: string;
AEncoding: TIdTextEncoding);
begin
FWriteTextToTarget := True;
try
inherited WriteLn(AOut, TIdTextEncoding.UTF8); //must be UTF8!
finally
FWriteTextToTarget := False;
if UseSingleWriteThread and IsWebsocket and
(GetCurrentThreadId <> TIdWebsocketWriteThread.Instance.ThreadID) then
begin
InterlockedIncrement(FPendingWriteCount);
TIdWebsocketWriteThread.Instance.QueueEvent(
procedure
begin
InterlockedDecrement(FPendingWriteCount);
WriteLn(AOut, AEncoding);
end)
end
else
begin
Lock;
try
FWriteTextToTarget := True;
inherited WriteLn(AOut, TIdTextEncoding.UTF8); //must be UTF8!
finally
FWriteTextToTarget := False;
Unlock;
end;
end;
end;
procedure TIdIOHandlerWebsocket.WriteLnRFC(const AOut: string;
AEncoding: TIdTextEncoding);
begin
FWriteTextToTarget := True;
try
inherited WriteLnRFC(AOut, TIdTextEncoding.UTF8); //must be UTF8!
finally
FWriteTextToTarget := False;
if UseSingleWriteThread and IsWebsocket and
(GetCurrentThreadId <> TIdWebsocketWriteThread.Instance.ThreadID) then
begin
InterlockedIncrement(FPendingWriteCount);
TIdWebsocketWriteThread.Instance.QueueEvent(
procedure
begin
InterlockedDecrement(FPendingWriteCount);
WriteLnRFC(AOut, AEncoding);
end)
end
else
begin
Lock;
try
FWriteTextToTarget := True;
inherited WriteLnRFC(AOut, TIdTextEncoding.UTF8); //must be UTF8!
finally
FWriteTextToTarget := False;
Unlock;
end;
end;
end;
procedure TIdIOHandlerWebsocket.Write(const AOut: string;
AEncoding: TIdTextEncoding);
begin
FWriteTextToTarget := True;
try
inherited Write(AOut, TIdTextEncoding.UTF8); //must be UTF8!
finally
FWriteTextToTarget := False;
if UseSingleWriteThread and IsWebsocket and
(GetCurrentThreadId <> TIdWebsocketWriteThread.Instance.ThreadID) then
begin
InterlockedIncrement(FPendingWriteCount);
TIdWebsocketWriteThread.Instance.QueueEvent(
procedure
begin
InterlockedDecrement(FPendingWriteCount);
Write(AOut, AEncoding);
end)
end
else
begin
Lock;
try
FWriteTextToTarget := True;
inherited Write(AOut, TIdTextEncoding.UTF8); //must be UTF8!
finally
FWriteTextToTarget := False;
Unlock;
end;
end;
end;
procedure TIdIOHandlerWebsocket.Write(AValue: TStrings;
AWriteLinesCount: Boolean; AEncoding: TIdTextEncoding);
begin
FWriteTextToTarget := True;
try
inherited Write(AValue, AWriteLinesCount, TIdTextEncoding.UTF8); //must be UTF8!
finally
FWriteTextToTarget := False;
if UseSingleWriteThread and IsWebsocket and
(GetCurrentThreadId <> TIdWebsocketWriteThread.Instance.ThreadID) then
begin
InterlockedIncrement(FPendingWriteCount);
TIdWebsocketWriteThread.Instance.QueueEvent(
procedure
begin
InterlockedDecrement(FPendingWriteCount);
Write(AValue, AWriteLinesCount, AEncoding);
end)
end
else
begin
Lock;
try
FWriteTextToTarget := True;
inherited Write(AValue, AWriteLinesCount, TIdTextEncoding.UTF8); //must be UTF8!
finally
FWriteTextToTarget := False;
Unlock;
end;
end;
end;
procedure TIdIOHandlerWebsocket.Write(AStream: TStream;
aType: TWSDataType);
begin
FWriteTextToTarget := (aType = wdtText);
try
inherited Write(AStream);
finally
FWriteTextToTarget := False;
if UseSingleWriteThread and IsWebsocket and
(GetCurrentThreadId <> TIdWebsocketWriteThread.Instance.ThreadID) then
begin
InterlockedIncrement(FPendingWriteCount);
TIdWebsocketWriteThread.Instance.QueueEvent(
procedure
begin
InterlockedDecrement(FPendingWriteCount);
Write(AStream, aType);
end)
end
else
begin
Lock;
try
FWriteTextToTarget := (aType = wdtText);
inherited Write(AStream);
finally
FWriteTextToTarget := False;
Unlock;
end;
end;
end;
procedure TIdIOHandlerWebsocket.WriteBufferFlush(AByteCount: Integer);
begin
if (FWriteBuffer = nil) or (FWriteBuffer.Size <= 0) then Exit;
if UseSingleWriteThread and IsWebsocket and
(GetCurrentThreadId <> TIdWebsocketWriteThread.Instance.ThreadID) then
begin
InterlockedIncrement(FPendingWriteCount);
TIdWebsocketWriteThread.Instance.QueueEvent(
procedure
begin
InterlockedDecrement(FPendingWriteCount);
WriteBufferFlush(AByteCount);
end)
end
else
inherited WriteBufferFlush(AByteCount);
end;
function TIdIOHandlerWebsocket.WriteDataToTarget(const ABuffer: TIdBytes;
const AOffset, ALength: Integer): Integer;
begin
if UseSingleWriteThread and IsWebsocket and (GetCurrentThreadId <> TIdWebsocketWriteThread.Instance.ThreadID) then
Assert(False, 'Write done in different thread than TIdWebsocketWriteThread!');
if not IsWebsocket then
Result := inherited WriteDataToTarget(ABuffer, AOffset, ALength)
else
@ -541,9 +730,15 @@ var
var
temp: TIdBytes;
begin
if HasData then Exit(True);
Result := InternalReadDataFromSource(temp, ARaiseExceptionOnTimeout) > 0;
if Result then
begin
FWSInputBuffer.Write(temp);
//if debughook > 0 then
// OutputDebugString(PChar('Received: ' + BytesToStringRaw(temp)));
end;
end;
function _GetByte: Byte;
@ -570,6 +765,8 @@ var
begin
InternalReadDataFromSource(temp, True);
FWSInputBuffer.Write(temp);
//if debughook > 0 then
// OutputDebugString(PChar('Received: ' + BytesToStringRaw(temp)));
if FWSInputBuffer.Size < aCount then
Sleep(1);
end;
@ -691,7 +888,7 @@ function TIdIOHandlerWebsocket.WriteData(aData: TIdBytes;
aType: TWSDataCode; aFIN, aRSV1, aRSV2, aRSV3: boolean): integer;
var
iByte: Byte;
i: NativeInt;
i, ioffset: NativeInt;
iDataLength, iPos: Int64;
rLength: Int64Rec;
rMask: record
@ -784,14 +981,9 @@ begin
//write header
strmData.Position := 0;
TIdStreamHelper.ReadBytes(strmData, bData);
Result := Binding.Send(bData);
//Mask? Note: Only clients must apply a mask
if IsServerSide then
begin
Result := Binding.Send(aData);
end
else
if not IsServerSide then
begin
iPos := 0;
iDataLength := Length(aData);
@ -802,13 +994,161 @@ begin
aData[iPos] := iByte;
inc(iPos);
end;
//send masked data
Result := Binding.Send(aData);
end;
AppendBytes(bData, aData); //important: send all at once!
ioffset := 0;
repeat
Result := Binding.Send(bData, ioffset);
Inc(ioffset, Result);
until ioffset >= Length(bData);
//if debughook > 0 then
// OutputDebugString(PChar('Written: ' + BytesToStringRaw(bData)));
finally
strmData.Free;
end;
end;
{ TIdWebsocketQueueThread }
procedure TIdWebsocketQueueThread.AfterConstruction;
begin
inherited;
FLock := TCriticalSection.Create;
FEvents := TList<TThreadProcedure>.Create;
FProcessing := TList<TThreadProcedure>.Create;
FEvent := TEvent.Create;
end;
destructor TIdWebsocketQueueThread.Destroy;
begin
Lock;
FEvents.Clear;
FProcessing.Free;
FEvent.Free;
UnLock;
FEvents.Free;
FLock.Free;
inherited;
end;
procedure TIdWebsocketQueueThread.Execute;
begin
TThread.NameThreadForDebugging(Self.ClassName);
while not Terminated do
begin
try
if FEvent.WaitFor(3 * 1000) = wrSignaled then
begin
FEvent.ResetEvent;
ProcessQueue;
end;
if FProcessing.Count > 0 then
ProcessQueue;
except
//continue
end;
end;
end;
function TIdWebsocketQueueThread.GetThreadID: TThreadID;
begin
if FTempThread > 0 then
Result := FTempThread
else
Result := inherited ThreadID;
end;
procedure TIdWebsocketQueueThread.Lock;
begin
//System.TMonitor.Enter(FEvents);
FLock.Enter;
end;
procedure TIdWebsocketQueueThread.ProcessQueue;
var
proc: Classes.TThreadProcedure;
begin
FTempThread := GetCurrentThreadId;
Lock;
try
//copy
while FEvents.Count > 0 do
begin
proc := FEvents.Items[0];
FProcessing.Add(proc);
FEvents.Delete(0);
end;
finally
UnLock;
end;
while FProcessing.Count > 0 do
begin
proc := FProcessing.Items[0];
FProcessing.Delete(0);
proc();
end;
end;
procedure TIdWebsocketQueueThread.QueueEvent(aEvent: TThreadProcedure);
begin
if Terminated then Exit;
Lock;
try
FEvents.Add(aEvent);
finally
UnLock;
end;
FEvent.SetEvent;
end;
procedure TIdWebsocketQueueThread.Terminate;
begin
inherited Terminate;
FEvent.SetEvent;
end;
procedure TIdWebsocketQueueThread.UnLock;
begin
//System.TMonitor.Exit(FEvents);
FLock.Leave;
end;
{ TIdWebsocketWriteThread }
class function TIdWebsocketWriteThread.Instance: TIdWebsocketWriteThread;
begin
if FInstance = nil then
begin
GlobalNameSpace.BeginWrite;
try
if FInstance = nil then
begin
FInstance := Self.Create(True);
FInstance.Start;
end;
finally
GlobalNameSpace.EndWrite;
end;
end;
Result := FInstance;
end;
class procedure TIdWebsocketWriteThread.RemoveInstance;
begin
if FInstance <> nil then
begin
FInstance.Terminate;
FInstance.WaitFor;
FreeAndNil(FInstance);
end;
end;
end.

View file

@ -58,13 +58,15 @@ begin
Assert(aSocketIOHandler <> nil);
aSocketIOHandler.WriteConnect(context);
end;
//AThread.Connection.Socket.UseNagle := False;
AThread.Connection.Socket.UseNagle := False; //no 200ms delay!
tstart := Now;
context := AThread as TIdServerWSContext;
while AThread.Connection.Connected do
begin
if (AThread.Connection.IOHandler.InputBuffer.Size > 0) or
if context.IOHandler.HasData or
(AThread.Connection.IOHandler.InputBuffer.Size > 0) or
AThread.Connection.IOHandler.Readable(1 * 1000) then //wait 5s, else ping the client(!)
begin
tstart := Now;
@ -72,7 +74,6 @@ begin
strmResponse := TMemoryStream.Create;
strmRequest := TMemoryStream.Create;
try
context := AThread as TIdServerWSContext;
strmRequest.Position := 0;
//first is the type: text or bin
@ -278,9 +279,7 @@ begin
context.WebSocketVersion := StrToIntDef(sValue, 0);
if context.WebSocketVersion < 13 then
Abort; //must be at least 13
end
else
Abort; //must exist

View file

@ -40,6 +40,7 @@ type
TSocketIOContext = class(TInterfacedObject,
ISocketIOContext)
private
FLock: TCriticalSection;
FPingSend: Boolean;
FConnectSend: Boolean;
FGUID: string;
@ -59,6 +60,7 @@ type
public
constructor Create();overload;
constructor Create(aClient: TIdHTTP);overload;
procedure AfterConstruction; override;
destructor Destroy; override;
procedure Lock;
@ -97,6 +99,7 @@ type
TIdBaseSocketIOHandling = class(TIdServerBaseHandling)
protected
FLock: TCriticalSection;
FConnections: TObjectDictionary<TIdContext,TSocketIOContext>;
FConnectionsGUID: TObjectDictionary<string,TSocketIOContext>;
@ -133,8 +136,8 @@ type
procedure ProcessSocketIO_XHR(const aGUID: string; const aStrmRequest, aStrmResponse: TStream);
procedure ProcessSocketIORequest(const ASocket: TSocketIOContext; const strmRequest: TMemoryStream);overload;
procedure ProcessSocketIORequest(const ASocket: TSocketIOContext; const aData: string);overload;
procedure ProcessSocketIORequest(const ASocket: ISocketIOContext; const strmRequest: TMemoryStream);overload;
procedure ProcessSocketIORequest(const ASocket: ISocketIOContext; const aData: string);overload;
procedure ProcessSocketIORequest(const AContext: TIdContext; const strmRequest: TMemoryStream);overload;
procedure ProcessHeatbeatRequest(const ASocket: TSocketIOContext; const aText: string);virtual;
@ -175,6 +178,8 @@ uses
procedure TIdBaseSocketIOHandling.AfterConstruction;
begin
inherited;
FLock := TCriticalSection.Create;
FConnections := TObjectDictionary<TIdContext,TSocketIOContext>.Create([doOwnsValues]);
FConnectionsGUID := TObjectDictionary<string,TSocketIOContext>.Create([doOwnsValues]);
@ -191,6 +196,7 @@ destructor TIdBaseSocketIOHandling.Destroy;
var squid: string;
idcontext: TIdContext;
begin
Lock;
FSocketIOEventCallback.Free;
FSocketIOEventCallbackRef.Free;
FSocketIOErrorRef.Free;
@ -212,8 +218,11 @@ begin
FConnectionsGUID.ExtractPair(squid);
end;
FConnections.Free;
FConnections := nil;
FConnectionsGUID.Free;
UnLock;
FLock.Free;
inherited;
end;
@ -223,29 +232,33 @@ var squid: string;
idcontext: TIdContext;
begin
if ASocket = nil then Exit;
Lock;
try
ASocket.Context := nil;
ASocket.FIOHandler := nil;
ASocket.FClient := nil;
ASocket.FHandling := nil;
ASocket.FGUID := '';
ASocket.FPeerIP := '';
ASocket.Context := nil;
ASocket.FIOHandler := nil;
ASocket.FClient := nil;
ASocket.FHandling := nil;
ASocket.FGUID := '';
ASocket.FPeerIP := '';
for idcontext in FConnections.Keys do
begin
if FConnections.Items[idcontext] = ASocket then
for idcontext in FConnections.Keys do
begin
FConnections.ExtractPair(idcontext);
ASocket._Release;
if FConnections.Items[idcontext] = ASocket then
begin
FConnections.ExtractPair(idcontext);
ASocket._Release;
end;
end;
end;
for squid in FConnectionsGUID.Keys do
begin
if FConnectionsGUID.Items[squid] = ASocket then
for squid in FConnectionsGUID.Keys do
begin
FConnectionsGUID.ExtractPair(squid);
ASocket._Release; //use reference count? otherwise AV when used in TThread.Queue
if FConnectionsGUID.Items[squid] = ASocket then
begin
FConnectionsGUID.ExtractPair(squid);
ASocket._Release; //use reference count? otherwise AV when used in TThread.Queue
end;
end;
finally
Unlock;
end;
end;
@ -264,7 +277,9 @@ end;
procedure TIdBaseSocketIOHandling.Lock;
begin
System.TMonitor.Enter(Self);
// Assert(FConnections <> nil);
// System.TMonitor.Enter(Self);
FLock.Enter;
end;
function TIdBaseSocketIOHandling.NewConnection(
@ -428,7 +443,7 @@ begin
end;
procedure TIdBaseSocketIOHandling.ProcessSocketIORequest(
const ASocket: TSocketIOContext; const strmRequest: TMemoryStream);
const ASocket: ISocketIOContext; const strmRequest: TMemoryStream);
function __ReadToEnd: string;
var
@ -543,11 +558,12 @@ end;
procedure TIdBaseSocketIOHandling.UnLock;
begin
System.TMonitor.Exit(Self);
//System.TMonitor.Exit(Self);
FLock.Leave;
end;
procedure TIdBaseSocketIOHandling.ProcessSocketIORequest(
const ASocket: TSocketIOContext; const aData: string);
const ASocket: ISocketIOContext; const aData: string);
function __GetSocketIOPart(const aData: string; aIndex: Integer): string;
var ipos: Integer;
@ -583,16 +599,18 @@ var
callbackobj: TSocketIOCallbackObj;
errorref: TSocketIOError;
error: ISuperObject;
socket: TSocketIOContext;
begin
if ASocket = nil then Exit;
socket := ASocket as TSocketIOContext;
if not FConnections.ContainsValue(ASocket) and
not FConnectionsGUID.ContainsValue(ASocket) then
if not FConnections.ContainsValue(socket) and
not FConnectionsGUID.ContainsValue(socket) then
begin
Lock;
try
ASocket._AddRef;
FConnections.Add(nil, ASocket); //clients do not have a TIdContext?
FConnections.Add(nil, socket); //clients do not have a TIdContext?
finally
UnLock;
end;
@ -621,21 +639,21 @@ begin
if StartsStr('0:', str) then
begin
schannel := __GetSocketIOPart(str, 2);
ProcessCloseChannel(ASocket, schannel);
ProcessCloseChannel(socket, schannel);
end
//(1) Connect
//'1::' [path] [query]
else if StartsStr('1:', str) then
begin
//todo: add channel/room to authorized channel/room list
if not ASocket.ConnectSend then
WriteString(ASocket, str); //write same connect back, e.g. 1::/chat
if not socket.ConnectSend then
WriteString(socket, str); //write same connect back, e.g. 1::/chat
end
//(2) Heartbeat
else if StartsStr('2:', str) then
begin
//todo: timer to disconnect client if no ping within time
ProcessHeatbeatRequest(ASocket, str);
ProcessHeatbeatRequest(socket, str);
end
//(3) Message (https://github.com/LearnBoost/socket.io-spec#3-message)
//'3:' [message id ('+')] ':' [message endpoint] ':' [data]
@ -649,10 +667,10 @@ begin
callbackobj := TSocketIOCallbackObj.Create;
try
callbackobj.FHandling := Self;
callbackobj.FSocket := ASocket;
callbackobj.FSocket := socket;
callbackobj.FMsgNr := imsg;
try
OnSocketIOMsg(ASocket, sdata, callbackobj); //, imsg, bCallback);
OnSocketIOMsg(socket, sdata, callbackobj); //, imsg, bCallback);
except
on E:Exception do
begin
@ -682,10 +700,10 @@ begin
callbackobj := TSocketIOCallbackObj.Create;
try
callbackobj.FHandling := Self;
callbackobj.FSocket := ASocket;
callbackobj.FSocket := socket;
callbackobj.FMsgNr := imsg;
try
OnSocketIOJson(ASocket, SO(sdata), callbackobj); //, imsg, bCallback);
OnSocketIOJson(socket, SO(sdata), callbackobj); //, imsg, bCallback);
except
on E:Exception do
begin
@ -712,7 +730,7 @@ begin
//if Assigned(OnSocketIOEvent) then
// OnSocketIOEvent(AContext, sdata, imsg, bCallback);
try
ProcessEvent(ASocket, sdata, imsg, bCallback);
ProcessEvent(socket, sdata, imsg, bCallback);
except
on e:exception do
//
@ -989,6 +1007,13 @@ end;
{ TSocketIOContext }
procedure TSocketIOContext.AfterConstruction;
begin
inherited;
FLock := TCriticalSection.Create;
FQueue := TList<string>.Create;
end;
constructor TSocketIOContext.Create(aClient: TIdHTTP);
begin
FClient := aClient;
@ -1003,7 +1028,9 @@ destructor TSocketIOContext.Destroy;
begin
Lock;
FEvent.Free;
FQueue.Free;
FreeAndNil(FQueue);
UnLock;
FLock.Free;
inherited;
end;
@ -1029,7 +1056,9 @@ end;
procedure TSocketIOContext.Lock;
begin
System.TMonitor.Enter(Self);
// Assert(FQueue <> nil);
// System.TMonitor.Enter(Self);
FLock.Enter;
end;
constructor TSocketIOContext.Create;
@ -1059,8 +1088,6 @@ procedure TSocketIOContext.QueueData(const aData: string);
begin
if FEvent = nil then
FEvent := TEvent.Create;
if FQueue = nil then
FQueue := TList<string>.Create;
FQueue.Add(aData);
FEvent.SetEvent;
@ -1134,7 +1161,8 @@ end;
procedure TSocketIOContext.UnLock;
begin
System.TMonitor.Exit(Self);
//System.TMonitor.Exit(Self);
FLock.Leave;
end;
function TSocketIOContext.WaitForQueue(aTimeout_ms: Integer): string;