local sync
This commit is contained in:
parent
86dc2bfc35
commit
7a59f2b272
|
@ -246,7 +246,7 @@ begin
|
|||
if ARaiseExceptionOnTimeout then
|
||||
EIdReadTimeout.Toss(RSIdNoDataToRead) //exit, no data can be received
|
||||
else
|
||||
Exit;
|
||||
Exit(0);
|
||||
end;
|
||||
|
||||
SetLength(VBuffer, RecvBufferSize);
|
||||
|
|
|
@ -584,6 +584,8 @@ var
|
|||
errorref: TSocketIOError;
|
||||
error: ISuperObject;
|
||||
begin
|
||||
if ASocket = nil then Exit;
|
||||
|
||||
if not FConnections.ContainsValue(ASocket) and
|
||||
not FConnectionsGUID.ContainsValue(ASocket) then
|
||||
begin
|
||||
|
@ -1258,8 +1260,6 @@ begin
|
|||
for context in FConnections.Values do
|
||||
begin
|
||||
if context.IsDisconnected then Continue;
|
||||
// if not context.IsSocketIO then
|
||||
// raise EIdSocketIoUnhandledMessage.Create('Not a socket.io connection!');
|
||||
|
||||
if not Assigned(aCallback) then
|
||||
WriteSocketIOMsg(context, ''{no room}, aMessage, nil)
|
||||
|
@ -1274,8 +1274,6 @@ begin
|
|||
for context in FConnectionsGUID.Values do
|
||||
begin
|
||||
if context.IsDisconnected then Continue;
|
||||
// if not context.IsSocketIO then
|
||||
// raise EIdSocketIoUnhandledMessage.Create('Not a socket.io connection!');
|
||||
|
||||
if not Assigned(aCallback) then
|
||||
WriteSocketIOMsg(context, ''{no room}, aMessage, nil)
|
||||
|
|
|
@ -1,314 +1,333 @@
|
|||
unit uROHTTPWebsocketServer;
|
||||
|
||||
interface
|
||||
|
||||
uses
|
||||
Classes, IdServerIOHandlerWebsocket, IdIOHandlerWebsocket,
|
||||
uROIndyHTTPServer, uROClientIntf, uROServer, uROHTTPDispatch,
|
||||
IdContext, IdCustomHTTPServer, IdCustomTCPServer, uROHash, uROServerIntf,
|
||||
IdServerWebsocketContext, IdServerSocketIOHandling,
|
||||
IdServerWebsocketHandling;
|
||||
|
||||
type
|
||||
TROTransportContext = class;
|
||||
|
||||
TROIndyHTTPWebsocketServer = class(TROIndyHTTPServer)
|
||||
private
|
||||
FOnCustomChannelExecute: TWebsocketChannelRequest;
|
||||
FSocketIO: TIdServerSocketIOHandling_Ext;
|
||||
function GetSocketIO: TIdServerSocketIOHandling;
|
||||
protected
|
||||
procedure InternalServerConnect(AThread: TIdContext); override;
|
||||
procedure InternalServerCommandGet(AThread: TIdThreadClass;
|
||||
ARequestInfo: TIdHTTPRequestInfo; AResponseInfo: TIdHTTPResponseInfo); override;
|
||||
procedure ProcessRemObjectsRequest(const AThread: TIdContext; const strmRequest: TMemoryStream; const strmResponse: TMemoryStream);
|
||||
|
||||
function GetDispatchersClass: TROMessageDispatchersClass; override;
|
||||
public
|
||||
procedure AfterConstruction; override;
|
||||
destructor Destroy; override;
|
||||
procedure Loaded; override;
|
||||
|
||||
property SocketIO: TIdServerSocketIOHandling read GetSocketIO;
|
||||
property OnCustomChannelExecute: TWebsocketChannelRequest read FOnCustomChannelExecute write FOnCustomChannelExecute;
|
||||
end;
|
||||
|
||||
TROHTTPDispatcher_Websocket = class(TROHTTPDispatcher)
|
||||
public
|
||||
function CanHandleMessage(const aTransport: IROTransport; aRequeststream : TStream): boolean; override;
|
||||
end;
|
||||
|
||||
TROHTTPMessageDispatchers_WebSocket = class(TROHTTPMessageDispatchers)
|
||||
protected
|
||||
function GetDispatcherClass : TROMessageDispatcherClass; override;
|
||||
end;
|
||||
|
||||
TROTransportContext = class(TInterfacedObject,
|
||||
IROTransport, IROTCPTransport,
|
||||
IROActiveEventServer)
|
||||
private
|
||||
FROServer: TROIndyHTTPServer;
|
||||
FIdContext: TIdServerWSContext;
|
||||
FEventCount: Integer;
|
||||
FClientId: TGUID;
|
||||
private
|
||||
class var FGlobalEventCount: Integer;
|
||||
protected
|
||||
{IROTransport}
|
||||
function GetTransportObject: TObject;
|
||||
{IROTCPTransport}
|
||||
function GetClientAddress : string;
|
||||
{IROActiveEventServer}
|
||||
procedure EventsRegistered(aSender : TObject; aClient: TGUID);
|
||||
procedure DispatchEvent(anEventDataItem : TROEventData; aSessionReference : TGUID; aSender: TObject); // asender is TROEventRepository
|
||||
public
|
||||
//constructor Create(aROServer: TROIndyHTTPServer; aIOHandler: TIdIOHandlerWebsocket);
|
||||
constructor Create(aROServer: TROIndyHTTPServer; aIdContext: TIdServerWSContext);
|
||||
|
||||
property Context: TIdServerWSContext read FIdContext;
|
||||
property ClientId: TGUID read FClientId write FClientId;
|
||||
end;
|
||||
|
||||
procedure Register;
|
||||
|
||||
implementation
|
||||
|
||||
uses
|
||||
SysUtils, IdCoderMIME, Windows, uROEventRepository, uROSessions, uROClient,
|
||||
uROClasses, StrUtils, uROIdServerWebsocketHandling;
|
||||
|
||||
procedure Register;
|
||||
begin
|
||||
RegisterComponents('RBK', [TROIndyHTTPWebsocketServer]);
|
||||
end;
|
||||
|
||||
procedure TROIndyHTTPWebsocketServer.AfterConstruction;
|
||||
begin
|
||||
inherited;
|
||||
|
||||
FSocketIO := TIdServerSocketIOHandling_Ext.Create;
|
||||
|
||||
IndyServer.ContextClass := TROIdServerWSContext;
|
||||
if Self.IndyServer.IOHandler = nil then
|
||||
IndyServer.IOHandler := TIdServerIOHandlerWebsocket.Create(Self);
|
||||
end;
|
||||
|
||||
destructor TROIndyHTTPWebsocketServer.Destroy;
|
||||
begin
|
||||
inherited;
|
||||
FSocketIO.Free;
|
||||
end;
|
||||
|
||||
function TROIndyHTTPWebsocketServer.GetDispatchersClass: TROMessageDispatchersClass;
|
||||
begin
|
||||
Result := TROHTTPMessageDispatchers_Websocket;
|
||||
end;
|
||||
|
||||
function TROIndyHTTPWebsocketServer.GetSocketIO: TIdServerSocketIOHandling;
|
||||
begin
|
||||
Result := FSocketIO;
|
||||
end;
|
||||
|
||||
procedure TROIndyHTTPWebsocketServer.InternalServerCommandGet(AThread: TIdThreadClass;
|
||||
ARequestInfo: TIdHTTPRequestInfo; AResponseInfo: TIdHTTPResponseInfo);
|
||||
begin
|
||||
(AThread as TIdServerWSContext).OnCustomChannelExecute := Self.OnCustomChannelExecute;
|
||||
(AThread as TIdServerWSContext).SocketIO := Self.FSocketIO;
|
||||
(AThread as TROIdServerWSContext).OnRemObjectsRequest := Self.ProcessRemObjectsRequest;
|
||||
|
||||
if not TROIdServerWebsocketHandling.ProcessServerCommandGet(AThread as TIdServerWSContext, ARequestInfo, AResponseInfo) then
|
||||
inherited InternalServerCommandGet(AThread, ARequestInfo, AResponseInfo)
|
||||
unit uROHTTPWebsocketServer;
|
||||
|
||||
interface
|
||||
|
||||
uses
|
||||
Classes, IdServerIOHandlerWebsocket, IdIOHandlerWebsocket,
|
||||
uROIndyHTTPServer, uROClientIntf, uROServer, uROHTTPDispatch,
|
||||
IdContext, IdCustomHTTPServer, IdCustomTCPServer, uROHash, uROServerIntf,
|
||||
IdServerWebsocketContext, IdServerSocketIOHandling,
|
||||
IdServerWebsocketHandling;
|
||||
|
||||
type
|
||||
TROTransportContext = class;
|
||||
|
||||
TROIndyHTTPWebsocketServer = class(TROIndyHTTPServer)
|
||||
private
|
||||
FOnCustomChannelExecute: TWebsocketChannelRequest;
|
||||
FSocketIO: TIdServerSocketIOHandling_Ext;
|
||||
function GetSocketIO: TIdServerSocketIOHandling;
|
||||
protected
|
||||
FROTransportContexts: TInterfaceList;
|
||||
procedure InternalServerConnect(AThread: TIdContext); override;
|
||||
procedure InternalServerDisConnect(AThread: TIdContext); virtual;
|
||||
procedure InternalServerCommandGet(AThread: TIdThreadClass;
|
||||
ARequestInfo: TIdHTTPRequestInfo; AResponseInfo: TIdHTTPResponseInfo); override;
|
||||
procedure ProcessRemObjectsRequest(const AThread: TIdContext; const strmRequest: TMemoryStream; const strmResponse: TMemoryStream);
|
||||
|
||||
function GetDispatchersClass: TROMessageDispatchersClass; override;
|
||||
public
|
||||
procedure AfterConstruction; override;
|
||||
destructor Destroy; override;
|
||||
procedure Loaded; override;
|
||||
|
||||
property SocketIO: TIdServerSocketIOHandling read GetSocketIO;
|
||||
property OnCustomChannelExecute: TWebsocketChannelRequest read FOnCustomChannelExecute write FOnCustomChannelExecute;
|
||||
end;
|
||||
|
||||
TROHTTPDispatcher_Websocket = class(TROHTTPDispatcher)
|
||||
public
|
||||
function CanHandleMessage(const aTransport: IROTransport; aRequeststream : TStream): boolean; override;
|
||||
end;
|
||||
|
||||
TROHTTPMessageDispatchers_WebSocket = class(TROHTTPMessageDispatchers)
|
||||
protected
|
||||
function GetDispatcherClass : TROMessageDispatcherClass; override;
|
||||
end;
|
||||
|
||||
TROTransportContext = class(TInterfacedObject,
|
||||
IROTransport, IROTCPTransport,
|
||||
IROActiveEventServer)
|
||||
private
|
||||
FROServer: TROIndyHTTPServer;
|
||||
FIdContext: TIdServerWSContext;
|
||||
FEventCount: Integer;
|
||||
FClientId: TGUID;
|
||||
private
|
||||
class var FGlobalEventCount: Integer;
|
||||
protected
|
||||
{IROTransport}
|
||||
function GetTransportObject: TObject;
|
||||
{IROTCPTransport}
|
||||
function GetClientAddress : string;
|
||||
{IROActiveEventServer}
|
||||
procedure EventsRegistered(aSender : TObject; aClient: TGUID);
|
||||
procedure DispatchEvent(anEventDataItem : TROEventData; aSessionReference : TGUID; aSender: TObject); // asender is TROEventRepository
|
||||
public
|
||||
//constructor Create(aROServer: TROIndyHTTPServer; aIOHandler: TIdIOHandlerWebsocket);
|
||||
constructor Create(aROServer: TROIndyHTTPServer; aIdContext: TIdServerWSContext);
|
||||
|
||||
property Context: TIdServerWSContext read FIdContext;
|
||||
property ClientId: TGUID read FClientId write FClientId;
|
||||
end;
|
||||
|
||||
procedure Register;
|
||||
|
||||
implementation
|
||||
|
||||
uses
|
||||
SysUtils, IdCoderMIME, Windows, uROEventRepository, uROSessions, uROClient,
|
||||
uROClasses, StrUtils, uROIdServerWebsocketHandling;
|
||||
|
||||
procedure Register;
|
||||
begin
|
||||
RegisterComponents('RBK', [TROIndyHTTPWebsocketServer]);
|
||||
end;
|
||||
|
||||
procedure TROIndyHTTPWebsocketServer.InternalServerConnect(AThread: TIdContext);
|
||||
begin
|
||||
inherited;
|
||||
(AThread as TIdServerWSContext).OnCustomChannelExecute := Self.OnCustomChannelExecute;
|
||||
(AThread as TROIdServerWSContext).OnRemObjectsRequest := Self.ProcessRemObjectsRequest;
|
||||
end;
|
||||
|
||||
procedure TROIndyHTTPWebsocketServer.Loaded;
|
||||
begin
|
||||
//do before inherited in case of designtime connection
|
||||
if Self.IndyServer.IOHandler = nil then
|
||||
IndyServer.IOHandler := TIdServerIOHandlerWebsocket.Create(Self);
|
||||
inherited;
|
||||
end;
|
||||
|
||||
procedure TROIndyHTTPWebsocketServer.ProcessRemObjectsRequest(
|
||||
const AThread: TIdContext; const strmRequest: TMemoryStream; const strmResponse: TMemoryStream);
|
||||
var
|
||||
cWSNR: array[0..High(C_ROWSNR)] of AnsiChar;
|
||||
msg: TROMessageDispatcher;
|
||||
iMsgNr: Integer;
|
||||
imsg: IROMessage;
|
||||
transport: TROTransportContext;
|
||||
begin
|
||||
if strmRequest.Size < Length(C_ROWSNR) + SizeOf(iMsgNr) then Exit;
|
||||
//read messagenr from the end
|
||||
strmRequest.Position := strmRequest.Size - Length(C_ROWSNR) - SizeOf(iMsgNr);
|
||||
strmRequest.Read(cWSNR[0], Length(C_ROWSNR));
|
||||
if (cWSNR <> C_ROWSNR) then Exit;
|
||||
strmRequest.Read(iMsgNr, SizeOf(iMsgNr));
|
||||
strmRequest.Position := 0;
|
||||
//trunc extra data
|
||||
strmRequest.Size := strmRequest.Size - Length(C_ROWSNR) - SizeOf(iMsgNr);
|
||||
transport := AThread.Data as TROTransportContext;
|
||||
//no RO transport object already made?
|
||||
if transport = nil then
|
||||
begin
|
||||
//create IROTransport object
|
||||
transport := TROTransportContext.Create(Self, AThread as TIdServerWSContext);
|
||||
(transport as IROTransport)._AddRef;
|
||||
//attach RO transport to indy context
|
||||
AThread.Data := transport;
|
||||
//todo: enveloppes
|
||||
//read client GUID the first time (needed to be able to send RO events)
|
||||
msg := Self.Dispatchers.FindDispatcher(transport, strmRequest);
|
||||
if msg = nil then
|
||||
raise EROException.Create('No suiteable message dispatcher found!');
|
||||
imsg := (msg.MessageIntf as IROMessageCloneable).Clone;
|
||||
imsg.InitializeRead(transport);
|
||||
imsg.ReadFromStream(strmRequest);
|
||||
transport.ClientId := imsg.ClientID;
|
||||
imsg := nil;
|
||||
Assert(not IsEqualGUID(transport.ClientID, EmptyGUID));
|
||||
end;
|
||||
//EXECUTE FUNCTION
|
||||
Self.DispatchMessage(transport, strmRequest, strmResponse);
|
||||
//write number at end
|
||||
strmResponse.Position := strmResponse.Size;
|
||||
strmResponse.Write(C_ROWSNR, Length(C_ROWSNR));
|
||||
strmResponse.Write(iMsgNr, SizeOf(iMsgNr));
|
||||
strmResponse.Position := 0;
|
||||
end;
|
||||
|
||||
{ TROTransport }
|
||||
|
||||
constructor TROTransportContext.Create(aROServer: TROIndyHTTPServer;
|
||||
aIdContext: TIdServerWSContext);
|
||||
begin
|
||||
FROServer := aROServer;
|
||||
FIdContext := aIdContext;
|
||||
end;
|
||||
|
||||
procedure TROTransportContext.EventsRegistered(aSender: TObject; aClient: TGUID);
|
||||
begin
|
||||
//
|
||||
end;
|
||||
|
||||
procedure TROTransportContext.DispatchEvent(anEventDataItem: TROEventData;
|
||||
aSessionReference: TGUID; aSender: TObject);
|
||||
var
|
||||
i: Integer;
|
||||
LContext: TIdContext;
|
||||
transport: TROTransportContext;
|
||||
l: TList;
|
||||
ws: TIdIOHandlerWebsocket;
|
||||
cWSNR: array[0..High(C_ROWSNR)] of AnsiChar;
|
||||
begin
|
||||
l := FROServer.IndyServer.Contexts.LockList;
|
||||
try
|
||||
if l.Count <= 0 then Exit;
|
||||
|
||||
anEventDataItem.Data.Position := anEventDataItem.Data.Size - Length(C_ROWSNR) - SizeOf(FEventCount);
|
||||
anEventDataItem.Data.Read(cWSNR[0], Length(cWSNR));
|
||||
//event number not written already?
|
||||
if cWSNR <> C_ROWSNR then
|
||||
begin
|
||||
//new event nr
|
||||
FEventCount := -1 * InterlockedIncrement(FGlobalEventCount); //negative = event, positive is normal RO message
|
||||
//overflow? then start again from 0
|
||||
if FEventCount > 0 then
|
||||
begin
|
||||
InterlockedExchange(FGlobalEventCount, 0);
|
||||
FEventCount := -1 * InterlockedIncrement(FGlobalEventCount); //negative = event, positive is normal RO message
|
||||
end;
|
||||
Assert(FEventCount < 0);
|
||||
//write nr at end of message
|
||||
anEventDataItem.Data.Position := anEventDataItem.Data.Size;
|
||||
anEventDataItem.Data.Write(C_ROWSNR, Length(C_ROWSNR));
|
||||
anEventDataItem.Data.Write(FEventCount, SizeOf(FEventCount));
|
||||
anEventDataItem.Data.Position := 0;
|
||||
end;
|
||||
|
||||
//search specific client
|
||||
for i := 0 to l.Count - 1 do
|
||||
begin
|
||||
LContext := TIdContext(l.Items[i]);
|
||||
transport := LContext.Data as TROTransportContext;
|
||||
if transport = nil then Continue;
|
||||
if not IsEqualGUID(transport.ClientId, aSessionReference) then Continue;
|
||||
|
||||
//direct write event data
|
||||
ws := (LContext.Connection.IOHandler as TIdIOHandlerWebsocket);
|
||||
if not ws.IsWebsocket then Exit;
|
||||
ws.Lock;
|
||||
try
|
||||
try ws.Write(anEventDataItem.Data, wdtBinary) except {continue with other connections} end;
|
||||
finally
|
||||
ws.Unlock;
|
||||
end;
|
||||
end;
|
||||
finally
|
||||
anEventDataItem.RemoveRef;
|
||||
FROServer.IndyServer.Contexts.UnlockList;
|
||||
end;
|
||||
end;
|
||||
|
||||
function TROTransportContext.GetClientAddress: string;
|
||||
begin
|
||||
Result := FIdContext.Binding.PeerIP;
|
||||
end;
|
||||
|
||||
function TROTransportContext.GetTransportObject: TObject;
|
||||
begin
|
||||
Result := FROServer;
|
||||
end;
|
||||
|
||||
{ TROHTTPMessageDispatchers_WebSocket }
|
||||
|
||||
function TROHTTPMessageDispatchers_WebSocket.GetDispatcherClass: TROMessageDispatcherClass;
|
||||
begin
|
||||
result := TROHTTPDispatcher_Websocket;
|
||||
end;
|
||||
|
||||
{ TROHTTPDispatcher_Websocket }
|
||||
|
||||
function TROHTTPDispatcher_Websocket.CanHandleMessage(
|
||||
const aTransport: IROTransport; aRequeststream: TStream): boolean;
|
||||
var
|
||||
tcp: IROTCPTransport;
|
||||
buf: array [0..5] of AnsiChar;
|
||||
begin
|
||||
if aRequeststream = nil then result := FALSE else // for preventing warning in FPC
|
||||
result := FALSE;
|
||||
|
||||
if not Enabled or
|
||||
not Supports(aTransport, IROTCPTransport, tcp)
|
||||
then
|
||||
Exit;
|
||||
if (tcp as TROTransportContext).FIdContext.IOHandler.IsWebsocket then
|
||||
begin
|
||||
//we can handle all kind of messages, independent on the path, so check which kind of message we have
|
||||
Result := Self.Message.IsValidMessage((aRequeststream as TMemoryStream).Memory, aRequeststream.Size);
|
||||
|
||||
//goes wrong with enveloppes!
|
||||
//TROMessage.Envelopes_ProcessIncoming
|
||||
if not Result and
|
||||
(aRequeststream.Size > 6) then
|
||||
begin
|
||||
aRequeststream.Read(buf,6);
|
||||
Result := (buf[0] = EnvelopeSignature[0]) and
|
||||
(buf[1] = EnvelopeSignature[1]) and
|
||||
(buf[2] = EnvelopeSignature[2]) and
|
||||
(buf[3] = EnvelopeSignature[3]) and
|
||||
(buf[4] = EnvelopeSignature[4]);
|
||||
aRequeststream.Position := 0;
|
||||
end;
|
||||
end
|
||||
else
|
||||
Result := inherited CanHandleMessage(aTransport, aRequeststream);
|
||||
end;
|
||||
|
||||
end.
|
||||
|
||||
procedure TROIndyHTTPWebsocketServer.AfterConstruction;
|
||||
begin
|
||||
inherited;
|
||||
|
||||
FSocketIO := TIdServerSocketIOHandling_Ext.Create;
|
||||
FROTransportContexts := TInterfaceList.Create;
|
||||
|
||||
IndyServer.ContextClass := TROIdServerWSContext;
|
||||
if Self.IndyServer.IOHandler = nil then
|
||||
IndyServer.IOHandler := TIdServerIOHandlerWebsocket.Create(Self);
|
||||
IndyServer.OnDisconnect := InternalServerDisConnect;
|
||||
end;
|
||||
|
||||
destructor TROIndyHTTPWebsocketServer.Destroy;
|
||||
begin
|
||||
inherited;
|
||||
FSocketIO.Free;
|
||||
FROTransportContexts.Free;
|
||||
end;
|
||||
|
||||
function TROIndyHTTPWebsocketServer.GetDispatchersClass: TROMessageDispatchersClass;
|
||||
begin
|
||||
Result := TROHTTPMessageDispatchers_Websocket;
|
||||
end;
|
||||
|
||||
function TROIndyHTTPWebsocketServer.GetSocketIO: TIdServerSocketIOHandling;
|
||||
begin
|
||||
Result := FSocketIO;
|
||||
end;
|
||||
|
||||
procedure TROIndyHTTPWebsocketServer.InternalServerCommandGet(AThread: TIdThreadClass;
|
||||
ARequestInfo: TIdHTTPRequestInfo; AResponseInfo: TIdHTTPResponseInfo);
|
||||
begin
|
||||
(AThread as TIdServerWSContext).OnCustomChannelExecute := Self.OnCustomChannelExecute;
|
||||
(AThread as TIdServerWSContext).SocketIO := Self.FSocketIO;
|
||||
(AThread as TROIdServerWSContext).OnRemObjectsRequest := Self.ProcessRemObjectsRequest;
|
||||
|
||||
if not TROIdServerWebsocketHandling.ProcessServerCommandGet(AThread as TIdServerWSContext, ARequestInfo, AResponseInfo) then
|
||||
inherited InternalServerCommandGet(AThread, ARequestInfo, AResponseInfo)
|
||||
end;
|
||||
|
||||
|
||||
procedure TROIndyHTTPWebsocketServer.InternalServerConnect(AThread: TIdContext);
|
||||
begin
|
||||
inherited;
|
||||
(AThread as TIdServerWSContext).OnCustomChannelExecute := Self.OnCustomChannelExecute;
|
||||
(AThread as TROIdServerWSContext).OnRemObjectsRequest := Self.ProcessRemObjectsRequest;
|
||||
end;
|
||||
|
||||
procedure TROIndyHTTPWebsocketServer.InternalServerDisConnect(
|
||||
AThread: TIdContext);
|
||||
var
|
||||
transport: TROTransportContext;
|
||||
begin
|
||||
transport := AThread.Data as TROTransportContext;
|
||||
if transport <> nil then
|
||||
FROTransportContexts.Remove(transport);
|
||||
//transport._Release;
|
||||
AThread.Data := nil;
|
||||
end;
|
||||
|
||||
procedure TROIndyHTTPWebsocketServer.Loaded;
|
||||
begin
|
||||
//do before inherited in case of designtime connection
|
||||
if Self.IndyServer.IOHandler = nil then
|
||||
IndyServer.IOHandler := TIdServerIOHandlerWebsocket.Create(Self);
|
||||
inherited;
|
||||
end;
|
||||
|
||||
procedure TROIndyHTTPWebsocketServer.ProcessRemObjectsRequest(
|
||||
const AThread: TIdContext; const strmRequest: TMemoryStream; const strmResponse: TMemoryStream);
|
||||
var
|
||||
cWSNR: array[0..High(C_ROWSNR)] of AnsiChar;
|
||||
msg: TROMessageDispatcher;
|
||||
iMsgNr: Integer;
|
||||
imsg: IROMessage;
|
||||
transport: TROTransportContext;
|
||||
begin
|
||||
if strmRequest.Size < Length(C_ROWSNR) + SizeOf(iMsgNr) then Exit;
|
||||
//read messagenr from the end
|
||||
strmRequest.Position := strmRequest.Size - Length(C_ROWSNR) - SizeOf(iMsgNr);
|
||||
strmRequest.Read(cWSNR[0], Length(C_ROWSNR));
|
||||
if (cWSNR <> C_ROWSNR) then Exit;
|
||||
strmRequest.Read(iMsgNr, SizeOf(iMsgNr));
|
||||
strmRequest.Position := 0;
|
||||
//trunc extra data
|
||||
strmRequest.Size := strmRequest.Size - Length(C_ROWSNR) - SizeOf(iMsgNr);
|
||||
transport := AThread.Data as TROTransportContext;
|
||||
//no RO transport object already made?
|
||||
if transport = nil then
|
||||
begin
|
||||
//create IROTransport object
|
||||
transport := TROTransportContext.Create(Self, AThread as TIdServerWSContext);
|
||||
//(transport as IROTransport)._AddRef;
|
||||
FROTransportContexts.Add(transport);
|
||||
//attach RO transport to indy context
|
||||
AThread.Data := transport;
|
||||
//todo: enveloppes
|
||||
//read client GUID the first time (needed to be able to send RO events)
|
||||
msg := Self.Dispatchers.FindDispatcher(transport, strmRequest);
|
||||
if msg = nil then
|
||||
raise EROException.Create('No suiteable message dispatcher found!');
|
||||
imsg := (msg.MessageIntf as IROMessageCloneable).Clone;
|
||||
imsg.InitializeRead(transport);
|
||||
imsg.ReadFromStream(strmRequest);
|
||||
transport.ClientId := imsg.ClientID;
|
||||
imsg := nil;
|
||||
Assert(not IsEqualGUID(transport.ClientID, EmptyGUID));
|
||||
end;
|
||||
//EXECUTE FUNCTION
|
||||
Self.DispatchMessage(transport, strmRequest, strmResponse);
|
||||
//write number at end
|
||||
strmResponse.Position := strmResponse.Size;
|
||||
strmResponse.Write(C_ROWSNR, Length(C_ROWSNR));
|
||||
strmResponse.Write(iMsgNr, SizeOf(iMsgNr));
|
||||
strmResponse.Position := 0;
|
||||
end;
|
||||
|
||||
{ TROTransport }
|
||||
|
||||
constructor TROTransportContext.Create(aROServer: TROIndyHTTPServer;
|
||||
aIdContext: TIdServerWSContext);
|
||||
begin
|
||||
FROServer := aROServer;
|
||||
FIdContext := aIdContext;
|
||||
end;
|
||||
|
||||
procedure TROTransportContext.EventsRegistered(aSender: TObject; aClient: TGUID);
|
||||
begin
|
||||
//
|
||||
end;
|
||||
|
||||
procedure TROTransportContext.DispatchEvent(anEventDataItem: TROEventData;
|
||||
aSessionReference: TGUID; aSender: TObject);
|
||||
var
|
||||
i: Integer;
|
||||
LContext: TIdContext;
|
||||
transport: TROTransportContext;
|
||||
l: TList;
|
||||
ws: TIdIOHandlerWebsocket;
|
||||
cWSNR: array[0..High(C_ROWSNR)] of AnsiChar;
|
||||
begin
|
||||
l := FROServer.IndyServer.Contexts.LockList;
|
||||
try
|
||||
if l.Count <= 0 then Exit;
|
||||
|
||||
anEventDataItem.Data.Position := anEventDataItem.Data.Size - Length(C_ROWSNR) - SizeOf(FEventCount);
|
||||
anEventDataItem.Data.Read(cWSNR[0], Length(cWSNR));
|
||||
//event number not written already?
|
||||
if cWSNR <> C_ROWSNR then
|
||||
begin
|
||||
//new event nr
|
||||
FEventCount := -1 * InterlockedIncrement(FGlobalEventCount); //negative = event, positive is normal RO message
|
||||
//overflow? then start again from 0
|
||||
if FEventCount > 0 then
|
||||
begin
|
||||
InterlockedExchange(FGlobalEventCount, 0);
|
||||
FEventCount := -1 * InterlockedIncrement(FGlobalEventCount); //negative = event, positive is normal RO message
|
||||
end;
|
||||
Assert(FEventCount < 0);
|
||||
//write nr at end of message
|
||||
anEventDataItem.Data.Position := anEventDataItem.Data.Size;
|
||||
anEventDataItem.Data.Write(C_ROWSNR, Length(C_ROWSNR));
|
||||
anEventDataItem.Data.Write(FEventCount, SizeOf(FEventCount));
|
||||
anEventDataItem.Data.Position := 0;
|
||||
end;
|
||||
|
||||
//search specific client
|
||||
for i := 0 to l.Count - 1 do
|
||||
begin
|
||||
LContext := TIdContext(l.Items[i]);
|
||||
transport := LContext.Data as TROTransportContext;
|
||||
if transport = nil then Continue;
|
||||
if not IsEqualGUID(transport.ClientId, aSessionReference) then Continue;
|
||||
|
||||
//direct write event data
|
||||
ws := (LContext.Connection.IOHandler as TIdIOHandlerWebsocket);
|
||||
if not ws.IsWebsocket then Exit;
|
||||
ws.Lock;
|
||||
try
|
||||
try ws.Write(anEventDataItem.Data, wdtBinary) except {continue with other connections} end;
|
||||
finally
|
||||
ws.Unlock;
|
||||
end;
|
||||
end;
|
||||
finally
|
||||
anEventDataItem.RemoveRef;
|
||||
FROServer.IndyServer.Contexts.UnlockList;
|
||||
end;
|
||||
end;
|
||||
|
||||
function TROTransportContext.GetClientAddress: string;
|
||||
begin
|
||||
Result := FIdContext.Binding.PeerIP;
|
||||
end;
|
||||
|
||||
function TROTransportContext.GetTransportObject: TObject;
|
||||
begin
|
||||
Result := FROServer;
|
||||
end;
|
||||
|
||||
{ TROHTTPMessageDispatchers_WebSocket }
|
||||
|
||||
function TROHTTPMessageDispatchers_WebSocket.GetDispatcherClass: TROMessageDispatcherClass;
|
||||
begin
|
||||
result := TROHTTPDispatcher_Websocket;
|
||||
end;
|
||||
|
||||
{ TROHTTPDispatcher_Websocket }
|
||||
|
||||
function TROHTTPDispatcher_Websocket.CanHandleMessage(
|
||||
const aTransport: IROTransport; aRequeststream: TStream): boolean;
|
||||
var
|
||||
tcp: IROTCPTransport;
|
||||
buf: array [0..5] of AnsiChar;
|
||||
begin
|
||||
if aRequeststream = nil then result := FALSE else // for preventing warning in FPC
|
||||
result := FALSE;
|
||||
|
||||
if not Enabled or
|
||||
not Supports(aTransport, IROTCPTransport, tcp)
|
||||
then
|
||||
Exit;
|
||||
if (tcp as TROTransportContext).FIdContext.IOHandler.IsWebsocket then
|
||||
begin
|
||||
//we can handle all kind of messages, independent on the path, so check which kind of message we have
|
||||
Result := Self.Message.IsValidMessage((aRequeststream as TMemoryStream).Memory, aRequeststream.Size);
|
||||
|
||||
//goes wrong with enveloppes!
|
||||
//TROMessage.Envelopes_ProcessIncoming
|
||||
if not Result and
|
||||
(aRequeststream.Size > 6) then
|
||||
begin
|
||||
aRequeststream.Read(buf,6);
|
||||
Result := (buf[0] = EnvelopeSignature[0]) and
|
||||
(buf[1] = EnvelopeSignature[1]) and
|
||||
(buf[2] = EnvelopeSignature[2]) and
|
||||
(buf[3] = EnvelopeSignature[3]) and
|
||||
(buf[4] = EnvelopeSignature[4]);
|
||||
aRequeststream.Position := 0;
|
||||
end;
|
||||
end
|
||||
else
|
||||
Result := inherited CanHandleMessage(aTransport, aRequeststream);
|
||||
end;
|
||||
|
||||
end.
|
||||
|
|
Loading…
Reference in New Issue