Websocket support for RemObjectsSDK

This commit is contained in:
andremussche 2013-11-11 21:15:05 +01:00
parent 7d3b78b227
commit cb2855115f
6 changed files with 1186 additions and 0 deletions

BIN
ROdemoWS.zip Normal file

Binary file not shown.

218
RemObjectsSDK_WS.js Normal file
View File

@ -0,0 +1,218 @@
//*************************************************************//
//some extra stuff for Websockets PoC
//by: André Mussche, andre.mussche@gmail.com
function WebSocketsWrapper(url, aWebSocketClientChannel) {
this.updating = false;
this.urlCall = url;
this.WebSocketClientChannel = aWebSocketClientChannel;
};
WebSocketsWrapper.prototype.post = function post(passData, isBinary, onSuccessFunction, onErrorFunction)
{
//if (this.updating) {
// return false;
//};
this.WS = this.WS || null;
if (this.WS) {
if (this.updating) {
return false;
}
if (this.WS.readyState == 2 || //CLOSING
this.WS.readyState == 3) //CLOSED
this.WS = null;
};
var parser = new BinaryParser();
//check websocket support
if ("WebSocket" in window)
{
if (isBinary == true)
{
//add messagenr to end
var sMsgNr = 'WSNR' + parser.encodeInt(12345, 32, false);
passData = passData + sMsgNr;
var len = passData.length;
var data = new Uint8Array(len);
for (var i=0; i<len; i++) {
data[i] = passData.charCodeAt(i);
};
//var bbClass = undefined;
var bbClass = window.WebKitBlobBuilder || window.BlobBuilder || window.MozBlobBuilder;
//if (!bbClass)
// throw new Error("WebSocketsWrapper.post: this browser does not support Blobs");
if (bbClass)
{
var bb = new bbClass();
bb.append(data.buffer);
//add messagenr to end
//bb.append('WSNR');
//bb.append(parser.encodeInt(12345, 32, false));
var blob = bb.getBlob("application/octet-stream");
}
else
//use ArrayBuffer instead of Blobs in case (mobile) browser does not support it
{
var binarray = data.buffer;
var blob = undefined;
}
}
// Create new websocket connection
if (!this.WS)
{
this.WS = new WebSocket(this.urlCall); //e.g. "ws://localhost:7000/"
this.WS.WebSocketsWrapper = this;
// Called after connection is established
this.WS.onopen = function() {
if (isBinary == true)
{
if (blob)
this.send(blob)
else
this.send(binarray)
}
else
//data + messagenr
this.send(passData + 'WSNR' + parser.encodeInt(12345, 32, false));
};
// Called when a new message is received
this.WS.onmessage = function(msg) {
this.updating = false;
if (msg.data)
{
//Text
if (typeof msg.data === "string")
{
//var parser = new BinaryParser();
//message nr is last 4 bytes
var sWSNR = msg.data.substr(- 4 - 4, 4);
if (!sWSNR === 'WSNR')
throw new Error("Message read error: no WSNR at end of data!");
var iMsgNr = parser.decodeInt(msg.data.substr(-4), 32, true/*signed)*/);
//strip last 4 bytes and 4 chars
var adata = msg.data.substring(0, msg.data.length - 4 - 4);
onSuccessFunction(adata);
}
//Blob
else
{
var reader = new FileReader();
reader.WS = this;
reader.onloadend = function() {
var parser = new BinaryParser();
//message nr is last 4 bytes
var sWSNR = reader.result.substr(- 4 - 4, 4);
if (!sWSNR === 'WSNR')
throw new Error("Message read error: no WSNR at end of data!");
var msgNr = parser.decodeInt(reader.result.substr(-4), 32, true/*signed)*/);
//event? (negative msg number)
if (msgNr < 0)
{
//note: events are always binary?
var binmsg = new RemObjects.SDK.BinMessage();
binmsg.initialize("", "");
binmsg.setResponseStream(reader.result);
var sinkName = binmsg.read("", "AnsiString");
if (RemObjects.SDK.RTTI[sinkName] &&
RemObjects.SDK.RTTI[sinkName].prototype instanceof RemObjects.SDK.ROComplexType)
{
var sink = new RemObjects.SDK.RTTI[sinkName]();
var eventName = binmsg.read("", "AnsiString");
sink.readEvent(binmsg, eventName);
//get attached event receiver (other way around, normally reciever use polling :( )
var that = this.WS.WebSocketsWrapper.WebSocketClientChannel.FReceiver;
if (that.fHandlers[eventName]) {
that.fHandlers[eventName](RemObjects.SDK.ROStructType.prototype.toObject.call(sink[eventName]));
};
} else {
throw new Error("EventReceiver.intPollServer: unknown event sink: " + eventName);
}
}
else
{
//strip last 4 bytes and 4 chars
var adata = reader.result.substring(0, reader.result.length - 4 - 4);
onSuccessFunction(adata);
}
};
//convert blob
reader.readAsBinaryString(msg.data);
}
}
};
//error callback (only the first one?)
this.WS.onerror = onErrorFunction;
// Called when connection is closed
this.WS.onclose = function() {
this.updating = false;
}
}
//already made, direct send
else
{
//if (this.WS.readyState == 0) { //CONNECTING
// Called after connection is established
// this.WS.onopen += function() { does not work?
// this.send(passData);
// };
//}
//else
if (isBinary == true)
{
if (blob)
this.send(blob)
else
this.send(binarray)
}
else
this.WS.send(passData + 'WSNR' + parser.encodeInt(12345, 32, false));
}
} else {
alert('Browser doesn\'t support websockets!');
}
this.WS.updating = new Date();
};
RemObjects.SDK.WebSocketClientChannel = function WebSocketClientChannel(aUrl) {
RemObjects.SDK.ClientChannel.call(this, aUrl);
},
RemObjects.SDK.WebSocketClientChannel.prototype = new RemObjects.SDK.ClientChannel("");
RemObjects.SDK.WebSocketClientChannel.prototype.constructor = RemObjects.SDK.WebSocketClientChannel;
RemObjects.SDK.WebSocketClientChannel.prototype.post = function post(aMessage, isBinary, onSuccess, onError)
{
this.ajaxObject = this.ajaxObject || null;
if (!this.ajaxObject)
this.ajaxObject = new WebSocketsWrapper(this.url, this);
this.ajaxObject.post(aMessage, isBinary, onSuccess, onError);
};
RemObjects.SDK.WebSocketClientChannel.prototype.RegisterEventReceiver = function RegisterEventReceiver(aReceiver)
{
this.FReceiver = aReceiver;
}
//load data on server side (Node.js)
RemObjects.SDK.JSONMessage.prototype.setRequestStream = function setRequestStream(aRequest) {
try {
this.fRequestObject = RemObjects.UTIL.parseJSON(aRequest);
//RO for JS is only meant for the client side (yet :)), so we must copy the request
//into response, so the .read can read it from there
this.fResponseObject.result = this.fRequestObject.params;
} catch (e) {
throw new Error("JSONMessage.setRequestStream:\n JSON parsing error: " + e.message + "\nServer response:\n" + aResponse);
};
};

