[ create a new paste ] login | about

Link: http://codepad.org/TbPq2PWb    [ raw code | fork ]

C++, pasted on Mar 4:
#include "RemoteMessagingSystem.h"
#include "base/message/MessagingSystem.h"

//<begin register remote messages-------------->

/* messages from attack detection */
REMOTE_MESSAGING_REGISTER 	(MessageAggregateIntervalValues)
REMOTE_MESSAGING_REGISTER 	(MessageLastSuspiciousAggregate)
REMOTE_MESSAGING_REGISTER 	(MessageAggregateWatchValues)
REMOTE_MESSAGING_REGISTER 	(MessageAddressDistribution)

/* messages from sampling */
// no messages in sampling

/* messages from utility */

//<end register remote messages------------------>

RemoteMessagingSystem::RemoteMessagingSystem (MessagingSystem* system, DistackInfo* info)
	: m_messagingSystem( system ), m_commSystem( NULL ), distackInfo( info) {

	// create the communication system of choice

	string remoteMessaging = Configuration::instance(distackInfo)->getRemoteMessaging ();

	if (remoteMessaging == "OMNET_TCP")
		m_commSystem = new RemoteCommSystemOmnetTcp( distackInfo );
	else if (remoteMessaging == "OMNET_PATH")
		m_commSystem = new RemoteCommSystemOmnetPathbased( distackInfo );
	else if (remoteMessaging == "OMNET_RING")
		m_commSystem = new RemoteCommSystemOmnetRing( distackInfo );
		m_commSystem = new RemoteCommSystemNone( distackInfo ); // default case: no comm
	if (remoteMessaging == "TCP_SOCKET")
		m_commSystem = new RemoteCommSystemTcpSocket( distackInfo );
	else if (remoteMessaging == "GIST")
		m_commSystem = new RemoteCommSystemGist( distackInfo );
		m_commSystem = new RemoteCommSystemNone( distackInfo ); // default case: no comm

	if (m_commSystem != NULL) {
		m_commSystem->setMessagingSystem (this);
		logging_debug( "running remote messaging system: " + m_commSystem->getName());
	} else {
		logging_warn( "no remote messaging system created. Remote messaging will fail!");

RemoteMessagingSystem::~RemoteMessagingSystem ()
	if (m_commSystem != NULL)
		delete m_commSystem;

void RemoteMessagingSystem::sendMessage (Message* const msg, MessagingNode::SEND_OPTIONS options)
	if (msg == NULL) return;
	if (m_commSystem == NULL) return;

	assert ((options.remoteMode & MessagingNode::SEND_REMOTE_MODE_REMOTE) == MessagingNode::SEND_REMOTE_MODE_REMOTE);

	// serialize the object into a string

	string data = serialize (msg);

#ifdef _DEBUG
	Message* testMsg = deserialize (data);

	assert (testMsg->getKind() == msg->getKind());
	assert (testMsg->getType() == msg->getType());

	delete testMsg;

	// prepend the header and send out 
	// the string buffer over the network

	DISTACK_REMOTE_DATA	sendData ((unsigned long)data.length());
	memcpy (sendData.data, (unsigned char*)data.c_str(), sendData.len);

	m_commSystem->write (sendData, options);
	sendData.destroy	();

void RemoteMessagingSystem::reveiveMessage (Message* msg)
	if (msg == NULL) return;
	assert (m_messagingSystem != NULL);

	// put the message into the MessagingSystem queue 
	// it will then be processed asyncronously from the consumer thread

	msg->m_fromRemote = true;

	boost::mutex::scoped_lock scopedLock (m_messagingSystem->m_queueMutex);
	MessagingNode::SEND_OPTIONS options = MessagingNode::SEND_OPTIONS::createLocalAsync();
	m_messagingSystem->m_queue.push (MessagingSystem::MESSAGE_OPTIONS_PAIR(msg, options));

void RemoteMessagingSystem::receiveBytes (DISTACK_REMOTE_DATA data)
	string stringdata = "";

	unsigned char* pnt = data.data;
	for (unsigned long i=0; i<data.len; i++, pnt++)
		stringdata += *pnt;
	data.destroy ();
	Message* msg = deserialize (stringdata);

	if (msg != NULL)
		reveiveMessage (msg);

string RemoteMessagingSystem::serialize (Message* const msg)

	stringstream archiveStream (ios_base::binary | ios_base::out);
	{ // the brackets are important, otherwise the closing xml-tag is not written

// 		Output serialized message into xml file instead of remote message (for debugging purposes):
// 		ofstream archiveStream ("./distack_xml_output", ios_base::binary | ios_base::out);
// 		boost::archive::xml_oarchive	outputArchive (archiveStream);
		boost::archive::xml_oarchive outputArchive( archiveStream );
		boost::archive::binary_oarchive outputArchive( archiveStream );
#endif //REMOTE_XML

	    outputArchive & msg;
	string data = archiveStream.str();
	return data;

Message* RemoteMessagingSystem::deserialize (string data)
	stringstream archivStream (data, ios_base::binary | ios_base::in);
	boost::archive::xml_iarchive inputArchive( archivStream );
	boost::archive::binary_iarchive inputArchive( archivStream );

	Message* msg = NULL;
	inputArchive >> msg;

	return msg;

RemoteCommSystem* RemoteMessagingSystem::getCommSystem ()
	return m_commSystem;

Create a new paste based on this one
