137 lines
4.1 KiB
ObjectPascal
137 lines
4.1 KiB
ObjectPascal
unit uROSimpleEventRepository;
|
|
|
|
interface
|
|
|
|
uses
|
|
uROEventRepository, uROClient, uROTypes, uROClientIntf,
|
|
uROHTTPWebsocketServer, uROSessions, Classes, SyncObjs;
|
|
|
|
type
|
|
TROSimpleWebsocketEventRepository = class(TInterfacedObject,
|
|
IROEventRepository)
|
|
private
|
|
FMessage: TROMessage;
|
|
FROServer: TROIndyHTTPWebsocketServer;
|
|
FEventCount: Integer;
|
|
protected
|
|
{IROEventRepository}
|
|
procedure AddSession(aSessionID : TGUID); overload;
|
|
procedure AddSession(aSessionID : TGUID; aEventSinkId: AnsiString); overload;
|
|
procedure RemoveSession(aSessionID : TGUID); overload;
|
|
procedure RemoveSession(aSessionID : TGUID; aEventSinkId: AnsiString); overload;
|
|
|
|
procedure StoreEventData(SourceSessionID : TGUID; Data : Binary;
|
|
const ExcludeSender: Boolean;
|
|
const ExcludeSessionList: Boolean;
|
|
const SessionList: String); overload;
|
|
procedure StoreEventData(SourceSessionID : TGUID; Data : Binary;
|
|
const ExcludeSender: Boolean;
|
|
const ExcludeSessionList: Boolean;
|
|
const SessionList: String;
|
|
const EventSinkId: AnsiString); overload;
|
|
function GetEventData(SessionID : TGUID; var TargetStream : Binary) : integer;
|
|
public
|
|
function GetEventWriter(const IID: TGUID): IROEventWriter;
|
|
|
|
property Message : TROMessage read FMessage write FMessage;
|
|
property ROServer: TROIndyHTTPWebsocketServer read FROServer write FROServer;
|
|
end;
|
|
|
|
implementation
|
|
|
|
uses
|
|
IdContext, IdIOHandlerWebsocket, Windows;
|
|
|
|
{ TSimpleEventRepository }
|
|
|
|
procedure TROSimpleWebsocketEventRepository.AddSession(aSessionID: TGUID);
|
|
begin
|
|
//no session
|
|
end;
|
|
|
|
procedure TROSimpleWebsocketEventRepository.AddSession(aSessionID: TGUID;
|
|
aEventSinkId: AnsiString);
|
|
begin
|
|
//no session
|
|
end;
|
|
|
|
procedure TROSimpleWebsocketEventRepository.RemoveSession(aSessionID: TGUID;
|
|
aEventSinkId: AnsiString);
|
|
begin
|
|
//no session
|
|
end;
|
|
|
|
procedure TROSimpleWebsocketEventRepository.RemoveSession(aSessionID: TGUID);
|
|
begin
|
|
//no session
|
|
end;
|
|
|
|
function TROSimpleWebsocketEventRepository.GetEventWriter(
|
|
const IID: TGUID): IROEventWriter;
|
|
var
|
|
lEventWriterClass: TROEventWriterClass;
|
|
begin
|
|
lEventWriterClass := FindEventWriterClass(IID);
|
|
if not assigned(lEventWriterClass) then exit;
|
|
result := lEventWriterClass.Create(fMessage, Self) as IROEventWriter;
|
|
end;
|
|
|
|
function TROSimpleWebsocketEventRepository.GetEventData(SessionID: TGUID;
|
|
var TargetStream: Binary): integer;
|
|
begin
|
|
Result := -1;
|
|
Assert(False);
|
|
end;
|
|
|
|
procedure TROSimpleWebsocketEventRepository.StoreEventData(SourceSessionID: TGUID;
|
|
Data: Binary; const ExcludeSender, ExcludeSessionList: Boolean;
|
|
const SessionList: String; const EventSinkId: AnsiString);
|
|
begin
|
|
StoreEventData(SourceSessionID, Data, ExcludeSender, ExcludeSessionList, SessionList);
|
|
end;
|
|
|
|
procedure TROSimpleWebsocketEventRepository.StoreEventData(SourceSessionID: TGUID;
|
|
Data: Binary; const ExcludeSender, ExcludeSessionList: Boolean;
|
|
const SessionList: String);
|
|
var
|
|
i, iEventNr: Integer;
|
|
LContext: TIdContext;
|
|
l: TList;
|
|
ws: TIdIOHandlerWebsocket;
|
|
begin
|
|
l := ROServer.IndyServer.Contexts.LockList;
|
|
try
|
|
if l.Count <= 0 then Exit;
|
|
|
|
iEventNr := -1 * InterlockedIncrement(FEventCount); //negative = event, positive is normal RO message
|
|
if iEventNr > 0 then
|
|
begin
|
|
InterlockedExchange(FEventCount, 0);
|
|
iEventNr := -1 * InterlockedIncrement(FEventCount); //negative = event, positive is normal RO message
|
|
end;
|
|
Assert(iEventNr < 0);
|
|
Data.Position := Data.Size;
|
|
Data.Write(C_ROWSNR, Length(C_ROWSNR));
|
|
Data.Write(iEventNr, SizeOf(iEventNr));
|
|
Data.Position := 0;
|
|
|
|
//direct write to ALL connections
|
|
for i := 0 to l.Count - 1 do
|
|
begin
|
|
LContext := TIdContext(l.Items[i]);
|
|
ws := (LContext.Connection.IOHandler as TIdIOHandlerWebsocket);
|
|
if not ws.IsWebsocket then Continue;
|
|
ws.Lock;
|
|
try
|
|
ws.Write(Data, wdtBinary);
|
|
finally
|
|
ws.Unlock;
|
|
end;
|
|
end;
|
|
finally
|
|
ROServer.IndyServer.Contexts.UnlockList;
|
|
end;
|
|
end;
|
|
|
|
end.
|
|
|