#include "AmB2BMedia.h" #include "AmAudio.h" #include "amci/codecs.h" #include <string.h> #include <strings.h> #include "AmB2BSession.h" #include "AmRtpReceiver.h" #include "sip/msg_logger.h" #include <algorithm> #include <stdexcept> using namespace std; #define TRACE DBG #define UNDEFINED_PAYLOAD (-1) /** class for computing payloads for relay the simpliest way - allow relaying of * all payloads supported by remote party */ static B2BMediaStatistics b2b_stats; static const string zero_ip("0.0.0.0"); static void replaceRtcpAttr(SdpMedia &m, const string& relay_address, int rtcp_port) { for (std::vector<SdpAttribute>::iterator a = m.attributes.begin(); a != m.attributes.end(); ++a) { try { if (a->attribute == "rtcp") { RtcpAddress addr(a->value); addr.setPort(rtcp_port); if (addr.hasAddress()) addr.setAddress(relay_address); a->value = addr.print(); } } catch (const exception &e) { DBG("can't replace RTCP address: %s\n", e.what()); } } } ////////////////////////////////////////////////////////////////////////////////// void B2BMediaStatistics::incCodecWriteUsage(const string &codec_name) { if (codec_name.empty()) return; AmLock lock(mutex); map<string, int>::iterator i = codec_write_usage.find(codec_name); if (i != codec_write_usage.end()) i->second++; else codec_write_usage[codec_name] = 1; } void B2BMediaStatistics::decCodecWriteUsage(const string &codec_name) { if (codec_name.empty()) return; AmLock lock(mutex); map<string, int>::iterator i = codec_write_usage.find(codec_name); if (i != codec_write_usage.end()) { if (i->second > 0) i->second--; } } void B2BMediaStatistics::incCodecReadUsage(const string &codec_name) { if (codec_name.empty()) return; AmLock lock(mutex); map<string, int>::iterator i = codec_read_usage.find(codec_name); if (i != codec_read_usage.end()) i->second++; else codec_read_usage[codec_name] = 1; } void B2BMediaStatistics::decCodecReadUsage(const string &codec_name) { if (codec_name.empty()) return; AmLock lock(mutex); map<string, int>::iterator i = codec_read_usage.find(codec_name); if (i != codec_read_usage.end()) { if (i->second > 0) i->second--; } } B2BMediaStatistics *B2BMediaStatistics::instance() { return &b2b_stats; } void B2BMediaStatistics::reportCodecWriteUsage(string &dst) { if (codec_write_usage.empty()) { dst = "pcma=0"; // to be not empty return; } bool first = true; dst.clear(); AmLock lock(mutex); for (map<string, int>::iterator i = codec_write_usage.begin(); i != codec_write_usage.end(); ++i) { if (first) first = false; else dst += ","; dst += i->first; dst += "="; dst += int2str(i->second); } } void B2BMediaStatistics::reportCodecReadUsage(string &dst) { if (codec_read_usage.empty()) { dst = "pcma=0"; // to be not empty return; } bool first = true; dst.clear(); AmLock lock(mutex); for (map<string, int>::iterator i = codec_read_usage.begin(); i != codec_read_usage.end(); ++i) { if (first) first = false; else dst += ","; dst += i->first; dst += "="; dst += int2str(i->second); } } void B2BMediaStatistics::getReport(const AmArg &args, AmArg &ret) { AmArg write_usage; AmArg read_usage; { // locked area AmLock lock(mutex); for (map<string, int>::iterator i = codec_write_usage.begin(); i != codec_write_usage.end(); ++i) { AmArg avp; avp["codec"] = i->first; avp["count"] = i->second; write_usage.push(avp); } for (map<string, int>::iterator i = codec_read_usage.begin(); i != codec_read_usage.end(); ++i) { AmArg avp; avp["codec"] = i->first; avp["count"] = i->second; read_usage.push(avp); } } ret["write"] = write_usage; ret["read"] = read_usage; } ////////////////////////////////////////////////////////////////////////////////// void AudioStreamData::initialize(AmB2BSession *session) { stream = new AmRtpAudio(session, session->getRtpInterface()); stream->setRtpRelayTransparentSeqno(session->getRtpRelayTransparentSeqno()); stream->setRtpRelayTransparentSSRC(session->getRtpRelayTransparentSSRC()); stream->setRtpRelayFilterRtpDtmf(session->getEnableDtmfRtpFiltering()); if (session->getEnableDtmfRtpDetection()) stream->force_receive_dtmf = true; force_symmetric_rtp = session->getRtpRelayForceSymmetricRtp(); enable_dtmf_transcoding = session->getEnableDtmfTranscoding(); session->getLowFiPLs(lowfi_payloads); stream->setLocalIP(session->localMediaIP()); } AudioStreamData::AudioStreamData(AmB2BSession *session): in(NULL), initialized(false), dtmf_detector(NULL), dtmf_queue(NULL), relay_enabled(false), relay_port(0), outgoing_payload(UNDEFINED_PAYLOAD), incoming_payload(UNDEFINED_PAYLOAD), force_symmetric_rtp(false), enable_dtmf_transcoding(false), muted(false), relay_paused(false), receiving(true) { if (session) initialize(session); else stream = NULL; // not initialized yet } void AudioStreamData::changeSession(AmB2BSession *session) { if (!stream) { // the stream was not created yet TRACE("delayed stream initialization for session %p\n", session); if (session) initialize(session); } else { // the stream is already created if (session) { stream->changeSession(session); /* FIXME: do we want to reinitialize the stream? stream->setRtpRelayTransparentSeqno(session->getRtpRelayTransparentSeqno()); stream->setRtpRelayTransparentSSRC(session->getRtpRelayTransparentSSRC()); force_symmetric_rtp = session->getRtpRelayForceSymmetricRtp(); enable_dtmf_transcoding = session->getEnableDtmfTranscoding(); session->getLowFiPLs(lowfi_payloads); stream->setLocalIP(session->localMediaIP()); ... }*/ } else clear(); // free the stream and other stuff because it can't be used anyway } } void AudioStreamData::clear() { resetStats(); if (in) { //in->close(); //delete in; in = NULL; } if (stream) { delete stream; stream = NULL; } clearDtmfSink(); initialized = false; } void AudioStreamData::stopStreamProcessing() { if (stream) stream->stopReceiving(); } void AudioStreamData::resumeStreamProcessing() { if (stream) stream->resumeReceiving(); } void AudioStreamData::setRelayStream(AmRtpStream *other) { if (!stream) return; if (relay_address.empty()) { DBG("not setting relay for empty relay address\n"); stream->disableRtpRelay(); return; } if (relay_enabled && other) { stream->setRelayStream(other); stream->setRelayPayloads(relay_mask); if (!relay_paused) stream->enableRtpRelay(); stream->setRAddr(relay_address, relay_port, relay_port+1); } else { // nothing to relay or other stream not set stream->disableRtpRelay(); } } void AudioStreamData::setRelayPayloads(const SdpMedia &m, RelayController *ctrl) { ctrl->computeRelayMask(m, relay_enabled, relay_mask); } void AudioStreamData::setRelayDestination(const string& connection_address, int port) { relay_address = connection_address; relay_port = port; } void AudioStreamData::setRelayPaused(bool paused) { if (paused == relay_paused) { DBG("relay already paused for stream [%p], ignoring\n", stream); return; } relay_paused = paused; DBG("relay %spaused, stream [%p]\n", relay_paused?"":"not ", stream); if (NULL != stream) { if (relay_paused) stream->disableRtpRelay(); else stream->enableRtpRelay(); } } void AudioStreamData::clearDtmfSink() { if (dtmf_detector) { delete dtmf_detector; dtmf_detector = NULL; } if (dtmf_queue) { delete dtmf_queue; dtmf_queue = NULL; } } void AudioStreamData::setDtmfSink(AmDtmfSink *dtmf_sink) { // TODO: optimize: clear & create the dtmf_detector only if the dtmf_sink changed clearDtmfSink(); if (dtmf_sink && stream) { dtmf_detector = new AmDtmfDetector(dtmf_sink); dtmf_queue = new AmDtmfEventQueue(dtmf_detector); dtmf_detector->setInbandDetector(AmConfig::DefaultDTMFDetector, stream->getSampleRate()); if(!enable_dtmf_transcoding && lowfi_payloads.size()) { string selected_payload_name = stream->getPayloadName(stream->getPayloadType()); for(vector<SdpPayload>::iterator it = lowfi_payloads.begin(); it != lowfi_payloads.end(); ++it){ DBG("checking %s/%i PL type against %s/%i\n", selected_payload_name.c_str(), stream->getPayloadType(), it->encoding_name.c_str(), it->payload_type); if(selected_payload_name == it->encoding_name) { enable_dtmf_transcoding = true; break; } } } } } bool AudioStreamData::initStream(PlayoutType playout_type, AmSdp &local_sdp, AmSdp &remote_sdp, int media_idx) { resetStats(); if (!stream) { initialized = false; return false; } // TODO: try to init only in case there are some payloads which can't be relayed stream->forceSdpMediaIndex(media_idx); stream->setOnHold(false); // just hack to do correctly mute detection in stream->init if (stream->init(local_sdp, remote_sdp, force_symmetric_rtp) == 0) { stream->setPlayoutType(playout_type); initialized = true; // // do not unmute if muted because of 0.0.0.0 remote IP (the mute flag is set during init) // if (!stream->muted()) stream->setOnHold(muted); } else { initialized = false; DBG("stream initialization failed\n"); // there still can be payloads to be relayed (if all possible payloads are // to be relayed this needs not to be an error) } stream->setOnHold(muted); stream->setReceiving(receiving); return initialized; } void AudioStreamData::sendDtmf(int event, unsigned int duration_ms) { if (stream) stream->sendDtmf(event,duration_ms); } void AudioStreamData::resetStats() { if (outgoing_payload != UNDEFINED_PAYLOAD) { b2b_stats.decCodecWriteUsage(outgoing_payload_name); outgoing_payload = UNDEFINED_PAYLOAD; outgoing_payload_name.clear(); } if (incoming_payload != UNDEFINED_PAYLOAD) { b2b_stats.decCodecReadUsage(incoming_payload_name); incoming_payload = UNDEFINED_PAYLOAD; incoming_payload_name.clear(); } } void AudioStreamData::updateSendStats() { if (!initialized) { resetStats(); return; } int payload = stream->getPayloadType(); if (payload != outgoing_payload) { // payload used to send has changed // decrement usage of previous payload if set if (outgoing_payload != UNDEFINED_PAYLOAD) b2b_stats.decCodecWriteUsage(outgoing_payload_name); if (payload != UNDEFINED_PAYLOAD) { // remember payload name (in lowercase to simulate case insensitivity) outgoing_payload_name = stream->getPayloadName(payload); transform(outgoing_payload_name.begin(), outgoing_payload_name.end(), outgoing_payload_name.begin(), ::tolower); b2b_stats.incCodecWriteUsage(outgoing_payload_name); } else outgoing_payload_name.clear(); outgoing_payload = payload; } } void AudioStreamData::updateRecvStats(AmRtpStream *s) { if (!initialized) { resetStats(); return; } int payload = s->getLastPayload(); if (payload != incoming_payload) { // payload used to send has changed // decrement usage of previous payload if set if (incoming_payload != UNDEFINED_PAYLOAD) b2b_stats.decCodecReadUsage(incoming_payload_name); if (payload != UNDEFINED_PAYLOAD) { // remember payload name (in lowercase to simulate case insensitivity) incoming_payload_name = stream->getPayloadName(payload); transform(incoming_payload_name.begin(), incoming_payload_name.end(), incoming_payload_name.begin(), ::tolower); b2b_stats.incCodecReadUsage(incoming_payload_name); } else incoming_payload_name.clear(); incoming_payload = payload; } } int AudioStreamData::writeStream(unsigned long long ts, unsigned char *buffer, AudioStreamData &src) { if (!initialized) return 0; if (stream->getOnHold()) return 0; // ignore hold streams? unsigned int f_size = stream->getFrameSize(); if (stream->sendIntReached(ts)) { // A leg is ready to send data int sample_rate = stream->getSampleRate(); int got = 0; if (in) got = in->get(ts, buffer, sample_rate, f_size); else { if (!src.isInitialized()) return 0; AmRtpAudio *src_stream = src.getStream(); if (src_stream->checkInterval(ts)) { got = src_stream->get(ts, buffer, sample_rate, f_size); if (got > 0) { updateRecvStats(src_stream); if (dtmf_queue && enable_dtmf_transcoding) { dtmf_queue->putDtmfAudio(buffer, got, ts); } } } } if (got < 0) return -1; if (got > 0) { // we have data to be sent updateSendStats(); return stream->put(ts, buffer, sample_rate, got); } } return 0; } void AudioStreamData::mute(bool set_mute) { DBG("mute(%s) - RTP stream [%p]\n", set_mute?"true":"false", stream); if (stream) { stream->setOnHold(set_mute); if (muted != set_mute) stream->clearRTPTimeout(); } muted = set_mute; } void AudioStreamData::setReceiving(bool r) { DBG("setReceiving(%s) - RTP stream [%p]\n", r?"true":"false", stream); if (stream) { stream->setReceiving(r); } receiving = r; } ////////////////////////////////////////////////////////////////////////////////// AmB2BMedia::RelayStreamPair::RelayStreamPair(AmB2BSession *_a, AmB2BSession *_b) : a(_a, _a ? _a->getRtpInterface() : -1), b(_b, _b ? _b->getRtpInterface() : -1) { } AmB2BMedia::AmB2BMedia(AmB2BSession *_a, AmB2BSession *_b): ref_cnt(0), // everybody who wants to use must add one reference itselves a(_a), b(_b), callgroup(AmSession::getNewId()), have_a_leg_local_sdp(false), have_a_leg_remote_sdp(false), have_b_leg_local_sdp(false), have_b_leg_remote_sdp(false), playout_type(ADAPTIVE_PLAYOUT), //playout_type(SIMPLE_PLAYOUT), a_leg_muted(false), b_leg_muted(false), relay_paused(false), logger(NULL) { } AmB2BMedia::~AmB2BMedia() { if (logger) dec_ref(logger); } void AmB2BMedia::addToMediaProcessor() { addReference(); // AmMediaProcessor's reference AmMediaProcessor::instance()->addSession(this, callgroup); } void AmB2BMedia::addToMediaProcessorUnsafe() { ref_cnt++; // AmMediaProcessor's reference AmMediaProcessor::instance()->addSession(this, callgroup); } void AmB2BMedia::addReference() { mutex.lock(); ref_cnt++; mutex.unlock(); } bool AmB2BMedia::releaseReference() { mutex.lock(); int r = --ref_cnt; mutex.unlock(); if (r==0) { DBG("last reference to AmB2BMedia [%p] cleared, destroying\n", this); delete this; } return (r == 0); } void AmB2BMedia::changeSession(bool a_leg, AmB2BSession *new_session) { AmLock lock(mutex); changeSessionUnsafe(a_leg, new_session); } void AmB2BMedia::changeSessionUnsafe(bool a_leg, AmB2BSession *new_session) { TRACE("changing %s leg session to %p\n", a_leg ? "A" : "B", new_session); if (a_leg) a = new_session; else b = new_session; bool needs_processing = a && b && a->getRtpRelayMode() == AmB2BSession::RTP_Transcoding; // update all streams for (AudioStreamIterator i = audio.begin(); i != audio.end(); ++i) { // stop processing first to avoid unexpected results i->a.stopStreamProcessing(); i->b.stopStreamProcessing(); // replace session if (a_leg) { i->a.changeSession(new_session); } else { i->b.changeSession(new_session); } updateStreamPair(*i); if (i->requiresProcessing()) needs_processing = true; // reset logger (needed if a stream changes) i->setLogger(logger); // return back for processing if needed i->a.resumeStreamProcessing(); i->b.resumeStreamProcessing(); } for (RelayStreamIterator j = relay_streams.begin(); j != relay_streams.end(); ++j) { AmRtpStream &a = (*j)->a; AmRtpStream &b = (*j)->b; // FIXME: is stop & resume receiving needed here? if (a_leg) a.changeSession(new_session); else b.changeSession(new_session); } if (needs_processing) { if (!isProcessingMedia()) { addToMediaProcessorUnsafe(); } } else if (isProcessingMedia()) AmMediaProcessor::instance()->removeSession(this); TRACE("session changed\n"); } int AmB2BMedia::writeStreams(unsigned long long ts, unsigned char *buffer) { int res = 0; AmLock lock(mutex); for (AudioStreamIterator i = audio.begin(); i != audio.end(); ++i) { if (i->a.writeStream(ts, buffer, i->b) < 0) { res = -1; break; } if (i->b.writeStream(ts, buffer, i->a) < 0) { res = -1; break; } } return res; } void AmB2BMedia::processDtmfEvents() { AmLock lock(mutex); for (AudioStreamIterator i = audio.begin(); i != audio.end(); ++i) { i->a.processDtmfEvents(); i->b.processDtmfEvents(); } if (a) a->processDtmfEvents(); if (b) b->processDtmfEvents(); } void AmB2BMedia::sendDtmf(bool a_leg, int event, unsigned int duration_ms) { AmLock lock(mutex); if(!audio.size()) return; // send the DTMFs using the first available stream if(a_leg) { audio[0].a.sendDtmf(event,duration_ms); } else { audio[0].b.sendDtmf(event,duration_ms); } } void AmB2BMedia::clearAudio(bool a_leg) { TRACE("clear %s leg audio\n", a_leg ? "A" : "B"); AmLock lock(mutex); for (AudioStreamIterator i = audio.begin(); i != audio.end(); ++i) { // remove streams from AmRtpReceiver first! (always both?) i->a.stopStreamProcessing(); i->b.stopStreamProcessing(); if (a_leg) i->a.clear(); else i->b.clear(); } for (RelayStreamIterator j = relay_streams.begin(); j != relay_streams.end(); ++j) { if ((*j)->a.hasLocalSocket()) AmRtpReceiver::instance()->removeStream((*j)->a.getLocalSocket()); if ((*j)->b.hasLocalSocket()) AmRtpReceiver::instance()->removeStream((*j)->b.getLocalSocket()); } // forget sessions to avoid using them once clearAudio is called changeSessionUnsafe(a_leg, NULL); if (!a && !b) { audio.clear(); // both legs cleared for (RelayStreamIterator j = relay_streams.begin(); j != relay_streams.end(); ++j) { delete *j; } relay_streams.clear(); } } void AmB2BMedia::clearRTPTimeout() { AmLock lock(mutex); for (AudioStreamIterator i = audio.begin(); i != audio.end(); ++i) { i->a.clearRTPTimeout(); i->b.clearRTPTimeout(); } } bool AmB2BMedia::canRelay(const SdpMedia &m) { return (m.transport == TP_RTPAVP) || (m.transport == TP_RTPSAVP) || (m.transport == TP_UDP) || (m.transport == TP_UDPTL); } void AmB2BMedia::createStreams(const AmSdp &sdp) { AudioStreamIterator astreams = audio.begin(); RelayStreamIterator rstreams = relay_streams.begin(); vector<SdpMedia>::const_iterator m = sdp.media.begin(); int idx = 0; bool create_audio = astreams == audio.end(); bool create_relay = rstreams == relay_streams.end(); for (; m != sdp.media.end(); ++m, ++idx) { // audio streams if (m->type == MT_AUDIO) { if (create_audio) { AudioStreamPair pair(a, b, idx); pair.a.mute(a_leg_muted); pair.b.mute(b_leg_muted); audio.push_back(pair); audio.back().setLogger(logger); } else if (++astreams == audio.end()) create_audio = true; // we went through the last audio stream } // non-audio streams that we can relay else if(canRelay(*m)) { if (create_relay) { relay_streams.push_back(new RelayStreamPair(a, b)); relay_streams.back()->setLogger(logger); } else if (++rstreams == relay_streams.end()) create_relay = true; // we went through the last relay stream } } } void AmB2BMedia::replaceConnectionAddress(AmSdp &parser_sdp, bool a_leg, const string& relay_address, const string& relay_public_address) { AmLock lock(mutex); SdpConnection orig_conn = parser_sdp.conn; // needed for the 'quick workaround' for non-audio media // place relay_address in connection address if (!parser_sdp.conn.address.empty() && (parser_sdp.conn.address != zero_ip)) { parser_sdp.conn.address = relay_public_address; DBG("new connection address: %s",parser_sdp.conn.address.c_str()); } // we need to create streams if they are not already created createStreams(parser_sdp); string replaced_ports; AudioStreamIterator audio_stream_it = audio.begin(); RelayStreamIterator relay_stream_it = relay_streams.begin(); std::vector<SdpMedia>::iterator it = parser_sdp.media.begin(); for (; it != parser_sdp.media.end() ; ++it) { // FIXME: only UDP streams are handled for now if (it->type == MT_AUDIO) { if( audio_stream_it == audio.end() ){ // strange... we should actually have a stream for this media line... DBG("audio media line does not have coresponding audio stream...\n"); continue; } if(it->port) { // if stream active if (!it->conn.address.empty() && (parser_sdp.conn.address != zero_ip)) { it->conn.address = relay_public_address; DBG("new stream connection address: %s",it->conn.address.c_str()); } try { if (a_leg) { audio_stream_it->a.setLocalIP(relay_address); it->port = audio_stream_it->a.getLocalPort(); replaceRtcpAttr(*it, relay_address, audio_stream_it->a.getLocalRtcpPort()); } else { audio_stream_it->b.setLocalIP(relay_address); it->port = audio_stream_it->b.getLocalPort(); replaceRtcpAttr(*it, relay_address, audio_stream_it->b.getLocalRtcpPort()); } if(!replaced_ports.empty()) replaced_ports += "/"; replaced_ports += int2str(it->port); } catch (const string& s) { ERROR("setting port: '%s'\n", s.c_str()); throw string("error setting RTP port\n"); } } ++audio_stream_it; } else if(canRelay(*it)) { if( relay_stream_it == relay_streams.end() ){ // strange... we should actually have a stream for this media line... DBG("media line does not have a coresponding relay stream...\n"); continue; } if(it->port) { // if stream active if (!it->conn.address.empty() && (parser_sdp.conn.address != zero_ip)) { it->conn.address = relay_public_address; DBG("new stream connection address: %s",it->conn.address.c_str()); } try { if (a_leg) { if(!(*relay_stream_it)->a.hasLocalSocket()){ (*relay_stream_it)->a.setLocalIP(relay_address); } it->port = (*relay_stream_it)->a.getLocalPort(); replaceRtcpAttr(*it, relay_address, (*relay_stream_it)->a.getLocalRtcpPort()); } else { if(!(*relay_stream_it)->b.hasLocalSocket()){ (*relay_stream_it)->b.setLocalIP(relay_address); } it->port = (*relay_stream_it)->b.getLocalPort(); replaceRtcpAttr(*it, relay_address, (*relay_stream_it)->b.getLocalRtcpPort()); } if(!replaced_ports.empty()) replaced_ports += "/"; replaced_ports += int2str(it->port); } catch (const string& s) { ERROR("setting port: '%s'\n", s.c_str()); throw string("error setting RTP port\n"); } } ++relay_stream_it; } else { // quick workaround to allow direct connection of non-supported streams (i.e. // those which are not relayed or transcoded): propagate connection // address - might work but need not (to be tested with real clients // instead of simulators) if (it->conn.address.empty()) it->conn = orig_conn; continue; } } if (it != parser_sdp.media.end()) { // FIXME: create new streams here? WARN("trying to relay SDP with more media lines than " "relay streams initialized (%zu)\n",audio.size()+relay_streams.size()); } DBG("replaced connection address in SDP with %s:%s.\n", relay_public_address.c_str(), replaced_ports.c_str()); } static const char* _rtp_relay_mode_str(const AmB2BSession::RTPRelayMode& relay_mode) { switch(relay_mode){ case AmB2BSession::RTP_Direct: return "RTP_Direct"; case AmB2BSession::RTP_Relay: return "RTP_Relay"; case AmB2BSession::RTP_Transcoding: return "RTP_Transcoding"; } return ""; } void AmB2BMedia::updateStreamPair(AudioStreamPair &pair) { bool have_a = have_a_leg_local_sdp && have_a_leg_remote_sdp; bool have_b = have_b_leg_local_sdp && have_b_leg_remote_sdp; TRACE("updating stream in A leg\n"); pair.a.setDtmfSink(b); if (pair.b.getInput()) pair.a.setRelayStream(NULL); // don't mix relayed RTP into the other's input else pair.a.setRelayStream(pair.b.getStream()); if (have_a) pair.a.initStream(playout_type, a_leg_local_sdp, a_leg_remote_sdp, pair.media_idx); TRACE("updating stream in B leg\n"); pair.b.setDtmfSink(a); if (pair.a.getInput()) pair.b.setRelayStream(NULL); // don't mix relayed RTP into the other's input else pair.b.setRelayStream(pair.a.getStream()); if (have_b) pair.b.initStream(playout_type, b_leg_local_sdp, b_leg_remote_sdp, pair.media_idx); TRACE("audio streams updated\n"); } void AmB2BMedia::updateAudioStreams() { // SDP was updated TRACE("handling SDP change, A leg: %c%c, B leg: %c%c\n", have_a_leg_local_sdp ? 'X' : '-', have_a_leg_remote_sdp ? 'X' : '-', have_b_leg_local_sdp ? 'X' : '-', have_b_leg_remote_sdp ? 'X' : '-'); // if we have all necessary information we can initialize streams and start // their processing if (audio.empty() && relay_streams.empty()) return; // no streams bool have_a = have_a_leg_local_sdp && have_a_leg_remote_sdp; bool have_b = have_b_leg_local_sdp && have_b_leg_remote_sdp; if (!( (have_a || have_b) )) return; bool needs_processing = a && b && a->getRtpRelayMode() == AmB2BSession::RTP_Transcoding; // initialize streams to be able to relay & transcode (or use local audio) for (AudioStreamIterator i = audio.begin(); i != audio.end(); ++i) { i->a.stopStreamProcessing(); i->b.stopStreamProcessing(); updateStreamPair(*i); if (i->requiresProcessing()) needs_processing = true; i->a.resumeStreamProcessing(); i->b.resumeStreamProcessing(); } // start media processing (only if transcoding or regular audio processing // required) // Note: once we send local SDP to the other party we have to expect RTP but // we need to be fully initialised (both legs) before we can correctly handle // the media, right? if (needs_processing) { if (!isProcessingMedia()) { addToMediaProcessorUnsafe(); } } else if (isProcessingMedia()) AmMediaProcessor::instance()->removeSession(this); } void AmB2BMedia::updateRelayStream(AmRtpStream *stream, AmB2BSession *session, const string& connection_address, const SdpMedia &m, AmRtpStream *relay_to) { static const PayloadMask true_mask(true); stream->stopReceiving(); if(m.port) { if (session) { // propagate session settings stream->setPassiveMode(session->getRtpRelayForceSymmetricRtp()); stream->setRtpRelayTransparentSeqno(session->getRtpRelayTransparentSeqno()); stream->setRtpRelayTransparentSSRC(session->getRtpRelayTransparentSSRC()); // if (!stream->hasLocalSocket()) stream->setLocalIP(session->advertisedIP()); } stream->setRelayStream(relay_to); stream->setRelayPayloads(true_mask); if (!relay_paused) stream->enableRtpRelay(); stream->setRAddr(connection_address,m.port,m.port+1); if((m.transport != TP_RTPAVP) && (m.transport != TP_RTPSAVP)) stream->enableRawRelay(); stream->resumeReceiving(); } else { DBG("disabled stream"); } } void AmB2BMedia::updateStreams(bool a_leg, const AmSdp &local_sdp, const AmSdp &remote_sdp, RelayController *ctrl) { TRACE("%s (%c): updating streams with local & remote SDP\n", a_leg ? (a ? a->getLocalTag().c_str() : "NULL") : (b ? b->getLocalTag().c_str() : "NULL"), a_leg ? 'A': 'B'); /*string s; local_sdp.print(s); INFO("local SDP: %s\n", s.c_str()); remote_sdp.print(s); INFO("remote SDP: %s\n", s.c_str());*/ AmLock lock(mutex); // streams should be created already (replaceConnectionAddress called // before updateLocalSdp uses/assignes their port numbers) // save SDP: FIXME: really needed to store instead of just to use? if (a_leg) { a_leg_local_sdp = local_sdp; a_leg_remote_sdp = remote_sdp; have_a_leg_local_sdp = true; have_a_leg_remote_sdp = true; } else { b_leg_local_sdp = local_sdp; b_leg_remote_sdp = remote_sdp; have_b_leg_local_sdp = true; have_b_leg_remote_sdp = true; } // create missing streams createStreams(local_sdp); // FIXME: remote_sdp? // compute relay mask for every stream // Warning: do not apply the new mask unless the offer answer succeeds? // we can safely apply the changes once we have local & remote SDP (i.e. the // negotiation is finished) otherwise we might handle the RTP in a wrong way AudioStreamIterator astream = audio.begin(); RelayStreamIterator rstream = relay_streams.begin(); for (vector<SdpMedia>::const_iterator m = remote_sdp.media.begin(); m != remote_sdp.media.end(); ++m) { const string& connection_address = (m->conn.address.empty() ? remote_sdp.conn.address : m->conn.address); if (m->type == MT_AUDIO) { // initialize relay mask in the other(!) leg and relay destination for stream in current leg TRACE("relay payloads in direction %s\n", a_leg ? "B -> A" : "A -> B"); if (a_leg) { astream->b.setRelayPayloads(*m, ctrl); astream->a.setRelayDestination(connection_address, m->port); } else { astream->a.setRelayPayloads(*m, ctrl); astream->b.setRelayDestination(connection_address, m->port); } ++astream; } else { if (!canRelay(*m)) continue; if (rstream == relay_streams.end()) continue; RelayStreamPair& relay_stream = **rstream; if(a_leg) { DBG("updating A-leg relay_stream"); updateRelayStream(&relay_stream.a, a, connection_address, *m, &relay_stream.b); } else { DBG("updating B-leg relay_stream"); updateRelayStream(&relay_stream.b, b, connection_address, *m, &relay_stream.a); } ++rstream; } } updateAudioStreams(); TRACE("streams updated with SDP\n"); } void AmB2BMedia::stop(bool a_leg) { TRACE("stop %s leg\n", a_leg ? "A" : "B"); clearAudio(a_leg); // remove from processor only if both A and B leg stopped if (isProcessingMedia() && (!a) && (!b)) { AmMediaProcessor::instance()->removeSession(this); } } void AmB2BMedia::onMediaProcessingTerminated() { AmMediaSession::onMediaProcessingTerminated(); // release reference held by AmMediaProcessor releaseReference(); } bool AmB2BMedia::replaceOffer(AmSdp &sdp, bool a_leg) { TRACE("replacing offer with a local one\n"); createStreams(sdp); // create missing streams AmLock lock(mutex); try { AudioStreamIterator as = audio.begin(); for (vector<SdpMedia>::iterator m = sdp.media.begin(); m != sdp.media.end(); ++m) { if (m->type == MT_AUDIO && as != audio.end()) { // generate our local offer TRACE("... making audio stream offer\n"); if (a_leg) as->a.getSdpOffer(as->media_idx, *m); else as->b.getSdpOffer(as->media_idx, *m); ++as; } else { TRACE("... making non-audio/uninitialised stream inactive\n"); m->send = false; m->recv = false; } } } catch (...) { TRACE("hold SDP offer creation failed\n"); return true; } TRACE("hold SDP offer generated\n"); return true; } void AmB2BMedia::setMuteFlag(bool a_leg, bool set) { AmLock lock(mutex); if (a_leg) a_leg_muted = set; else b_leg_muted = set; for (AudioStreamIterator i = audio.begin(); i != audio.end(); ++i) { if (a_leg) i->a.mute(set); else i->b.mute(set); } } void AmB2BMedia::setFirstStreamInput(bool a_leg, AmAudio *in) { AmLock lock(mutex); //for ( i != audio.end(); ++i) { if (!audio.empty()) { AudioStreamIterator i = audio.begin(); if (a_leg) i->a.setInput(in); else i->b.setInput(in); updateAudioStreams(); } else { if (in) { ERROR("BUG: can't set %s leg's first stream input, no streams\n", a_leg ? "A": "B"); } } } void AmB2BMedia::createHoldAnswer(bool a_leg, const AmSdp &offer, AmSdp &answer, bool use_zero_con) { // because of possible RTP relaying our payloads need not to match the remote // party's payloads (i.e. we might need not understand the remote party's // codecs) // As a quick hack we may use just copy of the original SDP with all streams // deactivated to avoid sending RTP to us (twinkle requires at least one // non-disabled stream in the response so we can not set all ports to 0 to // signalize that we don't want to receive anything) AmLock lock(mutex); answer = offer; answer.media.clear(); if (use_zero_con) answer.conn.address = zero_ip; else { if (a_leg) { if (a) answer.conn.address = a->advertisedIP(); } else { if (b) answer.conn.address = b->advertisedIP(); } if (answer.conn.address.empty()) answer.conn.address = zero_ip; // we need something there } AudioStreamIterator i = audio.begin(); vector<SdpMedia>::const_iterator m; for (m = offer.media.begin(); m != offer.media.end(); ++m) { answer.media.push_back(SdpMedia()); SdpMedia &media = answer.media.back(); media.type = m->type; if (media.type != MT_AUDIO) { media = *m ; media.port = 0; continue; } // copy whole media line except port if (m->port == 0) { media = *m; ++i; continue; } // copy whole inactive media line if (a_leg) i->a.getSdpAnswer(i->media_idx, *m, media); else i->b.getSdpAnswer(i->media_idx, *m, media); media.send = false; // should be already because the stream should be on hold media.recv = false; // what we would do with received data? if (media.payloads.empty()) { // we have to add something there if (!m->payloads.empty()) media.payloads.push_back(m->payloads[0]); } break; } } void AmB2BMedia::setRtpLogger(msg_logger* _logger) { if (logger) dec_ref(logger); logger = _logger; if (logger) inc_ref(logger); // walk through all the streams and use logger for them for (AudioStreamIterator i = audio.begin(); i != audio.end(); ++i) i->setLogger(logger); for (RelayStreamIterator j = relay_streams.begin(); j != relay_streams.end(); ++j) (*j)->setLogger(logger); } void AmB2BMedia::setRelayDTMFReceiving(bool enabled) { DBG("relay_streams.size() = %zd, audio_streams.size() = %zd\n", relay_streams.size(), audio.size()); for (RelayStreamIterator j = relay_streams.begin(); j != relay_streams.end(); j++) { DBG("force_receive_dtmf %sabled for [%p]\n", enabled?"en":"dis", &(*j)->a); DBG("force_receive_dtmf %sabled for [%p]\n", enabled?"en":"dis", &(*j)->b); (*j)->a.force_receive_dtmf = enabled; (*j)->b.force_receive_dtmf = enabled; } for (AudioStreamIterator j = audio.begin(); j != audio.end(); j++) { DBG("force_receive_dtmf %sabled for [%p]\n", enabled?"en":"dis", j->a.getStream()); DBG("force_receive_dtmf %sabled for [%p]\n", enabled?"en":"dis", j->b.getStream()); if (NULL != j->a.getStream()) j->a.getStream()->force_receive_dtmf = enabled; if (NULL != j->b.getStream()) j->b.getStream()->force_receive_dtmf = enabled; } } /** set receving of RTP/relay streams (not receiving=drop incoming packets) */ void AmB2BMedia::setReceiving(bool receiving_a, bool receiving_b) { AmLock lock(mutex); // TODO: is this necessary? DBG("relay_streams.size() = %zd, audio_streams.size() = %zd\n", relay_streams.size(), audio.size()); for (RelayStreamIterator j = relay_streams.begin(); j != relay_streams.end(); j++) { DBG("setReceiving(%s) A relay stream [%p]\n", receiving_a?"true":"false", &(*j)->a); (*j)->a.setReceiving(receiving_a); DBG("setReceiving(%s) B relay stream [%p]\n", receiving_b?"true":"false", &(*j)->b); (*j)->b.setReceiving(receiving_b); } for (AudioStreamIterator j = audio.begin(); j != audio.end(); j++) { DBG("setReceiving(%s) A audio stream [%p]\n", receiving_a?"true":"false", j->a.getStream()); j->a.setReceiving(receiving_a); DBG("setReceiving(%s) B audio stream [%p]\n", receiving_b?"true":"false", j->b.getStream()); j->b.setReceiving(receiving_b); } } void AmB2BMedia::pauseRelay() { DBG("relay_streams.size() = %zd, audio_streams.size() = %zd\n", relay_streams.size(), audio.size()); relay_paused = true; for (RelayStreamIterator j = relay_streams.begin(); j != relay_streams.end(); j++) { (*j)->a.disableRawRelay(); (*j)->b.disableRawRelay(); } for (AudioStreamIterator j = audio.begin(); j != audio.end(); j++) { j->a.setRelayPaused(true); j->b.setRelayPaused(true); } } void AmB2BMedia::restartRelay() { DBG("relay_streams.size() = %zd, audio_streams.size() = %zd\n", relay_streams.size(), audio.size()); relay_paused = false; for (RelayStreamIterator j = relay_streams.begin(); j != relay_streams.end(); j++) { (*j)->a.enableRawRelay(); (*j)->b.enableRawRelay(); } for (AudioStreamIterator j = audio.begin(); j != audio.end(); j++) { j->a.setRelayPaused(false); j->b.setRelayPaused(false); } } void AudioStreamData::debug() { DBG("\tmuted: %s\n", muted ? "yes" : "no"); if(stream) { stream->debug(); } else DBG("\t<null> <-> <null>"); } // print debug info void AmB2BMedia::debug() { // walk through all the streams DBG("B2B media session %p ('%s' <-> '%s'):", this, a ? a->getLocalTag().c_str() : "?", b ? b->getLocalTag().c_str() : "?"); DBG("\tOA status: %c%c / %c%c", have_a_leg_local_sdp ? 'X' : '-', have_a_leg_remote_sdp ? 'X' : '-', have_b_leg_local_sdp ? 'X' : '-', have_b_leg_remote_sdp ? 'X' : '-'); for (AudioStreamIterator i = audio.begin(); i != audio.end(); ++i) { DBG(" - audio stream (A):\n"); i->a.debug(); DBG(" - audio stream (B):\n"); i->b.debug(); } for (RelayStreamIterator j = relay_streams.begin(); j != relay_streams.end(); ++j) { DBG(" - relay stream (A):\n"); (*j)->a.debug(); DBG(" - relay stream (B):\n"); (*j)->b.debug(); } }