DelphiWebsockets/uROIndyHTTPWebsocketChannel.pas
2014-06-25 15:30:18 +02:00

485 lines
14 KiB
ObjectPascal

unit uROIndyHTTPWebsocketChannel;
interface
uses
Classes, SyncObjs,
uROIndyHTTPChannel, uROClientIntf,
IdHTTPWebsocketClient, IdHTTP, IdWinsock2;
const
C_RO_WS_NR: array[0..5] of AnsiChar = 'ROWSNR';
type
TROIndyHTTPWebsocketChannel = class;
//TROIndyHTTPSocketIOClient = class(TIdHTTPSocketIOClient_old)
TROIndyHTTPSocketIOClient = class(TIdHTTPWebsocketClient)
protected
FParent: TROIndyHTTPWebsocketChannel;
public
procedure AsyncDispatchEvent(const aEvent: TStream); overload; override;
procedure AsyncDispatchEvent(const aEvent: string); overload; override;
end;
TROIndyHTTPWebsocketChannel = class(TROIndyHTTPChannel,
IROActiveEventChannel)
private
function GetHost: string;
function GetPort: integer;
procedure SetHost(const Value: string);
procedure SetPort(const Value: integer);
function GetIndyClient: TIdHTTPWebsocketClient;
procedure SetWSResourceName(const Value: string);
function GetWSResourceName: string;
protected
FTriedUpgrade: Boolean;
FEventReceivers: TInterfaceList;
FMessageNr: Integer;
procedure IntDispatchEvent(aEvent: TStream);
procedure AsyncDispatchEvent(aEvent: TStream);
procedure SocketConnected(Sender: TObject);
procedure ResetChannel;
function TryUpgradeToWebsocket: Boolean;
procedure CheckConnection;
protected
procedure IntDispatch(aRequest, aResponse: TStream); override;
function CreateIndyClient: TIdHTTP; override;
protected
{IROActiveEventChannel}
procedure RegisterEventReceiver (aReceiver: IROEventReceiver);
procedure UnregisterEventReceiver(aReceiver: IROEventReceiver);
public
procedure AfterConstruction;override;
destructor Destroy; override;
published
property IndyClient: TIdHTTPWebsocketClient read GetIndyClient;
property Port: integer read GetPort write SetPort;
property Host: string read GetHost write SetHost;
property WSResourceName: string read GetWSResourceName write SetWSResourceName;
end;
procedure Register;
implementation
uses
SysUtils, Windows,
IdStack, IdStackConsts, IdGlobal, IdStackBSDBase,
uRORes, uROIndySupport, mcFinalizationHelper, IdIOHandlerWebsocket, StrUtils;
procedure Register;
begin
RegisterComponents('RBK', [TROIndyHTTPWebsocketChannel]);
end;
type
TAnonymousThread = class(TThread)
protected
FThreadProc: TThreadProcedure;
procedure Execute; override;
public
constructor Create(AThreadProc: TThreadProcedure);
end;
procedure CreateAnonymousThread(AThreadProc: TThreadProcedure);
begin
TAnonymousThread.Create(AThreadProc);
end;
{ TROIndyHTTPChannel_Websocket }
procedure TROIndyHTTPWebsocketChannel.AfterConstruction;
begin
inherited;
FEventReceivers := TInterfaceList.Create;
//not needed, is ignored at server now, but who knows later? :) e.g. support multiple sub protocols
WSResourceName := 'RemObjects';
end;
destructor TROIndyHTTPWebsocketChannel.Destroy;
begin
if TIdWebsocketMultiReadThread.Instance <> nil then
TIdWebsocketMultiReadThread.Instance.RemoveClient(Self.IndyClient);
FEventReceivers.Free;
inherited;
end;
function TROIndyHTTPWebsocketChannel.GetIndyClient: TIdHTTPWebsocketClient;
begin
Result := inherited IndyClient as TIdHTTPWebsocketClient;
end;
procedure TROIndyHTTPWebsocketChannel.SetHost(const Value: string);
begin
IndyClient.Host := Value;
TargetURL := Format('ws://%s:%d/%s', [Host, Port, WSResourceName]);
FTriedUpgrade := False; //reset
end;
procedure TROIndyHTTPWebsocketChannel.SetPort(const Value: integer);
begin
IndyClient.Port := Value;
TargetURL := Format('ws://%s:%d/%s', [Host, Port, WSResourceName]);
FTriedUpgrade := False; //reset
end;
procedure TROIndyHTTPWebsocketChannel.SetWSResourceName(const Value: string);
begin
IndyClient.WSResourceName := Value;
TargetURL := Format('ws://%s:%d/%s', [Host, Port, WSResourceName]);
FTriedUpgrade := False; //reset
end;
function TROIndyHTTPWebsocketChannel.GetHost: string;
begin
Result := IndyClient.Host;
end;
function TROIndyHTTPWebsocketChannel.GetPort: integer;
begin
Result := IndyClient.Port;
end;
function TROIndyHTTPWebsocketChannel.GetWSResourceName: string;
begin
Result := IndyClient.WSResourceName;
end;
procedure TROIndyHTTPWebsocketChannel.AsyncDispatchEvent(aEvent: TStream);
var
strmevent: TMemoryStream;
begin
strmevent := TMemoryStream.Create;
strmevent.CopyFrom(aEvent, aEvent.Size);
//events during dispatch? channel is busy so offload event dispatching to different thread!
TIdWebsocketDispatchThread.Instance.QueueEvent(
procedure
begin
IntDispatchEvent(strmevent);
strmevent.Free;
end);
//events during dispatch? channel is busy so offload event dispatching to different thread!
// CreateAnonymousThread(
// procedure
// begin
// IntDispatchEvent(strmevent);
// strmevent.Free;
// end);
end;
procedure TROIndyHTTPWebsocketChannel.CheckConnection;
begin
try
if IndyClient.Connected then
IndyClient.IOHandler.CheckForDisconnect(True, True)
except
IndyClient.Disconnect(False);
end;
if not IndyClient.Connected then
begin
if IndyClient.IOHandler <> nil then
IndyClient.IOHandler.Clear;
IndyClient.Connect;
if not IndyClient.IOHandler.IsWebsocket then //not already upgraded?
TryUpgradeToWebsocket;
FTriedUpgrade := True; //one shot
end;
end;
function TROIndyHTTPWebsocketChannel.CreateIndyClient: TIdHTTP;
var
wsclient: TROIndyHTTPSocketIOClient;
begin
//Result := inherited CreateIndyClient;
wsclient := TROIndyHTTPSocketIOClient.Create(Self);
// wsclient := TIdHTTPWebsocketClient.Create(Self);
wsclient.FParent := Self;
wsclient.Port := 80;
wsclient.Host := '127.0.0.1';
wsclient.Request.UserAgent := uRORes.str_ProductName;
wsclient.OnConnected := SocketConnected;
//TargetURL := '';
Result := wsclient;
end;
procedure TROIndyHTTPWebsocketChannel.SocketConnected(Sender: TObject);
begin
if DisableNagle then
uROIndySupport.Indy_DisableNagle(IndyClient);
end;
function TROIndyHTTPWebsocketChannel.TryUpgradeToWebsocket: Boolean;
begin
try
Result := (IndyClient as TIdHTTPWebsocketClient).TryUpgradeToWebsocket;
if Result then
begin
Self.IndyClient.IOHandler.InputBuffer.Clear;
Self.IndyClient.IOHandler.ReadTimeout := Self.IndyClient.ReadTimeout;
//background wait for data in single thread
TIdWebsocketMultiReadThread.Instance.AddClient(Self.IndyClient);
end;
except
ResetChannel;
raise;
end;
end;
procedure TROIndyHTTPWebsocketChannel.IntDispatch(aRequest, aResponse: TStream);
var
cWSNR: array[0..High(C_RO_WS_NR)] of AnsiChar;
iMsgNr, iMsgNr2: Integer;
ws: TIdIOHandlerWebsocket;
wscode: TWSDataCode;
swstext: utf8string;
begin
IndyClient.Lock;
try
//http server supports websockets?
if not FTriedUpgrade then
begin
if not IndyClient.IOHandler.IsWebsocket then //not already upgraded?
TryUpgradeToWebsocket;
FTriedUpgrade := True; //one shot
end;
ws := IndyClient.IOHandler as TIdIOHandlerWebsocket;
if not ws.IsWebsocket then
//normal http dispatch
inherited IntDispatch(aRequest, aResponse)
else
//websocket dispatch
begin
ws.Lock;
try
//write messagenr at end
aRequest.Position := aRequest.Size;
Inc(FMessageNr);
iMsgNr := FMessageNr;
aRequest.Write(C_RO_WS_NR, Length(C_RO_WS_NR));
aRequest.Write(iMsgNr, SizeOf(iMsgNr));
aRequest.Position := 0;
CheckConnection;
//write
IndyClient.IOHandler.Write(aRequest);
iMsgNr2 := 0;
while iMsgNr2 <= 0 do
begin
aResponse.Size := 0; //clear
//first is the data type TWSDataType(text or bin), but is ignore/not needed
wscode := TWSDataCode(IndyClient.IOHandler.ReadLongWord);
//next the size + data = stream
IndyClient.IOHandler.ReadStream(aResponse);
//ignore ping/pong messages
if wscode in [wdcPing, wdcPong] then Continue;
if aResponse.Size >= Length(C_RO_WS_NR) + SizeOf(iMsgNr) then
begin
//get event or message nr
aResponse.Position := aResponse.Size - Length(C_RO_WS_NR) - SizeOf(iMsgNr2);
aResponse.Read(cWSNR[0], Length(cWSNR));
end;
if (cWSNR = C_RO_WS_NR) then
begin
aResponse.Read(iMsgNr2, SizeOf(iMsgNr2));
aResponse.Size := aResponse.Size - Length(C_RO_WS_NR) - SizeOf(iMsgNr2); //trunc
aResponse.Position := 0;
//event?
if iMsgNr2 < 0 then
begin
//events during dispatch? channel is busy so offload event dispatching to different thread!
AsyncDispatchEvent(aResponse);
aResponse.Size := 0;
{
ws.Unlock;
try
IntDispatchEvent(aResponse);
aResponse.Size := 0;
finally
ws.Lock;
end;
}
end;
end
else
begin
aResponse.Position := 0;
if wscode = wdcBinary then
begin
Self.IndyClient.AsyncDispatchEvent(aResponse);
end
else if wscode = wdcText then
begin
SetLength(swstext, aResponse.Size);
aResponse.Read(swstext[1], aResponse.Size);
if swstext <> '' then
begin
Self.IndyClient.AsyncDispatchEvent(string(swstext));
end;
end;
end;
end;
except
ws.Unlock; //always unlock
ResetChannel;
Raise;
end;
ws.Unlock; //normal unlock (no extra try finally needed)
if iMsgNr2 <> iMsgNr then
Assert(iMsgNr2 = iMsgNr, 'Message number mismatch between send and received!');
end;
finally
IndyClient.UnLock;
end;
end;
procedure TROIndyHTTPWebsocketChannel.IntDispatchEvent(aEvent: TStream);
var
i: Integer;
eventrecv: IROEventReceiver;
begin
for i := 0 to FEventReceivers.Count - 1 do
begin
aEvent.Position := 0;
eventrecv := FEventReceivers.Items[i] as IROEventReceiver;
try
eventrecv.Dispatch(aEvent, TThread.CurrentThread);
except
//ignore errors within events, so normal communication is preserved
end;
end;
end;
procedure TROIndyHTTPWebsocketChannel.RegisterEventReceiver(
aReceiver: IROEventReceiver);
begin
FEventReceivers.Add(aReceiver);
end;
procedure TROIndyHTTPWebsocketChannel.ResetChannel;
//var
// ws: TIdIOHandlerWebsocket;
begin
FTriedUpgrade := False; //reset
TIdWebsocketMultiReadThread.Instance.RemoveClient(Self.IndyClient);
if IndyClient.IOHandler <> nil then
begin
IndyClient.IOHandler.InputBuffer.Clear;
//close/disconnect internal socket
//ws := IndyClient.IOHandler as TIdIOHandlerWebsocket;
//ws.Close; done in disconnect below
end;
IndyClient.Disconnect(False);
end;
procedure TROIndyHTTPWebsocketChannel.UnregisterEventReceiver(
aReceiver: IROEventReceiver);
begin
FEventReceivers.Remove(aReceiver);
end;
{ TMultiChannelReadThread }
(*
procedure TROIndyWSMultiChannelReadThread_old.ReadFromAllChannels;
if strmEvent = nil then
strmEvent := TMemoryStream.Create;
strmEvent.Clear;
//first is the data type TWSDataType(text or bin), but is ignore/not needed
wscode := TWSDataCode(chn.IndyClient.IOHandler.ReadLongWord);
//next the size + data = stream
chn.IndyClient.IOHandler.ReadStream(strmEvent);
//ignore ping/pong messages
if wscode in [wdcPing, wdcPong] then Continue;
if strmEvent.Size < Length(C_ROWSNR) + SizeOf(iEventNr) then Continue;
//get event nr
strmEvent.Position := strmEvent.Size - Length(C_ROWSNR) - SizeOf(iEventNr);
strmEvent.Read(cWSNR[0], Length(cWSNR));
Assert(cWSNR = C_ROWSNR);
strmEvent.Read(iEventNr, SizeOf(iEventNr));
Assert(iEventNr < 0);
//trunc
strmEvent.Size := strmEvent.Size - Length(C_ROWSNR) - SizeOf(iEventNr);
//fire event
//chn.IntDispatchEvent(strmEvent);
//offload event dispatching to different thread! otherwise deadlocks possible? (do to synchronize)
strmEvent.Position := 0;
chn.AsyncDispatchEvent(strmEvent);
*)
{ TAnonymousThread }
constructor TAnonymousThread.Create(AThreadProc: TThreadProcedure);
begin
FThreadProc := AThreadProc;
FreeOnTerminate := True;
inherited Create(False {direct start});
end;
procedure TAnonymousThread.Execute;
begin
if Assigned(FThreadProc) then
FThreadProc();
end;
{ TROIndyHTTPSocketIOClient }
procedure TROIndyHTTPSocketIOClient.AsyncDispatchEvent(const aEvent: TStream);
var
iEventNr: Integer;
cWSNR: array[0..High(C_RO_WS_NR)] of AnsiChar;
s: string;
begin
if aEvent.Size > Length(C_RO_WS_NR) + SizeOf(iEventNr) then
begin
//get event nr
aEvent.Position := aEvent.Size - Length(C_RO_WS_NR) - SizeOf(iEventNr);
aEvent.Read(cWSNR[0], Length(cWSNR));
//has eventnr?
if cWSNR = C_RO_WS_NR then
begin
aEvent.Read(iEventNr, SizeOf(iEventNr));
if iEventNr >= 0 then
begin
aEvent.Position := 0;
with TStreamReader.Create(aEvent) do
begin
s := ReadToEnd;
Free;
end;
Assert(iEventNr < 0, 'must be negative number for RO events: ' + s);
end;
//trunc
aEvent.Size := aEvent.Size - Length(C_RO_WS_NR) - SizeOf(iEventNr);
aEvent.Position := 0;
FParent.AsyncDispatchEvent(aEvent);
Exit;
end;
end;
inherited AsyncDispatchEvent(aEvent);
end;
procedure TROIndyHTTPSocketIOClient.AsyncDispatchEvent(const aEvent: string);
begin
inherited AsyncDispatchEvent(aEvent);
end;
end.