314
uROHTTPWebsocketServer.pas Normal file
View File

@ -0,0 +1,314 @@
unit uROHTTPWebsocketServer;
interface
uses
Classes, IdServerIOHandlerWebsocket, IdIOHandlerWebsocket,
uROIndyHTTPServer, uROClientIntf, uROServer, uROHTTPDispatch,
IdContext, IdCustomHTTPServer, IdCustomTCPServer, uROHash, uROServerIntf,
IdServerWebsocketContext, IdServerSocketIOHandling,
IdServerWebsocketHandling;
type
TROTransportContext = class;
TROIndyHTTPWebsocketServer = class(TROIndyHTTPServer)
private
FOnCustomChannelExecute: TWebsocketChannelRequest;
FSocketIO: TIdServerSocketIOHandling_Ext;
function GetSocketIO: TIdServerSocketIOHandling;
protected
procedure InternalServerConnect(AThread: TIdContext); override;
procedure InternalServerCommandGet(AThread: TIdThreadClass;
ARequestInfo: TIdHTTPRequestInfo; AResponseInfo: TIdHTTPResponseInfo); override;
procedure ProcessRemObjectsRequest(const AThread: TIdContext; const strmRequest: TMemoryStream; const strmResponse: TMemoryStream);
function GetDispatchersClass: TROMessageDispatchersClass; override;
public
procedure AfterConstruction; override;
destructor Destroy; override;
procedure Loaded; override;
property SocketIO: TIdServerSocketIOHandling read GetSocketIO;
property OnCustomChannelExecute: TWebsocketChannelRequest read FOnCustomChannelExecute write FOnCustomChannelExecute;
end;
TROHTTPDispatcher_Websocket = class(TROHTTPDispatcher)
public
function CanHandleMessage(const aTransport: IROTransport; aRequeststream : TStream): boolean; override;
end;
TROHTTPMessageDispatchers_WebSocket = class(TROHTTPMessageDispatchers)
protected
function GetDispatcherClass : TROMessageDispatcherClass; override;
end;
TROTransportContext = class(TInterfacedObject,
IROTransport, IROTCPTransport,
IROActiveEventServer)
private
FROServer: TROIndyHTTPServer;
FIdContext: TIdServerWSContext;
FEventCount: Integer;
FClientId: TGUID;
private
class var FGlobalEventCount: Integer;
protected
{IROTransport}
function GetTransportObject: TObject;
{IROTCPTransport}
function GetClientAddress : string;
{IROActiveEventServer}
procedure EventsRegistered(aSender : TObject; aClient: TGUID);
procedure DispatchEvent(anEventDataItem : TROEventData; aSessionReference : TGUID; aSender: TObject); // asender is TROEventRepository
public
//constructor Create(aROServer: TROIndyHTTPServer; aIOHandler: TIdIOHandlerWebsocket);
constructor Create(aROServer: TROIndyHTTPServer; aIdContext: TIdServerWSContext);
property Context: TIdServerWSContext read FIdContext;
property ClientId: TGUID read FClientId write FClientId;
end;
procedure Register;
implementation
uses
SysUtils, IdCoderMIME, Windows, uROEventRepository, uROSessions, uROClient,
uROClasses, StrUtils, uROIdServerWebsocketHandling;
procedure Register;
begin
RegisterComponents('RBK', [TROIndyHTTPWebsocketServer]);
end;
procedure TROIndyHTTPWebsocketServer.AfterConstruction;
begin
inherited;
FSocketIO := TIdServerSocketIOHandling_Ext.Create;
IndyServer.ContextClass := TROIdServerWSContext;
if Self.IndyServer.IOHandler = nil then
IndyServer.IOHandler := TIdServerIOHandlerWebsocket.Create(Self);
end;
destructor TROIndyHTTPWebsocketServer.Destroy;
begin
inherited;
FSocketIO.Free;
end;
function TROIndyHTTPWebsocketServer.GetDispatchersClass: TROMessageDispatchersClass;
begin
Result := TROHTTPMessageDispatchers_Websocket;
end;
function TROIndyHTTPWebsocketServer.GetSocketIO: TIdServerSocketIOHandling;
begin
Result := FSocketIO;
end;
procedure TROIndyHTTPWebsocketServer.InternalServerCommandGet(AThread: TIdThreadClass;
ARequestInfo: TIdHTTPRequestInfo; AResponseInfo: TIdHTTPResponseInfo);
begin
(AThread as TIdServerWSContext).OnCustomChannelExecute := Self.OnCustomChannelExecute;
(AThread as TIdServerWSContext).SocketIO := Self.FSocketIO;
(AThread as TROIdServerWSContext).OnRemObjectsRequest := Self.ProcessRemObjectsRequest;
if not TROIdServerWebsocketHandling.ProcessServerCommandGet(AThread as TIdServerWSContext, ARequestInfo, AResponseInfo) then
inherited InternalServerCommandGet(AThread, ARequestInfo, AResponseInfo)
end;
procedure TROIndyHTTPWebsocketServer.InternalServerConnect(AThread: TIdContext);
begin
inherited;
(AThread as TIdServerWSContext).OnCustomChannelExecute := Self.OnCustomChannelExecute;
(AThread as TROIdServerWSContext).OnRemObjectsRequest := Self.ProcessRemObjectsRequest;
end;
procedure TROIndyHTTPWebsocketServer.Loaded;
begin
//do before inherited in case of designtime connection
if Self.IndyServer.IOHandler = nil then
IndyServer.IOHandler := TIdServerIOHandlerWebsocket.Create(Self);
inherited;
end;
procedure TROIndyHTTPWebsocketServer.ProcessRemObjectsRequest(
const AThread: TIdContext; const strmRequest: TMemoryStream; const strmResponse: TMemoryStream);
var
cWSNR: array[0..High(C_ROWSNR)] of AnsiChar;
msg: TROMessageDispatcher;
iMsgNr: Integer;
imsg: IROMessage;
transport: TROTransportContext;
begin
if strmRequest.Size < Length(C_ROWSNR) + SizeOf(iMsgNr) then Exit;
//read messagenr from the end
strmRequest.Position := strmRequest.Size - Length(C_ROWSNR) - SizeOf(iMsgNr);
strmRequest.Read(cWSNR[0], Length(C_ROWSNR));
if (cWSNR <> C_ROWSNR) then Exit;
strmRequest.Read(iMsgNr, SizeOf(iMsgNr));
strmRequest.Position := 0;
//trunc extra data
strmRequest.Size := strmRequest.Size - Length(C_ROWSNR) - SizeOf(iMsgNr);
transport := AThread.Data as TROTransportContext;
//no RO transport object already made?
if transport = nil then
begin
//create IROTransport object
transport := TROTransportContext.Create(Self, AThread as TIdServerWSContext);
(transport as IROTransport)._AddRef;
//attach RO transport to indy context
AThread.Data := transport;
//todo: enveloppes
//read client GUID the first time (needed to be able to send RO events)
msg := Self.Dispatchers.FindDispatcher(transport, strmRequest);
if msg = nil then
raise EROException.Create('No suiteable message dispatcher found!');
imsg := (msg.MessageIntf as IROMessageCloneable).Clone;
imsg.InitializeRead(transport);
imsg.ReadFromStream(strmRequest);
transport.ClientId := imsg.ClientID;
imsg := nil;
Assert(not IsEqualGUID(transport.ClientID, EmptyGUID));
end;
//EXECUTE FUNCTION
Self.DispatchMessage(transport, strmRequest, strmResponse);
//write number at end
strmResponse.Position := strmResponse.Size;
strmResponse.Write(C_ROWSNR, Length(C_ROWSNR));
strmResponse.Write(iMsgNr, SizeOf(iMsgNr));
strmResponse.Position := 0;
end;
{ TROTransport }
constructor TROTransportContext.Create(aROServer: TROIndyHTTPServer;
aIdContext: TIdServerWSContext);
begin
FROServer := aROServer;
FIdContext := aIdContext;
end;
procedure TROTransportContext.EventsRegistered(aSender: TObject; aClient: TGUID);
begin
//
end;
procedure TROTransportContext.DispatchEvent(anEventDataItem: TROEventData;
aSessionReference: TGUID; aSender: TObject);
var
i: Integer;
LContext: TIdContext;
transport: TROTransportContext;
l: TList;
ws: TIdIOHandlerWebsocket;
cWSNR: array[0..High(C_ROWSNR)] of AnsiChar;
begin
l := FROServer.IndyServer.Contexts.LockList;
try
if l.Count <= 0 then Exit;
anEventDataItem.Data.Position := anEventDataItem.Data.Size - Length(C_ROWSNR) - SizeOf(FEventCount);
anEventDataItem.Data.Read(cWSNR[0], Length(cWSNR));
//event number not written already?
if cWSNR <> C_ROWSNR then
begin
//new event nr
FEventCount := -1 * InterlockedIncrement(FGlobalEventCount); //negative = event, positive is normal RO message
//overflow? then start again from 0
if FEventCount > 0 then
begin
InterlockedExchange(FGlobalEventCount, 0);
FEventCount := -1 * InterlockedIncrement(FGlobalEventCount); //negative = event, positive is normal RO message
end;
Assert(FEventCount < 0);
//write nr at end of message
anEventDataItem.Data.Position := anEventDataItem.Data.Size;
anEventDataItem.Data.Write(C_ROWSNR, Length(C_ROWSNR));
anEventDataItem.Data.Write(FEventCount, SizeOf(FEventCount));
anEventDataItem.Data.Position := 0;
end;
//search specific client
for i := 0 to l.Count - 1 do
begin
LContext := TIdContext(l.Items[i]);
transport := LContext.Data as TROTransportContext;
if transport = nil then Continue;
if not IsEqualGUID(transport.ClientId, aSessionReference) then Continue;
//direct write event data
ws := (LContext.Connection.IOHandler as TIdIOHandlerWebsocket);
if not ws.IsWebsocket then Exit;
ws.Lock;
try
try ws.Write(anEventDataItem.Data, wdtBinary) except {continue with other connections} end;
finally
ws.Unlock;
end;
end;
finally
anEventDataItem.RemoveRef;
FROServer.IndyServer.Contexts.UnlockList;
end;
end;
function TROTransportContext.GetClientAddress: string;
begin
Result := FIdContext.Binding.PeerIP;
end;
function TROTransportContext.GetTransportObject: TObject;
begin
Result := FROServer;
end;
{ TROHTTPMessageDispatchers_WebSocket }
function TROHTTPMessageDispatchers_WebSocket.GetDispatcherClass: TROMessageDispatcherClass;
begin
result := TROHTTPDispatcher_Websocket;
end;
{ TROHTTPDispatcher_Websocket }
function TROHTTPDispatcher_Websocket.CanHandleMessage(
const aTransport: IROTransport; aRequeststream: TStream): boolean;
var
tcp: IROTCPTransport;
buf: array [0..5] of AnsiChar;
begin
if aRequeststream = nil then result := FALSE else // for preventing warning in FPC
result := FALSE;
if not Enabled or
not Supports(aTransport, IROTCPTransport, tcp)
then
Exit;
if (tcp as TROTransportContext).FIdContext.IOHandler.IsWebsocket then
begin
//we can handle all kind of messages, independent on the path, so check which kind of message we have
Result := Self.Message.IsValidMessage((aRequeststream as TMemoryStream).Memory, aRequeststream.Size);
//goes wrong with enveloppes!
//TROMessage.Envelopes_ProcessIncoming
if not Result and
(aRequeststream.Size > 6) then
begin
aRequeststream.Read(buf,6);
Result := (buf[0] = EnvelopeSignature[0]) and
(buf[1] = EnvelopeSignature[1]) and
(buf[2] = EnvelopeSignature[2]) and
(buf[3] = EnvelopeSignature[3]) and
(buf[4] = EnvelopeSignature[4]);
aRequeststream.Position := 0;
end;
end
else
Result := inherited CanHandleMessage(aTransport, aRequeststream);
end;
end.

