- safe callbacks, ISocketContext interface is used everywhere internally

- sync emit
- error + cleanup on disconnect
This commit is contained in:
André Mussche 2014-07-03 11:26:41 +02:00
parent b85079a7cb
commit a1b813b767
3 changed files with 326 additions and 116 deletions

View file

@ -504,7 +504,9 @@ var
begin
Lock;
try
if not IOHandler.IsWebsocket then
if IOHandler = nil then
Connect
else if not IOHandler.IsWebsocket then
InternalUpgradeToWebsocket(True{raise}, sError);
finally
UnLock;

View file

@ -11,14 +11,14 @@ uses
type
TIdServerSocketIOHandling = class(TIdBaseSocketIOHandling)
protected
procedure ProcessHeatbeatRequest(const AContext: TSocketIOContext; const aText: string); override;
procedure ProcessHeatbeatRequest(const AContext: ISocketIOContext; const aText: string); override;
public
function SendToAll(const aMessage: string; const aCallback: TSocketIOMsgJSON = nil; const aOnError: TSocketIOError = nil): Integer;
procedure SendTo (const aContext: TIdServerContext; const aMessage: string; const aCallback: TSocketIOMsgJSON = nil; const aOnError: TSocketIOError = nil);
function EmitEventToAll(const aEventName: string; const aData: ISuperObject; const aCallback: TSocketIOMsgJSON = nil; const aOnError: TSocketIOError = nil): Integer;overload;
function EmitEventToAll(const aEventName: string; const aData: string ; const aCallback: TSocketIOMsgJSON = nil; const aOnError: TSocketIOError = nil): Integer;overload;
procedure EmitEventTo (const aContext: TSocketIOContext;
procedure EmitEventTo (const aContext: ISocketIOContext;
const aEventName: string; const aData: ISuperObject; const aCallback: TSocketIOMsgJSON = nil; const aOnError: TSocketIOError = nil);overload;
procedure EmitEventTo (const aContext: TIdServerContext;
const aEventName: string; const aData: ISuperObject; const aCallback: TSocketIOMsgJSON = nil; const aOnError: TSocketIOError = nil);overload;
@ -32,7 +32,7 @@ uses
{ TIdServerSocketIOHandling }
procedure TIdServerSocketIOHandling.EmitEventTo(
const aContext: TSocketIOContext; const aEventName: string;
const aContext: ISocketIOContext; const aEventName: string;
const aData: ISuperObject; const aCallback: TSocketIOMsgJSON; const aOnError: TSocketIOError);
var
jsonarray: string;
@ -61,7 +61,7 @@ procedure TIdServerSocketIOHandling.EmitEventTo(
const aContext: TIdServerContext; const aEventName: string;
const aData: ISuperObject; const aCallback: TSocketIOMsgJSON; const aOnError: TSocketIOError);
var
context: TSocketIOContext;
context: ISocketIOContext;
begin
Lock;
try
@ -76,7 +76,7 @@ function TIdServerSocketIOHandling.EmitEventToAll(const aEventName,
aData: string; const aCallback: TSocketIOMsgJSON;
const aOnError: TSocketIOError): Integer;
var
context: TSocketIOContext;
context: ISocketIOContext;
jsonarray: string;
begin
Result := 0;
@ -127,7 +127,7 @@ begin
end;
procedure TIdServerSocketIOHandling.ProcessHeatbeatRequest(
const AContext: TSocketIOContext; const aText: string);
const AContext: ISocketIOContext; const aText: string);
begin
inherited ProcessHeatbeatRequest(AContext, aText);
end;
@ -135,7 +135,7 @@ end;
procedure TIdServerSocketIOHandling.SendTo(const aContext: TIdServerContext;
const aMessage: string; const aCallback: TSocketIOMsgJSON; const aOnError: TSocketIOError);
var
context: TSocketIOContext;
context: ISocketIOContext;
begin
Lock;
try
@ -159,7 +159,7 @@ end;
function TIdServerSocketIOHandling.SendToAll(const aMessage: string;
const aCallback: TSocketIOMsgJSON; const aOnError: TSocketIOError): Integer;
var
context: TSocketIOContext;
context: ISocketIOContext;
begin
Result := 0;
Lock;

View file

