diff --git a/dll/net.proto b/dll/net.proto index 5b0f627e..aaa2bef3 100644 --- a/dll/net.proto +++ b/dll/net.proto @@ -125,6 +125,20 @@ message Networking_Sockets { bytes data = 5; } +message Networking_Messages { + enum Types { + CONNECTION_NEW = 0; + CONNECTION_ACCEPT = 1; + CONNECTION_END = 2; + DATA = 3; + } + + Types type = 1; + uint32 channel = 2; + uint32 id_from = 3; + bytes data = 5; +} + message Gameserver { uint64 id = 1; bytes game_description = 2; @@ -212,6 +226,7 @@ message Common_Message { Network_Old network_old = 12; Networking_Sockets networking_sockets = 13; Steam_Messages steam_messages = 14; + Networking_Messages networking_messages = 15; } uint32 source_ip = 128; diff --git a/dll/network.cpp b/dll/network.cpp index 2de7de36..e7c5b2d0 100644 --- a/dll/network.cpp +++ b/dll/network.cpp @@ -554,6 +554,11 @@ void Networking::do_callbacks_message(Common_Message *msg) PRINT_DEBUG("has_steam_messages\n"); run_callbacks(CALLBACK_ID_STEAM_MESSAGES, msg); } + + if (msg->has_networking_messages()) { + PRINT_DEBUG("has_networking_messages\n"); + run_callbacks(CALLBACK_ID_NETWORKING_MESSAGES, msg); + } } bool Networking::handle_tcp(Common_Message *msg, struct TCP_Socket &socket) diff --git a/dll/network.h b/dll/network.h index 015115cd..5bb3e621 100644 --- a/dll/network.h +++ b/dll/network.h @@ -56,6 +56,7 @@ enum Callback_Ids { CALLBACK_ID_FRIEND_MESSAGES, CALLBACK_ID_NETWORKING_SOCKETS, CALLBACK_ID_STEAM_MESSAGES, + CALLBACK_ID_NETWORKING_MESSAGES, CALLBACK_IDS_MAX }; diff --git a/dll/steam_networking_messages.h b/dll/steam_networking_messages.h index deadca5a..7d5471bd 100644 --- a/dll/steam_networking_messages.h +++ b/dll/steam_networking_messages.h @@ -17,6 +17,22 @@ #include "base.h" +#define NETWORKING_MESSAGES_TIMEOUT 30.0 + +struct Steam_Message_Connection { + SteamNetworkingIdentity remote_identity; + std::map> data; + + std::list channels; + bool accepted = false; + bool dead = false; + + unsigned id; + unsigned remote_id = 0; + + std::chrono::high_resolution_clock::time_point created = std::chrono::high_resolution_clock::now(); +}; + class Steam_Networking_Messages : public ISteamNetworkingMessages { @@ -26,7 +42,13 @@ public ISteamNetworkingMessages class SteamCallBacks *callbacks; class RunEveryRunCB *run_every_runcb; + std::map connections; + std::list incoming_data; + + unsigned id_counter = 0; + std::chrono::steady_clock::time_point created; public: + static void steam_callback(void *object, Common_Message *msg) { PRINT_DEBUG("steam_networking_messages_callback\n"); @@ -48,11 +70,14 @@ Steam_Networking_Messages(class Settings *settings, class Networking *network, c this->settings = settings; this->network = network; this->run_every_runcb = run_every_runcb; + this->network->setCallback(CALLBACK_ID_NETWORKING_MESSAGES, settings->get_local_steam_id(), &Steam_Networking_Messages::steam_callback, this); this->network->setCallback(CALLBACK_ID_USER_STATUS, settings->get_local_steam_id(), &Steam_Networking_Messages::steam_callback, this); this->run_every_runcb->add(&Steam_Networking_Messages::steam_run_every_runcb, this); this->callback_results = callback_results; this->callbacks = callbacks; + + this->created = std::chrono::steady_clock::now(); } ~Steam_Networking_Messages() @@ -61,6 +86,45 @@ Steam_Networking_Messages(class Settings *settings, class Networking *network, c this->run_every_runcb->remove(&Steam_Networking_Messages::steam_run_every_runcb, this); } +std::map::iterator find_or_create_message_connection(SteamNetworkingIdentity identityRemote, bool incoming, bool restartbroken) +{ + auto conn = connections.find(identityRemote.GetSteamID()); + if (conn == connections.end() || (conn->second.dead && restartbroken)) { + ++id_counter; + struct Steam_Message_Connection con; + con.remote_identity = identityRemote; + con.id = id_counter; + connections[identityRemote.GetSteamID()] = con; + + Common_Message msg; + msg.set_source_id(settings->get_local_steam_id().ConvertToUint64()); + msg.set_dest_id(con.remote_identity.GetSteamID64()); + msg.set_allocated_networking_messages(new Networking_Messages); + if (incoming) { + msg.mutable_networking_messages()->set_type(Networking_Messages::CONNECTION_ACCEPT); + } else { + msg.mutable_networking_messages()->set_type(Networking_Messages::CONNECTION_NEW); + } + msg.mutable_networking_messages()->set_channel(0); + msg.mutable_networking_messages()->set_id_from(con.id); + network->sendTo(&msg, true); + + conn = connections.find(identityRemote.GetSteamID()); + + if (incoming) { + SteamNetworkingMessagesSessionRequest_t data; + data.m_identityRemote = con.remote_identity; + callbacks->addCBResult(data.k_iCallback, &data, sizeof(data)); + } + } + + if (!incoming) { + conn->second.accepted = true; + } + + return conn; +} + /// Sends a message to the specified host. If we don't already have a session with that user, /// a session is implicitly created. There might be some handshaking that needs to happen /// before we can actually begin sending message data. If this handshaking fails and we can't @@ -106,7 +170,57 @@ Steam_Networking_Messages(class Settings *settings, class Networking *network, c EResult SendMessageToUser( const SteamNetworkingIdentity &identityRemote, const void *pubData, uint32 cubData, int nSendFlags, int nRemoteChannel ) { PRINT_DEBUG("Steam_Networking_Messages::SendMessageToUser\n"); - return k_EResultNoConnection; + std::lock_guard lock(global_mutex); + const SteamNetworkingIPAddr *ip = identityRemote.GetIPAddr(); + bool reliable = false; + if (nSendFlags & k_nSteamNetworkingSend_Reliable) { + reliable = true; + } + + bool restart_broken = false; + if (nSendFlags & k_nSteamNetworkingSend_AutoRestartBrokenSession) { + restart_broken = true; + } + + if (identityRemote.m_eType == k_ESteamNetworkingIdentityType_SteamID) { + PRINT_DEBUG("Steam_Networking_Messages::SendMessageToUser %llu\n", identityRemote.GetSteamID64()); + //steam id identity + } else if (ip) { + PRINT_DEBUG("Steam_Networking_Messages::SendMessageToUser %u:%u ipv4? %u\n", ip->GetIPv4(), ip->m_port, ip->IsIPv4()); + //ip addr + return k_EResultNoConnection; //TODO + } else { + return k_EResultNoConnection; + } + + auto conn = find_or_create_message_connection(identityRemote, false, restart_broken); + if (conn->second.dead) { + return k_EResultNoConnection; + } + + Common_Message msg; + msg.set_source_id(settings->get_local_steam_id().ConvertToUint64()); + msg.set_dest_id(conn->second.remote_identity.GetSteamID64()); + msg.set_allocated_networking_messages(new Networking_Messages); + msg.mutable_networking_messages()->set_type(Networking_Messages::DATA); + msg.mutable_networking_messages()->set_channel(nRemoteChannel); + msg.mutable_networking_messages()->set_id_from(conn->second.id); + msg.mutable_networking_messages()->set_data(pubData, cubData); + + network->sendTo(&msg, reliable); + return k_EResultOK; +} + +static void free_steam_message_data(SteamNetworkingMessage_t *pMsg) +{ + free(pMsg->m_pData); + pMsg->m_pData = NULL; +} + +static void delete_steam_message(SteamNetworkingMessage_t *pMsg) +{ + if (pMsg->m_pfnFreeData) pMsg->m_pfnFreeData(pMsg); + delete pMsg; } /// Reads the next message that has been sent from another user via SendMessageToUser() on the given channel. @@ -116,7 +230,42 @@ EResult SendMessageToUser( const SteamNetworkingIdentity &identityRemote, const int ReceiveMessagesOnChannel( int nLocalChannel, SteamNetworkingMessage_t **ppOutMessages, int nMaxMessages ) { PRINT_DEBUG("Steam_Networking_Messages::ReceiveMessagesOnChannel\n"); - return 0; + std::lock_guard lock(global_mutex); + int message_counter = 0; + + for (auto & conn : connections) { + auto chan = conn.second.data.find(nLocalChannel); + if (chan != conn.second.data.end()) { + while (!chan->second.empty() && message_counter <= nMaxMessages) { + SteamNetworkingMessage_t *pMsg = new SteamNetworkingMessage_t(); //TODO size is wrong + unsigned long size = chan->second.front().size(); + pMsg->m_pData = malloc(size); + pMsg->m_cbSize = size; + memcpy(pMsg->m_pData, chan->second.front().data(), size); + pMsg->m_conn = conn.second.id; + pMsg->m_identityPeer = conn.second.remote_identity; + pMsg->m_nConnUserData = -1; + pMsg->m_usecTimeReceived = std::chrono::duration_cast(std::chrono::steady_clock::now() - created).count(); + //TODO: messagenumber? + // pMsg->m_nMessageNumber = connect_socket->second.packet_receive_counter; + // ++connect_socket->second.packet_receive_counter; + + pMsg->m_pfnFreeData = &free_steam_message_data; + pMsg->m_pfnRelease = &delete_steam_message; + pMsg->m_nChannel = nLocalChannel; + ppOutMessages[message_counter] = pMsg; + ++message_counter; + chan->second.pop(); + } + } + + if (message_counter >= nMaxMessages) { + break; + } + } + + PRINT_DEBUG("Steam_Networking_Messages::ReceiveMessagesOnChannel got %u\n", message_counter); + return message_counter; } /// AcceptSessionWithUser() should only be called in response to a SteamP2PSessionRequest_t callback @@ -129,7 +278,14 @@ int ReceiveMessagesOnChannel( int nLocalChannel, SteamNetworkingMessage_t **ppOu bool AcceptSessionWithUser( const SteamNetworkingIdentity &identityRemote ) { PRINT_DEBUG("Steam_Networking_Messages::AcceptSessionWithUser\n"); - return false; + std::lock_guard lock(global_mutex); + auto conn = connections.find(identityRemote.GetSteamID()); + if (conn == connections.end()) { + return false; + } + + conn->second.accepted = true; + return true; } /// Call this when you're done talking to a user to immediately free up resources under-the-hood. @@ -140,7 +296,23 @@ bool AcceptSessionWithUser( const SteamNetworkingIdentity &identityRemote ) bool CloseSessionWithUser( const SteamNetworkingIdentity &identityRemote ) { PRINT_DEBUG("Steam_Networking_Messages::CloseSessionWithUser\n"); - return false; + std::lock_guard lock(global_mutex); + auto conn = connections.find(identityRemote.GetSteamID()); + if (conn == connections.end()) { + return false; + } + + Common_Message msg; + msg.set_source_id(settings->get_local_steam_id().ConvertToUint64()); + msg.set_dest_id(conn->second.remote_identity.GetSteamID64()); + msg.set_allocated_networking_messages(new Networking_Messages); + msg.mutable_networking_messages()->set_type(Networking_Messages::CONNECTION_END); + msg.mutable_networking_messages()->set_channel(0); + msg.mutable_networking_messages()->set_id_from(conn->second.id); + network->sendTo(&msg, true); + + connections.erase(conn); + return true; } /// Call this when you're done talking to a user on a specific channel. Once all @@ -150,6 +322,7 @@ bool CloseSessionWithUser( const SteamNetworkingIdentity &identityRemote ) bool CloseChannelWithUser( const SteamNetworkingIdentity &identityRemote, int nLocalChannel ) { PRINT_DEBUG("Steam_Networking_Messages::CloseChannelWithUser\n"); + std::lock_guard lock(global_mutex); return false; } @@ -165,12 +338,62 @@ bool CloseChannelWithUser( const SteamNetworkingIdentity &identityRemote, int nL ESteamNetworkingConnectionState GetSessionConnectionInfo( const SteamNetworkingIdentity &identityRemote, SteamNetConnectionInfo_t *pConnectionInfo, SteamNetworkingQuickConnectionStatus *pQuickStatus ) { PRINT_DEBUG("Steam_Networking_Messages::GetSessionConnectionInfo\n"); - return k_ESteamNetworkingConnectionState_None; + std::lock_guard lock(global_mutex); + auto conn = connections.find(identityRemote.GetSteamID()); + if (conn == connections.end()) { + return k_ESteamNetworkingConnectionState_None; + } + + if (pConnectionInfo) { + //TODO + } + + if (pQuickStatus) { + //TODO + } + + if (conn->second.remote_id == 0 || !conn->second.accepted) { + return k_ESteamNetworkingConnectionState_Connecting; + } + + if (conn->second.dead) { + return k_ESteamNetworkingConnectionState_ClosedByPeer; + } + + return k_ESteamNetworkingConnectionState_Connected; } +void end_connection(CSteamID steam_id) +{ + auto conn = connections.find(steam_id); + if (conn != connections.end()) { + conn->second.dead = true; + } +} void RunCallbacks() { + auto msg = std::begin(incoming_data); + while (msg != std::end(incoming_data)) { + CSteamID source_id((uint64)msg->source_id()); + + auto conn = connections.find(source_id); + if (conn != connections.end()) { + if (conn->second.remote_id == msg->networking_messages().id_from()) + conn->second.data[msg->networking_messages().channel()].push(msg->networking_messages().data()); + } + + msg = incoming_data.erase(msg); + } + + auto conn = std::begin(connections); + while (conn != std::end(connections)) { + if (!conn->second.accepted && check_timedout(conn->second.created, NETWORKING_MESSAGES_TIMEOUT)) { + conn = connections.erase(conn); + } else { + ++conn; + } + } } void Callback(Common_Message *msg) @@ -181,7 +404,33 @@ void Callback(Common_Message *msg) } if (msg->low_level().type() == Low_Level::DISCONNECT) { + end_connection((uint64)msg->source_id()); + } + } + if (msg->has_networking_messages()) { + PRINT_DEBUG("Steam_Networking_Messages: got network socket msg %u\n", msg->networking_messages().type()); + if (msg->networking_messages().type() == Networking_Messages::CONNECTION_NEW) { + SteamNetworkingIdentity identity; + identity.SetSteamID64(msg->source_id()); + auto conn = find_or_create_message_connection(identity, true, false); + conn->second.remote_id = msg->networking_messages().id_from(); + conn->second.dead = false; + } + + if (msg->networking_messages().type() == Networking_Messages::CONNECTION_ACCEPT) { + auto conn = connections.find((uint64)msg->source_id()); + if (conn != connections.end()) { + conn->second.remote_id = msg->networking_messages().id_from(); + } + } + + if (msg->networking_messages().type() == Networking_Messages::CONNECTION_END) { + end_connection((uint64)msg->source_id()); + } + + if (msg->networking_messages().type() == Networking_Messages::DATA) { + incoming_data.push_back(Common_Message(*msg)); } } }