Moved events handling to different method. Created helper methods for connecting and disconnecting.

Shard now dont send if connection is resumed just to send it back.
[ci skip]
This commit is contained in:
Szymon Uglis 2020-06-15 22:13:59 +02:00
parent 3202ea46c5
commit f7dc1a9b01

View file

@ -82,6 +82,7 @@ class Shard implements Disposable {
int _sequence = 0;
late Timer _heartbeatTimer;
bool connected = false;
bool resume = false;
late SendPort sendPort;
@ -202,15 +203,36 @@ class Shard implements Disposable {
break;
case 4007:
case 4009:
Future.delayed(const Duration(seconds: 2), () => this.sendPort.send({ "cmd" : "RECONNECT" }));
_reconnect();
break;
default:
Future.delayed(const Duration(seconds: 3), () => this.sendPort.send({ "cmd" : "CONNECT" }));
_connect();
break;
}
}
// Connects to gateway
void _connect() {
this.resume = false;
Future.delayed(const Duration(seconds: 2), () => this.sendPort.send({ "cmd" : "CONNECT"}))
.then((_) => manager._logger.info("Connecting to gateway!"));
}
// Reconnects to gateway
void _reconnect() {
this.resume = true;
Future.delayed(const Duration(seconds: 1), () => this.sendPort.send({ "cmd" : "CONNECT"}))
.then((value) => manager._logger.info("Resuming connection to gateway!"));
}
Future<void> _handle(dynamic data) async {
if(data["cmd"] == "CONNECT_ACK") {
manager._logger.info("Shard $id connected to gateway!");
return;
}
if(data["cmd"] == "ERROR" || data["cmd"] == "DISCONNECTED") {
_handleError(data);
}
@ -220,7 +242,6 @@ class Shard implements Disposable {
}
final msg = data["jsonData"] as Map<String, dynamic>;
final resume = data["resume"] as bool;
if (msg["op"] == OPCodes.dispatch && manager._ws._client._options.ignoredEvents.contains(msg["t"] as String)) {
return;
@ -229,8 +250,12 @@ class Shard implements Disposable {
if (msg["s"] != null) {
this._sequence = msg["s"] as int;
}
switch (msg["op"] as int) {
await _dispatch(msg);
}
Future<void> _dispatch(Map<String, dynamic> rawPayload) async {
switch (rawPayload["op"] as int) {
case OPCodes.heartbeatAck:
this._heartbeatAckReceived = true;
this._gatewaylatency = DateTime.now().difference(this._lastHeartbeatSent);
@ -265,31 +290,31 @@ class Shard implements Disposable {
}
this._heartbeatTimer = Timer.periodic(
Duration(milliseconds: msg["d"]["heartbeat_interval"] as int), (Timer t) => this._heartbeat());
Duration(milliseconds: rawPayload["d"]["heartbeat_interval"] as int), (Timer t) => this._heartbeat());
break;
case OPCodes.invalidSession:
manager._logger.severe("Invalid session on shard $id. ${(msg["d"] as bool) ? "Resuming..." : "Reconnecting..."}");
manager._logger.severe("Invalid session on shard $id. ${(rawPayload["d"] as bool) ? "Resuming..." : "Reconnecting..."}");
_heartbeatTimer.cancel();
manager._ws._client._events.onDisconnect.add(DisconnectEvent._new(this, DisconnectEventReason.invalidSession));
if (msg["d"] as bool) {
Future.delayed(const Duration(seconds: 1), () => this.sendPort.send({ "cmd" : "RECONNECT"}));
if (rawPayload["d"] as bool) {
_reconnect();
} else {
Future.delayed(const Duration(seconds: 2), () => this.sendPort.send({ "cmd" : "CONNECT"}));
_connect();
}
break;
case OPCodes.dispatch:
final j = msg["t"] as String;
final j = rawPayload["t"] as String;
switch (j) {
case "READY":
this._sessionId = msg["d"]["session_id"] as String;
manager._ws._client.self = ClientUser._new(msg["d"]["user"] as Map<String, dynamic>, manager._ws._client);
this._sessionId = rawPayload["d"]["session_id"] as String;
manager._ws._client.self = ClientUser._new(rawPayload["d"]["user"] as Map<String, dynamic>, manager._ws._client);
this.connected = true;
manager._logger.info("Shard ${this.id} connected");
manager._logger.info("Shard ${this.id} ready!");
if (!resume) {
await manager._ws.propagateReady();
@ -298,142 +323,142 @@ class Shard implements Disposable {
break;
case "GUILD_MEMBERS_CHUNK":
manager._onMemberChunk.add(MemberChunkEvent._new(msg, manager._ws._client));
manager._onMemberChunk.add(MemberChunkEvent._new(rawPayload, manager._ws._client));
break;
case "MESSAGE_REACTION_REMOVE_ALL":
manager._ws._client._events.onMessageReactionsRemoved.add(MessageReactionsRemovedEvent._new(msg, manager._ws._client));
manager._ws._client._events.onMessageReactionsRemoved.add(MessageReactionsRemovedEvent._new(rawPayload, manager._ws._client));
break;
case "MESSAGE_REACTION_ADD":
MessageReactionAddedEvent._new(msg, manager._ws._client);
MessageReactionAddedEvent._new(rawPayload, manager._ws._client);
break;
case "MESSAGE_REACTION_REMOVE":
MessageReactionRemovedEvent._new(msg, manager._ws._client);
MessageReactionRemovedEvent._new(rawPayload, manager._ws._client);
break;
case "MESSAGE_DELETE_BULK":
manager._ws._client._events.onMessageDeleteBulk.add(MessageDeleteBulkEvent._new(msg, manager._ws._client));
manager._ws._client._events.onMessageDeleteBulk.add(MessageDeleteBulkEvent._new(rawPayload, manager._ws._client));
break;
case "CHANNEL_PINS_UPDATE":
manager._ws._client._events.onChannelPinsUpdate.add(ChannelPinsUpdateEvent._new(msg, manager._ws._client));
manager._ws._client._events.onChannelPinsUpdate.add(ChannelPinsUpdateEvent._new(rawPayload, manager._ws._client));
break;
case "VOICE_STATE_UPDATE":
manager._ws._client._events.onVoiceStateUpdate.add(VoiceStateUpdateEvent._new(msg, manager._ws._client));
manager._ws._client._events.onVoiceStateUpdate.add(VoiceStateUpdateEvent._new(rawPayload, manager._ws._client));
break;
case "VOICE_SERVER_UPDATE":
manager._ws._client._events.onVoiceServerUpdate.add(VoiceServerUpdateEvent._new(msg, manager._ws._client));
manager._ws._client._events.onVoiceServerUpdate.add(VoiceServerUpdateEvent._new(rawPayload, manager._ws._client));
break;
case "GUILD_EMOJIS_UPDATE":
manager._ws._client._events.onGuildEmojisUpdate.add(GuildEmojisUpdateEvent._new(msg, manager._ws._client));
manager._ws._client._events.onGuildEmojisUpdate.add(GuildEmojisUpdateEvent._new(rawPayload, manager._ws._client));
break;
case "MESSAGE_CREATE":
manager._ws._client._events.onMessageReceived.add(MessageReceivedEvent._new(msg, manager._ws._client));
manager._ws._client._events.onMessageReceived.add(MessageReceivedEvent._new(rawPayload, manager._ws._client));
break;
case "MESSAGE_DELETE":
manager._ws._client._events.onMessageDelete.add(MessageDeleteEvent._new(msg, manager._ws._client));
manager._ws._client._events.onMessageDelete.add(MessageDeleteEvent._new(rawPayload, manager._ws._client));
break;
case "MESSAGE_UPDATE":
manager._ws._client._events.onMessageUpdate.add(MessageUpdateEvent._new(msg, manager._ws._client));
manager._ws._client._events.onMessageUpdate.add(MessageUpdateEvent._new(rawPayload, manager._ws._client));
break;
case "GUILD_CREATE":
final event = GuildCreateEvent._new(msg, manager._ws._client);
final event = GuildCreateEvent._new(rawPayload, manager._ws._client);
this.guilds.add(event.guild.id);
manager._ws._client._events.onGuildCreate.add(event);
break;
case "GUILD_UPDATE":
manager._ws._client._events.onGuildUpdate.add(GuildUpdateEvent._new(msg, manager._ws._client));
manager._ws._client._events.onGuildUpdate.add(GuildUpdateEvent._new(rawPayload, manager._ws._client));
break;
case "GUILD_DELETE":
manager._ws._client._events.onGuildDelete.add(GuildDeleteEvent._new(msg, manager._ws._client));
manager._ws._client._events.onGuildDelete.add(GuildDeleteEvent._new(rawPayload, manager._ws._client));
break;
case "GUILD_BAN_ADD":
manager._ws._client._events.onGuildBanAdd.add(GuildBanAddEvent._new(msg, manager._ws._client));
manager._ws._client._events.onGuildBanAdd.add(GuildBanAddEvent._new(rawPayload, manager._ws._client));
break;
case "GUILD_BAN_REMOVE":
manager._ws._client._events.onGuildBanRemove.add(GuildBanRemoveEvent._new(msg, manager._ws._client));
manager._ws._client._events.onGuildBanRemove.add(GuildBanRemoveEvent._new(rawPayload, manager._ws._client));
break;
case "GUILD_MEMBER_ADD":
manager._ws._client._events.onGuildMemberAdd.add(GuildMemberAddEvent._new(msg, manager._ws._client));
manager._ws._client._events.onGuildMemberAdd.add(GuildMemberAddEvent._new(rawPayload, manager._ws._client));
break;
case "GUILD_MEMBER_REMOVE":
manager._ws._client._events.onGuildMemberRemove.add(GuildMemberRemoveEvent._new(msg, manager._ws._client));
manager._ws._client._events.onGuildMemberRemove.add(GuildMemberRemoveEvent._new(rawPayload, manager._ws._client));
break;
case "GUILD_MEMBER_UPDATE":
manager._ws._client._events.onGuildMemberUpdate.add(GuildMemberUpdateEvent._new(msg, manager._ws._client));
manager._ws._client._events.onGuildMemberUpdate.add(GuildMemberUpdateEvent._new(rawPayload, manager._ws._client));
break;
case "CHANNEL_CREATE":
manager._ws._client._events.onChannelCreate.add(ChannelCreateEvent._new(msg, manager._ws._client));
manager._ws._client._events.onChannelCreate.add(ChannelCreateEvent._new(rawPayload, manager._ws._client));
break;
case "CHANNEL_UPDATE":
manager._ws._client._events.onChannelUpdate.add(ChannelUpdateEvent._new(msg, manager._ws._client));
manager._ws._client._events.onChannelUpdate.add(ChannelUpdateEvent._new(rawPayload, manager._ws._client));
break;
case "CHANNEL_DELETE":
manager._ws._client._events.onChannelDelete.add(ChannelDeleteEvent._new(msg, manager._ws._client));
manager._ws._client._events.onChannelDelete.add(ChannelDeleteEvent._new(rawPayload, manager._ws._client));
break;
case "TYPING_START":
manager._ws._client._events.onTyping.add(TypingEvent._new(msg, manager._ws._client));
manager._ws._client._events.onTyping.add(TypingEvent._new(rawPayload, manager._ws._client));
break;
case "PRESENCE_UPDATE":
manager._ws._client._events.onPresenceUpdate.add(PresenceUpdateEvent._new(msg, manager._ws._client));
manager._ws._client._events.onPresenceUpdate.add(PresenceUpdateEvent._new(rawPayload, manager._ws._client));
break;
case "GUILD_ROLE_CREATE":
manager._ws._client._events.onRoleCreate.add(RoleCreateEvent._new(msg, manager._ws._client));
manager._ws._client._events.onRoleCreate.add(RoleCreateEvent._new(rawPayload, manager._ws._client));
break;
case "GUILD_ROLE_UPDATE":
manager._ws._client._events.onRoleUpdate.add(RoleUpdateEvent._new(msg, manager._ws._client));
manager._ws._client._events.onRoleUpdate.add(RoleUpdateEvent._new(rawPayload, manager._ws._client));
break;
case "GUILD_ROLE_DELETE":
manager._ws._client._events.onRoleDelete.add(RoleDeleteEvent._new(msg, manager._ws._client));
manager._ws._client._events.onRoleDelete.add(RoleDeleteEvent._new(rawPayload, manager._ws._client));
break;
case "USER_UPDATE":
manager._ws._client._events.onUserUpdate.add(UserUpdateEvent._new(msg, manager._ws._client));
manager._ws._client._events.onUserUpdate.add(UserUpdateEvent._new(rawPayload, manager._ws._client));
break;
case "INVITE_CREATE":
manager._ws._client._events.onInviteCreated.add(InviteCreatedEvent._new(msg, manager._ws._client));
manager._ws._client._events.onInviteCreated.add(InviteCreatedEvent._new(rawPayload, manager._ws._client));
break;
case "INVITE_DELETE":
manager._ws._client._events.onInviteDelete.add(InviteDeletedEvent._new(msg, manager._ws._client));
manager._ws._client._events.onInviteDelete.add(InviteDeletedEvent._new(rawPayload, manager._ws._client));
break;
case "MESSAGE_REACTION_REMOVE_EMOJI":
manager._ws._client._events.onMessageReactionRemoveEmoji
.add(MessageReactionRemoveEmojiEvent._new(msg, manager._ws._client));
.add(MessageReactionRemoveEmojiEvent._new(rawPayload, manager._ws._client));
break;
default:
print("UNKNOWN OPCODE: ${jsonEncode(msg)}");
print("UNKNOWN OPCODE: ${jsonEncode(rawPayload)}");
}
break;
}
}
}
@override
Future<void> dispose() async {
@ -456,14 +481,15 @@ Map<String, dynamic> _decodeBytes(dynamic bytes) {
}
/*
Protocol used to comunicate with shard isolate.
Protocol used to communicate with shard isolate.
First message delivered to shardHandler will be init message with gateway uri
* INIT - sent along with map of initial data needed for connection
* SEND - sent along with data to send via websocket
* OK - last operation was completed with success
* DATA - sent along with data received from websocket
* CONNECTED - sent when ws connection is established. additional data can contain if reconnected.
* DISCONNECTED - sent when shard disconnects
* ERROR - sent when error occurs
* CONNECT - sent when ws connection is established. additional data can contain if reconnected.
* SEND - sent along with data to send via websocket
*/
Future<void> _shardHandler(SendPort shardPort) async {
/// Port init
@ -483,11 +509,12 @@ Future<void> _shardHandler(SendPort shardPort) async {
transport_vm.configureWTransportForVM();
// Attempts to connect to ws
Future<void> _connect([bool resume = false]) async {
Future<void> _connect() async {
await transport.WebSocket.connect(gatewayUri).then((ws) {
shardPort.send({ "cmd" : "CONNECT_ACK" });
_socket = ws;
_socketSubscription = _socket!.listen((data) {
shardPort.send({ "cmd" : "DATA", "jsonData" : _decodeBytes(data), "resume" : resume});
shardPort.send({ "cmd" : "DATA", "jsonData" : _decodeBytes(data) });
}, onDone: () async {
shardPort.send({ "cmd" : "DISCONNECTED", "errorCode" : _socket!.closeCode, "errorReason" : _socket!.closeReason });
}, cancelOnError: true, onError: (err) => shardPort.send({ "cmd" : "ERROR", "error": err.toString(), "errorCode" : _socket!.closeCode, "errorReason" : _socket!.closeReason }));
@ -508,14 +535,6 @@ Future<void> _shardHandler(SendPort shardPort) async {
continue;
}
if(cmd == "RECONNECT") {
await _socketSubscription?.cancel();
await _socket?.close(1000);
await _connect(true);
continue;
}
if(cmd == "CONNECT") {
await _socketSubscription?.cancel();
await _socket?.close(1000);