better threadsafe connecting

This commit is contained in:
André Mussche 2014-03-07 13:40:04 +01:00
parent d28d5b1457
commit b2ffe540df
2 changed files with 118 additions and 62 deletions

View file

@ -497,17 +497,25 @@ var
i: Integer; i: Integer;
sKey, sResponseKey: string; sKey, sResponseKey: string;
sSocketioextended: string; sSocketioextended: string;
bLocked: boolean;
begin begin
Assert((IOHandler = nil) or not IOHandler.IsWebsocket); Assert((IOHandler = nil) or not IOHandler.IsWebsocket);
//remove from thread during connection handling //remove from thread during connection handling
TIdWebsocketMultiReadThread.Instance.RemoveClient(Self); TIdWebsocketMultiReadThread.Instance.RemoveClient(Self);
//reset pending data bLocked := False;
if IOHandler <> nil then
IOHandler.Clear;
strmResponse := TMemoryStream.Create; strmResponse := TMemoryStream.Create;
Self.Lock;
try try
//reset pending data
if IOHandler <> nil then
begin
IOHandler.Lock;
bLocked := True;
if IOHandler.IsWebsocket then Exit;
IOHandler.Clear;
end;
//special socket.io handling, see https://github.com/LearnBoost/socket.io-spec //special socket.io handling, see https://github.com/LearnBoost/socket.io-spec
if SocketIOCompatible then if SocketIOCompatible then
begin begin
@ -693,6 +701,10 @@ begin
Request.Clear; Request.Clear;
strmResponse.Free; strmResponse.Free;
if bLocked and (IOHandler <> nil) then
IOHandler.Unlock;
Unlock;
//add to thread for auto retry/reconnect //add to thread for auto retry/reconnect
TIdWebsocketMultiReadThread.Instance.AddClient(Self); TIdWebsocketMultiReadThread.Instance.AddClient(Self);
end; end;
@ -1204,7 +1216,7 @@ begin
end; end;
//reconnect needed? (in background) //reconnect needed? (in background)
if FReconnectlist.Count > 0 then if (FReconnectlist <> nil) and (FReconnectlist.Count > 0) then
begin begin
if FReconnectThread = nil then if FReconnectThread = nil then
FReconnectThread := TIdWebsocketQueueThread.Create(False{direct start}); FReconnectThread := TIdWebsocketQueueThread.Create(False{direct start});
@ -1217,7 +1229,7 @@ begin
while FReconnectlist.Count > 0 do while FReconnectlist.Count > 0 do
begin begin
chn := nil; chn := nil;
try try
//get first one //get first one
l := FReconnectlist.LockList; l := FReconnectlist.LockList;
try try
@ -1239,15 +1251,18 @@ begin
if ( (ws = nil) or if ( (ws = nil) or
(SecondsBetween(Now, ws.LastActivityTime) >= 5) ) then (SecondsBetween(Now, ws.LastActivityTime) >= 5) ) then
begin begin
try try
if ws <> nil then if not chn.Connected then
ws.LastActivityTime := Now; begin
chn.ConnectTimeout := 1000; if ws <> nil then
if (chn.Host <> '') and (chn.Port > 0) then ws.LastActivityTime := Now;
chn.TryUpgradeToWebsocket; //chn.ConnectTimeout := 1000;
except if (chn.Host <> '') and (chn.Port > 0) then
//just try chn.TryUpgradeToWebsocket;
end; end;
except
//just try
end;
end; end;
//remove from todo list //remove from todo list

View file