View File

@ -0,0 +1,81 @@
unit uROIdServerWebsocketHandling;
interface
uses
IdServerWebsocketHandling, IdServerWebsocketContext,
IdContext,
Classes, IdIOHandlerWebsocket;
type
TOnRemObjectsRequest = procedure(const AThread: TIdContext; const strmRequest: TMemoryStream; const strmResponse: TMemoryStream) of object;
TROIdServerWSContext = class(TIdServerWSContext)
private
FOnRemObjectsRequest: TOnRemObjectsRequest;
public
property OnRemObjectsRequest: TOnRemObjectsRequest read FOnRemObjectsRequest write FOnRemObjectsRequest;
end;
TROIdServerWebsocketHandling = class(TIdServerWebsocketHandling)
protected
class procedure DoWSExecute(AThread: TIdContext; aSocketIOHandler: TIdServerSocketIOHandling_Ext); override;
class procedure HandleWSMessage(AContext: TIdServerWSContext; aType: TWSDataType; aRequestStrm, aResponseStrm: TMemoryStream;
aSocketIOHandler: TIdServerSocketIOHandling_Ext);override;
end;
const
C_ROWSNR: array[0..5] of AnsiChar = 'ROWSNR';
implementation
uses
uROHTTPWebsocketServer, uROClientIntf;
{ TROIdServerWebsocketHandling }
class procedure TROIdServerWebsocketHandling.DoWSExecute(AThread: TIdContext;
aSocketIOHandler: TIdServerSocketIOHandling_Ext);
var
transport: TROTransportContext;
begin
try
inherited DoWSExecute(AThread, aSocketIOHandler);
finally
transport := AThread.Data as TROTransportContext;
//detach RO transport
if transport <> nil then
(transport as IROTransport)._Release;
end;
end;
class procedure TROIdServerWebsocketHandling.HandleWSMessage(
AContext: TIdServerWSContext; aType: TWSDataType; aRequestStrm, aResponseStrm: TMemoryStream;
aSocketIOHandler: TIdServerSocketIOHandling_Ext);
var
cWSNR: array[0..High(C_ROWSNR)] of AnsiChar;
rocontext: TROIdServerWSContext;
begin
if aRequestStrm.Size > Length(C_ROWSNR) + SizeOf(Integer) then
begin
aRequestStrm.Position := aRequestStrm.Size - Length(C_ROWSNR) - SizeOf(Integer);
aRequestStrm.Read(cWSNR[0], Length(cWSNR));
end
else
cWSNR := '';
if cWSNR = C_ROWSNR then
begin
rocontext := AContext as TROIdServerWSContext;
if Assigned(rocontext.OnRemObjectsRequest) then
rocontext.OnRemObjectsRequest(AContext, aRequestStrm, aResponseStrm);
end
// else if SameText(context.path, '/RemObjects') then
// begin
// ProcessRemObjectsRequest(AThread, strmRequest, strmResponse, transport);
// end
else
inherited HandleWSMessage(AContext, aType, aRequestStrm, aResponseStrm, aSocketIOHandler);
end;
end.

