From b2ffe540dfcac7fc71fd22852dcccf6a756176b0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Mussche?= Date: Fri, 7 Mar 2014 13:40:04 +0100 Subject: [PATCH] better threadsafe connecting --- IdHTTPWebsocketClient.pas | 45 ++++++++----- IdIOHandlerWebsocket.pas | 135 +++++++++++++++++++++++++------------- 2 files changed, 118 insertions(+), 62 deletions(-) diff --git a/IdHTTPWebsocketClient.pas b/IdHTTPWebsocketClient.pas index 0b869e3..5def380 100644 --- a/IdHTTPWebsocketClient.pas +++ b/IdHTTPWebsocketClient.pas @@ -497,17 +497,25 @@ var i: Integer; sKey, sResponseKey: string; sSocketioextended: string; + bLocked: boolean; begin Assert((IOHandler = nil) or not IOHandler.IsWebsocket); //remove from thread during connection handling TIdWebsocketMultiReadThread.Instance.RemoveClient(Self); - //reset pending data - if IOHandler <> nil then - IOHandler.Clear; - + bLocked := False; strmResponse := TMemoryStream.Create; + Self.Lock; try + //reset pending data + if IOHandler <> nil then + begin + IOHandler.Lock; + bLocked := True; + if IOHandler.IsWebsocket then Exit; + IOHandler.Clear; + end; + //special socket.io handling, see https://github.com/LearnBoost/socket.io-spec if SocketIOCompatible then begin @@ -693,6 +701,10 @@ begin Request.Clear; strmResponse.Free; + if bLocked and (IOHandler <> nil) then + IOHandler.Unlock; + Unlock; + //add to thread for auto retry/reconnect TIdWebsocketMultiReadThread.Instance.AddClient(Self); end; @@ -1204,7 +1216,7 @@ begin end; //reconnect needed? (in background) - if FReconnectlist.Count > 0 then + if (FReconnectlist <> nil) and (FReconnectlist.Count > 0) then begin if FReconnectThread = nil then FReconnectThread := TIdWebsocketQueueThread.Create(False{direct start}); @@ -1217,7 +1229,7 @@ begin while FReconnectlist.Count > 0 do begin chn := nil; - try + try //get first one l := FReconnectlist.LockList; try @@ -1239,15 +1251,18 @@ begin if ( (ws = nil) or (SecondsBetween(Now, ws.LastActivityTime) >= 5) ) then begin - try - if ws <> nil then - ws.LastActivityTime := Now; - chn.ConnectTimeout := 1000; - if (chn.Host <> '') and (chn.Port > 0) then - chn.TryUpgradeToWebsocket; - except - //just try - end; + try + if not chn.Connected then + begin + if ws <> nil then + ws.LastActivityTime := Now; + //chn.ConnectTimeout := 1000; + if (chn.Host <> '') and (chn.Port > 0) then + chn.TryUpgradeToWebsocket; + end; + except + //just try + end; end; //remove from todo list diff --git a/IdIOHandlerWebsocket.pas b/IdIOHandlerWebsocket.pas index 2404c2a..4ad9a64 100644 --- a/IdIOHandlerWebsocket.pas +++ b/IdIOHandlerWebsocket.pas @@ -1,5 +1,7 @@ unit IdIOHandlerWebsocket; +{$DEFINE DEBUG_WS} + //The WebSocket Protocol, RFC 6455 //http://datatracker.ietf.org/doc/rfc6455/?include_text=1 @@ -209,6 +211,8 @@ begin //SetLength(Result, Length(aValue)); for i := 0 to High(AValue) do begin + if AValue[i] = 0 then Exit; + if (AValue[i] < 33) or ( (AValue[i] > 126) and (AValue[i] < 161) ) @@ -550,24 +554,40 @@ begin if UseSingleWriteThread and IsWebsocket and (GetCurrentThreadId <> TIdWebsocketWriteThread.Instance.ThreadID) then Assert(False, 'Write done in different thread than TIdWebsocketWriteThread!'); - if not IsWebsocket then - Result := inherited WriteDataToTarget(ABuffer, AOffset, ALength) - else - begin - Lock; - try - if FWriteTextToTarget then - Result := WriteData(ABuffer, wdcText, True{send all at once}, - webBit1 in ClientExtensionBits, webBit2 in ClientExtensionBits, webBit3 in ClientExtensionBits) - else - Result := WriteData(ABuffer, wdcBinary, True{send all at once}, - webBit1 in ClientExtensionBits, webBit2 in ClientExtensionBits, webBit3 in ClientExtensionBits); - except - Unlock; //always unlock when socket exception - FClosedGracefully := True; - Raise; + Lock; + try + Result := -1; + if not IsWebsocket then + begin + {$IFDEF DEBUG_WS} + if Debughook > 0 then + OutputDebugString(PChar(Format('Send (non ws, TID:%d, P:%d): %s', + [getcurrentthreadid, Self.Binding.PeerPort, BytesToStringRaw(ABuffer)]))); + {$ENDIF} + Result := inherited WriteDataToTarget(ABuffer, AOffset, ALength) + end + else + begin + {$IFDEF DEBUG_WS} + if Debughook > 0 then + OutputDebugString(PChar(Format('Send (ws, TID:%d, P:%d): %s', + [getcurrentthreadid, Self.Binding.PeerPort, BytesToStringRaw(ABuffer)]))); + {$ENDIF} + try + if FWriteTextToTarget then + Result := WriteData(ABuffer, wdcText, True{send all at once}, + webBit1 in ClientExtensionBits, webBit2 in ClientExtensionBits, webBit3 in ClientExtensionBits) + else + Result := WriteData(ABuffer, wdcBinary, True{send all at once}, + webBit1 in ClientExtensionBits, webBit2 in ClientExtensionBits, webBit3 in ClientExtensionBits); + except + FClosedGracefully := True; + Result := -1; + Raise; + end; end; - Unlock; //normal unlock (no double try finally) + finally + Unlock; end; end; @@ -596,27 +616,44 @@ begin IsWebsocket := True; end; - if not IsWebsocket then - Result := inherited ReadDataFromSource(VBuffer) - else - begin - Lock; - try - //we wait till we have a full message here (can be fragmented in several frames) - Result := ReadMessage(VBuffer, wscode); + Result := -1; + Lock; + try + if not IsWebsocket then + begin + Result := inherited ReadDataFromSource(VBuffer); + {$IFDEF DEBUG_WS} + if Debughook > 0 then + OutputDebugString(PChar(Format('Received (non ws, TID:%d, P:%d): %s', + [getcurrentthreadid, Self.Binding.PeerPort, BytesToStringRaw(VBuffer)]))); + {$ENDIF} + end + else + begin + try + //we wait till we have a full message here (can be fragmented in several frames) + Result := ReadMessage(VBuffer, wscode); - //first write the data code (text or binary, ping, pong) - FInputBuffer.Write(LongWord(Ord(wscode))); - //we write message size here, vbuffer is written after this. This way we can use ReadStream to get 1 single message (in case multiple messages in FInputBuffer) - if LargeStream then - FInputBuffer.Write(Int64(Result)) - else - FInputBuffer.Write(LongWord(Result)) - except - Unlock; //always unlock when socket exception - FClosedGracefully := True; //closed (but not gracefully?) - Raise; + {$IFDEF DEBUG_WS} + if Debughook > 0 then + OutputDebugString(PChar(Format('Received (ws, TID:%d, P:%d): %s', + [getcurrentthreadid, Self.Binding.PeerPort, BytesToStringRaw(VBuffer)]))); + {$ENDIF} + + //first write the data code (text or binary, ping, pong) + FInputBuffer.Write(LongWord(Ord(wscode))); + //we write message size here, vbuffer is written after this. This way we can use ReadStream to get 1 single message (in case multiple messages in FInputBuffer) + if LargeStream then + FInputBuffer.Write(Int64(Result)) + else + FInputBuffer.Write(LongWord(Result)) + except + Unlock; //always unlock when socket exception + FClosedGracefully := True; //closed (but not gracefully?) + Raise; + end; end; + finally Unlock; //normal unlock (no double try finally) end; end; @@ -662,7 +699,7 @@ begin wdcText, wdcBinary: begin if lFirstDataCode <> wdcNone then - raise EIdWebSocketHandleError.Create('Invalid frame: specified data code only allowed for the first frame'); + raise EIdWebSocketHandleError.Create('Invalid frame: specified data code only allowed for the first frame. Data = ' + BytesToStringRaw(iaReadBuffer)); lFirstDataCode := lDataCode; FMessageStream.Clear; @@ -671,7 +708,7 @@ begin wdcContinuation: begin if not (lFirstDataCode in [wdcText, wdcBinary]) then - raise EIdWebSocketHandleError.Create('Invalid frame continuation'); + raise EIdWebSocketHandleError.Create('Invalid frame continuation. Data = ' + BytesToStringRaw(iaReadBuffer)); TIdStreamHelper.Write(FMessageStream, iaReadBuffer); end; wdcClose: @@ -780,8 +817,10 @@ var if Result then begin FWSInputBuffer.Write(temp); - //if debughook > 0 then - // OutputDebugString(PChar('Received: ' + BytesToStringRaw(temp))); + {$IFDEF DEBUG_WS} + if debughook > 0 then + OutputDebugString(PChar('Received: ' + BytesToStringRaw(temp))); + {$ENDIF} end; end; @@ -809,8 +848,10 @@ var begin InternalReadDataFromSource(temp, True); FWSInputBuffer.Write(temp); - //if debughook > 0 then - // OutputDebugString(PChar('Received: ' + BytesToStringRaw(temp))); + {$IFDEF DEBUG_WS} + if debughook > 0 then + OutputDebugString(PChar('Received: ' + BytesToStringRaw(temp))); + {$ENDIF} if FWSInputBuffer.Size < aCount then Sleep(1); end; @@ -864,7 +905,7 @@ begin C_FrameCode_Ping: aDataCode := wdcPing; C_FrameCode_Pong: aDataCode := wdcPong; else - raise EIdException.CreateFmt('Unsupported data code: %d', [iCode]); + raise EIdException.CreateFmt('Unsupported data code: %d. Buffer = %s', [iCode, FWSInputBuffer.AsString]); end; //Mask: 1 bit @@ -896,9 +937,9 @@ begin //"All frames sent from client to server must have this bit set to 1" if IsServerSide and not bHasMask then - raise EIdWebSocketHandleError.Create('No mask supplied: mask is required for clients when sending data to server') + raise EIdWebSocketHandleError.Create('No mask supplied: mask is required for clients when sending data to server. Buffer = ' + FWSInputBuffer.AsString) else if not IsServerSide and bHasMask then - raise EIdWebSocketHandleError.Create('Mask supplied but mask is not allowed for servers when sending data to clients'); + raise EIdWebSocketHandleError.Create('Mask supplied but mask is not allowed for servers when sending data to clients. Buffer = ' + FWSInputBuffer.AsString); //Masking-key: 0 or 4 bytes if bHasMask then @@ -1050,8 +1091,8 @@ begin Inc(ioffset, Result); until ioffset >= Length(bData); - //if debughook > 0 then - // OutputDebugString(PChar('Written: ' + BytesToStringRaw(bData))); +// if debughook > 0 then +// OutputDebugString(PChar('Written: ' + BytesToStringRaw(bData))); finally Unlock; strmData.Free;