diff --git a/ROdemoWS.zip b/ROdemoWS.zip new file mode 100644 index 0000000..9545d01 Binary files /dev/null and b/ROdemoWS.zip differ diff --git a/RemObjectsSDK_WS.js b/RemObjectsSDK_WS.js new file mode 100644 index 0000000..8734666 --- /dev/null +++ b/RemObjectsSDK_WS.js @@ -0,0 +1,218 @@ +//*************************************************************// +//some extra stuff for Websockets PoC +//by: André Mussche, andre.mussche@gmail.com + +function WebSocketsWrapper(url, aWebSocketClientChannel) { + this.updating = false; + this.urlCall = url; + this.WebSocketClientChannel = aWebSocketClientChannel; +}; + +WebSocketsWrapper.prototype.post = function post(passData, isBinary, onSuccessFunction, onErrorFunction) +{ + //if (this.updating) { + // return false; + //}; + this.WS = this.WS || null; + if (this.WS) { + if (this.updating) { + return false; + } + if (this.WS.readyState == 2 || //CLOSING + this.WS.readyState == 3) //CLOSED + this.WS = null; + }; + + var parser = new BinaryParser(); + + //check websocket support + if ("WebSocket" in window) + { + if (isBinary == true) + { + //add messagenr to end + var sMsgNr = 'WSNR' + parser.encodeInt(12345, 32, false); + passData = passData + sMsgNr; + + var len = passData.length; + var data = new Uint8Array(len); + for (var i=0; i C_ROWSNR) then Exit; + strmRequest.Read(iMsgNr, SizeOf(iMsgNr)); + strmRequest.Position := 0; + //trunc extra data + strmRequest.Size := strmRequest.Size - Length(C_ROWSNR) - SizeOf(iMsgNr); + transport := AThread.Data as TROTransportContext; + //no RO transport object already made? + if transport = nil then + begin + //create IROTransport object + transport := TROTransportContext.Create(Self, AThread as TIdServerWSContext); + (transport as IROTransport)._AddRef; + //attach RO transport to indy context + AThread.Data := transport; + //todo: enveloppes + //read client GUID the first time (needed to be able to send RO events) + msg := Self.Dispatchers.FindDispatcher(transport, strmRequest); + if msg = nil then + raise EROException.Create('No suiteable message dispatcher found!'); + imsg := (msg.MessageIntf as IROMessageCloneable).Clone; + imsg.InitializeRead(transport); + imsg.ReadFromStream(strmRequest); + transport.ClientId := imsg.ClientID; + imsg := nil; + Assert(not IsEqualGUID(transport.ClientID, EmptyGUID)); + end; + //EXECUTE FUNCTION + Self.DispatchMessage(transport, strmRequest, strmResponse); + //write number at end + strmResponse.Position := strmResponse.Size; + strmResponse.Write(C_ROWSNR, Length(C_ROWSNR)); + strmResponse.Write(iMsgNr, SizeOf(iMsgNr)); + strmResponse.Position := 0; +end; + +{ TROTransport } + +constructor TROTransportContext.Create(aROServer: TROIndyHTTPServer; + aIdContext: TIdServerWSContext); +begin + FROServer := aROServer; + FIdContext := aIdContext; +end; + +procedure TROTransportContext.EventsRegistered(aSender: TObject; aClient: TGUID); +begin + // +end; + +procedure TROTransportContext.DispatchEvent(anEventDataItem: TROEventData; + aSessionReference: TGUID; aSender: TObject); +var + i: Integer; + LContext: TIdContext; + transport: TROTransportContext; + l: TList; + ws: TIdIOHandlerWebsocket; + cWSNR: array[0..High(C_ROWSNR)] of AnsiChar; +begin + l := FROServer.IndyServer.Contexts.LockList; + try + if l.Count <= 0 then Exit; + + anEventDataItem.Data.Position := anEventDataItem.Data.Size - Length(C_ROWSNR) - SizeOf(FEventCount); + anEventDataItem.Data.Read(cWSNR[0], Length(cWSNR)); + //event number not written already? + if cWSNR <> C_ROWSNR then + begin + //new event nr + FEventCount := -1 * InterlockedIncrement(FGlobalEventCount); //negative = event, positive is normal RO message + //overflow? then start again from 0 + if FEventCount > 0 then + begin + InterlockedExchange(FGlobalEventCount, 0); + FEventCount := -1 * InterlockedIncrement(FGlobalEventCount); //negative = event, positive is normal RO message + end; + Assert(FEventCount < 0); + //write nr at end of message + anEventDataItem.Data.Position := anEventDataItem.Data.Size; + anEventDataItem.Data.Write(C_ROWSNR, Length(C_ROWSNR)); + anEventDataItem.Data.Write(FEventCount, SizeOf(FEventCount)); + anEventDataItem.Data.Position := 0; + end; + + //search specific client + for i := 0 to l.Count - 1 do + begin + LContext := TIdContext(l.Items[i]); + transport := LContext.Data as TROTransportContext; + if transport = nil then Continue; + if not IsEqualGUID(transport.ClientId, aSessionReference) then Continue; + + //direct write event data + ws := (LContext.Connection.IOHandler as TIdIOHandlerWebsocket); + if not ws.IsWebsocket then Exit; + ws.Lock; + try + try ws.Write(anEventDataItem.Data, wdtBinary) except {continue with other connections} end; + finally + ws.Unlock; + end; + end; + finally + anEventDataItem.RemoveRef; + FROServer.IndyServer.Contexts.UnlockList; + end; +end; + +function TROTransportContext.GetClientAddress: string; +begin + Result := FIdContext.Binding.PeerIP; +end; + +function TROTransportContext.GetTransportObject: TObject; +begin + Result := FROServer; +end; + +{ TROHTTPMessageDispatchers_WebSocket } + +function TROHTTPMessageDispatchers_WebSocket.GetDispatcherClass: TROMessageDispatcherClass; +begin + result := TROHTTPDispatcher_Websocket; +end; + +{ TROHTTPDispatcher_Websocket } + +function TROHTTPDispatcher_Websocket.CanHandleMessage( + const aTransport: IROTransport; aRequeststream: TStream): boolean; +var + tcp: IROTCPTransport; + buf: array [0..5] of AnsiChar; +begin + if aRequeststream = nil then result := FALSE else // for preventing warning in FPC + result := FALSE; + + if not Enabled or + not Supports(aTransport, IROTCPTransport, tcp) + then + Exit; + if (tcp as TROTransportContext).FIdContext.IOHandler.IsWebsocket then + begin + //we can handle all kind of messages, independent on the path, so check which kind of message we have + Result := Self.Message.IsValidMessage((aRequeststream as TMemoryStream).Memory, aRequeststream.Size); + + //goes wrong with enveloppes! + //TROMessage.Envelopes_ProcessIncoming + if not Result and + (aRequeststream.Size > 6) then + begin + aRequeststream.Read(buf,6); + Result := (buf[0] = EnvelopeSignature[0]) and + (buf[1] = EnvelopeSignature[1]) and + (buf[2] = EnvelopeSignature[2]) and + (buf[3] = EnvelopeSignature[3]) and + (buf[4] = EnvelopeSignature[4]); + aRequeststream.Position := 0; + end; + end + else + Result := inherited CanHandleMessage(aTransport, aRequeststream); +end; + +end. diff --git a/uROIdServerWebsocketHandling.pas b/uROIdServerWebsocketHandling.pas new file mode 100644 index 0000000..ec28ed1 --- /dev/null +++ b/uROIdServerWebsocketHandling.pas @@ -0,0 +1,81 @@ +unit uROIdServerWebsocketHandling; + +interface + +uses + IdServerWebsocketHandling, IdServerWebsocketContext, + IdContext, + Classes, IdIOHandlerWebsocket; + +type + TOnRemObjectsRequest = procedure(const AThread: TIdContext; const strmRequest: TMemoryStream; const strmResponse: TMemoryStream) of object; + + TROIdServerWSContext = class(TIdServerWSContext) + private + FOnRemObjectsRequest: TOnRemObjectsRequest; + public + property OnRemObjectsRequest: TOnRemObjectsRequest read FOnRemObjectsRequest write FOnRemObjectsRequest; + end; + + TROIdServerWebsocketHandling = class(TIdServerWebsocketHandling) + protected + class procedure DoWSExecute(AThread: TIdContext; aSocketIOHandler: TIdServerSocketIOHandling_Ext); override; + class procedure HandleWSMessage(AContext: TIdServerWSContext; aType: TWSDataType; aRequestStrm, aResponseStrm: TMemoryStream; + aSocketIOHandler: TIdServerSocketIOHandling_Ext);override; + end; + +const + C_ROWSNR: array[0..5] of AnsiChar = 'ROWSNR'; + +implementation + +uses + uROHTTPWebsocketServer, uROClientIntf; + +{ TROIdServerWebsocketHandling } + +class procedure TROIdServerWebsocketHandling.DoWSExecute(AThread: TIdContext; + aSocketIOHandler: TIdServerSocketIOHandling_Ext); +var + transport: TROTransportContext; +begin + try + inherited DoWSExecute(AThread, aSocketIOHandler); + finally + transport := AThread.Data as TROTransportContext; + //detach RO transport + if transport <> nil then + (transport as IROTransport)._Release; + end; +end; + +class procedure TROIdServerWebsocketHandling.HandleWSMessage( + AContext: TIdServerWSContext; aType: TWSDataType; aRequestStrm, aResponseStrm: TMemoryStream; + aSocketIOHandler: TIdServerSocketIOHandling_Ext); +var + cWSNR: array[0..High(C_ROWSNR)] of AnsiChar; + rocontext: TROIdServerWSContext; +begin + if aRequestStrm.Size > Length(C_ROWSNR) + SizeOf(Integer) then + begin + aRequestStrm.Position := aRequestStrm.Size - Length(C_ROWSNR) - SizeOf(Integer); + aRequestStrm.Read(cWSNR[0], Length(cWSNR)); + end + else + cWSNR := ''; + + if cWSNR = C_ROWSNR then + begin + rocontext := AContext as TROIdServerWSContext; + if Assigned(rocontext.OnRemObjectsRequest) then + rocontext.OnRemObjectsRequest(AContext, aRequestStrm, aResponseStrm); + end +// else if SameText(context.path, '/RemObjects') then +// begin +// ProcessRemObjectsRequest(AThread, strmRequest, strmResponse, transport); +// end + else + inherited HandleWSMessage(AContext, aType, aRequestStrm, aResponseStrm, aSocketIOHandler); +end; + +end. diff --git a/uROIndyHTTPWebsocketChannel.pas b/uROIndyHTTPWebsocketChannel.pas new file mode 100644 index 0000000..59e6ac8 --- /dev/null +++ b/uROIndyHTTPWebsocketChannel.pas @@ -0,0 +1,436 @@ +unit uROIndyHTTPWebsocketChannel; + +interface + +uses + Classes, SyncObjs, + uROIndyHTTPChannel, uROClientIntf, + IdHTTPWebsocketClient, IdHTTP, IdWinsock2; + +const + C_RO_WS_NR: array[0..5] of AnsiChar = 'ROWSNR'; + +type + TROIndyHTTPWebsocketChannel = class; + + //TROIndyHTTPSocketIOClient = class(TIdHTTPSocketIOClient_old) + TROIndyHTTPSocketIOClient = class(TIdHTTPWebsocketClient) + protected + FParent: TROIndyHTTPWebsocketChannel; + public + procedure AsyncDispatchEvent(const aEvent: TStream); overload; override; + procedure AsyncDispatchEvent(const aEvent: string); overload; override; + end; + + TROIndyHTTPWebsocketChannel = class(TROIndyHTTPChannel, + IROActiveEventChannel) + private + function GetHost: string; + function GetPort: integer; + procedure SetHost(const Value: string); + procedure SetPort(const Value: integer); + function GetIndyClient: TIdHTTPWebsocketClient; + procedure SetWSResourceName(const Value: string); + function GetWSResourceName: string; + protected + FTriedUpgrade: Boolean; + FEventReceivers: TInterfaceList; + FMessageNr: Integer; + procedure IntDispatchEvent(aEvent: TStream); + procedure AsyncDispatchEvent(aEvent: TStream); + procedure SocketConnected(Sender: TObject); + procedure ResetChannel; + + function TryUpgradeToWebsocket: Boolean; + protected + procedure IntDispatch(aRequest, aResponse: TStream); override; + function CreateIndyClient: TIdHTTP; override; + protected + {IROActiveEventChannel} + procedure RegisterEventReceiver (aReceiver: IROEventReceiver); + procedure UnregisterEventReceiver(aReceiver: IROEventReceiver); + public + procedure AfterConstruction;override; + destructor Destroy; override; + published + property IndyClient: TIdHTTPWebsocketClient read GetIndyClient; + property Port: integer read GetPort write SetPort; + property Host: string read GetHost write SetHost; + property WSResourceName: string read GetWSResourceName write SetWSResourceName; + end; + + procedure Register; + +implementation + +uses + SysUtils, Windows, + IdStack, IdStackConsts, IdGlobal, IdStackBSDBase, + uRORes, uROIndySupport, mcFinalizationHelper, IdIOHandlerWebsocket, StrUtils; + +procedure Register; +begin + RegisterComponents('RBK', [TROIndyHTTPWebsocketChannel]); +end; + +type + TAnonymousThread = class(TThread) + protected + FThreadProc: TThreadProcedure; + procedure Execute; override; + public + constructor Create(AThreadProc: TThreadProcedure); + end; + +procedure CreateAnonymousThread(AThreadProc: TThreadProcedure); +begin + TAnonymousThread.Create(AThreadProc); +end; + +{ TROIndyHTTPChannel_Websocket } + +procedure TROIndyHTTPWebsocketChannel.AfterConstruction; +begin + inherited; + FEventReceivers := TInterfaceList.Create; + //not needed, is ignored at server now, but who knows later? :) e.g. support multiple sub protocols + WSResourceName := 'RemObjects'; +end; + +destructor TROIndyHTTPWebsocketChannel.Destroy; +begin + if TIdWebsocketMultiReadThread.Instance <> nil then + TIdWebsocketMultiReadThread.Instance.RemoveClient(Self.IndyClient); + + FEventReceivers.Free; + inherited; +end; + +function TROIndyHTTPWebsocketChannel.GetIndyClient: TIdHTTPWebsocketClient; +begin + Result := inherited IndyClient as TIdHTTPWebsocketClient; +end; + +procedure TROIndyHTTPWebsocketChannel.SetHost(const Value: string); +begin + IndyClient.Host := Value; + TargetURL := Format('ws://%s:%d/%s', [Host, Port, WSResourceName]); +end; + +procedure TROIndyHTTPWebsocketChannel.SetPort(const Value: integer); +begin + IndyClient.Port := Value; + TargetURL := Format('ws://%s:%d/%s', [Host, Port, WSResourceName]); +end; + +procedure TROIndyHTTPWebsocketChannel.SetWSResourceName(const Value: string); +begin + IndyClient.WSResourceName := Value; + TargetURL := Format('ws://%s:%d/%s', [Host, Port, WSResourceName]); +end; + +function TROIndyHTTPWebsocketChannel.GetHost: string; +begin + Result := IndyClient.Host; +end; + +function TROIndyHTTPWebsocketChannel.GetPort: integer; +begin + Result := IndyClient.Port; +end; + +function TROIndyHTTPWebsocketChannel.GetWSResourceName: string; +begin + Result := IndyClient.WSResourceName; +end; + +procedure TROIndyHTTPWebsocketChannel.AsyncDispatchEvent(aEvent: TStream); +var + strmevent: TMemoryStream; +begin + strmevent := TMemoryStream.Create; + strmevent.CopyFrom(aEvent, aEvent.Size); + + //events during dispatch? channel is busy so offload event dispatching to different thread! + CreateAnonymousThread( + procedure + begin + IntDispatchEvent(strmevent); + strmevent.Free; + end); +end; + +function TROIndyHTTPWebsocketChannel.CreateIndyClient: TIdHTTP; +var + wsclient: TROIndyHTTPSocketIOClient; +begin + //Result := inherited CreateIndyClient; + wsclient := TROIndyHTTPSocketIOClient.Create(Self); +// wsclient := TIdHTTPWebsocketClient.Create(Self); + wsclient.FParent := Self; + wsclient.Port := 80; + wsclient.Host := '127.0.0.1'; + wsclient.Request.UserAgent := uRORes.str_ProductName; + wsclient.OnConnected := SocketConnected; + //TargetURL := ''; + + Result := wsclient; +end; + +procedure TROIndyHTTPWebsocketChannel.SocketConnected(Sender: TObject); +begin + if DisableNagle then + uROIndySupport.Indy_DisableNagle(IndyClient); +end; + +function TROIndyHTTPWebsocketChannel.TryUpgradeToWebsocket: Boolean; +begin + try + Result := (IndyClient as TIdHTTPWebsocketClient).TryUpgradeToWebsocket; + if Result then + begin + Self.IndyClient.IOHandler.InputBuffer.Clear; + //background wait for data in single thread + TIdWebsocketMultiReadThread.Instance.AddClient(Self.IndyClient); + end; + except + ResetChannel; + raise; + end; +end; + +procedure TROIndyHTTPWebsocketChannel.IntDispatch(aRequest, aResponse: TStream); +var + cWSNR: array[0..High(C_RO_WS_NR)] of AnsiChar; + iMsgNr, iMsgNr2: Integer; + ws: TIdIOHandlerWebsocket; + wscode: TWSDataCode; + swstext: utf8string; +begin + //http server supports websockets? + if not FTriedUpgrade then + begin + if not IndyClient.IOHandler.IsWebsocket then //not already upgraded? + TryUpgradeToWebsocket; + FTriedUpgrade := True; //one shot + end; + + ws := IndyClient.IOHandler as TIdIOHandlerWebsocket; + if not ws.IsWebsocket then + //normal http dispatch + inherited IntDispatch(aRequest, aResponse) + else + //websocket dispatch + begin + ws.Lock; + try + //write messagenr at end + aRequest.Position := aRequest.Size; + Inc(FMessageNr); + iMsgNr := FMessageNr; + aRequest.Write(C_RO_WS_NR, Length(C_RO_WS_NR)); + aRequest.Write(iMsgNr, SizeOf(iMsgNr)); + aRequest.Position := 0; + + //write + IndyClient.IOHandler.Write(aRequest); + + iMsgNr2 := 0; + while iMsgNr2 <= 0 do + begin + aResponse.Size := 0; //clear + //first is the data type TWSDataType(text or bin), but is ignore/not needed + wscode := TWSDataCode(IndyClient.IOHandler.ReadLongWord); + //next the size + data = stream + IndyClient.IOHandler.ReadStream(aResponse); + //ignore ping/pong messages + if wscode in [wdcPing, wdcPong] then Continue; + if aResponse.Size >= Length(C_RO_WS_NR) + SizeOf(iMsgNr) then + begin + //get event or message nr + aResponse.Position := aResponse.Size - Length(C_RO_WS_NR) - SizeOf(iMsgNr2); + aResponse.Read(cWSNR[0], Length(cWSNR)); + end; + + if (cWSNR = C_RO_WS_NR) then + begin + aResponse.Read(iMsgNr2, SizeOf(iMsgNr2)); + aResponse.Size := aResponse.Size - Length(C_RO_WS_NR) - SizeOf(iMsgNr2); //trunc + aResponse.Position := 0; + + //event? + if iMsgNr2 < 0 then + begin + //events during dispatch? channel is busy so offload event dispatching to different thread! + AsyncDispatchEvent(aResponse); + aResponse.Size := 0; + { + ws.Unlock; + try + IntDispatchEvent(aResponse); + aResponse.Size := 0; + finally + ws.Lock; + end; + } + end; + end + else + begin + aResponse.Position := 0; + if wscode = wdcBinary then + begin + Self.IndyClient.AsyncDispatchEvent(aResponse); + end + else if wscode = wdcText then + begin + SetLength(swstext, aResponse.Size); + aResponse.Read(swstext[1], aResponse.Size); + if swstext <> '' then + begin + Self.IndyClient.AsyncDispatchEvent(string(swstext)); + end; + end; + end; + end; + except + ws.Unlock; //always unlock + ResetChannel; + Raise; + end; + ws.Unlock; //normal unlock (no extra try finally needed) + + if iMsgNr2 <> iMsgNr then + Assert(iMsgNr2 = iMsgNr, 'Message number mismatch between send and received!'); + end; +end; + +procedure TROIndyHTTPWebsocketChannel.IntDispatchEvent(aEvent: TStream); +var + i: Integer; + eventrecv: IROEventReceiver; +begin + for i := 0 to FEventReceivers.Count - 1 do + begin + aEvent.Position := 0; + eventrecv := FEventReceivers.Items[i] as IROEventReceiver; + try + eventrecv.Dispatch(aEvent, TThread.CurrentThread); + except + //ignore errors within events, so normal communication is preserved + end; + end; +end; + +procedure TROIndyHTTPWebsocketChannel.RegisterEventReceiver( + aReceiver: IROEventReceiver); +begin + FEventReceivers.Add(aReceiver); +end; + +procedure TROIndyHTTPWebsocketChannel.ResetChannel; +//var +// ws: TIdIOHandlerWebsocket; +begin + FTriedUpgrade := False; //reset + TIdWebsocketMultiReadThread.Instance.RemoveClient(Self.IndyClient); + + if IndyClient.IOHandler <> nil then + begin + IndyClient.IOHandler.InputBuffer.Clear; + //close/disconnect internal socket + //ws := IndyClient.IOHandler as TIdIOHandlerWebsocket; + //ws.Close; done in disconnect below + end; + IndyClient.Disconnect(False); +end; + +procedure TROIndyHTTPWebsocketChannel.UnregisterEventReceiver( + aReceiver: IROEventReceiver); +begin + FEventReceivers.Remove(aReceiver); +end; + +{ TMultiChannelReadThread } + +(* +procedure TROIndyWSMultiChannelReadThread_old.ReadFromAllChannels; + if strmEvent = nil then + strmEvent := TMemoryStream.Create; + strmEvent.Clear; + + //first is the data type TWSDataType(text or bin), but is ignore/not needed + wscode := TWSDataCode(chn.IndyClient.IOHandler.ReadLongWord); + //next the size + data = stream + chn.IndyClient.IOHandler.ReadStream(strmEvent); + + //ignore ping/pong messages + if wscode in [wdcPing, wdcPong] then Continue; + if strmEvent.Size < Length(C_ROWSNR) + SizeOf(iEventNr) then Continue; + + //get event nr + strmEvent.Position := strmEvent.Size - Length(C_ROWSNR) - SizeOf(iEventNr); + strmEvent.Read(cWSNR[0], Length(cWSNR)); + Assert(cWSNR = C_ROWSNR); + strmEvent.Read(iEventNr, SizeOf(iEventNr)); + Assert(iEventNr < 0); + //trunc + strmEvent.Size := strmEvent.Size - Length(C_ROWSNR) - SizeOf(iEventNr); + + //fire event + //chn.IntDispatchEvent(strmEvent); + //offload event dispatching to different thread! otherwise deadlocks possible? (do to synchronize) + strmEvent.Position := 0; + chn.AsyncDispatchEvent(strmEvent); +*) + +{ TAnonymousThread } + +constructor TAnonymousThread.Create(AThreadProc: TThreadProcedure); +begin + FThreadProc := AThreadProc; + FreeOnTerminate := True; + inherited Create(False {direct start}); +end; + +procedure TAnonymousThread.Execute; +begin + if Assigned(FThreadProc) then + FThreadProc(); +end; + +{ TROIndyHTTPSocketIOClient } + +procedure TROIndyHTTPSocketIOClient.AsyncDispatchEvent(const aEvent: TStream); +var + iEventNr: Integer; + cWSNR: array[0..High(C_RO_WS_NR)] of AnsiChar; +begin + if aEvent.Size > Length(C_RO_WS_NR) + SizeOf(iEventNr) then + begin + //get event nr + aEvent.Position := aEvent.Size - Length(C_RO_WS_NR) - SizeOf(iEventNr); + aEvent.Read(cWSNR[0], Length(cWSNR)); + //has eventnr? + if cWSNR = C_RO_WS_NR then + begin + aEvent.Read(iEventNr, SizeOf(iEventNr)); + Assert(iEventNr < 0, 'must be negative number for RO events'); + //trunc + aEvent.Size := aEvent.Size - Length(C_RO_WS_NR) - SizeOf(iEventNr); + + aEvent.Position := 0; + FParent.AsyncDispatchEvent(aEvent); + Exit; + end; + end; + + inherited AsyncDispatchEvent(aEvent); +end; + +procedure TROIndyHTTPSocketIOClient.AsyncDispatchEvent(const aEvent: string); +begin + inherited AsyncDispatchEvent(aEvent); +end; + +end. diff --git a/uROSimpleEventRepository.pas b/uROSimpleEventRepository.pas new file mode 100644 index 0000000..9b8e3b4 --- /dev/null +++ b/uROSimpleEventRepository.pas @@ -0,0 +1,137 @@ +unit uROSimpleEventRepository; + +interface + +uses + uROEventRepository, uROClient, uROTypes, uROClientIntf, + uROHTTPWebsocketServer, uROSessions, Classes, SyncObjs; + +type + TROSimpleWebsocketEventRepository = class(TInterfacedObject, + IROEventRepository) + private + FMessage: TROMessage; + FROServer: TROIndyHTTPWebsocketServer; + FEventCount: Integer; + protected + {IROEventRepository} + procedure AddSession(aSessionID : TGUID); overload; + procedure AddSession(aSessionID : TGUID; aEventSinkId: AnsiString); overload; + procedure RemoveSession(aSessionID : TGUID); overload; + procedure RemoveSession(aSessionID : TGUID; aEventSinkId: AnsiString); overload; + + procedure StoreEventData(SourceSessionID : TGUID; Data : Binary; + const ExcludeSender: Boolean; + const ExcludeSessionList: Boolean; + const SessionList: String); overload; + procedure StoreEventData(SourceSessionID : TGUID; Data : Binary; + const ExcludeSender: Boolean; + const ExcludeSessionList: Boolean; + const SessionList: String; + const EventSinkId: AnsiString); overload; + function GetEventData(SessionID : TGUID; var TargetStream : Binary) : integer; + public + function GetEventWriter(const IID: TGUID): IROEventWriter; + + property Message : TROMessage read FMessage write FMessage; + property ROServer: TROIndyHTTPWebsocketServer read FROServer write FROServer; + end; + +implementation + +uses + IdContext, IdIOHandlerWebsocket, Windows; + +{ TSimpleEventRepository } + +procedure TROSimpleWebsocketEventRepository.AddSession(aSessionID: TGUID); +begin + //no session +end; + +procedure TROSimpleWebsocketEventRepository.AddSession(aSessionID: TGUID; + aEventSinkId: AnsiString); +begin + //no session +end; + +procedure TROSimpleWebsocketEventRepository.RemoveSession(aSessionID: TGUID; + aEventSinkId: AnsiString); +begin + //no session +end; + +procedure TROSimpleWebsocketEventRepository.RemoveSession(aSessionID: TGUID); +begin + //no session +end; + +function TROSimpleWebsocketEventRepository.GetEventWriter( + const IID: TGUID): IROEventWriter; +var + lEventWriterClass: TROEventWriterClass; +begin + lEventWriterClass := FindEventWriterClass(IID); + if not assigned(lEventWriterClass) then exit; + result := lEventWriterClass.Create(fMessage, Self) as IROEventWriter; +end; + +function TROSimpleWebsocketEventRepository.GetEventData(SessionID: TGUID; + var TargetStream: Binary): integer; +begin + Result := -1; + Assert(False); +end; + +procedure TROSimpleWebsocketEventRepository.StoreEventData(SourceSessionID: TGUID; + Data: Binary; const ExcludeSender, ExcludeSessionList: Boolean; + const SessionList: String; const EventSinkId: AnsiString); +begin + StoreEventData(SourceSessionID, Data, ExcludeSender, ExcludeSessionList, SessionList); +end; + +procedure TROSimpleWebsocketEventRepository.StoreEventData(SourceSessionID: TGUID; + Data: Binary; const ExcludeSender, ExcludeSessionList: Boolean; + const SessionList: String); +var + i, iEventNr: Integer; + LContext: TIdContext; + l: TList; + ws: TIdIOHandlerWebsocket; +begin + l := ROServer.IndyServer.Contexts.LockList; + try + if l.Count <= 0 then Exit; + + iEventNr := -1 * InterlockedIncrement(FEventCount); //negative = event, positive is normal RO message + if iEventNr > 0 then + begin + InterlockedExchange(FEventCount, 0); + iEventNr := -1 * InterlockedIncrement(FEventCount); //negative = event, positive is normal RO message + end; + Assert(iEventNr < 0); + Data.Position := Data.Size; + Data.Write(C_ROWSNR, Length(C_ROWSNR)); + Data.Write(iEventNr, SizeOf(iEventNr)); + Data.Position := 0; + + //direct write to ALL connections + for i := 0 to l.Count - 1 do + begin + LContext := TIdContext(l.Items[i]); + ws := (LContext.Connection.IOHandler as TIdIOHandlerWebsocket); + if not ws.IsWebsocket then Continue; + ws.Lock; + try + ws.Write(Data, wdtBinary); + finally + ws.Unlock; + end; + end; + finally + ROServer.IndyServer.Contexts.UnlockList; + end; +end; + +end. +