View File

@ -0,0 +1,436 @@
unit uROIndyHTTPWebsocketChannel;
interface
uses
Classes, SyncObjs,
uROIndyHTTPChannel, uROClientIntf,
IdHTTPWebsocketClient, IdHTTP, IdWinsock2;
const
C_RO_WS_NR: array[0..5] of AnsiChar = 'ROWSNR';
type
TROIndyHTTPWebsocketChannel = class;
//TROIndyHTTPSocketIOClient = class(TIdHTTPSocketIOClient_old)
TROIndyHTTPSocketIOClient = class(TIdHTTPWebsocketClient)
protected
FParent: TROIndyHTTPWebsocketChannel;
public
procedure AsyncDispatchEvent(const aEvent: TStream); overload; override;
procedure AsyncDispatchEvent(const aEvent: string); overload; override;
end;
TROIndyHTTPWebsocketChannel = class(TROIndyHTTPChannel,
IROActiveEventChannel)
private
function GetHost: string;
function GetPort: integer;
procedure SetHost(const Value: string);
procedure SetPort(const Value: integer);
function GetIndyClient: TIdHTTPWebsocketClient;
procedure SetWSResourceName(const Value: string);
function GetWSResourceName: string;
protected
FTriedUpgrade: Boolean;
FEventReceivers: TInterfaceList;
FMessageNr: Integer;
procedure IntDispatchEvent(aEvent: TStream);
procedure AsyncDispatchEvent(aEvent: TStream);
procedure SocketConnected(Sender: TObject);
procedure ResetChannel;
function TryUpgradeToWebsocket: Boolean;
protected
procedure IntDispatch(aRequest, aResponse: TStream); override;
function CreateIndyClient: TIdHTTP; override;
protected
{IROActiveEventChannel}
procedure RegisterEventReceiver (aReceiver: IROEventReceiver);
procedure UnregisterEventReceiver(aReceiver: IROEventReceiver);
public
procedure AfterConstruction;override;
destructor Destroy; override;
published
property IndyClient: TIdHTTPWebsocketClient read GetIndyClient;
property Port: integer read GetPort write SetPort;
property Host: string read GetHost write SetHost;
property WSResourceName: string read GetWSResourceName write SetWSResourceName;
end;
procedure Register;
implementation
uses
SysUtils, Windows,
IdStack, IdStackConsts, IdGlobal, IdStackBSDBase,
uRORes, uROIndySupport, mcFinalizationHelper, IdIOHandlerWebsocket, StrUtils;
procedure Register;
begin
RegisterComponents('RBK', [TROIndyHTTPWebsocketChannel]);
end;
type
TAnonymousThread = class(TThread)
protected
FThreadProc: TThreadProcedure;
procedure Execute; override;
public
constructor Create(AThreadProc: TThreadProcedure);
end;
procedure CreateAnonymousThread(AThreadProc: TThreadProcedure);
begin
TAnonymousThread.Create(AThreadProc);
end;
{ TROIndyHTTPChannel_Websocket }
procedure TROIndyHTTPWebsocketChannel.AfterConstruction;
begin
inherited;
FEventReceivers := TInterfaceList.Create;
//not needed, is ignored at server now, but who knows later? :) e.g. support multiple sub protocols
WSResourceName := 'RemObjects';
end;
destructor TROIndyHTTPWebsocketChannel.Destroy;
begin
if TIdWebsocketMultiReadThread.Instance <> nil then
TIdWebsocketMultiReadThread.Instance.RemoveClient(Self.IndyClient);
FEventReceivers.Free;
inherited;
end;
function TROIndyHTTPWebsocketChannel.GetIndyClient: TIdHTTPWebsocketClient;
begin
Result := inherited IndyClient as TIdHTTPWebsocketClient;
end;
procedure TROIndyHTTPWebsocketChannel.SetHost(const Value: string);
begin
IndyClient.Host := Value;
TargetURL := Format('ws://%s:%d/%s', [Host, Port, WSResourceName]);
end;
procedure TROIndyHTTPWebsocketChannel.SetPort(const Value: integer);
begin
IndyClient.Port := Value;
TargetURL := Format('ws://%s:%d/%s', [Host, Port, WSResourceName]);
end;
procedure TROIndyHTTPWebsocketChannel.SetWSResourceName(const Value: string);
begin
IndyClient.WSResourceName := Value;
TargetURL := Format('ws://%s:%d/%s', [Host, Port, WSResourceName]);
end;
function TROIndyHTTPWebsocketChannel.GetHost: string;
begin
Result := IndyClient.Host;
end;
function TROIndyHTTPWebsocketChannel.GetPort: integer;
begin
Result := IndyClient.Port;
end;
function TROIndyHTTPWebsocketChannel.GetWSResourceName: string;
begin
Result := IndyClient.WSResourceName;
end;
procedure TROIndyHTTPWebsocketChannel.AsyncDispatchEvent(aEvent: TStream);
var
strmevent: TMemoryStream;
begin
strmevent := TMemoryStream.Create;
strmevent.CopyFrom(aEvent, aEvent.Size);
//events during dispatch? channel is busy so offload event dispatching to different thread!
CreateAnonymousThread(
procedure
begin
IntDispatchEvent(strmevent);
strmevent.Free;
end);
end;
function TROIndyHTTPWebsocketChannel.CreateIndyClient: TIdHTTP;
var
wsclient: TROIndyHTTPSocketIOClient;
begin
//Result := inherited CreateIndyClient;
wsclient := TROIndyHTTPSocketIOClient.Create(Self);
// wsclient := TIdHTTPWebsocketClient.Create(Self);
wsclient.FParent := Self;
wsclient.Port := 80;
wsclient.Host := '127.0.0.1';
wsclient.Request.UserAgent := uRORes.str_ProductName;
wsclient.OnConnected := SocketConnected;
//TargetURL := '';
Result := wsclient;
end;
procedure TROIndyHTTPWebsocketChannel.SocketConnected(Sender: TObject);
begin
if DisableNagle then
uROIndySupport.Indy_DisableNagle(IndyClient);
end;
function TROIndyHTTPWebsocketChannel.TryUpgradeToWebsocket: Boolean;
begin
try
Result := (IndyClient as TIdHTTPWebsocketClient).TryUpgradeToWebsocket;
if Result then
begin
Self.IndyClient.IOHandler.InputBuffer.Clear;
//background wait for data in single thread
TIdWebsocketMultiReadThread.Instance.AddClient(Self.IndyClient);
end;
except
ResetChannel;
raise;
end;
end;
procedure TROIndyHTTPWebsocketChannel.IntDispatch(aRequest, aResponse: TStream);
var
cWSNR: array[0..High(C_RO_WS_NR)] of AnsiChar;
iMsgNr, iMsgNr2: Integer;
ws: TIdIOHandlerWebsocket;
wscode: TWSDataCode;
swstext: utf8string;
begin
//http server supports websockets?
if not FTriedUpgrade then
begin
if not IndyClient.IOHandler.IsWebsocket then //not already upgraded?
TryUpgradeToWebsocket;
FTriedUpgrade := True; //one shot
end;
ws := IndyClient.IOHandler as TIdIOHandlerWebsocket;
if not ws.IsWebsocket then
//normal http dispatch
inherited IntDispatch(aRequest, aResponse)
else
//websocket dispatch
begin
ws.Lock;
try
//write messagenr at end
aRequest.Position := aRequest.Size;
Inc(FMessageNr);
iMsgNr := FMessageNr;
aRequest.Write(C_RO_WS_NR, Length(C_RO_WS_NR));
aRequest.Write(iMsgNr, SizeOf(iMsgNr));
aRequest.Position := 0;
//write
IndyClient.IOHandler.Write(aRequest);
iMsgNr2 := 0;
while iMsgNr2 <= 0 do
begin
aResponse.Size := 0; //clear
//first is the data type TWSDataType(text or bin), but is ignore/not needed
wscode := TWSDataCode(IndyClient.IOHandler.ReadLongWord);
//next the size + data = stream
IndyClient.IOHandler.ReadStream(aResponse);
//ignore ping/pong messages
if wscode in [wdcPing, wdcPong] then Continue;
if aResponse.Size >= Length(C_RO_WS_NR) + SizeOf(iMsgNr) then
begin
//get event or message nr
aResponse.Position := aResponse.Size - Length(C_RO_WS_NR) - SizeOf(iMsgNr2);
aResponse.Read(cWSNR[0], Length(cWSNR));
end;
if (cWSNR = C_RO_WS_NR) then
begin
aResponse.Read(iMsgNr2, SizeOf(iMsgNr2));
aResponse.Size := aResponse.Size - Length(C_RO_WS_NR) - SizeOf(iMsgNr2); //trunc
aResponse.Position := 0;
//event?
if iMsgNr2 < 0 then
begin
//events during dispatch? channel is busy so offload event dispatching to different thread!
AsyncDispatchEvent(aResponse);
aResponse.Size := 0;
{
ws.Unlock;
try
IntDispatchEvent(aResponse);
aResponse.Size := 0;
finally
ws.Lock;
end;
}
end;
end
else
begin
aResponse.Position := 0;
if wscode = wdcBinary then
begin
Self.IndyClient.AsyncDispatchEvent(aResponse);
end
else if wscode = wdcText then
begin
SetLength(swstext, aResponse.Size);
aResponse.Read(swstext[1], aResponse.Size);
if swstext <> '' then
begin
Self.IndyClient.AsyncDispatchEvent(string(swstext));
end;
end;
end;
end;
except
ws.Unlock; //always unlock
ResetChannel;
Raise;
end;
ws.Unlock; //normal unlock (no extra try finally needed)
if iMsgNr2 <> iMsgNr then
Assert(iMsgNr2 = iMsgNr, 'Message number mismatch between send and received!');
end;
end;
procedure TROIndyHTTPWebsocketChannel.IntDispatchEvent(aEvent: TStream);
var
i: Integer;
eventrecv: IROEventReceiver;
begin
for i := 0 to FEventReceivers.Count - 1 do
begin
aEvent.Position := 0;
eventrecv := FEventReceivers.Items[i] as IROEventReceiver;
try
eventrecv.Dispatch(aEvent, TThread.CurrentThread);
except
//ignore errors within events, so normal communication is preserved
end;
end;
end;
procedure TROIndyHTTPWebsocketChannel.RegisterEventReceiver(
aReceiver: IROEventReceiver);
begin
FEventReceivers.Add(aReceiver);
end;
procedure TROIndyHTTPWebsocketChannel.ResetChannel;
//var
// ws: TIdIOHandlerWebsocket;
begin
FTriedUpgrade := False; //reset
TIdWebsocketMultiReadThread.Instance.RemoveClient(Self.IndyClient);
if IndyClient.IOHandler <> nil then
begin
IndyClient.IOHandler.InputBuffer.Clear;
//close/disconnect internal socket
//ws := IndyClient.IOHandler as TIdIOHandlerWebsocket;
//ws.Close; done in disconnect below
end;
IndyClient.Disconnect(False);
end;
procedure TROIndyHTTPWebsocketChannel.UnregisterEventReceiver(
aReceiver: IROEventReceiver);
begin
FEventReceivers.Remove(aReceiver);
end;
{ TMultiChannelReadThread }
(*
procedure TROIndyWSMultiChannelReadThread_old.ReadFromAllChannels;
if strmEvent = nil then
strmEvent := TMemoryStream.Create;
strmEvent.Clear;
//first is the data type TWSDataType(text or bin), but is ignore/not needed
wscode := TWSDataCode(chn.IndyClient.IOHandler.ReadLongWord);
//next the size + data = stream
chn.IndyClient.IOHandler.ReadStream(strmEvent);
//ignore ping/pong messages
if wscode in [wdcPing, wdcPong] then Continue;
if strmEvent.Size < Length(C_ROWSNR) + SizeOf(iEventNr) then Continue;
//get event nr
strmEvent.Position := strmEvent.Size - Length(C_ROWSNR) - SizeOf(iEventNr);
strmEvent.Read(cWSNR[0], Length(cWSNR));
Assert(cWSNR = C_ROWSNR);
strmEvent.Read(iEventNr, SizeOf(iEventNr));
Assert(iEventNr < 0);
//trunc
strmEvent.Size := strmEvent.Size - Length(C_ROWSNR) - SizeOf(iEventNr);
//fire event
//chn.IntDispatchEvent(strmEvent);
//offload event dispatching to different thread! otherwise deadlocks possible? (do to synchronize)
strmEvent.Position := 0;
chn.AsyncDispatchEvent(strmEvent);
*)
{ TAnonymousThread }
constructor TAnonymousThread.Create(AThreadProc: TThreadProcedure);
begin
FThreadProc := AThreadProc;
FreeOnTerminate := True;
inherited Create(False {direct start});
end;
procedure TAnonymousThread.Execute;
begin
if Assigned(FThreadProc) then
FThreadProc();
end;
{ TROIndyHTTPSocketIOClient }
procedure TROIndyHTTPSocketIOClient.AsyncDispatchEvent(const aEvent: TStream);
var
iEventNr: Integer;
cWSNR: array[0..High(C_RO_WS_NR)] of AnsiChar;
begin
if aEvent.Size > Length(C_RO_WS_NR) + SizeOf(iEventNr) then
begin
//get event nr
aEvent.Position := aEvent.Size - Length(C_RO_WS_NR) - SizeOf(iEventNr);
aEvent.Read(cWSNR[0], Length(cWSNR));
//has eventnr?
if cWSNR = C_RO_WS_NR then
begin
aEvent.Read(iEventNr, SizeOf(iEventNr));
Assert(iEventNr < 0, 'must be negative number for RO events');
//trunc
aEvent.Size := aEvent.Size - Length(C_RO_WS_NR) - SizeOf(iEventNr);
aEvent.Position := 0;
FParent.AsyncDispatchEvent(aEvent);
Exit;
end;
end;
inherited AsyncDispatchEvent(aEvent);
end;
procedure TROIndyHTTPSocketIOClient.AsyncDispatchEvent(const aEvent: string);
begin
inherited AsyncDispatchEvent(aEvent);
end;
end.