@ -1,5 +1,7 @@
unit IdIOHandlerWebsocket; unit IdIOHandlerWebsocket;
{$DEFINE DEBUG_WS}
//The WebSocket Protocol, RFC 6455 //The WebSocket Protocol, RFC 6455
//http://datatracker.ietf.org/doc/rfc6455/?include_text=1 //http://datatracker.ietf.org/doc/rfc6455/?include_text=1
@ -209,6 +211,8 @@ begin
//SetLength(Result, Length(aValue)); //SetLength(Result, Length(aValue));
for i := 0 to High(AValue) do for i := 0 to High(AValue) do
begin begin
if AValue[i] = 0 then Exit;
if (AValue[i] < 33) or if (AValue[i] < 33) or
( (AValue[i] > 126) and ( (AValue[i] > 126) and
(AValue[i] < 161) ) (AValue[i] < 161) )
@ -550,24 +554,40 @@ begin
if UseSingleWriteThread and IsWebsocket and (GetCurrentThreadId <> TIdWebsocketWriteThread.Instance.ThreadID) then if UseSingleWriteThread and IsWebsocket and (GetCurrentThreadId <> TIdWebsocketWriteThread.Instance.ThreadID) then
Assert(False, 'Write done in different thread than TIdWebsocketWriteThread!'); Assert(False, 'Write done in different thread than TIdWebsocketWriteThread!');
if not IsWebsocket then Lock;
Result := inherited WriteDataToTarget(ABuffer, AOffset, ALength) try
else Result := -1;
begin if not IsWebsocket then
Lock; begin
try {$IFDEF DEBUG_WS}
if FWriteTextToTarget then if Debughook > 0 then
Result := WriteData(ABuffer, wdcText, True{send all at once}, OutputDebugString(PChar(Format('Send (non ws, TID:%d, P:%d): %s',
webBit1 in ClientExtensionBits, webBit2 in ClientExtensionBits, webBit3 in ClientExtensionBits) [getcurrentthreadid, Self.Binding.PeerPort, BytesToStringRaw(ABuffer)])));
else {$ENDIF}
Result := WriteData(ABuffer, wdcBinary, True{send all at once}, Result := inherited WriteDataToTarget(ABuffer, AOffset, ALength)
webBit1 in ClientExtensionBits, webBit2 in ClientExtensionBits, webBit3 in ClientExtensionBits); end
except else
Unlock; //always unlock when socket exception begin
FClosedGracefully := True; {$IFDEF DEBUG_WS}
Raise; if Debughook > 0 then
OutputDebugString(PChar(Format('Send (ws, TID:%d, P:%d): %s',
[getcurrentthreadid, Self.Binding.PeerPort, BytesToStringRaw(ABuffer)])));
{$ENDIF}
try
if FWriteTextToTarget then
Result := WriteData(ABuffer, wdcText, True{send all at once},
webBit1 in ClientExtensionBits, webBit2 in ClientExtensionBits, webBit3 in ClientExtensionBits)
else
Result := WriteData(ABuffer, wdcBinary, True{send all at once},
webBit1 in ClientExtensionBits, webBit2 in ClientExtensionBits, webBit3 in ClientExtensionBits);
except
FClosedGracefully := True;
Result := -1;
Raise;
end;
end; end;
Unlock; //normal unlock (no double try finally) finally
Unlock;
end; end;
end; end;
@ -596,27 +616,44 @@ begin
IsWebsocket := True; IsWebsocket := True;
end; end;
if not IsWebsocket then Result := -1;
Result := inherited ReadDataFromSource(VBuffer) Lock;
else try
begin if not IsWebsocket then
Lock; begin
try Result := inherited ReadDataFromSource(VBuffer);
//we wait till we have a full message here (can be fragmented in several frames) {$IFDEF DEBUG_WS}
Result := ReadMessage(VBuffer, wscode); if Debughook > 0 then
OutputDebugString(PChar(Format('Received (non ws, TID:%d, P:%d): %s',
[getcurrentthreadid, Self.Binding.PeerPort, BytesToStringRaw(VBuffer)])));
{$ENDIF}
end
else
begin
try
//we wait till we have a full message here (can be fragmented in several frames)
Result := ReadMessage(VBuffer, wscode);
//first write the data code (text or binary, ping, pong) {$IFDEF DEBUG_WS}
FInputBuffer.Write(LongWord(Ord(wscode))); if Debughook > 0 then
//we write message size here, vbuffer is written after this. This way we can use ReadStream to get 1 single message (in case multiple messages in FInputBuffer) OutputDebugString(PChar(Format('Received (ws, TID:%d, P:%d): %s',
if LargeStream then [getcurrentthreadid, Self.Binding.PeerPort, BytesToStringRaw(VBuffer)])));
FInputBuffer.Write(Int64(Result)) {$ENDIF}
else
FInputBuffer.Write(LongWord(Result)) //first write the data code (text or binary, ping, pong)
except FInputBuffer.Write(LongWord(Ord(wscode)));
Unlock; //always unlock when socket exception //we write message size here, vbuffer is written after this. This way we can use ReadStream to get 1 single message (in case multiple messages in FInputBuffer)
FClosedGracefully := True; //closed (but not gracefully?) if LargeStream then
Raise; FInputBuffer.Write(Int64(Result))
else
FInputBuffer.Write(LongWord(Result))
except
Unlock; //always unlock when socket exception
FClosedGracefully := True; //closed (but not gracefully?)
Raise;
end;
end; end;
finally
Unlock; //normal unlock (no double try finally) Unlock; //normal unlock (no double try finally)
end; end;
end; end;
@ -662,7 +699,7 @@ begin
wdcText, wdcBinary: wdcText, wdcBinary:
begin begin
if lFirstDataCode <> wdcNone then if lFirstDataCode <> wdcNone then
raise EIdWebSocketHandleError.Create('Invalid frame: specified data code only allowed for the first frame'); raise EIdWebSocketHandleError.Create('Invalid frame: specified data code only allowed for the first frame. Data = ' + BytesToStringRaw(iaReadBuffer));
lFirstDataCode := lDataCode; lFirstDataCode := lDataCode;
FMessageStream.Clear; FMessageStream.Clear;
@ -671,7 +708,7 @@ begin
wdcContinuation: wdcContinuation:
begin begin
if not (lFirstDataCode in [wdcText, wdcBinary]) then if not (lFirstDataCode in [wdcText, wdcBinary]) then
raise EIdWebSocketHandleError.Create('Invalid frame continuation'); raise EIdWebSocketHandleError.Create('Invalid frame continuation. Data = ' + BytesToStringRaw(iaReadBuffer));
TIdStreamHelper.Write(FMessageStream, iaReadBuffer); TIdStreamHelper.Write(FMessageStream, iaReadBuffer);
end; end;
wdcClose: wdcClose:
@ -780,8 +817,10 @@ var
if Result then if Result then
begin begin
FWSInputBuffer.Write(temp); FWSInputBuffer.Write(temp);
//if debughook > 0 then {$IFDEF DEBUG_WS}
// OutputDebugString(PChar('Received: ' + BytesToStringRaw(temp))); if debughook > 0 then
OutputDebugString(PChar('Received: ' + BytesToStringRaw(temp)));
{$ENDIF}
end; end;
end; end;
@ -809,8 +848,10 @@ var
begin begin
InternalReadDataFromSource(temp, True); InternalReadDataFromSource(temp, True);
FWSInputBuffer.Write(temp); FWSInputBuffer.Write(temp);
//if debughook > 0 then {$IFDEF DEBUG_WS}
// OutputDebugString(PChar('Received: ' + BytesToStringRaw(temp))); if debughook > 0 then
OutputDebugString(PChar('Received: ' + BytesToStringRaw(temp)));
{$ENDIF}
if FWSInputBuffer.Size < aCount then if FWSInputBuffer.Size < aCount then
Sleep(1); Sleep(1);
end; end;
@ -864,7 +905,7 @@ begin
C_FrameCode_Ping: aDataCode := wdcPing; C_FrameCode_Ping: aDataCode := wdcPing;
C_FrameCode_Pong: aDataCode := wdcPong; C_FrameCode_Pong: aDataCode := wdcPong;
else else
raise EIdException.CreateFmt('Unsupported data code: %d', [iCode]); raise EIdException.CreateFmt('Unsupported data code: %d. Buffer = %s', [iCode, FWSInputBuffer.AsString]);
end; end;
//Mask: 1 bit //Mask: 1 bit
@ -896,9 +937,9 @@ begin
//"All frames sent from client to server must have this bit set to 1" //"All frames sent from client to server must have this bit set to 1"
if IsServerSide and not bHasMask then if IsServerSide and not bHasMask then
raise EIdWebSocketHandleError.Create('No mask supplied: mask is required for clients when sending data to server') raise EIdWebSocketHandleError.Create('No mask supplied: mask is required for clients when sending data to server. Buffer = ' + FWSInputBuffer.AsString)
else if not IsServerSide and bHasMask then else if not IsServerSide and bHasMask then
raise EIdWebSocketHandleError.Create('Mask supplied but mask is not allowed for servers when sending data to clients'); raise EIdWebSocketHandleError.Create('Mask supplied but mask is not allowed for servers when sending data to clients. Buffer = ' + FWSInputBuffer.AsString);
//Masking-key: 0 or 4 bytes //Masking-key: 0 or 4 bytes
if bHasMask then if bHasMask then
@ -1050,8 +1091,8 @@ begin
Inc(ioffset, Result); Inc(ioffset, Result);
until ioffset >= Length(bData); until ioffset >= Length(bData);
//if debughook > 0 then // if debughook > 0 then
// OutputDebugString(PChar('Written: ' + BytesToStringRaw(bData))); // OutputDebugString(PChar('Written: ' + BytesToStringRaw(bData)));
finally finally
Unlock; Unlock;
strmData.Free; strmData.Free;