@ -31,8 +31,8 @@ type
ISocketIOContext = interface
['{ACCAC678-054C-4D75-8BAD-5922F55623AB}']
function GetCustomData: TObject;
function GetOwnsCustomData: Boolean;
function GetCustomData: TObject;
function GetOwnsCustomData: Boolean;
procedure SetCustomData(const Value: TObject);
procedure SetOwnsCustomData(const Value: Boolean);
@ -43,6 +43,11 @@ type
function PeerIP: string;
function PeerPort: Integer;
procedure Lock;
procedure UnLock;
function IsDisconnected: Boolean;
procedure EmitEvent(const aEventName: string; const aData: ISuperObject; const aCallback: TSocketIOMsgJSON = nil; const aOnError: TSocketIOError = nil);overload;
procedure EmitEvent(const aEventName: string; const aData: string; const aCallback: TSocketIOMsgJSON = nil; const aOnError: TSocketIOError = nil);overload;
procedure Send(const aData: string; const aCallback: TSocketIOMsgJSON = nil; const aOnError: TSocketIOError = nil);
@ -73,6 +78,7 @@ type
FClient: TIdHTTP;
FEvent: TEvent;
FQueue: TList<string>;
FPendingMessages: TList<Int64>;
procedure QueueData(const aData: string);
procedure ServerContextDestroy(AContext: TIdContext);
public
@ -118,20 +124,35 @@ type
ISocketIOCallback)
protected
FHandling: TIdBaseSocketIOHandling;
FSocket: TSocketIOContext;
FSocket: ISocketIOContext;
FMsgNr: Integer;
{ISocketIOCallback}
procedure SendResponse(const aResponse: string);
function IsResponseSend: Boolean;
public
constructor Create(aHandling: TIdBaseSocketIOHandling; aSocket: TSocketIOContext; aMsgNr: Integer);
constructor Create(aHandling: TIdBaseSocketIOHandling; aSocket: ISocketIOContext; aMsgNr: Integer);
end;
TSocketIOPromise = class
protected
Done, Success: Boolean;
Error: Exception;
Data : ISuperObject;
Event: TEvent;
public
procedure AfterConstruction; override;
destructor Destroy; override;
end;
ESocketIOException = class(Exception);
ESocketIOTimeout = class(ESocketIOException);
ESocketIODisconnect = class(ESocketIOException);
TIdBaseSocketIOHandling = class(TIdServerBaseHandling)
protected
FLock: TCriticalSection;
FConnections: TObjectDictionary<TIdContext,TSocketIOContext>;
FConnectionsGUID: TObjectDictionary<string,TSocketIOContext>;
FConnections: TObjectDictionary<TIdContext,ISocketIOContext>;
FConnectionsGUID: TObjectDictionary<string,ISocketIOContext>;
FOnConnectionList,
FOnDisconnectList: TSocketIONotifyList;
@ -139,7 +160,7 @@ type
FOnSocketIOMsg: TSocketIOMsg;
FOnSocketIOJson: TSocketIOMsgJSON;
procedure ProcessEvent(const AContext: TSocketIOContext; const aText: string; aMsgNr: Integer; aHasCallback: Boolean);
procedure ProcessEvent(const AContext: ISocketIOContext; const aText: string; aMsgNr: Integer; aHasCallback: Boolean);
private
FOnEventError: TSocketIOEventError;
protected
@ -150,21 +171,23 @@ type
FSocketIOMsgNr: Integer;
FSocketIOEventCallback: TDictionary<Integer,TSocketIOCallback>;
FSocketIOEventCallbackRef: TDictionary<Integer,TSocketIOCallbackRef>;
//FSocketIOEventPromises: TDictionary<Integer,TSocketIOPromise>;
FSocketIOErrorRef: TDictionary<Integer,TSocketIOError>;
function WriteConnect(const ASocket: TSocketIOContext): string; overload;
procedure WriteDisConnect(const ASocket: TSocketIOContext);overload;
procedure WritePing(const ASocket: TSocketIOContext);overload;
function WriteConnect(const ASocket: ISocketIOContext): string; overload;
procedure WriteDisConnect(const ASocket: ISocketIOContext);overload;
procedure WritePing(const ASocket: ISocketIOContext);overload;
//
function WriteConnect(const AContext: TIdContext): string; overload;
procedure WriteDisConnect(const AContext: TIdContext);overload;
procedure WritePing(const AContext: TIdContext);overload;
procedure WriteSocketIOMsg(const ASocket: TSocketIOContext; const aRoom, aData: string; aCallback: TSocketIOCallbackRef = nil; const aOnError: TSocketIOError = nil);
procedure WriteSocketIOJSON(const ASocket: TSocketIOContext; const aRoom, aJSON: string; aCallback: TSocketIOCallbackRef = nil; const aOnError: TSocketIOError = nil);
procedure WriteSocketIOEvent(const ASocket: TSocketIOContext; const aRoom, aEventName, aJSONArray: string; aCallback: TSocketIOCallback; const aOnError: TSocketIOError);
procedure WriteSocketIOEventRef(const ASocket: TSocketIOContext; const aRoom, aEventName, aJSONArray: string; aCallback: TSocketIOCallbackRef; const aOnError: TSocketIOError);
procedure WriteSocketIOResult(const ASocket: TSocketIOContext; aRequestMsgNr: Integer; const aRoom, aData: string);
procedure WriteSocketIOMsg(const ASocket: ISocketIOContext; const aRoom, aData: string; aCallback: TSocketIOCallbackRef = nil; const aOnError: TSocketIOError = nil);
procedure WriteSocketIOJSON(const ASocket: ISocketIOContext; const aRoom, aJSON: string; aCallback: TSocketIOCallbackRef = nil; const aOnError: TSocketIOError = nil);
procedure WriteSocketIOEvent(const ASocket: ISocketIOContext; const aRoom, aEventName, aJSONArray: string; aCallback: TSocketIOCallback; const aOnError: TSocketIOError);
procedure WriteSocketIOEventRef(const ASocket: ISocketIOContext; const aRoom, aEventName, aJSONArray: string; aCallback: TSocketIOCallbackRef; const aOnError: TSocketIOError);
function WriteSocketIOEventSync(const ASocket: ISocketIOContext; const aRoom, aEventName, aJSONArray: string; aMaxwait_ms: Integer = INFINITE): ISuperObject;
procedure WriteSocketIOResult(const ASocket: ISocketIOContext; aRequestMsgNr: Integer; const aRoom, aData: string);
procedure ProcessSocketIO_XHR(const aGUID: string; const aStrmRequest, aStrmResponse: TStream);
@ -172,9 +195,9 @@ type
procedure ProcessSocketIORequest(const ASocket: ISocketIOContext; const aData: string);overload;
procedure ProcessSocketIORequest(const AContext: TIdContext; const strmRequest: TMemoryStream);overload;
procedure ProcessHeatbeatRequest(const ASocket: TSocketIOContext; const aText: string);virtual;
procedure ProcessCloseChannel(const ASocket: TSocketIOContext; const aChannel: string);virtual;
function WriteString(const ASocket: TSocketIOContext; const aText: string): string; virtual;
procedure ProcessHeatbeatRequest(const ASocket: ISocketIOContext; const aText: string);virtual;
procedure ProcessCloseChannel(const ASocket: ISocketIOContext; const aChannel: string);virtual;
function WriteString(const ASocket: ISocketIOContext; const aText: string): string; virtual;
public
procedure AfterConstruction; override;
destructor Destroy; override;
@ -186,10 +209,10 @@ type
function GetSocketIOContext(const AContext: TIdContext): ISocketIOContext;
// procedure EmitEventToAll(const aEventName: string; const aData: ISuperObject; const aCallback: TSocketIOMsgJSON = nil);
function NewConnection(const AContext: TIdContext): TSocketIOContext;overload;
function NewConnection(const aGUID, aPeerIP: string): TSocketIOContext;overload;
function NewConnection(const AContext: TIdContext): ISocketIOContext;overload;
function NewConnection(const aGUID, aPeerIP: string): ISocketIOContext;overload;
procedure FreeConnection(const AContext: TIdContext);overload;
procedure FreeConnection(const ASocket: TSocketIOContext);overload;
procedure FreeConnection(const ASocket: ISocketIOContext);overload;
property OnSocketIOMsg : TSocketIOMsg read FOnSocketIOMsg write FOnSocketIOMsg;
property OnSocketIOJson : TSocketIOMsgJSON read FOnSocketIOJson write FOnSocketIOJson;
@ -206,6 +229,7 @@ type
public
procedure Send(const aMessage: string; const aCallback: TSocketIOMsgJSON = nil; const aOnError: TSocketIOError = nil);
procedure Emit(const aEventName: string; const aData: ISuperObject; const aCallback: TSocketIOMsgJSON = nil; const aOnError: TSocketIOError = nil);overload;
function EmitSync(const aEventName: string; const aData: ISuperObject; aMaxwait_ms: Integer = INFINITE): ISuperobject;
//procedure Emit(const aEventName: string; const aData: string; const aCallback: TSocketIOMsgJSON = nil; const aOnError: TSocketIOError = nil);overload;
end;
@ -219,8 +243,8 @@ begin
inherited;
FLock := TCriticalSection.Create;
FConnections := TObjectDictionary<TIdContext,TSocketIOContext>.Create([doOwnsValues]);
FConnectionsGUID := TObjectDictionary<string,TSocketIOContext>.Create([doOwnsValues]);
FConnections := TObjectDictionary<TIdContext,ISocketIOContext>.Create([]);
FConnectionsGUID := TObjectDictionary<string,ISocketIOContext>.Create([]);
FOnConnectionList := TSocketIONotifyList.Create;
FOnDisconnectList := TSocketIONotifyList.Create;
@ -229,11 +253,12 @@ begin
FSocketIOEventCallback := TDictionary<Integer,TSocketIOCallback>.Create;
FSocketIOEventCallbackRef := TDictionary<Integer,TSocketIOCallbackRef>.Create;
FSocketIOErrorRef := TDictionary<Integer,TSocketIOError>.Create;
//FSocketIOEventPromises := TDictionary<Integer,TSocketIOPromise>.Create;
end;
function TIdBaseSocketIOHandling.ConnectionCount: Integer;
var
context: TSocketIOContext;
context: ISocketIOContext;
begin
Lock;
try
@ -263,6 +288,7 @@ begin
FSocketIOEventCallback.Free;
FSocketIOEventCallbackRef.Free;
FSocketIOErrorRef.Free;
//FSocketIOEventPromises.Free;
FOnEventList.Free;
FOnConnectionList.Free;
@ -271,13 +297,13 @@ begin
while FConnections.Count > 0 do
for idcontext in FConnections.Keys do
begin
FConnections.Items[idcontext]._Release;
//FConnections.Items[idcontext]._Release;
FConnections.ExtractPair(idcontext);
end;
while FConnectionsGUID.Count > 0 do
for squid in FConnectionsGUID.Keys do
begin
FConnectionsGUID.Items[squid]._Release;
//FConnectionsGUID.Items[squid]._Release;
FConnectionsGUID.ExtractPair(squid);
end;
FConnections.Free;
@ -291,7 +317,7 @@ end;
procedure TIdBaseSocketIOHandling.EnumerateSockets(
const aEachSocketCallback: TSocketIONotify);
var socket: TSocketIOContext;
var socket: ISocketIOContext;
begin
Assert(Assigned(aEachSocketCallback));
Lock;
@ -314,26 +340,31 @@ begin
end;
procedure TIdBaseSocketIOHandling.FreeConnection(
const ASocket: TSocketIOContext);
const ASocket: ISocketIOContext);
var squid: string;
idcontext: TIdContext;
errorref: TSocketIOError;
i: Integer;
begin
if ASocket = nil then Exit;
Lock;
try
ASocket.Context := nil;
ASocket.FIOHandler := nil;
ASocket.FClient := nil;
ASocket.FHandling := nil;
ASocket.FGUID := '';
ASocket.FPeerIP := '';
with TSocketIOContext(ASocket) do
begin
Context := nil;
FIOHandler := nil;
FClient := nil;
FHandling := nil;
FGUID := '';
FPeerIP := '';
end;
for idcontext in FConnections.Keys do
begin
if FConnections.Items[idcontext] = ASocket then
begin
FConnections.ExtractPair(idcontext);
ASocket._Release;
//ASocket._Release;
end;
end;
for squid in FConnectionsGUID.Keys do
@ -341,7 +372,22 @@ begin
if FConnectionsGUID.Items[squid] = ASocket then
begin
FConnectionsGUID.ExtractPair(squid);
ASocket._Release; //use reference count? otherwise AV when used in TThread.Queue
//ASocket._Release; //use reference count? otherwise AV when used in TThread.Queue
end;
end;
//pending callbacks? then exceute error messages
for i in TSocketIOContext(ASocket).FPendingMessages do
begin
FSocketIOEventCallback.Remove(i);
FSocketIOEventCallbackRef.Remove(i);
if FSocketIOErrorRef.TryGetValue(i, errorref) then
begin
FSocketIOErrorRef.Remove(i);
try
errorref(ASocket, ESocketIODisconnect.ClassName, 'Connection disconnected');
except
end;
end;
end;
finally
@ -351,7 +397,7 @@ end;
function TIdBaseSocketIOHandling.GetSocketIOContext(const AContext: TIdContext): ISocketIOContext;
var
socket: TSocketIOContext;
socket: ISocketIOContext;
begin
Result := nil;
Lock;
@ -365,7 +411,7 @@ end;
procedure TIdBaseSocketIOHandling.FreeConnection(const AContext: TIdContext);
var
socket: TSocketIOContext;
socket: ISocketIOContext;
begin
Lock;
try
@ -384,18 +430,20 @@ begin
end;
function TIdBaseSocketIOHandling.NewConnection(
const AGUID, aPeerIP: string): TSocketIOContext;
const AGUID, aPeerIP: string): ISocketIOContext;
var
socket: TSocketIOContext;
begin
Lock;
try
if not FConnectionsGUID.TryGetValue(AGUID, socket) then
if not FConnectionsGUID.TryGetValue(AGUID, Result) then
begin
socket := TSocketIOContext.Create;
socket._AddRef;
FConnectionsGUID.Add(AGUID, socket);
end;
end
else
socket := TSocketIOContext(Result);
//socket.Context := AContext;
socket.FGUID := AGUID;
if aPeerIP <> '' then
@ -409,18 +457,20 @@ begin
end;
end;
function TIdBaseSocketIOHandling.NewConnection(const AContext: TIdContext): TSocketIOContext;
function TIdBaseSocketIOHandling.NewConnection(const AContext: TIdContext): ISocketIOContext;
var
socket: TSocketIOContext;
begin
Lock;
try
if not FConnections.TryGetValue(AContext, socket) then
if not FConnections.TryGetValue(AContext, Result) then
begin
socket := TSocketIOContext.Create;
socket._AddRef;
FConnections.Add(AContext, socket);
end;
end
else
socket := TSocketIOContext(Result);
socket.Context := AContext;
socket.FHandling := Self;
socket.FConnectSend := False;
@ -432,7 +482,7 @@ begin
end;
procedure TIdBaseSocketIOHandling.OnConnection(const aCallback: TSocketIONotify);
var context: TSocketIOContext;
var context: ISocketIOContext;
begin
FOnConnectionList.Add(aCallback);
@ -465,16 +515,16 @@ begin
end;
procedure TIdBaseSocketIOHandling.ProcessCloseChannel(
const ASocket: TSocketIOContext; const aChannel: string);
const ASocket: ISocketIOContext; const aChannel: string);
begin
if aChannel <> '' then
//todo: close channel
else if (ASocket.FContext <> nil) then
ASocket.FContext.Connection.Disconnect;
else if (TSocketIOContext(ASocket).FContext <> nil) then
TSocketIOContext(ASocket).FContext.Connection.Disconnect;
end;
procedure TIdBaseSocketIOHandling.ProcessEvent(
const AContext: TSocketIOContext; const aText: string; aMsgNr: Integer;
const AContext: ISocketIOContext; const aText: string; aMsgNr: Integer;
aHasCallback: Boolean);
var
json: ISuperObject;
@ -527,14 +577,17 @@ begin
end;
end;
procedure TIdBaseSocketIOHandling.ProcessHeatbeatRequest(const ASocket: TSocketIOContext; const aText: string);
procedure TIdBaseSocketIOHandling.ProcessHeatbeatRequest(const ASocket: ISocketIOContext; const aText: string);
begin
if ASocket.PingSend then
ASocket.PingSend := False //reset, client responded with 2:: heartbeat too
else
with TSocketIOContext(ASocket) do
begin
ASocket.PingSend := True; //stop infinite ping response loops
WriteString(ASocket, aText); //write same connect back, e.g. 2::
if PingSend then
PingSend := False //reset, client responded with 2:: heartbeat too
else
begin
PingSend := True; //stop infinite ping response loops
WriteString(ASocket, aText); //write same connect back, e.g. 2::
end;
end;
end;
@ -562,7 +615,7 @@ end;
procedure TIdBaseSocketIOHandling.ProcessSocketIORequest(const AContext: TIdContext;
const strmRequest: TMemoryStream);
var
socket: TSocketIOContext;
socket: ISocketIOContext;
begin
if not FConnections.TryGetValue(AContext, socket) then
begin
@ -574,7 +627,7 @@ end;
procedure TIdBaseSocketIOHandling.ProcessSocketIO_XHR(const aGUID: string; // const AContext: TIdContext;
const aStrmRequest, aStrmResponse: TStream);
var
socket: TSocketIOContext;
socket: ISocketIOContext;
sdata: string;
i, ilength: Integer;
bytes, singlemsg: TBytes;
@ -584,7 +637,7 @@ begin
then
socket := NewConnection(aGUID, '');
if not socket.FConnectSend then
if not TSocketIOContext(socket).FConnectSend then
WriteConnect(socket);
if (aStrmRequest <> nil) and
@ -633,12 +686,12 @@ begin
if aStrmResponse = nil then Exit;
//wait till some response data to be send (long polling)
sdata := socket.WaitForQueue(5 * 1000);
sdata := TSocketIOContext(socket).WaitForQueue(5 * 1000);
if sdata = '' then
begin
//no data? then send ping
WritePing(socket);
sdata := socket.WaitForQueue(0);
sdata := TSocketIOContext(socket).WaitForQueue(0);
end;
//send response back
if sdata <> '' then
@ -835,6 +888,7 @@ begin
imsg := StrToIntDef(smsg, 0);
sData := Copy(sdata, Pos('+', sData)+1, Length(sData));
TSocketIOContext(ASocket).FPendingMessages.Remove(imsg);
if FSocketIOErrorRef.TryGetValue(imsg, errorref) then
begin
FSocketIOErrorRef.Remove(imsg);
@ -849,6 +903,9 @@ begin
errorref(ASocket, error.S['Type'], error.S['Message'])
else
errorref(ASocket, 'Unknown', sdata);
FSocketIOEventCallback.Remove(imsg);
FSocketIOEventCallbackRef.Remove(imsg);
Exit;
end;
end;
@ -881,7 +938,7 @@ begin
end;
function TIdBaseSocketIOHandling.WriteConnect(
const ASocket: TSocketIOContext): string;
const ASocket: ISocketIOContext): string;
var
notify: TSocketIONotify;
begin
@ -894,9 +951,9 @@ begin
FConnections.Add(nil, ASocket); //clients do not have a TIdContext?
end;
if not ASocket.ConnectSend then
if not TSocketIOContext(ASocket).ConnectSend then
begin
ASocket.ConnectSend := True;
TSocketIOContext(ASocket).ConnectSend := True;
Result := WriteString(ASocket, '1::');
end;
finally
@ -908,7 +965,7 @@ begin
end;
procedure TIdBaseSocketIOHandling.WriteDisConnect(
const ASocket: TSocketIOContext);
const ASocket: ISocketIOContext);
var
notify: TSocketIONotify;
begin
@ -930,13 +987,13 @@ begin
end;
procedure TIdBaseSocketIOHandling.WritePing(
const ASocket: TSocketIOContext);
const ASocket: ISocketIOContext);
begin
ASocket.PingSend := True;
TSocketIOContext(ASocket).PingSend := True;
WriteString(ASocket, '2::') //heartbeat: https://github.com/LearnBoost/socket.io-spec
end;
procedure TIdBaseSocketIOHandling.WriteSocketIOEvent(const ASocket: TSocketIOContext; const aRoom, aEventName,
procedure TIdBaseSocketIOHandling.WriteSocketIOEvent(const ASocket: ISocketIOContext; const aRoom, aEventName,
aJSONArray: string; aCallback: TSocketIOCallback; const aOnError: TSocketIOError);
var
sresult: string;
@ -950,11 +1007,12 @@ begin
[inr, aRoom, aEventName, aJSONArray])
else
begin
if FSocketIOEventCallback = nil then
FSocketIOEventCallback := TDictionary<Integer,TSocketIOCallback>.Create;
//if FSocketIOEventCallback = nil then
// FSocketIOEventCallback := TDictionary<Integer,TSocketIOCallback>.Create;
sresult := Format('5:%d+:%s:{"name":"%s", "args":%s}',
[inr, aRoom, aEventName, aJSONArray]);
FSocketIOEventCallback.Add(inr, aCallback);
TSocketIOContext(ASocket).FPendingMessages.Add(inr);
if Assigned(aOnError) then
FSocketIOErrorRef.Add(inr, aOnError);
@ -962,7 +1020,7 @@ begin
WriteString(ASocket, sresult);
end;
procedure TIdBaseSocketIOHandling.WriteSocketIOEventRef(const ASocket: TSocketIOContext;
procedure TIdBaseSocketIOHandling.WriteSocketIOEventRef(const ASocket: ISocketIOContext;
const aRoom, aEventName, aJSONArray: string; aCallback: TSocketIOCallbackRef; const aOnError: TSocketIOError);
var
sresult: string;
@ -976,11 +1034,12 @@ begin
[inr, aRoom, aEventName, aJSONArray])
else
begin
if FSocketIOEventCallbackRef = nil then
FSocketIOEventCallbackRef := TDictionary<Integer,TSocketIOCallbackRef>.Create;
//if FSocketIOEventCallbackRef = nil then
// FSocketIOEventCallbackRef := TDictionary<Integer,TSocketIOCallbackRef>.Create;
sresult := Format('5:%d+:%s:{"name":"%s", "args":%s}',
[inr, aRoom, aEventName, aJSONArray]);
FSocketIOEventCallbackRef.Add(inr, aCallback);
TSocketIOContext(ASocket).FPendingMessages.Add(inr);
if Assigned(aOnError) then
FSocketIOErrorRef.Add(inr, aOnError);
@ -988,7 +1047,92 @@ begin
WriteString(ASocket, sresult);
end;
procedure TIdBaseSocketIOHandling.WriteSocketIOJSON(const ASocket: TSocketIOContext;
function TIdBaseSocketIOHandling.WriteSocketIOEventSync(const ASocket: ISocketIOContext; const aRoom, aEventName,
aJSONArray: string; aMaxwait_ms: Integer = INFINITE): ISuperObject;
var
sresult: string;
inr: Integer;
promise: TSocketIOPromise;
begin
Result := nil;
if (ASocket = nil) or (ASocket.IsDisconnected) then
raise ESocketIOException.CreateFmt('Socket is not connected, cannot send "%s" request', [aEventName]);
//see TROIndyHTTPWebsocketServer.ProcessSocketIORequest too
//5:1+:/chat:{"name":"GetLocations","args":[""]}
inr := InterlockedIncrement(FSocketIOMsgNr);
// if FSocketIOEventPromises = nil then
// FSocketIOEventPromises := TDictionary<Integer,TSocketIOPromise>.Create;
promise := TSocketIOPromise.Create;
// FSocketIOEventPromises.Add(inr, promise);
sresult := Format('5:%d+:%s:{"name":"%s", "args":%s}',
[inr, aRoom, aEventName, aJSONArray]);
Lock;
try
FSocketIOEventCallbackRef.Add(inr,
procedure(const aData: string)
begin
promise.Success := True;
promise.Error := nil;
promise.Data := SO(aData);
promise.Done := True;
promise.Event.SetEvent;
end);
FSocketIOErrorRef.Add(inr,
procedure(const ASocket: ISocketIOContext; const aErrorClass, aErrorMessage: string)
begin
promise.Success := False;
promise.Error := ESocketIOException.Create(aErrorClass + ': ' + aErrorMessage);
promise.Data := nil;
promise.Done := True;
promise.Event.SetEvent;
end);
TSocketIOContext(ASocket).FPendingMessages.Add(inr);
finally
Unlock;
end;
try
try
if ASocket.IsDisconnected then
raise ESocketIOException.CreateFmt('Socket is disconnected, cannot send "%s" request', [aEventName])
else
WriteString(ASocket, sresult);
//wait for callback
if promise.Event.WaitFor(aMaxwait_ms) = wrSignaled then
Assert(promise.Done)
else
//timeout
raise ESocketIOTimeout.CreateFmt('No response received for "%s" request', [aEventName]);
except
Lock;
try
FSocketIOEventCallbackRef.Remove(inr);
FSocketIOErrorRef.Remove(inr);
TSocketIOContext(ASocket).FPendingMessages.Remove(inr);
finally
UnLock;
end;
raise;
end;
Result := promise.Data;
if promise.Error <> nil then
begin
Assert(not promise.Success);
raise promise.Error;
end
else
Assert(promise.Success);
finally
promise.Free;
end;
end;
procedure TIdBaseSocketIOHandling.WriteSocketIOJSON(const ASocket: ISocketIOContext;
const aRoom, aJSON: string; aCallback: TSocketIOCallbackRef = nil; const aOnError: TSocketIOError = nil);
var
sresult: string;
@ -1002,11 +1146,12 @@ begin
sresult := Format('4:%d:%s:%s', [inr, aRoom, aJSON])
else
begin
if FSocketIOEventCallbackRef = nil then
FSocketIOEventCallbackRef := TDictionary<Integer,TSocketIOCallbackRef>.Create;
//if FSocketIOEventCallbackRef = nil then
// FSocketIOEventCallbackRef := TDictionary<Integer,TSocketIOCallbackRef>.Create;
sresult := Format('4:%d+:%s:%s',
[inr, aRoom, aJSON]);
FSocketIOEventCallbackRef.Add(inr, aCallback);
TSocketIOContext(ASocket).FPendingMessages.Add(inr);
if Assigned(aOnError) then
FSocketIOErrorRef.Add(inr, aOnError);
@ -1015,7 +1160,7 @@ begin
WriteString(ASocket, sresult);
end;
procedure TIdBaseSocketIOHandling.WriteSocketIOMsg(const ASocket: TSocketIOContext;
procedure TIdBaseSocketIOHandling.WriteSocketIOMsg(const ASocket: ISocketIOContext;
const aRoom, aData: string; aCallback: TSocketIOCallbackRef = nil; const aOnError: TSocketIOError = nil);
var
sresult: string;
@ -1029,11 +1174,12 @@ begin
sresult := Format('3:%d:%s:%s', [inr, aRoom, aData])
else
begin
if FSocketIOEventCallbackRef = nil then
FSocketIOEventCallbackRef := TDictionary<Integer,TSocketIOCallbackRef>.Create;
//if FSocketIOEventCallbackRef = nil then
// FSocketIOEventCallbackRef := TDictionary<Integer,TSocketIOCallbackRef>.Create;
sresult := Format('3:%d+:%s:%s',
[inr, aRoom, aData]);
FSocketIOEventCallbackRef.Add(inr, aCallback);
TSocketIOContext(ASocket).FPendingMessages.Add(inr);
if Assigned(aOnError) then
FSocketIOErrorRef.Add(inr, aOnError);
@ -1042,7 +1188,7 @@ begin
WriteString(ASocket, sresult);
end;
procedure TIdBaseSocketIOHandling.WriteSocketIOResult(const ASocket: TSocketIOContext;
procedure TIdBaseSocketIOHandling.WriteSocketIOResult(const ASocket: ISocketIOContext;
aRequestMsgNr: Integer; const aRoom, aData: string);
var
sresult: string;
@ -1052,33 +1198,36 @@ begin
WriteString(ASocket, sresult);
end;
function TIdBaseSocketIOHandling.WriteString(const ASocket: TSocketIOContext;
function TIdBaseSocketIOHandling.WriteString(const ASocket: ISocketIOContext;
const aText: string): string;
begin
if ASocket = nil then Exit;
ASocket.Lock;
try
if ASocket.FIOHandler = nil then
with TSocketIOContext(ASocket) do
begin
if ASocket.FContext <> nil then
ASocket.FIOHandler := (ASocket.FContext as TIdServerWSContext).IOHandler;
end;
if FIOHandler = nil then
begin
if FContext <> nil then
FIOHandler := (FContext as TIdServerWSContext).IOHandler;
end;
if (ASocket.FIOHandler <> nil) then
begin
//Assert(ASocket.FIOHandler.IsWebsocket);
// if DebugHook <> 0 then
// Windows.OutputDebugString(PChar('Send: ' + aText));
ASocket.FIOHandler.Write(aText);
end
else if ASocket.GUID <> '' then
begin
ASocket.QueueData(aText);
Result := aText; //for xhr-polling the data must be send using responseinfo instead of direct write!
end
else //disconnected
Assert(False, 'disconnected');
if (FIOHandler <> nil) then
begin
//Assert(ASocket.FIOHandler.IsWebsocket);
// if DebugHook <> 0 then
// Windows.OutputDebugString(PChar('Send: ' + aText));
FIOHandler.Write(aText);
end
else if GUID <> '' then
begin
QueueData(aText);
Result := aText; //for xhr-polling the data must be send using responseinfo instead of direct write!
end;
//else //disconnected
// Assert(False, 'disconnected');
end;
finally
ASocket.UnLock;
end;
@ -1087,7 +1236,7 @@ end;
{ TSocketIOCallbackObj }
constructor TSocketIOCallbackObj.Create(aHandling: TIdBaseSocketIOHandling;
aSocket: TSocketIOContext; aMsgNr: Integer);
aSocket: ISocketIOContext; aMsgNr: Integer);
begin
FHandling := aHandling;
FSocket := aSocket;
@ -1113,6 +1262,7 @@ begin
inherited;
FLock := TCriticalSection.Create;
FQueue := TList<string>.Create;
FPendingMessages := TList<Int64>.Create;
end;
constructor TSocketIOContext.Create(aClient: TIdHTTP);
@ -1134,6 +1284,7 @@ begin
FLock.Free;
if OwnsCustomData then
FCustomData.Free;
FPendingMessages.Free;
inherited;
end;
@ -1336,7 +1487,7 @@ end;
function TIdBaseSocketIOHandling.WriteConnect(const AContext: TIdContext): string;
var
socket: TSocketIOContext;
socket: ISocketIOContext;
begin
//if not FConnections.TryGetValue(AContext, socket) then
socket := NewConnection(AContext);
@ -1345,7 +1496,7 @@ end;
procedure TIdBaseSocketIOHandling.WriteDisConnect(const AContext: TIdContext);
var
socket: TSocketIOContext;
socket: ISocketIOContext;
begin
if not FConnections.TryGetValue(AContext, socket) then
socket := NewConnection(AContext);
@ -1354,7 +1505,7 @@ end;
procedure TIdBaseSocketIOHandling.WritePing(const AContext: TIdContext);
var
socket: TSocketIOContext;
socket: ISocketIOContext;
begin
if not FConnections.TryGetValue(AContext, socket) then
socket := NewConnection(AContext);
@ -1366,7 +1517,7 @@ end;
procedure TIdSocketIOHandling.Emit(const aEventName: string;
const aData: ISuperObject; const aCallback: TSocketIOMsgJSON; const aOnError: TSocketIOError);
var
context: TSocketIOContext;
context: ISocketIOContext;
jsonarray: string;
isendcount: Integer;
begin
@ -1384,7 +1535,7 @@ begin
try
isendcount := 0;
//note: is single connection?
//note: client has single connection?
for context in FConnections.Values do
begin
if context.IsDisconnected then Continue;
@ -1421,10 +1572,53 @@ begin
end;
end;
function TIdSocketIOHandling.EmitSync(const aEventName: string; const aData: ISuperObject; aMaxwait_ms: Integer = INFINITE): ISuperobject;
var
firstcontext, context: ISocketIOContext;
jsonarray: string;
isendcount: Integer;
begin
if aData <> nil then
begin
if aData.IsType(stArray) then
jsonarray := aData.AsString
else if aData.IsType(stString) then
jsonarray := '["' + aData.AsString + '"]'
else
jsonarray := '[' + aData.AsString + ']';
end;
Lock;
try
isendcount := 0;
//note: client has single connection?
for context in FConnections.Values do
begin
if context.IsDisconnected then Continue;
firstcontext := context;
Inc(isendcount);
end;
for context in FConnectionsGUID.Values do
begin
if context.IsDisconnected then Continue;
firstcontext := context;
Inc(isendcount);
end;
//todo: use multiple promises?
if isendcount > 1 then
raise EIdSocketIoUnhandledMessage.Create('Cannot emit synchronized to more than one connection!');
finally
UnLock;
end;
Result := WriteSocketIOEventSync(firstcontext, ''{no room}, aEventName, jsonarray, aMaxwait_ms);
end;
procedure TIdSocketIOHandling.Send(const aMessage: string;
const aCallback: TSocketIOMsgJSON; const aOnError: TSocketIOError);
var
context: TSocketIOContext;
context: ISocketIOContext;
isendcount: Integer;
begin
Lock;
@ -1468,4 +1662,18 @@ begin
end;
end;
{ TSocketIOPromise }
procedure TSocketIOPromise.AfterConstruction;
begin
inherited;
Event := TEvent.Create();
end;
destructor TSocketIOPromise.Destroy;
begin
Event.Free;
inherited;
end;
end.