View File

@ -0,0 +1,137 @@
unit uROSimpleEventRepository;
interface
uses
uROEventRepository, uROClient, uROTypes, uROClientIntf,
uROHTTPWebsocketServer, uROSessions, Classes, SyncObjs;
type
TROSimpleWebsocketEventRepository = class(TInterfacedObject,
IROEventRepository)
private
FMessage: TROMessage;
FROServer: TROIndyHTTPWebsocketServer;
FEventCount: Integer;
protected
{IROEventRepository}
procedure AddSession(aSessionID : TGUID); overload;
procedure AddSession(aSessionID : TGUID; aEventSinkId: AnsiString); overload;
procedure RemoveSession(aSessionID : TGUID); overload;
procedure RemoveSession(aSessionID : TGUID; aEventSinkId: AnsiString); overload;
procedure StoreEventData(SourceSessionID : TGUID; Data : Binary;
const ExcludeSender: Boolean;
const ExcludeSessionList: Boolean;
const SessionList: String); overload;
procedure StoreEventData(SourceSessionID : TGUID; Data : Binary;
const ExcludeSender: Boolean;
const ExcludeSessionList: Boolean;
const SessionList: String;
const EventSinkId: AnsiString); overload;
function GetEventData(SessionID : TGUID; var TargetStream : Binary) : integer;
public
function GetEventWriter(const IID: TGUID): IROEventWriter;
property Message : TROMessage read FMessage write FMessage;
property ROServer: TROIndyHTTPWebsocketServer read FROServer write FROServer;
end;
implementation
uses
IdContext, IdIOHandlerWebsocket, Windows;
{ TSimpleEventRepository }
procedure TROSimpleWebsocketEventRepository.AddSession(aSessionID: TGUID);
begin
//no session
end;
procedure TROSimpleWebsocketEventRepository.AddSession(aSessionID: TGUID;
aEventSinkId: AnsiString);
begin
//no session
end;
procedure TROSimpleWebsocketEventRepository.RemoveSession(aSessionID: TGUID;
aEventSinkId: AnsiString);
begin
//no session
end;
procedure TROSimpleWebsocketEventRepository.RemoveSession(aSessionID: TGUID);
begin
//no session
end;
function TROSimpleWebsocketEventRepository.GetEventWriter(
const IID: TGUID): IROEventWriter;
var
lEventWriterClass: TROEventWriterClass;
begin
lEventWriterClass := FindEventWriterClass(IID);
if not assigned(lEventWriterClass) then exit;
result := lEventWriterClass.Create(fMessage, Self) as IROEventWriter;
end;
function TROSimpleWebsocketEventRepository.GetEventData(SessionID: TGUID;
var TargetStream: Binary): integer;
begin
Result := -1;
Assert(False);
end;
procedure TROSimpleWebsocketEventRepository.StoreEventData(SourceSessionID: TGUID;
Data: Binary; const ExcludeSender, ExcludeSessionList: Boolean;
const SessionList: String; const EventSinkId: AnsiString);
begin
StoreEventData(SourceSessionID, Data, ExcludeSender, ExcludeSessionList, SessionList);
end;
procedure TROSimpleWebsocketEventRepository.StoreEventData(SourceSessionID: TGUID;
Data: Binary; const ExcludeSender, ExcludeSessionList: Boolean;
const SessionList: String);
var
i, iEventNr: Integer;
LContext: TIdContext;
l: TList;
ws: TIdIOHandlerWebsocket;
begin
l := ROServer.IndyServer.Contexts.LockList;
try
if l.Count <= 0 then Exit;
iEventNr := -1 * InterlockedIncrement(FEventCount); //negative = event, positive is normal RO message
if iEventNr > 0 then
begin
InterlockedExchange(FEventCount, 0);
iEventNr := -1 * InterlockedIncrement(FEventCount); //negative = event, positive is normal RO message
end;
Assert(iEventNr < 0);
Data.Position := Data.Size;
Data.Write(C_ROWSNR, Length(C_ROWSNR));
Data.Write(iEventNr, SizeOf(iEventNr));
Data.Position := 0;
//direct write to ALL connections
for i := 0 to l.Count - 1 do
begin
LContext := TIdContext(l.Items[i]);
ws := (LContext.Connection.IOHandler as TIdIOHandlerWebsocket);
if not ws.IsWebsocket then Continue;
ws.Lock;
try
ws.Write(Data, wdtBinary);
finally
ws.Unlock;
end;
end;
finally
ROServer.IndyServer.Contexts.UnlockList;
end;
end;
end.