diff --git a/IdHTTPWebsocketClient.pas b/IdHTTPWebsocketClient.pas index 1c061fc..ec94c68 100644 --- a/IdHTTPWebsocketClient.pas +++ b/IdHTTPWebsocketClient.pas @@ -140,23 +140,13 @@ type class procedure RemoveInstance; end; - //async post data - TIdWebsocketDispatchThread = class(TThread) + //async process data + TIdWebsocketDispatchThread = class(TIdWebsocketQueueThread) private class var FInstance: TIdWebsocketDispatchThread; - protected - FEvent: TEvent; - FEvents, FProcessing: TList; - procedure Execute; override; public - procedure AfterConstruction;override; - destructor Destroy; override; - - procedure Terminate; - - procedure QueueEvent(aEvent: TThreadProcedure); - - class function Instance: TIdWebsocketDispatchThread; + class function Instance: TIdWebsocketDispatchThread; + class procedure RemoveInstance; end; implementation @@ -248,6 +238,9 @@ end; procedure TIdHTTPWebsocketClient.Connect; begin + if IOHandler <> nil then + IOHandler.Clear; + FHeartBeat.Enabled := True; inherited Connect; end; @@ -289,7 +282,7 @@ begin inherited DisConnect(ANotifyPeer); //clear buffer, other still "connected" - IOHandler.InputBuffer.Clear; + IOHandler.Clear; //IOHandler.Free; //IOHandler := TIdIOHandlerWebsocket.Create(nil); @@ -320,20 +313,17 @@ begin try if (IOHandler <> nil) and not IOHandler.ClosedGracefully and - //IOHandler.Connected and + IOHandler.Connected and (FSocketIOContext <> nil) then begin - //not threadsafe because of background ReadAllThreads? - //FSocketIO.WritePing(FSocketIOContext as TSocketIOContext); //heartbeat socket.io message + FSocketIO.WritePing(FSocketIOContext as TSocketIOContext); //heartbeat socket.io message end //retry re-connect else try //clear inputbuffer, otherwise it can't connect :( - if (IOHandler <> nil) and - not IOHandler.InputBufferIsEmpty - then - IOHandler.DiscardAll; + if (IOHandler <> nil) then + IOHandler.Clear; Self.Connect; TryUpgradeToWebsocket; @@ -343,10 +333,8 @@ begin except on E:Exception do begin //clear inputbuffer, otherwise it stays connected :( - if (IOHandler <> nil) and - not IOHandler.InputBufferIsEmpty - then - IOHandler.DiscardAll; + if (IOHandler <> nil) then + IOHandler.Clear; Disconnect(False); if Assigned(OnDisConnected) then @@ -1048,22 +1036,15 @@ begin (chn.Socket.Binding.Handle > 0) and (chn.Socket.Binding.Handle <> INVALID_SOCKET) then begin -// if chn.IOHandler.TryLock then -// try + if chn.IOHandler.HasData then + begin + Inc(iResult); + Break; + end; - //todo: seperate read thread is needed because of threadsafety - if chn.IOHandler.Readable( 1000 div l.Count ) then - begin - Inc(iResult); - Break; - end; -// finally -// chn.IOHandler.Unlock; -// end; - -// Freadset.fd_count := iCount+1; -// Freadset.fd_array[iCount] := chn.Socket.Binding.Handle; -// Inc(iCount); + Freadset.fd_count := iCount+1; + Freadset.fd_array[iCount] := chn.Socket.Binding.Handle; + Inc(iCount); end; end; @@ -1081,23 +1062,24 @@ begin Finterval.tv_sec := 15; //15s Finterval.tv_usec := 0; - { //nothing to wait for? then sleep some time to prevent 100% CPU - if iCount = 0 then + if iResult = 0 then begin - iResult := IdWinsock2.select(0, nil, nil, @Fexceptionset, @Finterval); + if iCount = 0 then + begin + iResult := IdWinsock2.select(0, nil, nil, @Fexceptionset, @Finterval); + if iResult = SOCKET_ERROR then + iResult := 1; //ignore errors + end + //wait till a socket has some data (or a signal via exceptionset is fired) + else + iResult := IdWinsock2.select(0, @Freadset, nil, @Fexceptionset, @Finterval); if iResult = SOCKET_ERROR then - iResult := 1; //ignore errors - end - //wait till a socket has some data (or a signal via exceptionset is fired) - else - iResult := IdWinsock2.select(0, @Freadset, nil, @Fexceptionset, @Finterval); + //raise EIdWinsockStubError.Build(WSAGetLastError, '', []); + //ignore error during wait: socket disconnected etc + Exit; + end; - if iResult = SOCKET_ERROR then - //raise EIdWinsockStubError.Build(WSAGetLastError, '', []); - //ignore error during wait: socket disconnected etc - Exit; - } if Terminated then Exit; //some data? @@ -1113,7 +1095,8 @@ begin chn := TIdHTTPWebsocketClient(l.Items[i]); try //try to process all events - while chn.IOHandler.Readable(0) do //has some data + while chn.IOHandler.HasData or + chn.IOHandler.Readable(0) do //has some data begin ws := chn.IOHandler as TIdIOHandlerWebsocket; //no pending dispatch active? (so actually we only read events here?) @@ -1172,7 +1155,6 @@ begin end; end else - Sleep(10); end; procedure TIdWebsocketMultiReadThread.RemoveClient( @@ -1186,7 +1168,11 @@ end; class procedure TIdWebsocketMultiReadThread.RemoveInstance; begin if FInstance <> nil then + begin + FInstance.Terminate; + FInstance.WaitFor; FreeAndNil(FInstance); + end; end; procedure TIdWebsocketMultiReadThread.ResetSpecialEventSocket; @@ -1214,63 +1200,6 @@ end; { TIdWebsocketDispatchThread } -procedure TIdWebsocketDispatchThread.AfterConstruction; -begin - inherited; - FEvents := TList.Create; - FProcessing := TList.Create; - FEvent := TEvent.Create; -end; - -destructor TIdWebsocketDispatchThread.Destroy; -begin - System.TMonitor.Enter(FEvents); - FEvents.Clear; - FEvents.Free; - FProcessing.Free; - - FEvent.Free; - inherited; -end; - -procedure TIdWebsocketDispatchThread.Execute; -var - proc: Classes.TThreadProcedure; -begin - TThread.NameThreadForDebugging(Self.ClassName); - - while not Terminated do - begin - try - if FEvent.WaitFor(3 * 1000) = wrSignaled then - begin - FEvent.ResetEvent; - System.TMonitor.Enter(FEvents); - try - //copy - while FEvents.Count > 0 do - begin - proc := FEvents.Items[0]; - FProcessing.Add(proc); - FEvents.Delete(0); - end; - finally - System.TMonitor.Exit(FEvents); - end; - end; - - while FProcessing.Count > 0 do - begin - proc := FProcessing.Items[0]; - FProcessing.Delete(0); - proc(); - end; - except - //continue - end; - end; -end; - class function TIdWebsocketDispatchThread.Instance: TIdWebsocketDispatchThread; begin if FInstance = nil then @@ -1279,7 +1208,7 @@ begin try if FInstance = nil then begin - FInstance := TIdWebsocketDispatchThread.Create(True); + FInstance := Self.Create(True); FInstance.Start; end; finally @@ -1289,31 +1218,19 @@ begin Result := FInstance; end; -procedure TIdWebsocketDispatchThread.QueueEvent(aEvent: TThreadProcedure); +class procedure TIdWebsocketDispatchThread.RemoveInstance; begin - System.TMonitor.Enter(FEvents); - try - FEvents.Add(aEvent); - finally - System.TMonitor.Exit(FEvents); + if FInstance <> nil then + begin + FInstance.Terminate; + FInstance.WaitFor; + FreeAndNil(FInstance); end; - FEvent.SetEvent; -end; - -procedure TIdWebsocketDispatchThread.Terminate; -begin - inherited Terminate; - FEvent.SetEvent; end; initialization finalization - if TIdWebsocketMultiReadThread.FInstance <> nil then - begin - TIdWebsocketMultiReadThread.Instance.Terminate; - TIdWebsocketMultiReadThread.Instance.WaitFor; -// TBaseNamedThread.WaitForThread(TIdWebsocketMultiReadThread.Instance, 5 * 1000); - TIdWebsocketMultiReadThread.RemoveInstance; - end; + TIdWebsocketMultiReadThread.RemoveInstance; + TIdWebsocketDispatchThread.RemoveInstance end. diff --git a/IdIOHandlerWebsocket.pas b/IdIOHandlerWebsocket.pas index ddfb968..defe9cb 100644 --- a/IdIOHandlerWebsocket.pas +++ b/IdIOHandlerWebsocket.pas @@ -31,10 +31,12 @@ type FCloseReason: string; FCloseCode: Integer; FClosing: Boolean; + class var FUseSingleWriteThread: Boolean; protected FMessageStream: TMemoryStream; FWriteTextToTarget: Boolean; FCloseCodeSend: Boolean; + FPendingWriteCount: Integer; function InternalReadDataFromSource(var VBuffer: TIdBytes; ARaiseExceptionOnTimeout: Boolean): Integer; function ReadDataFromSource(var VBuffer: TIdBytes): Integer; override; @@ -50,6 +52,7 @@ type property IsServerSide : Boolean read FIsServerSide write FIsServerSide; property ClientExtensionBits : TWSExtensionBits read FExtensionBits write FExtensionBits; public + class constructor Create; procedure AfterConstruction;override; destructor Destroy; override; @@ -57,6 +60,8 @@ type procedure Unlock; function TryLock: Boolean; + function HasData: Boolean; + procedure Clear; function Readable(AMSec: Integer = IdTimeoutDefault): Boolean; override; function Connected: Boolean; override; @@ -71,8 +76,47 @@ type procedure WriteLnRFC(const AOut: string = ''; AEncoding: TIdTextEncoding = nil); override; procedure Write(AValue: TStrings; AWriteLinesCount: Boolean = False; AEncoding: TIdTextEncoding = nil); overload; override; procedure Write(AStream: TStream; aType: TWSDataType); overload; + procedure WriteBufferFlush(AByteCount: Integer); override; + + class property UseSingleWriteThread: Boolean read FUseSingleWriteThread write FUseSingleWriteThread; end; + TIdWebsocketQueueThread = class(TThread) + private + function GetThreadID: TThreadID; + protected + FLock: TCriticalSection; + FTempThread: Integer; + FEvent: TEvent; + FEvents, FProcessing: TList; + public + procedure AfterConstruction;override; + destructor Destroy; override; + + procedure Lock; + procedure UnLock; + + procedure ProcessQueue; + procedure Execute; override; + + property ThreadID: TThreadID read GetThreadID; + + procedure Terminate; + procedure QueueEvent(aEvent: TThreadProcedure); + end; + + //http://tangentsoft.net/wskfaq/intermediate.html + //Winsock is not threadsafe, use a single/seperate write thread and seperate read thread + //(do not write in 2 different thread or read in 2 different threads) + TIdWebsocketWriteThread = class(TIdWebsocketQueueThread) + private + class var FInstance: TIdWebsocketWriteThread; + public + class function Instance: TIdWebsocketWriteThread; + class procedure RemoveInstance; + end; + + //close frame codes const C_FrameClose_Normal = 1000; //1000 indicates a normal closure, meaning that the purpose for @@ -125,7 +169,8 @@ const implementation uses - SysUtils, Math, IdStream, IdStack, IdWinsock2, IdExceptionCore, + SysUtils, Math, Windows, + IdStream, IdStack, IdWinsock2, IdExceptionCore, IdResourceStrings, IdResourceStringsCore; //frame codes @@ -139,6 +184,23 @@ const C_FrameCode_Pong = 10 {A}; //B-F are reserved for further control frames +function BytesToStringRaw(const AValue: TIdBytes): string; +var + i: Integer; +begin + //SetLength(Result, Length(aValue)); + for i := 0 to High(AValue) do + begin + if (AValue[i] < 33) or + ( (AValue[i] > 126) and + (AValue[i] < 161) ) + then + Result := Result + '#' + IntToStr(AValue[i]) + else + Result := Result + Char(AValue[i]) + end; +end; + { TIdIOHandlerStack_Websocket } procedure TIdIOHandlerWebsocket.AfterConstruction; @@ -150,6 +212,12 @@ begin FSelectLock := TCriticalSection.Create; end; +procedure TIdIOHandlerWebsocket.Clear; +begin + FWSInputBuffer.Clear; + InputBuffer.Clear; +end; + procedure TIdIOHandlerWebsocket.Close; var iaWriteBuffer: TIdBytes; @@ -224,11 +292,24 @@ end; function TIdIOHandlerWebsocket.Connected: Boolean; begin - Result := inherited Connected; + Lock; + try + Result := inherited Connected; + finally + Unlock; + end; +end; + +class constructor TIdIOHandlerWebsocket.Create; +begin + //UseSingleWriteThread := True; end; destructor TIdIOHandlerWebsocket.Destroy; begin + while FPendingWriteCount > 0 do + Sleep(1); + FLock.Enter; FSelectLock.Enter; FLock.Free; @@ -239,10 +320,15 @@ begin inherited; end; +function TIdIOHandlerWebsocket.HasData: Boolean; +begin + //buffered data available? (more data from previous read) + Result := (FWSInputBuffer.Size > 0); +end; + function TIdIOHandlerWebsocket.InternalReadDataFromSource( var VBuffer: TIdBytes; ARaiseExceptionOnTimeout: Boolean): Integer; begin - Result := -1; SetLength(VBuffer, 0); CheckForDisconnect; @@ -268,7 +354,8 @@ begin begin CheckForDisconnect; //disconnected in the mean time? GStack.CheckForSocketError(GStack.WSGetLastError); //check for socket error - EIdNoDataToRead.Toss(RSIdNoDataToRead); //nothing read? then connection is probably closed -> exit + if ARaiseExceptionOnTimeout then + EIdNoDataToRead.Toss(RSIdNoDataToRead); //nothing read? then connection is probably closed -> exit end; SetLength(VBuffer, Result); end; @@ -276,61 +363,163 @@ end; procedure TIdIOHandlerWebsocket.WriteLn(const AOut: string; AEncoding: TIdTextEncoding); begin - FWriteTextToTarget := True; - try - inherited WriteLn(AOut, TIdTextEncoding.UTF8); //must be UTF8! - finally - FWriteTextToTarget := False; + if UseSingleWriteThread and IsWebsocket and + (GetCurrentThreadId <> TIdWebsocketWriteThread.Instance.ThreadID) then + begin + InterlockedIncrement(FPendingWriteCount); + TIdWebsocketWriteThread.Instance.QueueEvent( + procedure + begin + InterlockedDecrement(FPendingWriteCount); + WriteLn(AOut, AEncoding); + end) + end + else + begin + Lock; + try + FWriteTextToTarget := True; + inherited WriteLn(AOut, TIdTextEncoding.UTF8); //must be UTF8! + finally + FWriteTextToTarget := False; + Unlock; + end; end; end; procedure TIdIOHandlerWebsocket.WriteLnRFC(const AOut: string; AEncoding: TIdTextEncoding); begin - FWriteTextToTarget := True; - try - inherited WriteLnRFC(AOut, TIdTextEncoding.UTF8); //must be UTF8! - finally - FWriteTextToTarget := False; + if UseSingleWriteThread and IsWebsocket and + (GetCurrentThreadId <> TIdWebsocketWriteThread.Instance.ThreadID) then + begin + InterlockedIncrement(FPendingWriteCount); + TIdWebsocketWriteThread.Instance.QueueEvent( + procedure + begin + InterlockedDecrement(FPendingWriteCount); + WriteLnRFC(AOut, AEncoding); + end) + end + else + begin + Lock; + try + FWriteTextToTarget := True; + inherited WriteLnRFC(AOut, TIdTextEncoding.UTF8); //must be UTF8! + finally + FWriteTextToTarget := False; + Unlock; + end; end; end; procedure TIdIOHandlerWebsocket.Write(const AOut: string; AEncoding: TIdTextEncoding); begin - FWriteTextToTarget := True; - try - inherited Write(AOut, TIdTextEncoding.UTF8); //must be UTF8! - finally - FWriteTextToTarget := False; + if UseSingleWriteThread and IsWebsocket and + (GetCurrentThreadId <> TIdWebsocketWriteThread.Instance.ThreadID) then + begin + InterlockedIncrement(FPendingWriteCount); + TIdWebsocketWriteThread.Instance.QueueEvent( + procedure + begin + InterlockedDecrement(FPendingWriteCount); + Write(AOut, AEncoding); + end) + end + else + begin + Lock; + try + FWriteTextToTarget := True; + inherited Write(AOut, TIdTextEncoding.UTF8); //must be UTF8! + finally + FWriteTextToTarget := False; + Unlock; + end; end; end; procedure TIdIOHandlerWebsocket.Write(AValue: TStrings; AWriteLinesCount: Boolean; AEncoding: TIdTextEncoding); begin - FWriteTextToTarget := True; - try - inherited Write(AValue, AWriteLinesCount, TIdTextEncoding.UTF8); //must be UTF8! - finally - FWriteTextToTarget := False; + if UseSingleWriteThread and IsWebsocket and + (GetCurrentThreadId <> TIdWebsocketWriteThread.Instance.ThreadID) then + begin + InterlockedIncrement(FPendingWriteCount); + TIdWebsocketWriteThread.Instance.QueueEvent( + procedure + begin + InterlockedDecrement(FPendingWriteCount); + Write(AValue, AWriteLinesCount, AEncoding); + end) + end + else + begin + Lock; + try + FWriteTextToTarget := True; + inherited Write(AValue, AWriteLinesCount, TIdTextEncoding.UTF8); //must be UTF8! + finally + FWriteTextToTarget := False; + Unlock; + end; end; end; procedure TIdIOHandlerWebsocket.Write(AStream: TStream; aType: TWSDataType); begin - FWriteTextToTarget := (aType = wdtText); - try - inherited Write(AStream); - finally - FWriteTextToTarget := False; + if UseSingleWriteThread and IsWebsocket and + (GetCurrentThreadId <> TIdWebsocketWriteThread.Instance.ThreadID) then + begin + InterlockedIncrement(FPendingWriteCount); + TIdWebsocketWriteThread.Instance.QueueEvent( + procedure + begin + InterlockedDecrement(FPendingWriteCount); + Write(AStream, aType); + end) + end + else + begin + Lock; + try + FWriteTextToTarget := (aType = wdtText); + inherited Write(AStream); + finally + FWriteTextToTarget := False; + Unlock; + end; end; end; +procedure TIdIOHandlerWebsocket.WriteBufferFlush(AByteCount: Integer); +begin + if (FWriteBuffer = nil) or (FWriteBuffer.Size <= 0) then Exit; + + if UseSingleWriteThread and IsWebsocket and + (GetCurrentThreadId <> TIdWebsocketWriteThread.Instance.ThreadID) then + begin + InterlockedIncrement(FPendingWriteCount); + TIdWebsocketWriteThread.Instance.QueueEvent( + procedure + begin + InterlockedDecrement(FPendingWriteCount); + WriteBufferFlush(AByteCount); + end) + end + else + inherited WriteBufferFlush(AByteCount); +end; + function TIdIOHandlerWebsocket.WriteDataToTarget(const ABuffer: TIdBytes; const AOffset, ALength: Integer): Integer; begin + if UseSingleWriteThread and IsWebsocket and (GetCurrentThreadId <> TIdWebsocketWriteThread.Instance.ThreadID) then + Assert(False, 'Write done in different thread than TIdWebsocketWriteThread!'); + if not IsWebsocket then Result := inherited WriteDataToTarget(ABuffer, AOffset, ALength) else @@ -541,9 +730,15 @@ var var temp: TIdBytes; begin + if HasData then Exit(True); + Result := InternalReadDataFromSource(temp, ARaiseExceptionOnTimeout) > 0; if Result then + begin FWSInputBuffer.Write(temp); + //if debughook > 0 then + // OutputDebugString(PChar('Received: ' + BytesToStringRaw(temp))); + end; end; function _GetByte: Byte; @@ -570,6 +765,8 @@ var begin InternalReadDataFromSource(temp, True); FWSInputBuffer.Write(temp); + //if debughook > 0 then + // OutputDebugString(PChar('Received: ' + BytesToStringRaw(temp))); if FWSInputBuffer.Size < aCount then Sleep(1); end; @@ -691,7 +888,7 @@ function TIdIOHandlerWebsocket.WriteData(aData: TIdBytes; aType: TWSDataCode; aFIN, aRSV1, aRSV2, aRSV3: boolean): integer; var iByte: Byte; - i: NativeInt; + i, ioffset: NativeInt; iDataLength, iPos: Int64; rLength: Int64Rec; rMask: record @@ -784,14 +981,9 @@ begin //write header strmData.Position := 0; TIdStreamHelper.ReadBytes(strmData, bData); - Result := Binding.Send(bData); //Mask? Note: Only clients must apply a mask - if IsServerSide then - begin - Result := Binding.Send(aData); - end - else + if not IsServerSide then begin iPos := 0; iDataLength := Length(aData); @@ -802,13 +994,161 @@ begin aData[iPos] := iByte; inc(iPos); end; - - //send masked data - Result := Binding.Send(aData); end; + + AppendBytes(bData, aData); //important: send all at once! + ioffset := 0; + repeat + Result := Binding.Send(bData, ioffset); + Inc(ioffset, Result); + until ioffset >= Length(bData); + + //if debughook > 0 then + // OutputDebugString(PChar('Written: ' + BytesToStringRaw(bData))); finally strmData.Free; end; end; +{ TIdWebsocketQueueThread } + +procedure TIdWebsocketQueueThread.AfterConstruction; +begin + inherited; + FLock := TCriticalSection.Create; + FEvents := TList.Create; + FProcessing := TList.Create; + FEvent := TEvent.Create; +end; + +destructor TIdWebsocketQueueThread.Destroy; +begin + Lock; + FEvents.Clear; + FProcessing.Free; + + FEvent.Free; + UnLock; + FEvents.Free; + FLock.Free; + inherited; +end; + +procedure TIdWebsocketQueueThread.Execute; +begin + TThread.NameThreadForDebugging(Self.ClassName); + + while not Terminated do + begin + try + if FEvent.WaitFor(3 * 1000) = wrSignaled then + begin + FEvent.ResetEvent; + ProcessQueue; + end; + + if FProcessing.Count > 0 then + ProcessQueue; + except + //continue + end; + end; +end; + +function TIdWebsocketQueueThread.GetThreadID: TThreadID; +begin + if FTempThread > 0 then + Result := FTempThread + else + Result := inherited ThreadID; +end; + +procedure TIdWebsocketQueueThread.Lock; +begin + //System.TMonitor.Enter(FEvents); + FLock.Enter; +end; + +procedure TIdWebsocketQueueThread.ProcessQueue; +var + proc: Classes.TThreadProcedure; +begin + FTempThread := GetCurrentThreadId; + + Lock; + try + //copy + while FEvents.Count > 0 do + begin + proc := FEvents.Items[0]; + FProcessing.Add(proc); + FEvents.Delete(0); + end; + finally + UnLock; + end; + + while FProcessing.Count > 0 do + begin + proc := FProcessing.Items[0]; + FProcessing.Delete(0); + proc(); + end; +end; + +procedure TIdWebsocketQueueThread.QueueEvent(aEvent: TThreadProcedure); +begin + if Terminated then Exit; + + Lock; + try + FEvents.Add(aEvent); + finally + UnLock; + end; + FEvent.SetEvent; +end; + +procedure TIdWebsocketQueueThread.Terminate; +begin + inherited Terminate; + FEvent.SetEvent; +end; + +procedure TIdWebsocketQueueThread.UnLock; +begin + //System.TMonitor.Exit(FEvents); + FLock.Leave; +end; + +{ TIdWebsocketWriteThread } + +class function TIdWebsocketWriteThread.Instance: TIdWebsocketWriteThread; +begin + if FInstance = nil then + begin + GlobalNameSpace.BeginWrite; + try + if FInstance = nil then + begin + FInstance := Self.Create(True); + FInstance.Start; + end; + finally + GlobalNameSpace.EndWrite; + end; + end; + Result := FInstance; +end; + +class procedure TIdWebsocketWriteThread.RemoveInstance; +begin + if FInstance <> nil then + begin + FInstance.Terminate; + FInstance.WaitFor; + FreeAndNil(FInstance); + end; +end; + end. diff --git a/IdServerWebsocketHandling.pas b/IdServerWebsocketHandling.pas index b4987e3..04bbe29 100644 --- a/IdServerWebsocketHandling.pas +++ b/IdServerWebsocketHandling.pas @@ -58,13 +58,15 @@ begin Assert(aSocketIOHandler <> nil); aSocketIOHandler.WriteConnect(context); end; - //AThread.Connection.Socket.UseNagle := False; + AThread.Connection.Socket.UseNagle := False; //no 200ms delay! tstart := Now; + context := AThread as TIdServerWSContext; while AThread.Connection.Connected do begin - if (AThread.Connection.IOHandler.InputBuffer.Size > 0) or + if context.IOHandler.HasData or + (AThread.Connection.IOHandler.InputBuffer.Size > 0) or AThread.Connection.IOHandler.Readable(1 * 1000) then //wait 5s, else ping the client(!) begin tstart := Now; @@ -72,7 +74,6 @@ begin strmResponse := TMemoryStream.Create; strmRequest := TMemoryStream.Create; try - context := AThread as TIdServerWSContext; strmRequest.Position := 0; //first is the type: text or bin @@ -278,9 +279,7 @@ begin context.WebSocketVersion := StrToIntDef(sValue, 0); if context.WebSocketVersion < 13 then - Abort; //must be at least 13 - end else Abort; //must exist diff --git a/IdSocketIOHandling.pas b/IdSocketIOHandling.pas index 9ef473b..bd3be74 100644 --- a/IdSocketIOHandling.pas +++ b/IdSocketIOHandling.pas @@ -40,6 +40,7 @@ type TSocketIOContext = class(TInterfacedObject, ISocketIOContext) private + FLock: TCriticalSection; FPingSend: Boolean; FConnectSend: Boolean; FGUID: string; @@ -59,6 +60,7 @@ type public constructor Create();overload; constructor Create(aClient: TIdHTTP);overload; + procedure AfterConstruction; override; destructor Destroy; override; procedure Lock; @@ -97,6 +99,7 @@ type TIdBaseSocketIOHandling = class(TIdServerBaseHandling) protected + FLock: TCriticalSection; FConnections: TObjectDictionary; FConnectionsGUID: TObjectDictionary; @@ -133,8 +136,8 @@ type procedure ProcessSocketIO_XHR(const aGUID: string; const aStrmRequest, aStrmResponse: TStream); - procedure ProcessSocketIORequest(const ASocket: TSocketIOContext; const strmRequest: TMemoryStream);overload; - procedure ProcessSocketIORequest(const ASocket: TSocketIOContext; const aData: string);overload; + procedure ProcessSocketIORequest(const ASocket: ISocketIOContext; const strmRequest: TMemoryStream);overload; + procedure ProcessSocketIORequest(const ASocket: ISocketIOContext; const aData: string);overload; procedure ProcessSocketIORequest(const AContext: TIdContext; const strmRequest: TMemoryStream);overload; procedure ProcessHeatbeatRequest(const ASocket: TSocketIOContext; const aText: string);virtual; @@ -175,6 +178,8 @@ uses procedure TIdBaseSocketIOHandling.AfterConstruction; begin inherited; + FLock := TCriticalSection.Create; + FConnections := TObjectDictionary.Create([doOwnsValues]); FConnectionsGUID := TObjectDictionary.Create([doOwnsValues]); @@ -191,6 +196,7 @@ destructor TIdBaseSocketIOHandling.Destroy; var squid: string; idcontext: TIdContext; begin + Lock; FSocketIOEventCallback.Free; FSocketIOEventCallbackRef.Free; FSocketIOErrorRef.Free; @@ -212,8 +218,11 @@ begin FConnectionsGUID.ExtractPair(squid); end; FConnections.Free; + FConnections := nil; FConnectionsGUID.Free; + UnLock; + FLock.Free; inherited; end; @@ -223,29 +232,33 @@ var squid: string; idcontext: TIdContext; begin if ASocket = nil then Exit; + Lock; + try + ASocket.Context := nil; + ASocket.FIOHandler := nil; + ASocket.FClient := nil; + ASocket.FHandling := nil; + ASocket.FGUID := ''; + ASocket.FPeerIP := ''; - ASocket.Context := nil; - ASocket.FIOHandler := nil; - ASocket.FClient := nil; - ASocket.FHandling := nil; - ASocket.FGUID := ''; - ASocket.FPeerIP := ''; - - for idcontext in FConnections.Keys do - begin - if FConnections.Items[idcontext] = ASocket then + for idcontext in FConnections.Keys do begin - FConnections.ExtractPair(idcontext); - ASocket._Release; + if FConnections.Items[idcontext] = ASocket then + begin + FConnections.ExtractPair(idcontext); + ASocket._Release; + end; end; - end; - for squid in FConnectionsGUID.Keys do - begin - if FConnectionsGUID.Items[squid] = ASocket then + for squid in FConnectionsGUID.Keys do begin - FConnectionsGUID.ExtractPair(squid); - ASocket._Release; //use reference count? otherwise AV when used in TThread.Queue + if FConnectionsGUID.Items[squid] = ASocket then + begin + FConnectionsGUID.ExtractPair(squid); + ASocket._Release; //use reference count? otherwise AV when used in TThread.Queue + end; end; + finally + Unlock; end; end; @@ -264,7 +277,9 @@ end; procedure TIdBaseSocketIOHandling.Lock; begin - System.TMonitor.Enter(Self); +// Assert(FConnections <> nil); +// System.TMonitor.Enter(Self); + FLock.Enter; end; function TIdBaseSocketIOHandling.NewConnection( @@ -428,7 +443,7 @@ begin end; procedure TIdBaseSocketIOHandling.ProcessSocketIORequest( - const ASocket: TSocketIOContext; const strmRequest: TMemoryStream); + const ASocket: ISocketIOContext; const strmRequest: TMemoryStream); function __ReadToEnd: string; var @@ -543,11 +558,12 @@ end; procedure TIdBaseSocketIOHandling.UnLock; begin - System.TMonitor.Exit(Self); + //System.TMonitor.Exit(Self); + FLock.Leave; end; procedure TIdBaseSocketIOHandling.ProcessSocketIORequest( - const ASocket: TSocketIOContext; const aData: string); + const ASocket: ISocketIOContext; const aData: string); function __GetSocketIOPart(const aData: string; aIndex: Integer): string; var ipos: Integer; @@ -583,16 +599,18 @@ var callbackobj: TSocketIOCallbackObj; errorref: TSocketIOError; error: ISuperObject; + socket: TSocketIOContext; begin if ASocket = nil then Exit; + socket := ASocket as TSocketIOContext; - if not FConnections.ContainsValue(ASocket) and - not FConnectionsGUID.ContainsValue(ASocket) then + if not FConnections.ContainsValue(socket) and + not FConnectionsGUID.ContainsValue(socket) then begin Lock; try ASocket._AddRef; - FConnections.Add(nil, ASocket); //clients do not have a TIdContext? + FConnections.Add(nil, socket); //clients do not have a TIdContext? finally UnLock; end; @@ -621,21 +639,21 @@ begin if StartsStr('0:', str) then begin schannel := __GetSocketIOPart(str, 2); - ProcessCloseChannel(ASocket, schannel); + ProcessCloseChannel(socket, schannel); end //(1) Connect //'1::' [path] [query] else if StartsStr('1:', str) then begin //todo: add channel/room to authorized channel/room list - if not ASocket.ConnectSend then - WriteString(ASocket, str); //write same connect back, e.g. 1::/chat + if not socket.ConnectSend then + WriteString(socket, str); //write same connect back, e.g. 1::/chat end //(2) Heartbeat else if StartsStr('2:', str) then begin //todo: timer to disconnect client if no ping within time - ProcessHeatbeatRequest(ASocket, str); + ProcessHeatbeatRequest(socket, str); end //(3) Message (https://github.com/LearnBoost/socket.io-spec#3-message) //'3:' [message id ('+')] ':' [message endpoint] ':' [data] @@ -649,10 +667,10 @@ begin callbackobj := TSocketIOCallbackObj.Create; try callbackobj.FHandling := Self; - callbackobj.FSocket := ASocket; + callbackobj.FSocket := socket; callbackobj.FMsgNr := imsg; try - OnSocketIOMsg(ASocket, sdata, callbackobj); //, imsg, bCallback); + OnSocketIOMsg(socket, sdata, callbackobj); //, imsg, bCallback); except on E:Exception do begin @@ -682,10 +700,10 @@ begin callbackobj := TSocketIOCallbackObj.Create; try callbackobj.FHandling := Self; - callbackobj.FSocket := ASocket; + callbackobj.FSocket := socket; callbackobj.FMsgNr := imsg; try - OnSocketIOJson(ASocket, SO(sdata), callbackobj); //, imsg, bCallback); + OnSocketIOJson(socket, SO(sdata), callbackobj); //, imsg, bCallback); except on E:Exception do begin @@ -712,7 +730,7 @@ begin //if Assigned(OnSocketIOEvent) then // OnSocketIOEvent(AContext, sdata, imsg, bCallback); try - ProcessEvent(ASocket, sdata, imsg, bCallback); + ProcessEvent(socket, sdata, imsg, bCallback); except on e:exception do // @@ -989,6 +1007,13 @@ end; { TSocketIOContext } +procedure TSocketIOContext.AfterConstruction; +begin + inherited; + FLock := TCriticalSection.Create; + FQueue := TList.Create; +end; + constructor TSocketIOContext.Create(aClient: TIdHTTP); begin FClient := aClient; @@ -1003,7 +1028,9 @@ destructor TSocketIOContext.Destroy; begin Lock; FEvent.Free; - FQueue.Free; + FreeAndNil(FQueue); + UnLock; + FLock.Free; inherited; end; @@ -1029,7 +1056,9 @@ end; procedure TSocketIOContext.Lock; begin - System.TMonitor.Enter(Self); +// Assert(FQueue <> nil); +// System.TMonitor.Enter(Self); + FLock.Enter; end; constructor TSocketIOContext.Create; @@ -1059,8 +1088,6 @@ procedure TSocketIOContext.QueueData(const aData: string); begin if FEvent = nil then FEvent := TEvent.Create; - if FQueue = nil then - FQueue := TList.Create; FQueue.Add(aData); FEvent.SetEvent; @@ -1134,7 +1161,8 @@ end; procedure TSocketIOContext.UnLock; begin - System.TMonitor.Exit(Self); + //System.TMonitor.Exit(Self); + FLock.Leave; end; function TSocketIOContext.WaitForQueue(aTimeout_ms: Integer): string;