[ create a new paste ] login | about

Link: http://codepad.org/TbPq2PWb    [ raw code | fork ]

C++, pasted on Mar 4:
#include "RemoteMessagingSystem.h"
#include "base/message/MessagingSystem.h"

//<begin register remote messages-------------->

/* messages from attack detection */
REMOTE_MESSAGING_REGISTER 	(MessageAggregateIntervalValues)
REMOTE_MESSAGING_REGISTER 	(MessageLastAnomalyType)
REMOTE_MESSAGING_REGISTER 	(MessageLastSuspiciousAggregate)
REMOTE_MESSAGING_REGISTER 	(MessageAggregateWatchValues)
REMOTE_MESSAGING_REGISTER 	(MessageAddressDistribution)

/* messages from sampling */
// no messages in sampling

/* messages from utility */
REMOTE_MESSAGING_REGISTER 	(MessageAlert)
REMOTE_MESSAGING_REGISTER 	(MessageTimer)

//<end register remote messages------------------>


RemoteMessagingSystem::RemoteMessagingSystem (MessagingSystem* system, DistackInfo* info)
	: m_messagingSystem( system ), m_commSystem( NULL ), distackInfo( info) {

	//
	// create the communication system of choice
	//

	string remoteMessaging = Configuration::instance(distackInfo)->getRemoteMessaging ();

#ifdef OMNET_SIMULATION
	if (remoteMessaging == "OMNET_TCP")
		m_commSystem = new RemoteCommSystemOmnetTcp( distackInfo );
	else if (remoteMessaging == "OMNET_PATH")
		m_commSystem = new RemoteCommSystemOmnetPathbased( distackInfo );
	else if (remoteMessaging == "OMNET_RING")
		m_commSystem = new RemoteCommSystemOmnetRing( distackInfo );
	else
		m_commSystem = new RemoteCommSystemNone( distackInfo ); // default case: no comm
#else
	if (remoteMessaging == "TCP_SOCKET")
		m_commSystem = new RemoteCommSystemTcpSocket( distackInfo );
	else if (remoteMessaging == "GIST")
		m_commSystem = new RemoteCommSystemGist( distackInfo );
	else
		m_commSystem = new RemoteCommSystemNone( distackInfo ); // default case: no comm
#endif // OMNET_SIMULATION

	if (m_commSystem != NULL) {
		m_commSystem->setMessagingSystem (this);
		logging_debug( "running remote messaging system: " + m_commSystem->getName());
	} else {
		logging_warn( "no remote messaging system created. Remote messaging will fail!");
	}
}

RemoteMessagingSystem::~RemoteMessagingSystem ()
{
	if (m_commSystem != NULL)
		delete m_commSystem;
}

void RemoteMessagingSystem::sendMessage (Message* const msg, MessagingNode::SEND_OPTIONS options)
{
	if (msg == NULL) return;
	if (m_commSystem == NULL) return;

	assert ((options.remoteMode & MessagingNode::SEND_REMOTE_MODE_REMOTE) == MessagingNode::SEND_REMOTE_MODE_REMOTE);

	//
	// serialize the object into a string
	//

	string data = serialize (msg);

#ifdef _DEBUG
	Message* testMsg = deserialize (data);

	assert (testMsg->getKind() == msg->getKind());
	assert (testMsg->getType() == msg->getType());

	delete testMsg;
#endif

	//
	// prepend the header and send out 
	// the string buffer over the network
	//

	DISTACK_REMOTE_DATA	sendData ((unsigned long)data.length());
	memcpy (sendData.data, (unsigned char*)data.c_str(), sendData.len);

	m_commSystem->write (sendData, options);
	sendData.destroy	();
}

void RemoteMessagingSystem::reveiveMessage (Message* msg)
{
	if (msg == NULL) return;
	assert (m_messagingSystem != NULL);

	//
	// put the message into the MessagingSystem queue 
	// it will then be processed asyncronously from the consumer thread
	//

	msg->m_fromRemote = true;

	boost::mutex::scoped_lock scopedLock (m_messagingSystem->m_queueMutex);
	MessagingNode::SEND_OPTIONS options = MessagingNode::SEND_OPTIONS::createLocalAsync();
	m_messagingSystem->m_queue.push (MessagingSystem::MESSAGE_OPTIONS_PAIR(msg, options));
}

void RemoteMessagingSystem::receiveBytes (DISTACK_REMOTE_DATA data)
{
	string stringdata = "";

	unsigned char* pnt = data.data;
	for (unsigned long i=0; i<data.len; i++, pnt++)
		stringdata += *pnt;
		
	data.destroy ();
	
	Message* msg = deserialize (stringdata);

	if (msg != NULL)
		reveiveMessage (msg);
}

string RemoteMessagingSystem::serialize (Message* const msg)
{

	stringstream archiveStream (ios_base::binary | ios_base::out);
	{ // the brackets are important, otherwise the closing xml-tag is not written

#ifdef REMOTE_XML
// 		Output serialized message into xml file instead of remote message (for debugging purposes):
// 		ofstream archiveStream ("./distack_xml_output", ios_base::binary | ios_base::out);
// 		boost::archive::xml_oarchive	outputArchive (archiveStream);
	
		boost::archive::xml_oarchive outputArchive( archiveStream );
#else
		boost::archive::binary_oarchive outputArchive( archiveStream );
#endif //REMOTE_XML

	    outputArchive & msg;
	}
	
	string data = archiveStream.str();
	return data;
}

Message* RemoteMessagingSystem::deserialize (string data)
{
	stringstream archivStream (data, ios_base::binary | ios_base::in);
#ifdef REMOTE_XML
	boost::archive::xml_iarchive inputArchive( archivStream );
#else
	boost::archive::binary_iarchive inputArchive( archivStream );
#endif

	Message* msg = NULL;
	inputArchive >> msg;

	return msg;
}

RemoteCommSystem* RemoteMessagingSystem::getCommSystem ()
{
	return m_commSystem;
}


Create a new paste based on this one


Comments: