Merge branch 'talk' into main-dev

# Conflicts:
#	src/main/java/com/genersoft/iot/vmp/conf/redis/RedisConfig.java
#	src/main/java/com/genersoft/iot/vmp/gb28181/bean/AudioBroadcastCatch.java
#	src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
#	src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
#	src/main/java/com/genersoft/iot/vmp/service/IPlayService.java
#	src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
结构优化
648540858 2023-03-20 14:32:53 +08:00
commit 039fbf7e24
27 changed files with 426 additions and 336 deletions

View File

@ -63,6 +63,8 @@ public class AudioBroadcastCatch {
*/ */
private SipTransactionInfo sipTransactionInfo; private SipTransactionInfo sipTransactionInfo;
private MediaServerItem mediaServerItem;
public String getDeviceId() { public String getDeviceId() {
return deviceId; return deviceId;
@ -123,4 +125,12 @@ public class AudioBroadcastCatch {
public void setSipTransactionInfoByRequset(SIPResponse response) { public void setSipTransactionInfoByRequset(SIPResponse response) {
this.sipTransactionInfo = new SipTransactionInfo(response, false); this.sipTransactionInfo = new SipTransactionInfo(response, false);
} }
public MediaServerItem getMediaServerItem() {
return mediaServerItem;
}
public void setMediaServerItem(MediaServerItem mediaServerItem) {
this.mediaServerItem = mediaServerItem;
}
} }

View File

@ -49,7 +49,7 @@ public class SendRtpItem {
/** /**
* streamId * streamId
*/ */
private String streamId; private String stream;
/** /**
* tcp * tcp
@ -117,6 +117,11 @@ public class SendRtpItem {
*/ */
private InviteStreamType playType; private InviteStreamType playType;
/**
*
*/
private String receiveStream;
public String getIp() { public String getIp() {
return ip; return ip;
} }
@ -181,12 +186,12 @@ public class SendRtpItem {
this.app = app; this.app = app;
} }
public String getStreamId() { public String getStream() {
return streamId; return stream;
} }
public void setStreamId(String streamId) { public void setStream(String stream) {
this.streamId = streamId; this.stream = stream;
} }
public boolean isTcp() { public boolean isTcp() {
@ -292,4 +297,12 @@ public class SendRtpItem {
public void setRtcp(boolean rtcp) { public void setRtcp(boolean rtcp) {
this.rtcp = rtcp; this.rtcp = rtcp;
} }
public String getReceiveStream() {
return receiveStream;
}
public void setReceiveStream(String receiveStream) {
this.receiveStream = receiveStream;
}
} }

View File

@ -1,6 +1,5 @@
package com.genersoft.iot.vmp.gb28181.event; package com.genersoft.iot.vmp.gb28181.event;
import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException;
import com.genersoft.iot.vmp.gb28181.bean.DeviceNotFoundEvent; import com.genersoft.iot.vmp.gb28181.bean.DeviceNotFoundEvent;
import gov.nist.javax.sip.message.SIPRequest; import gov.nist.javax.sip.message.SIPRequest;
import org.slf4j.Logger; import org.slf4j.Logger;

View File

@ -29,7 +29,8 @@ public class VideoStreamSessionManager {
play, play,
playback, playback,
download, download,
broadcast broadcast,
talk
} }
/** /**

View File

@ -94,12 +94,12 @@ public class SipRunner implements CommandLineRunner {
if (sendRtpItems.size() > 0) { if (sendRtpItems.size() > 0) {
for (SendRtpItem sendRtpItem : sendRtpItems) { for (SendRtpItem sendRtpItem : sendRtpItems) {
MediaServerItem mediaServerItem = mediaServerService.getOne(sendRtpItem.getMediaServerId()); MediaServerItem mediaServerItem = mediaServerService.getOne(sendRtpItem.getMediaServerId());
redisCatchStorage.deleteSendRTPServer(sendRtpItem.getPlatformId(),sendRtpItem.getChannelId(), sendRtpItem.getCallId(),sendRtpItem.getStreamId()); redisCatchStorage.deleteSendRTPServer(sendRtpItem.getPlatformId(),sendRtpItem.getChannelId(), sendRtpItem.getCallId(),sendRtpItem.getStream());
if (mediaServerItem != null) { if (mediaServerItem != null) {
Map<String, Object> param = new HashMap<>(); Map<String, Object> param = new HashMap<>();
param.put("vhost","__defaultVhost__"); param.put("vhost","__defaultVhost__");
param.put("app",sendRtpItem.getApp()); param.put("app",sendRtpItem.getApp());
param.put("stream",sendRtpItem.getStreamId()); param.put("stream",sendRtpItem.getStream());
param.put("ssrc",sendRtpItem.getSsrc()); param.put("ssrc",sendRtpItem.getSsrc());
JSONObject jsonObject = zlmresTfulUtils.stopSendRtp(mediaServerItem, param); JSONObject jsonObject = zlmresTfulUtils.stopSendRtp(mediaServerItem, param);
if (jsonObject != null && jsonObject.getInteger("code") == 0) { if (jsonObject != null && jsonObject.getInteger("code") == 0) {

View File

@ -2,10 +2,7 @@ package com.genersoft.iot.vmp.gb28181.transmit.cmd;
import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException; import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException;
import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.bean.DeviceAlarm;
import com.genersoft.iot.vmp.gb28181.bean.InviteStreamCallback;
import com.genersoft.iot.vmp.gb28181.bean.SipTransactionInfo;
import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
@ -131,7 +128,7 @@ public interface ISIPCommander {
*/ */
void streamByeCmd(Device device, String channelId, String stream, String callId, SipSubscribe.Event okEvent) throws InvalidArgumentException, SipException, ParseException, SsrcTransactionNotFoundException; void streamByeCmd(Device device, String channelId, String stream, String callId, SipSubscribe.Event okEvent) throws InvalidArgumentException, SipException, ParseException, SsrcTransactionNotFoundException;
void talkStreamCmd(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId, String callId, ZlmHttpHookSubscribe.Event event, ZlmHttpHookSubscribe.Event eventForPush, SipSubscribe.Event okEvent, SipSubscribe.Event errorEvent) throws InvalidArgumentException, SipException, ParseException; void talkStreamCmd(MediaServerItem mediaServerItem, SendRtpItem sendRtpItem, Device device, String channelId, String callId, ZlmHttpHookSubscribe.Event event, ZlmHttpHookSubscribe.Event eventForPush, SipSubscribe.Event okEvent, SipSubscribe.Event errorEvent) throws InvalidArgumentException, SipException, ParseException;
void streamByeCmd(Device device, String channelId, String stream, String callId) throws InvalidArgumentException, ParseException, SipException, SsrcTransactionNotFoundException; void streamByeCmd(Device device, String channelId, String stream, String callId) throws InvalidArgumentException, ParseException, SipException, SsrcTransactionNotFoundException;

View File

@ -32,7 +32,6 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.DependsOn; import org.springframework.context.annotation.DependsOn;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils; import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;
import javax.sip.InvalidArgumentException; import javax.sip.InvalidArgumentException;
import javax.sip.ResponseEvent; import javax.sip.ResponseEvent;
@ -584,9 +583,9 @@ public class SIPCommander implements ISIPCommander {
} }
@Override @Override
public void talkStreamCmd(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId, String callId, ZlmHttpHookSubscribe.Event event, ZlmHttpHookSubscribe.Event eventForPush, SipSubscribe.Event okEvent, SipSubscribe.Event errorEvent) throws InvalidArgumentException, SipException, ParseException { public void talkStreamCmd(MediaServerItem mediaServerItem, SendRtpItem sendRtpItem, Device device, String channelId, String callId, ZlmHttpHookSubscribe.Event event, ZlmHttpHookSubscribe.Event eventForPush, SipSubscribe.Event okEvent, SipSubscribe.Event errorEvent) throws InvalidArgumentException, SipException, ParseException {
String stream = ssrcInfo.getStream(); String stream = sendRtpItem.getStream();
if (device == null) { if (device == null) {
return; return;
@ -597,7 +596,7 @@ public class SIPCommander implements ISIPCommander {
return; return;
} }
logger.info("[语音对讲] {} 分配的ZLM为: {} [{}:{}]", stream, mediaServerItem.getId(), mediaServerItem.getIp(), ssrcInfo.getPort()); logger.info("[语音对讲] {} 分配的ZLM为: {} [{}:{}]", stream, mediaServerItem.getId(), mediaServerItem.getIp(), sendRtpItem.getPort());
HookSubscribeForStreamChange hookSubscribeForStreamChange = HookSubscribeFactory.on_stream_changed("rtp", stream, true, "rtsp", mediaServerItem.getId()); HookSubscribeForStreamChange hookSubscribeForStreamChange = HookSubscribeFactory.on_stream_changed("rtp", stream, true, "rtsp", mediaServerItem.getId());
subscribe.addSubscribe(hookSubscribeForStreamChange, (MediaServerItem mediaServerItemInUse, JSONObject json) -> { subscribe.addSubscribe(hookSubscribeForStreamChange, (MediaServerItem mediaServerItemInUse, JSONObject json) -> {
if (event != null) { if (event != null) {
@ -622,24 +621,27 @@ public class SIPCommander implements ISIPCommander {
content.append("c=IN IP4 " + mediaServerItem.getSdpIp() + "\r\n"); content.append("c=IN IP4 " + mediaServerItem.getSdpIp() + "\r\n");
content.append("t=0 0\r\n"); content.append("t=0 0\r\n");
content.append("m=audio " + ssrcInfo.getPort() + " RTP/AVP 8\r\n"); content.append("m=audio " + sendRtpItem.getPort() + " TCP/RTP/AVP 8\r\n");
content.append("a=setup:passive\r\n");
content.append("a=connection:new\r\n");
content.append("a=sendrecv\r\n"); content.append("a=sendrecv\r\n");
content.append("a=rtpmap:8 PCMA/8000\r\n"); content.append("a=rtpmap:8 PCMA/8000\r\n");
content.append("y=" + ssrcInfo.getSsrc() + "\r\n");//ssrc content.append("y=" + sendRtpItem.getSsrc() + "\r\n");//ssrc
// f字段:f= v/编码格式/分辨率/帧率/码率类型/码率大小a/编码格式/码率大小/采样率 // f字段:f= v/编码格式/分辨率/帧率/码率类型/码率大小a/编码格式/码率大小/采样率
content.append("f=v/////a/1/8/1" + "\r\n"); content.append("f=v/////a/1/8/1" + "\r\n");
Request request = headerProvider.createInviteRequest(device, channelId, content.toString(), SipUtils.getNewViaTag(), SipUtils.getNewFromTag(), null, ssrcInfo.getSsrc(), callIdHeader); Request request = headerProvider.createInviteRequest(device, channelId, content.toString(),
SipUtils.getNewViaTag(), SipUtils.getNewFromTag(), null, sendRtpItem.getSsrc(), callIdHeader);
sipSender.transmitRequest(sipLayer.getLocalIp(device.getLocalIp()), request, (e -> { sipSender.transmitRequest(sipLayer.getLocalIp(device.getLocalIp()), request, (e -> {
streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream()); streamSession.remove(device.getDeviceId(), channelId, sendRtpItem.getStream());
mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); mediaServerService.releaseSsrc(mediaServerItem.getId(), sendRtpItem.getSsrc());
errorEvent.response(e); errorEvent.response(e);
}), e -> { }), e -> {
// 这里为例避免一个通道的点播只有一个callID这个参数使用一个固定值 // 这里为例避免一个通道的点播只有一个callID这个参数使用一个固定值
ResponseEvent responseEvent = (ResponseEvent) e.event; ResponseEvent responseEvent = (ResponseEvent) e.event;
SIPResponse response = (SIPResponse) responseEvent.getResponse(); SIPResponse response = (SIPResponse) responseEvent.getResponse();
streamSession.put(device.getDeviceId(), channelId, "talk", stream, ssrcInfo.getSsrc(), mediaServerItem.getId(), response, VideoStreamSessionManager.SessionType.play); streamSession.put(device.getDeviceId(), channelId, "talk", stream, sendRtpItem.getSsrc(), mediaServerItem.getId(), response, VideoStreamSessionManager.SessionType.talk);
okEvent.response(e); okEvent.response(e);
}); });
} }

View File

@ -694,7 +694,7 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform {
MediaServerItem mediaServerItem = mediaServerService.getOne(mediaServerId); MediaServerItem mediaServerItem = mediaServerService.getOne(mediaServerId);
if (mediaServerItem != null) { if (mediaServerItem != null) {
mediaServerService.releaseSsrc(mediaServerItem.getId(), sendRtpItem.getSsrc()); mediaServerService.releaseSsrc(mediaServerItem.getId(), sendRtpItem.getSsrc());
zlmrtpServerFactory.closeRtpServer(mediaServerItem, sendRtpItem.getStreamId()); zlmrtpServerFactory.closeRtpServer(mediaServerItem, sendRtpItem.getStream());
} }
SIPRequest byeRequest = headerProviderPlatformProvider.createByeRequest(platform, sendRtpItem); SIPRequest byeRequest = headerProviderPlatformProvider.createByeRequest(platform, sendRtpItem);
if (byeRequest == null) { if (byeRequest == null) {

View File

@ -102,12 +102,12 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In
} }
String isUdp = sendRtpItem.isTcp() ? "0" : "1"; String isUdp = sendRtpItem.isTcp() ? "0" : "1";
MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
logger.info("收到ACKrtp/{}开始向上级推流, 目标={}:{}SSRC={}, RTCP={}", sendRtpItem.getStreamId(), logger.info("收到ACKrtp/{}开始向上级推流, 目标={}:{}SSRC={}, RTCP={}", sendRtpItem.getStream(),
sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc(), sendRtpItem.isRtcp()); sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc(), sendRtpItem.isRtcp());
Map<String, Object> param = new HashMap<>(12); Map<String, Object> param = new HashMap<>(12);
param.put("vhost","__defaultVhost__"); param.put("vhost","__defaultVhost__");
param.put("app",sendRtpItem.getApp()); param.put("app",sendRtpItem.getApp());
param.put("stream",sendRtpItem.getStreamId()); param.put("stream",sendRtpItem.getStream());
param.put("ssrc", sendRtpItem.getSsrc()); param.put("ssrc", sendRtpItem.getSsrc());
param.put("src_port", sendRtpItem.getLocalPort()); param.put("src_port", sendRtpItem.getLocalPort());
param.put("pt", sendRtpItem.getPt()); param.put("pt", sendRtpItem.getPt());
@ -121,7 +121,7 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In
if (mediaInfo == null) { if (mediaInfo == null) {
RequestPushStreamMsg requestPushStreamMsg = RequestPushStreamMsg.getInstance( RequestPushStreamMsg requestPushStreamMsg = RequestPushStreamMsg.getInstance(
sendRtpItem.getMediaServerId(), sendRtpItem.getApp(), sendRtpItem.getStreamId(), sendRtpItem.getMediaServerId(), sendRtpItem.getApp(), sendRtpItem.getStream(),
sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc(), sendRtpItem.isTcp(), sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc(), sendRtpItem.isTcp(),
sendRtpItem.getLocalPort(), sendRtpItem.getPt(), sendRtpItem.isUsePs(), sendRtpItem.isOnlyAudio()); sendRtpItem.getLocalPort(), sendRtpItem.getPt(), sendRtpItem.isUsePs(), sendRtpItem.isOnlyAudio());
redisGbPlayMsgListener.sendMsgForStartSendRtpStream(sendRtpItem.getServerId(), requestPushStreamMsg, json -> { redisGbPlayMsgListener.sendMsgForStartSendRtpStream(sendRtpItem.getServerId(), requestPushStreamMsg, json -> {

View File

@ -97,7 +97,7 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In
if (sendRtpItem != null){ if (sendRtpItem != null){
logger.info("[收到bye] {}/{}", sendRtpItem.getPlatformId(), sendRtpItem.getChannelId()); logger.info("[收到bye] {}/{}", sendRtpItem.getPlatformId(), sendRtpItem.getChannelId());
String streamId = sendRtpItem.getStreamId(); String streamId = sendRtpItem.getStream();
MediaServerItem mediaServerItem = mediaServerService.getOne(sendRtpItem.getMediaServerId()); MediaServerItem mediaServerItem = mediaServerService.getOne(sendRtpItem.getMediaServerId());
if (mediaServerItem == null) { if (mediaServerItem == null) {
return; return;
@ -105,7 +105,7 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In
Boolean ready = zlmrtpServerFactory.isStreamReady(mediaServerItem, sendRtpItem.getApp(), streamId); Boolean ready = zlmrtpServerFactory.isStreamReady(mediaServerItem, sendRtpItem.getApp(), streamId);
if (!ready) { if (!ready) {
logger.info("[收到bye] 发现流{}/{}已经结束,不需处理", sendRtpItem.getApp(), sendRtpItem.getStreamId()); logger.info("[收到bye] 发现流{}/{}已经结束,不需处理", sendRtpItem.getApp(), sendRtpItem.getStream());
return; return;
} }
Map<String, Object> param = new HashMap<>(); Map<String, Object> param = new HashMap<>();
@ -113,7 +113,7 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In
param.put("app",sendRtpItem.getApp()); param.put("app",sendRtpItem.getApp());
param.put("stream",streamId); param.put("stream",streamId);
param.put("ssrc",sendRtpItem.getSsrc()); param.put("ssrc",sendRtpItem.getSsrc());
logger.info("[收到bye] 停止向上级推流:{}", streamId); logger.info("[收到bye] 停止推流:{}", streamId);
MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
redisCatchStorage.deleteSendRTPServer(sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(), callIdHeader.getCallId(), null); redisCatchStorage.deleteSendRTPServer(sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(), callIdHeader.getCallId(), null);
zlmrtpServerFactory.stopSendRtpStream(mediaInfo, param); zlmrtpServerFactory.stopSendRtpStream(mediaInfo, param);
@ -129,15 +129,14 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In
try { try {
logger.warn("[停止点播] {}/{}", sendRtpItem.getDeviceId(), sendRtpItem.getChannelId()); logger.warn("[停止点播] {}/{}", sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
cmder.streamByeCmd(device, sendRtpItem.getChannelId(), streamId, null); cmder.streamByeCmd(device, sendRtpItem.getChannelId(), streamId, null);
} catch (InvalidArgumentException | ParseException | SipException | } catch (InvalidArgumentException | ParseException | SipException | SsrcTransactionNotFoundException e) {
SsrcTransactionNotFoundException e) {
logger.error("[收到bye] {} 无其它观看者,通知设备停止推流, 发送BYE失败 {}",streamId, e.getMessage()); logger.error("[收到bye] {} 无其它观看者,通知设备停止推流, 发送BYE失败 {}",streamId, e.getMessage());
} }
} }
if (sendRtpItem.getPlayType().equals(InviteStreamType.PUSH)) { if (sendRtpItem.getPlayType().equals(InviteStreamType.PUSH)) {
MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0, MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0,
sendRtpItem.getApp(), sendRtpItem.getStreamId(), sendRtpItem.getChannelId(), sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getChannelId(),
sendRtpItem.getPlatformId(), null, null, sendRtpItem.getMediaServerId()); sendRtpItem.getPlatformId(), null, null, sendRtpItem.getMediaServerId());
redisCatchStorage.sendStreamPushRequestedMsg(messageForPushChannel); redisCatchStorage.sendStreamPushRequestedMsg(messageForPushChannel);
} }

View File

@ -478,7 +478,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
if ("Playback".equalsIgnoreCase(sessionName)) { if ("Playback".equalsIgnoreCase(sessionName)) {
sendRtpItem.setPlayType(InviteStreamType.PLAYBACK); sendRtpItem.setPlayType(InviteStreamType.PLAYBACK);
SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, null, device.isSsrcCheck(), true); SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, null, device.isSsrcCheck(), true);
sendRtpItem.setStreamId(ssrcInfo.getStream()); sendRtpItem.setStream(ssrcInfo.getStream());
// 写入redis 超时时回复 // 写入redis 超时时回复
redisCatchStorage.updateSendRTPSever(sendRtpItem); redisCatchStorage.updateSendRTPSever(sendRtpItem);
playService.playBack(mediaServerItem, ssrcInfo, device.getDeviceId(), channelId, DateUtil.formatter.format(start), playService.playBack(mediaServerItem, ssrcInfo, device.getDeviceId(), channelId, DateUtil.formatter.format(start),
@ -523,7 +523,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
} }
SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId, null, device.isSsrcCheck(), false); SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId, null, device.isSsrcCheck(), false);
logger.info(JSONObject.toJSONString(ssrcInfo)); logger.info(JSONObject.toJSONString(ssrcInfo));
sendRtpItem.setStreamId(ssrcInfo.getStream()); sendRtpItem.setStream(ssrcInfo.getStream());
sendRtpItem.setSsrc(ssrc.equals(ssrcDefault) ? ssrcInfo.getSsrc() : ssrc); sendRtpItem.setSsrc(ssrc.equals(ssrcDefault) ? ssrcInfo.getSsrc() : ssrc);
// 写入redis 超时时回复 // 写入redis 超时时回复
@ -533,12 +533,12 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
redisCatchStorage.deleteSendRTPServer(platform.getServerGBId(), finalChannelId, callIdHeader.getCallId(), null); redisCatchStorage.deleteSendRTPServer(platform.getServerGBId(), finalChannelId, callIdHeader.getCallId(), null);
}); });
} else { } else {
sendRtpItem.setStreamId(playTransaction.getStream()); sendRtpItem.setStream(playTransaction.getStream());
// 写入redis 超时时回复 // 写入redis 超时时回复
redisCatchStorage.updateSendRTPSever(sendRtpItem); redisCatchStorage.updateSendRTPSever(sendRtpItem);
JSONObject jsonObject = new JSONObject(); JSONObject jsonObject = new JSONObject();
jsonObject.put("app", sendRtpItem.getApp()); jsonObject.put("app", sendRtpItem.getApp());
jsonObject.put("stream", sendRtpItem.getStreamId()); jsonObject.put("stream", sendRtpItem.getStream());
hookEvent.response(mediaServerItem, jsonObject); hookEvent.response(mediaServerItem, jsonObject);
} }
} }
@ -982,6 +982,21 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
} }
return; return;
} }
String addressStr = sdp.getOrigin().getAddress();
logger.info("设备{}请求语音流,地址:{}:{}ssrc{}, {}", requesterId, addressStr, port, ssrc,
mediaTransmissionTCP ? (tcpActive? "TCP主动":"TCP被动") : "UDP");
MediaServerItem mediaServerItem = audioBroadcastCatch.getMediaServerItem();
if (mediaServerItem == null) {
logger.warn("未找到语音喊话使用的zlm");
try {
responseAck(request, Response.BUSY_HERE);
} catch (SipException | InvalidArgumentException | ParseException e) {
logger.error("[命令发送失败] invite 未找到可用的zlm: {}", e.getMessage());
playService.stopAudioBroadcast(device.getDeviceId(), audioBroadcastCatch.getChannelId());
}
return;
}
String addressStr = sdp.getConnection().getAddress(); String addressStr = sdp.getConnection().getAddress();
logger.info("设备{}请求语音流, 收流地址:{}:{}ssrc{}, {}, 对讲方式:{}", requesterId, addressStr, port, ssrc, logger.info("设备{}请求语音流, 收流地址:{}:{}ssrc{}, {}, 对讲方式:{}", requesterId, addressStr, port, ssrc,
mediaTransmissionTCP ? (tcpActive? "TCP主动":"TCP被动") : "UDP", sdp.getSessionName().getValue()); mediaTransmissionTCP ? (tcpActive? "TCP主动":"TCP被动") : "UDP", sdp.getSessionName().getValue());

View File

@ -102,7 +102,7 @@ public class InfoRequestProcessor extends SIPRequestProcessorParent implements I
String contentSubType = header.getContentSubType(); String contentSubType = header.getContentSubType();
if ("Application".equalsIgnoreCase(contentType) && "MANSRTSP".equalsIgnoreCase(contentSubType)) { if ("Application".equalsIgnoreCase(contentType) && "MANSRTSP".equalsIgnoreCase(contentSubType)) {
SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(null, null, null, callIdHeader.getCallId()); SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(null, null, null, callIdHeader.getCallId());
String streamId = sendRtpItem.getStreamId(); String streamId = sendRtpItem.getStream();
StreamInfo streamInfo = redisCatchStorage.queryPlayback(null, null, streamId, null); StreamInfo streamInfo = redisCatchStorage.queryPlayback(null, null, streamId, null);
if (null == streamInfo) { if (null == streamInfo) {
responseAck(request, Response.NOT_FOUND, "stream " + streamId + " not found"); responseAck(request, Response.NOT_FOUND, "stream " + streamId + " not found");

View File

@ -90,7 +90,7 @@ public class MediaStatusNotifyMessageHandler extends SIPRequestProcessorParent i
try { try {
cmder.streamByeCmd(device, ssrcTransaction.getChannelId(), null, callIdHeader.getCallId()); cmder.streamByeCmd(device, ssrcTransaction.getChannelId(), null, callIdHeader.getCallId());
} catch (InvalidArgumentException | ParseException | SsrcTransactionNotFoundException | SipException e) { } catch (InvalidArgumentException | ParseException | SipException | SsrcTransactionNotFoundException e) {
logger.error("[录像流]推送完毕,收到关流通知, 发送BYE失败 {}", e.getMessage()); logger.error("[录像流]推送完毕,收到关流通知, 发送BYE失败 {}", e.getMessage());
} }

View File

@ -123,7 +123,7 @@ public class SipUtils {
} }
public static String getNewCallId() { public static String getNewCallId() {
return (int) Math.floor(Math.random() * 10000) + ""; return (int) Math.floor(Math.random() * 1000000000) + "";
} }
public static int getTypeCodeFromGbCode(String deviceId) { public static int getTypeCodeFromGbCode(String deviceId) {

View File

@ -9,9 +9,9 @@ import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.event.EventPublisher; import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager; import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager;
import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager; import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage; import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander; import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
import com.genersoft.iot.vmp.media.zlm.dto.HookType; import com.genersoft.iot.vmp.media.zlm.dto.HookType;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
@ -249,6 +249,7 @@ public class ZLMHttpHookListener {
String channelId = ssrcTransactionForAll.get(0).getChannelId(); String channelId = ssrcTransactionForAll.get(0).getChannelId();
DeviceChannel deviceChannel = storager.queryChannel(deviceId, channelId); DeviceChannel deviceChannel = storager.queryChannel(deviceId, channelId);
if (deviceChannel != null) { if (deviceChannel != null) {
result.setEnable_audio(deviceChannel.isHasAudio()); result.setEnable_audio(deviceChannel.isHasAudio());
} }
// 如果是录像下载就设置视频间隔十秒 // 如果是录像下载就设置视频间隔十秒
@ -257,6 +258,11 @@ public class ZLMHttpHookListener {
result.setEnable_audio(true); result.setEnable_audio(true);
result.setEnable_mp4(true); result.setEnable_mp4(true);
} }
// 如果是talk对讲则默认获取声音
if (ssrcTransactionForAll.get(0).getType() == VideoStreamSessionManager.SessionType.talk) {
result.setEnable_audio(true);
}
} }
return result; return result;
} }
@ -359,62 +365,30 @@ public class ZLMHttpHookListener {
} }
}else if ("talk".equals(param.getApp())){ }else if ("talk".equals(param.getApp())){
// 语音对讲推流 stream需要满足格式deviceId_channelId // 语音对讲推流 stream需要满足格式deviceId_channelId
if (param.isRegist() && param.getStream().indexOf("_") > 0) { if (param.getStream().indexOf("_") > 0) {
String[] streamArray = param.getStream().split("_"); String[] streamArray = param.getStream().split("_");
if (streamArray.length == 2) { if (streamArray.length == 2) {
String deviceId = streamArray[0]; String deviceId = streamArray[0];
String channelId = streamArray[1]; String channelId = streamArray[1];
Device device = deviceService.getDevice(deviceId); Device device = deviceService.getDevice(deviceId);
if (device != null) { if (device != null) {
DeviceChannel deviceChannel = storager.queryChannel(deviceId, channelId); if (param.isRegist()) {
if (deviceChannel != null) { if (audioBroadcastManager.exit(deviceId, channelId)) {
if (audioBroadcastManager.exit(deviceId, channelId)) { playService.stopAudioBroadcast(deviceId, channelId);
// 直接推流 }
SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(null, null, param.getStream(), null); // 开启语音对讲通道
if (sendRtpItem == null) { playService.talkCmd(device, channelId, mediaInfo, param.getStream(), (msg)->{
// TODO 可能数据错误,重新开启语音通道 logger.info("[语音对讲] 通道建立成功, device: {}, channel: {}", deviceId, channelId);
}else { });
MediaServerItem mediaServerItem = mediaServerService.getOne(sendRtpItem.getMediaServerId()); }else {
logger.info("rtp/{}开始向上级推流, 目标={}:{}SSRC={}", sendRtpItem.getStreamId(), sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc()); // 流注销
Map<String, Object> sendParam = new HashMap<>(12); playService.stopTalk(device, channelId, param.isRegist());
sendParam.put("vhost","__defaultVhost__"); }
sendParam.put("app",sendRtpItem.getApp()); } else{
sendParam.put("stream",sendRtpItem.getStreamId()); logger.info("[语音对讲] 未找到设备:{}", deviceId);
sendParam.put("ssrc", sendRtpItem.getSsrc()); }
sendParam.put("src_port", sendRtpItem.getLocalPort()); }
sendParam.put("pt", sendRtpItem.getPt()); }
sendParam.put("use_ps", sendRtpItem.isUsePs() ? "1" : "0");
sendParam.put("only_audio", sendRtpItem.isOnlyAudio() ? "1" : "0");
JSONObject jsonObject;
if (sendRtpItem.isTcpActive()) {
jsonObject = zlmrtpServerFactory.startSendRtpPassive(mediaServerItem, sendParam);
} else {
sendParam.put("is_udp", sendRtpItem.isTcp() ? "0" : "1");
sendParam.put("dst_url", sendRtpItem.getIp());
sendParam.put("dst_port", sendRtpItem.getPort());
jsonObject = zlmrtpServerFactory.startSendRtpStream(mediaServerItem, sendParam);
}
if (jsonObject != null && jsonObject.getInteger("code") == 0) {
logger.info("[语音对讲] 自动推流成功, device: {}, channel: {}", deviceId, channelId);
}
}
}else {
// 开启语音对讲通道
MediaServerItem mediaServerItem = mediaServerService.getOne(param.getMediaServerId());
playService.talk(mediaServerItem, device, channelId, (mediaServer, jsonObject)->{
System.out.println("开始推流");
}, eventResult -> {
System.out.println(eventResult.msg);
}, ()->{
System.out.println("超时");
});
}
}
}
}
}
}else{ }else{
if (!"rtp".equals(param.getApp())){ if (!"rtp".equals(param.getApp())){
@ -474,16 +448,21 @@ public class ZLMHttpHookListener {
ParentPlatform platform = storager.queryParentPlatByServerGBId(platformId); ParentPlatform platform = storager.queryParentPlatByServerGBId(platformId);
Device device = deviceService.getDevice(platformId); Device device = deviceService.getDevice(platformId);
try {
if (platform != null) { if (platform != null) {
commanderFroPlatform.streamByeCmd(platform, sendRtpItem); try {
commanderFroPlatform.streamByeCmd(platform, sendRtpItem);
} catch (SipException | InvalidArgumentException | ParseException e) {
logger.error("[命令发送失败] 国标级联 发送BYE: {}", e.getMessage());
}
} else { } else {
cmder.streamByeCmd(device, sendRtpItem.getChannelId(), param.getStream(), sendRtpItem.getCallId()); try {
cmder.streamByeCmd(device, sendRtpItem.getChannelId(), param.getStream(), sendRtpItem.getCallId());
} catch (SipException | InvalidArgumentException | ParseException |
SsrcTransactionNotFoundException e) {
logger.error("[命令发送失败] 发送BYE: {}", e.getMessage());
}
} }
} catch (SipException | InvalidArgumentException | ParseException |
SsrcTransactionNotFoundException e) {
logger.error("[命令发送失败] 国标级联 发送BYE: {}", e.getMessage());
}
} }
} }
} }
@ -526,7 +505,7 @@ public class ZLMHttpHookListener {
logger.error("[命令发送失败] 国标级联 发送BYE: {}", e.getMessage()); logger.error("[命令发送失败] 国标级联 发送BYE: {}", e.getMessage());
} }
redisCatchStorage.deleteSendRTPServer(parentPlatform.getServerGBId(), sendRtpItem.getChannelId(), redisCatchStorage.deleteSendRTPServer(parentPlatform.getServerGBId(), sendRtpItem.getChannelId(),
sendRtpItem.getCallId(), sendRtpItem.getStreamId()); sendRtpItem.getCallId(), sendRtpItem.getStream());
} }
} }
} }
@ -555,8 +534,7 @@ public class ZLMHttpHookListener {
try { try {
cmder.streamByeCmd(device, streamInfoForPlayBackCatch.getChannelId(), cmder.streamByeCmd(device, streamInfoForPlayBackCatch.getChannelId(),
streamInfoForPlayBackCatch.getStream(), null); streamInfoForPlayBackCatch.getStream(), null);
} catch (InvalidArgumentException | ParseException | SipException | } catch (InvalidArgumentException | ParseException | SipException | SsrcTransactionNotFoundException e) {
SsrcTransactionNotFoundException e) {
logger.error("[无人观看]回放, 发送BYE失败 {}", e.getMessage()); logger.error("[无人观看]回放, 发送BYE失败 {}", e.getMessage());
} }
} }
@ -572,6 +550,13 @@ public class ZLMHttpHookListener {
ret.put("close", false); ret.put("close", false);
return ret; return ret;
} }
SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(null, null, param.getStream(), null);
if ("talk".equals(sendRtpItem.getApp())){
ret.put("close", false);
return ret;
}
}else if ("talk".equals(param.getApp()) || "broadcast".equals(param.getApp())){
ret.put("close", false);
} else { } else {
// 非国标流 推流/拉流代理 // 非国标流 推流/拉流代理
// 拉流代理 // 拉流代理
@ -733,7 +718,7 @@ public class ZLMHttpHookListener {
logger.error("[命令发送失败] 国标级联 发送BYE: {}", e.getMessage()); logger.error("[命令发送失败] 国标级联 发送BYE: {}", e.getMessage());
} }
redisCatchStorage.deleteSendRTPServer(parentPlatform.getServerGBId(), sendRtpItem.getChannelId(), redisCatchStorage.deleteSendRTPServer(parentPlatform.getServerGBId(), sendRtpItem.getChannelId(),
sendRtpItem.getCallId(), sendRtpItem.getStreamId()); sendRtpItem.getCallId(), sendRtpItem.getStream());
} }
} }
}); });

View File

@ -291,6 +291,10 @@ public class ZLMRESTfulUtils {
return sendPost(mediaServerItem, "startSendRtpPassive",param, null); return sendPost(mediaServerItem, "startSendRtpPassive",param, null);
} }
public JSONObject startSendRtpPassive(MediaServerItem mediaServerItem, Map<String, Object> param, RequestCallback callback) {
return sendPost(mediaServerItem, "startSendRtpPassive",param, callback);
}
public JSONObject stopSendRtp(MediaServerItem mediaServerItem, Map<String, Object> param) { public JSONObject stopSendRtp(MediaServerItem mediaServerItem, Map<String, Object> param) {
return sendPost(mediaServerItem, "stopSendRtp",param, null); return sendPost(mediaServerItem, "stopSendRtp",param, null);
} }

View File

@ -229,7 +229,7 @@ public class ZLMRTPServerFactory {
sendRtpItem.setPort(port); sendRtpItem.setPort(port);
sendRtpItem.setSsrc(ssrc); sendRtpItem.setSsrc(ssrc);
sendRtpItem.setApp(app); sendRtpItem.setApp(app);
sendRtpItem.setStreamId(stream); sendRtpItem.setStream(stream);
sendRtpItem.setPlatformId(platformId); sendRtpItem.setPlatformId(platformId);
sendRtpItem.setChannelId(channelId); sendRtpItem.setChannelId(channelId);
sendRtpItem.setTcp(tcp); sendRtpItem.setTcp(tcp);
@ -290,6 +290,10 @@ public class ZLMRTPServerFactory {
return zlmresTfulUtils.startSendRtpPassive(mediaServerItem, param); return zlmresTfulUtils.startSendRtpPassive(mediaServerItem, param);
} }
public JSONObject startSendRtpPassive(MediaServerItem mediaServerItem, Map<String, Object>param, ZLMRESTfulUtils.RequestCallback callback) {
return zlmresTfulUtils.startSendRtpPassive(mediaServerItem, param, callback);
}
/** /**
* *
*/ */
@ -343,7 +347,7 @@ public class ZLMRTPServerFactory {
result= true; result= true;
logger.info("[停止RTP推流] 成功"); logger.info("[停止RTP推流] 成功");
} else { } else {
logger.error("[停止RTP推流] 失败: {}, 参数:{}->\r\n{}",jsonObject.getString("msg"), JSON.toJSON(param), jsonObject); logger.warn("[停止RTP推流] 失败: {}, 参数:{}->\r\n{}",jsonObject.getString("msg"), JSON.toJSON(param), jsonObject);
} }
return result; return result;
} }

View File

@ -12,6 +12,7 @@ import com.genersoft.iot.vmp.service.bean.PlayBackCallback;
import com.genersoft.iot.vmp.service.bean.SSRCInfo; import com.genersoft.iot.vmp.service.bean.SSRCInfo;
import com.genersoft.iot.vmp.vmanager.bean.AudioBroadcastResult; import com.genersoft.iot.vmp.vmanager.bean.AudioBroadcastResult;
import com.genersoft.iot.vmp.vmanager.gb28181.play.bean.AudioBroadcastEvent; import com.genersoft.iot.vmp.vmanager.gb28181.play.bean.AudioBroadcastEvent;
import com.genersoft.iot.vmp.vmanager.gb28181.play.bean.AudioEvent;
import gov.nist.javax.sip.message.SIPResponse; import gov.nist.javax.sip.message.SIPResponse;
import javax.sip.InvalidArgumentException; import javax.sip.InvalidArgumentException;
@ -27,10 +28,6 @@ public interface IPlayService {
void onPublishHandlerForPlay(MediaServerItem mediaServerItem, JSONObject resonse, String deviceId, String channelId); void onPublishHandlerForPlay(MediaServerItem mediaServerItem, JSONObject resonse, String deviceId, String channelId);
void talk(MediaServerItem mediaServerItem, Device device, String channelId,
ZlmHttpHookSubscribe.Event hookEvent, SipSubscribe.Event errorEvent,
Runnable timeoutCallback);
void play(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId, void play(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId,
ZlmHttpHookSubscribe.Event hookEvent, SipSubscribe.Event errorEvent, ZlmHttpHookSubscribe.Event hookEvent, SipSubscribe.Event errorEvent,
InviteTimeOutCallback timeoutCallback); InviteTimeOutCallback timeoutCallback);
@ -57,7 +54,7 @@ public interface IPlayService {
void zlmServerOnline(String mediaServerId); void zlmServerOnline(String mediaServerId);
AudioBroadcastResult audioBroadcast(Device device, String channelId); AudioBroadcastResult audioBroadcast(Device device, String channelId, Boolean broadcastMode);
void stopAudioBroadcast(String deviceId, String channelId); void stopAudioBroadcast(String deviceId, String channelId);
void audioBroadcastCmd(Device device, String channelId, int timeout, MediaServerItem mediaServerItem, String sourceApp, String sourceStream, AudioBroadcastEvent event) throws InvalidArgumentException, ParseException, SipException; void audioBroadcastCmd(Device device, String channelId, int timeout, MediaServerItem mediaServerItem, String sourceApp, String sourceStream, AudioBroadcastEvent event) throws InvalidArgumentException, ParseException, SipException;
@ -70,4 +67,8 @@ public interface IPlayService {
void startSendRtpStreamHand(SendRtpItem sendRtpItem, ParentPlatform parentPlatform, void startSendRtpStreamHand(SendRtpItem sendRtpItem, ParentPlatform parentPlatform,
JSONObject jsonObject, Map<String, Object> param, CallIdHeader callIdHeader); JSONObject jsonObject, Map<String, Object> param, CallIdHeader callIdHeader);
void talkCmd(Device device, String channelId, MediaServerItem mediaServerItem, String stream, AudioEvent event);
void stopTalk(Device device, String channelId, Boolean streamIsReady);
} }

View File

@ -202,7 +202,7 @@ public class DeviceServiceImpl implements IDeviceService {
Map<String, Object> param = new HashMap<>(); Map<String, Object> param = new HashMap<>();
param.put("vhost", "__defaultVhost__"); param.put("vhost", "__defaultVhost__");
param.put("app", sendRtpItem.getApp()); param.put("app", sendRtpItem.getApp());
param.put("stream", sendRtpItem.getStreamId()); param.put("stream", sendRtpItem.getStream());
zlmresTfulUtils.stopSendRtp(mediaInfo, param); zlmresTfulUtils.stopSendRtp(mediaInfo, param);
} }

View File

@ -253,7 +253,7 @@ public class PlatformServiceImpl implements IPlatformService {
Map<String, Object> param = new HashMap<>(3); Map<String, Object> param = new HashMap<>(3);
param.put("vhost", "__defaultVhost__"); param.put("vhost", "__defaultVhost__");
param.put("app", sendRtpItem.getApp()); param.put("app", sendRtpItem.getApp());
param.put("stream", sendRtpItem.getStreamId()); param.put("stream", sendRtpItem.getStream());
zlmrtpServerFactory.stopSendRtpStream(mediaInfo, param); zlmrtpServerFactory.stopSendRtpStream(mediaInfo, param);
} }
} }

View File

@ -41,7 +41,7 @@ import com.genersoft.iot.vmp.vmanager.bean.AudioBroadcastResult;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
import com.genersoft.iot.vmp.vmanager.bean.StreamContent; import com.genersoft.iot.vmp.vmanager.bean.StreamContent;
import com.genersoft.iot.vmp.vmanager.bean.WVPResult; import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
import com.genersoft.iot.vmp.vmanager.gb28181.play.bean.AudioBroadcastEvent; import com.genersoft.iot.vmp.vmanager.gb28181.play.bean.AudioEvent;
import gov.nist.javax.sip.message.SIPResponse; import gov.nist.javax.sip.message.SIPResponse;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -134,8 +134,8 @@ public class PlayServiceImpl implements IPlayService {
@Override @Override
public void play(MediaServerItem mediaServerItem, String deviceId, String channelId, public void play(MediaServerItem mediaServerItem, String deviceId, String channelId,
ZlmHttpHookSubscribe.Event hookEvent, SipSubscribe.Event errorEvent, ZlmHttpHookSubscribe.Event hookEvent, SipSubscribe.Event errorEvent,
Runnable timeoutCallback) { Runnable timeoutCallback) {
if (mediaServerItem == null) { if (mediaServerItem == null) {
throw new ControllerException(ErrorCode.ERROR100.getCode(), "未找到可用的zlm"); throw new ControllerException(ErrorCode.ERROR100.getCode(), "未找到可用的zlm");
} }
@ -243,194 +243,148 @@ public class PlayServiceImpl implements IPlayService {
} }
} }
@Override private void talk(MediaServerItem mediaServerItem, Device device, String channelId, String stream,
public void talk(MediaServerItem mediaServerItem, Device device, String channelId, ZlmHttpHookSubscribe.Event hookEvent, SipSubscribe.Event errorEvent,
ZlmHttpHookSubscribe.Event hookEvent, SipSubscribe.Event errorEvent, Runnable timeoutCallback, AudioEvent audioEvent) {
Runnable timeoutCallback) {
String streamId = null; String playSsrc = mediaServerItem.getSsrcConfig().getPlaySsrc();
if (mediaServerItem.isRtpEnable()) { if (playSsrc == null) {
streamId = String.format("%s_%s", device.getDeviceId(), channelId); audioEvent.call("ssrc已经用尽");
return;
} }
SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId, device.isSsrcCheck(), false); SendRtpItem sendRtpItem = new SendRtpItem();
logger.info("[对讲开始] deviceId: {}, channelId: {},收流端口: {}, 收流模式:{}, SSRC: {}, SSRC校验{}", device.getDeviceId(), channelId, ssrcInfo.getPort(), device.getStreamMode(), ssrcInfo.getSsrc(), device.isSsrcCheck()); sendRtpItem.setApp("talk");
sendRtpItem.setStream(stream);
sendRtpItem.setSsrc(playSsrc);
sendRtpItem.setDeviceId(device.getDeviceId());
sendRtpItem.setPlatformId(device.getDeviceId());
sendRtpItem.setChannelId(channelId);
sendRtpItem.setRtcp(false);
sendRtpItem.setMediaServerId(mediaServerItem.getId());
sendRtpItem.setOnlyAudio(true);
sendRtpItem.setPlayType(InviteStreamType.TALK);
sendRtpItem.setPt(8);
sendRtpItem.setStatus(1);
sendRtpItem.setTcpActive(false);
sendRtpItem.setTcp(true);
sendRtpItem.setUsePs(false);
sendRtpItem.setReceiveStream(stream + "_talk");
int port = zlmrtpServerFactory.keepPort(mediaServerItem, playSsrc);
//端口获取失败的ssrcInfo 没有必要发送点播指令
if (port <= 0) {
logger.info("[语音对讲] 端口分配异常deviceId={},channelId={}", device.getDeviceId(), channelId);
audioEvent.call("端口分配异常");
return;
}
sendRtpItem.setLocalPort(port);
sendRtpItem.setPort(port);
logger.info("[语音对讲]开始 deviceId: {}, channelId: {},收流端口: {}, 收流模式:{}, SSRC: {}, SSRC校验{}", device.getDeviceId(), channelId, sendRtpItem.getLocalPort(), device.getStreamMode(), sendRtpItem.getSsrc(), false);
// 超时处理 // 超时处理
String timeOutTaskKey = UUID.randomUUID().toString(); String timeOutTaskKey = UUID.randomUUID().toString();
SSRCInfo finalSsrcInfo = ssrcInfo;
System.out.println("设置超时任务: " + timeOutTaskKey);
dynamicTask.startDelay(timeOutTaskKey, () -> { dynamicTask.startDelay(timeOutTaskKey, () -> {
logger.info("[对讲超时] 收流超时 deviceId: {}, channelId: {},端口:{}, SSRC: {}", device.getDeviceId(), channelId, finalSsrcInfo.getPort(), finalSsrcInfo.getSsrc()); logger.info("[语音对讲] 收流超时 deviceId: {}, channelId: {},端口:{}, SSRC: {}", device.getDeviceId(), channelId, sendRtpItem.getPort(), sendRtpItem.getSsrc());
timeoutCallback.run(); timeoutCallback.run();
// 点播超时回复BYE 同时释放ssrc以及此次点播的资源 // 点播超时回复BYE 同时释放ssrc以及此次点播的资源
try { try {
cmder.streamByeCmd(device, channelId, finalSsrcInfo.getStream(), null); cmder.streamByeCmd(device, channelId, sendRtpItem.getStream(), null);
} catch (InvalidArgumentException | ParseException | SipException e) { } catch (InvalidArgumentException | ParseException | SipException | SsrcTransactionNotFoundException e) {
logger.error("[对讲超时] 发送BYE失败 {}", e.getMessage()); logger.error("[语音对讲]超时, 发送BYE失败 {}", e.getMessage());
} catch (SsrcTransactionNotFoundException e) { } finally {
timeoutCallback.run(); timeoutCallback.run();
mediaServerService.releaseSsrc(mediaServerItem.getId(), finalSsrcInfo.getSsrc()); mediaServerService.releaseSsrc(mediaServerItem.getId(), sendRtpItem.getSsrc());
mediaServerService.closeRTPServer(mediaServerItem, finalSsrcInfo.getStream()); streamSession.remove(device.getDeviceId(), channelId, sendRtpItem.getStream());
streamSession.remove(device.getDeviceId(), channelId, finalSsrcInfo.getStream());
} }
}, userSetting.getPlayTimeout()); }, userSetting.getPlayTimeout());
final String ssrc = ssrcInfo.getSsrc();
final String stream = ssrcInfo.getStream();
//端口获取失败的ssrcInfo 没有必要发送点播指令
if (ssrcInfo.getPort() <= 0) {
logger.info("[对讲] 端口分配异常deviceId={},channelId={},ssrcInfo={}", device.getDeviceId(), channelId, ssrcInfo);
return;
}
String callId = SipUtils.getNewCallId(); String callId = SipUtils.getNewCallId();
boolean pushing = false;
zlmrtpServerFactory.releasePort(mediaServerItem, playSsrc);
Map<String, Object> param = new HashMap<>(12);
param.put("vhost","__defaultVhost__");
param.put("app", sendRtpItem.getApp());
param.put("stream", sendRtpItem.getStream());
param.put("ssrc", sendRtpItem.getSsrc());
param.put("src_port", sendRtpItem.getLocalPort());
param.put("pt", sendRtpItem.getPt());
param.put("use_ps", sendRtpItem.isUsePs() ? "1" : "0");
param.put("only_audio", sendRtpItem.isOnlyAudio() ? "1" : "0");
param.put("is_udp", sendRtpItem.isTcp() ? "0" : "1");
param.put("recv_stream_id", sendRtpItem.getReceiveStream());
param.put("close_delay_ms", userSetting.getPlayTimeout() * 1000);
zlmrtpServerFactory.startSendRtpPassive(mediaServerItem, param, jsonObject -> {
if (jsonObject == null || jsonObject.getInteger("code") != 0 ) {
mediaServerService.releaseSsrc(mediaServerItem.getId(), sendRtpItem.getSsrc());
logger.info("[语音对讲]失败 deviceId: {}, channelId: {}", device.getDeviceId(), channelId);
audioEvent.call("失败, " + jsonObject.getString("msg"));
// 查看是否已经建立了通道存在则发送bye
stopTalk(device, channelId);
}
});
// 查看设备是否已经在推流 // 查看设备是否已经在推流
// MediaItem mediaItem = zlmrtpServerFactory.getMediaInfo(mediaServerItem, "rtp",ssrcInfo.getStream()); try {
// if (mediaItem != null) { cmder.talkStreamCmd(mediaServerItem, sendRtpItem, device, channelId, callId, (MediaServerItem mediaServerItemInuse, JSONObject response) -> {
// SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, logger.info("[语音对讲] 流已生成, 开始推流: " + response.toJSONString());
// mediaItem.getOriginSock().getPeer_ip(), mediaItem.getOriginSock().getPeer_port(), ssrcInfo.getSsrc(), device.getDeviceId(), dynamicTask.stop(timeOutTaskKey);
// device.getDeviceId(), channelId, // TODO 暂不做处理
// false); }, (MediaServerItem mediaServerItemInuse, JSONObject json) -> {
// logger.info("[语音对讲] 设备开始推流: " + json.toJSONString());
// sendRtpItem.setTcpActive(false); dynamicTask.stop(timeOutTaskKey);
// sendRtpItem.setCallId(callId);
// sendRtpItem.setPlayType(InviteStreamType.TALK);
// sendRtpItem.setStatus(1);
// sendRtpItem.setIp(mediaItem.getOriginSock().getPeer_ip());
// sendRtpItem.setPort(mediaItem.getOriginSock().getPeer_port());
// sendRtpItem.setTcpActive(false);
// sendRtpItem.setStreamId(ssrcInfo.getStream());
// sendRtpItem.setApp("1000");
// sendRtpItem.setStreamId("1000");
// sendRtpItem.setSsrc(ssrc);
// sendRtpItem.setOnlyAudio(true);
// redisCatchStorage.updateSendRTPSever(sendRtpItem);
//
// Map<String, Object> param = new HashMap<>(12);
// param.put("vhost","__defaultVhost__");
// param.put("app",sendRtpItem.getApp());
// param.put("stream",sendRtpItem.getStreamId());
// param.put("ssrc", sendRtpItem.getSsrc());
// param.put("dst_url", sendRtpItem.getIp());
// param.put("dst_port", sendRtpItem.getPort());
// param.put("src_port", sendRtpItem.getLocalPort());
// param.put("pt", sendRtpItem.getPt());
// param.put("use_ps", sendRtpItem.isUsePs() ? "1" : "0");
// param.put("is_udp", sendRtpItem.isTcp() ? "0" : "1");
// param.put("only_audio", sendRtpItem.isOnlyAudio() ? "1" : "0");
// JSONObject jsonObject = zlmrtpServerFactory.startSendRtpStream(mediaServerItem, param);
// System.out.println(2222);
// System.out.println(jsonObject);
// }else {
try {
cmder.talkStreamCmd(mediaServerItem, ssrcInfo, device, channelId, callId, (MediaServerItem mediaServerItemInuse, JSONObject response) -> {
logger.info("[对讲] 流已生成, 开始推流: " + response.toJSONString());
dynamicTask.stop(timeOutTaskKey);
// TODO 暂不做处理
}, (MediaServerItem mediaServerItemInuse, JSONObject json) -> {
logger.info("[对讲] 设备开始推流: " + json.toJSONString());
dynamicTask.stop(timeOutTaskKey);
// 获取远程IP端口 作为回复语音流的地址
String ip = json.getString("ip");
Integer port = json.getInteger("port");
logger.info("[设备开始推流]{}/{}, 来自ip{}, 端口:{}", device.getDeviceId(), channelId, ip, port);
// 查看平台推流是否就绪
// Boolean ready = zlmrtpServerFactory.isStreamReady(mediaServerItemInuse, "talk", stream);
// if (!ready) {
// try {
// cmder.streamByeCmd(device, channelId, finalSsrcInfo.getStream(), null);
// } catch (InvalidArgumentException | ParseException | SipException e) {
// logger.error("[对讲超时] 发送BYE失败 {}", e.getMessage());
// } catch (SsrcTransactionNotFoundException e) {
// timeoutCallback.run();
// mediaServerService.releaseSsrc(mediaServerItem.getId(), finalSsrcInfo.getSsrc());
// mediaServerService.closeRTPServer(mediaServerItem, finalSsrcInfo.getStream());
// streamSession.remove(device.getDeviceId(), channelId, finalSsrcInfo.getStream());
// }
// }else {
// try {
// Thread.sleep(1000);
// } catch (InterruptedException e) {
// throw new RuntimeException(e);
// }
SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, ip, port, ssrcInfo.getSsrc(), device.getDeviceId(),
device.getDeviceId(), channelId,
false, false);
}, (event) -> {
dynamicTask.stop(timeOutTaskKey);
// if (sendRtpItem.getLocalPort() == 0) { if (event.event instanceof ResponseEvent) {
// logger.warn("服务器端口资源不足"); ResponseEvent responseEvent = (ResponseEvent) event.event;
// try { if (responseEvent.getResponse() instanceof SIPResponse) {
// cmder.streamByeCmd(device, channelId, finalSsrcInfo.getStream(), null); SIPResponse response = (SIPResponse) responseEvent.getResponse();
// } catch (InvalidArgumentException | ParseException | SipException e) { sendRtpItem.setFromTag(response.getFromTag());
// logger.error("[对讲超时] 发送BYE失败 {}", e.getMessage()); sendRtpItem.setToTag(response.getToTag());
// } catch (SsrcTransactionNotFoundException e) { sendRtpItem.setCallId(response.getCallIdHeader().getCallId());
// timeoutCallback.run();
// mediaServerService.releaseSsrc(mediaServerItem.getId(), finalSsrcInfo.getSsrc());
// mediaServerService.closeRTPServer(mediaServerItem, finalSsrcInfo.getStream());
// streamSession.remove(device.getDeviceId(), channelId, finalSsrcInfo.getStream());
// }
// return;
// }
sendRtpItem.setTcpActive(false);
sendRtpItem.setCallId(callId);
sendRtpItem.setPlayType(InviteStreamType.TALK);
sendRtpItem.setStatus(1);
sendRtpItem.setIp(ip);
sendRtpItem.setPort(port);
sendRtpItem.setTcpActive(false);
sendRtpItem.setApp("1000");
sendRtpItem.setStreamId("1000");
sendRtpItem.setSsrc(ssrc);
sendRtpItem.setOnlyAudio(true);
sendRtpItem.setRtcp(false);
redisCatchStorage.updateSendRTPSever(sendRtpItem); redisCatchStorage.updateSendRTPSever(sendRtpItem);
Map<String, Object> param = new HashMap<>(12); streamSession.put(device.getDeviceId(), channelId, "talk",
param.put("vhost","__defaultVhost__"); sendRtpItem.getStream(), sendRtpItem.getSsrc(), sendRtpItem.getMediaServerId(),
param.put("app",sendRtpItem.getApp()); response, VideoStreamSessionManager.SessionType.talk);
param.put("stream",sendRtpItem.getStreamId()); } else {
param.put("ssrc", sendRtpItem.getSsrc()); logger.error("[语音对讲]收到的消息错误response不是SIPResponse");
param.put("dst_url", sendRtpItem.getIp()); }
param.put("dst_port", sendRtpItem.getPort()); } else {
param.put("src_port", sendRtpItem.getLocalPort()); logger.error("[语音对讲]收到的消息错误event不是ResponseEvent");
param.put("pt", sendRtpItem.getPt()); }
param.put("use_ps", sendRtpItem.isUsePs() ? "1" : "0");
param.put("is_udp", sendRtpItem.isTcp() ? "0" : "1");
param.put("only_audio", sendRtpItem.isOnlyAudio() ? "1" : "0");
JSONObject jsonObject = zlmrtpServerFactory.startSendRtpStream(mediaServerItemInuse, param);
System.out.println(11111);
System.out.println(sendRtpItem.getIp() + ":" + sendRtpItem.getPort());
// System.out.println(jsonObject);
// }
}, (event) -> { }, (event) -> {
}, (event) -> {
dynamicTask.stop(timeOutTaskKey);
mediaServerService.closeRTPServer(mediaServerItem, finalSsrcInfo.getStream());
// 释放ssrc
mediaServerService.releaseSsrc(mediaServerItem.getId(), finalSsrcInfo.getSsrc());
streamSession.remove(device.getDeviceId(), channelId, finalSsrcInfo.getStream());
errorEvent.response(event);
});
} catch (InvalidArgumentException | SipException | ParseException e) {
logger.error("[命令发送失败] 对讲消息: {}", e.getMessage());
dynamicTask.stop(timeOutTaskKey); dynamicTask.stop(timeOutTaskKey);
mediaServerService.closeRTPServer(mediaServerItem, finalSsrcInfo.getStream()); mediaServerService.closeRTPServer(mediaServerItem, sendRtpItem.getStream());
// 释放ssrc // 释放ssrc
mediaServerService.releaseSsrc(mediaServerItem.getId(), finalSsrcInfo.getSsrc()); mediaServerService.releaseSsrc(mediaServerItem.getId(), sendRtpItem.getSsrc());
streamSession.remove(device.getDeviceId(), channelId, sendRtpItem.getStream());
errorEvent.response(event);
});
} catch (InvalidArgumentException | SipException | ParseException e) {
streamSession.remove(device.getDeviceId(), channelId, finalSsrcInfo.getStream()); logger.error("[命令发送失败] 对讲消息: {}", e.getMessage());
SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult(new CmdSendFailEvent(null)); dynamicTask.stop(timeOutTaskKey);
eventResult.msg = "命令发送失败"; mediaServerService.closeRTPServer(mediaServerItem, sendRtpItem.getStream());
errorEvent.response(eventResult); // 释放ssrc
} mediaServerService.releaseSsrc(mediaServerItem.getId(), sendRtpItem.getSsrc());
streamSession.remove(device.getDeviceId(), channelId, sendRtpItem.getStream());
SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult(new CmdSendFailEvent(null));
eventResult.msg = "命令发送失败";
errorEvent.response(eventResult);
}
// } // }
} }
@Override @Override
public void play(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId, public void play(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId,
ZlmHttpHookSubscribe.Event hookEvent, SipSubscribe.Event errorEvent, ZlmHttpHookSubscribe.Event hookEvent, SipSubscribe.Event errorEvent,
@ -446,7 +400,8 @@ public class PlayServiceImpl implements IPlayService {
// 点播超时回复BYE 同时释放ssrc以及此次点播的资源 // 点播超时回复BYE 同时释放ssrc以及此次点播的资源
try { try {
cmder.streamByeCmd(device, channelId, ssrcInfo.getStream(), null); cmder.streamByeCmd(device, channelId, ssrcInfo.getStream(), null);
} catch (InvalidArgumentException | ParseException | SipException | SsrcTransactionNotFoundException e) { } catch (InvalidArgumentException | ParseException | SipException |
SsrcTransactionNotFoundException e) {
logger.error("[点播超时] 发送BYE失败 {}", e.getMessage()); logger.error("[点播超时] 发送BYE失败 {}", e.getMessage());
} finally { } finally {
timeoutCallback.run(1, "收流超时"); timeoutCallback.run(1, "收流超时");
@ -483,7 +438,7 @@ public class PlayServiceImpl implements IPlayService {
onPublishHandlerForPlay(mediaServerItemInuse, response, device.getDeviceId(), channelId); onPublishHandlerForPlay(mediaServerItemInuse, response, device.getDeviceId(), channelId);
hookEvent.response(mediaServerItemInuse, response); hookEvent.response(mediaServerItemInuse, response);
logger.info("[点播成功] deviceId: {}, channelId: {}", device.getDeviceId(), channelId); logger.info("[点播成功] deviceId: {}, channelId: {}", device.getDeviceId(), channelId);
String streamUrl = String.format("http://127.0.0.1:%s/%s/%s.live.flv", mediaServerItemInuse.getHttpPort(), "rtp", ssrcInfo.getStream()); String streamUrl = String.format("http://127.0.0.1:%s/%s/%s.live.flv", mediaServerItemInuse.getHttpPort(), "rtp", ssrcInfo.getStream());
String path = "snap"; String path = "snap";
String fileName = device.getDeviceId() + "_" + channelId + ".jpg"; String fileName = device.getDeviceId() + "_" + channelId + ".jpg";
// 请求截图 // 请求截图
@ -652,8 +607,8 @@ public class PlayServiceImpl implements IPlayService {
@Override @Override
public void playBack(String deviceId, String channelId, String startTime, public void playBack(String deviceId, String channelId, String startTime,
String endTime, InviteStreamCallback inviteStreamCallback, String endTime, InviteStreamCallback inviteStreamCallback,
PlayBackCallback callback) { PlayBackCallback callback) {
Device device = storager.queryVideoDevice(deviceId); Device device = storager.queryVideoDevice(deviceId);
if (device == null) { if (device == null) {
return; return;
@ -666,9 +621,9 @@ public class PlayServiceImpl implements IPlayService {
@Override @Override
public void playBack(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, public void playBack(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo,
String deviceId, String channelId, String startTime, String deviceId, String channelId, String startTime,
String endTime, InviteStreamCallback infoCallBack, String endTime, InviteStreamCallback infoCallBack,
PlayBackCallback playBackCallback) { PlayBackCallback playBackCallback) {
if (mediaServerItem == null || ssrcInfo == null) { if (mediaServerItem == null || ssrcInfo == null) {
return; return;
} }
@ -792,7 +747,6 @@ public class PlayServiceImpl implements IPlayService {
} }
@Override @Override
public void download(String deviceId, String channelId, String startTime, String endTime, int downloadSpeed, InviteStreamCallback infoCallBack, PlayBackCallback playBackCallback) { public void download(String deviceId, String channelId, String startTime, String endTime, int downloadSpeed, InviteStreamCallback infoCallBack, PlayBackCallback playBackCallback) {
Device device = storager.queryVideoDevice(deviceId); Device device = storager.queryVideoDevice(deviceId);
@ -977,7 +931,7 @@ public class PlayServiceImpl implements IPlayService {
cmder.streamByeCmd(device, ssrcTransaction.getChannelId(), cmder.streamByeCmd(device, ssrcTransaction.getChannelId(),
ssrcTransaction.getStream(), null); ssrcTransaction.getStream(), null);
} catch (InvalidArgumentException | ParseException | SipException | } catch (InvalidArgumentException | ParseException | SipException |
SsrcTransactionNotFoundException e) { SsrcTransactionNotFoundException e) {
logger.error("[zlm离线]为正在使用此zlm的设备 发送BYE失败 {}", e.getMessage()); logger.error("[zlm离线]为正在使用此zlm的设备 发送BYE失败 {}", e.getMessage());
} }
} }
@ -986,7 +940,8 @@ public class PlayServiceImpl implements IPlayService {
} }
@Override @Override
public AudioBroadcastResult audioBroadcast(Device device, String channelId) { public AudioBroadcastResult audioBroadcast(Device device, String channelId, Boolean broadcastMode) {
// TODO 必须多端口模式才支持语音喊话鹤语音对讲
if (device == null || channelId == null) { if (device == null || channelId == null) {
return null; return null;
} }
@ -997,15 +952,15 @@ public class PlayServiceImpl implements IPlayService {
return null; return null;
} }
MediaServerItem mediaServerItem = mediaServerService.getMediaServerForMinimumLoad(null); MediaServerItem mediaServerItem = mediaServerService.getMediaServerForMinimumLoad(null);
String app = "broadcast"; if (broadcastMode == null) {
// TODO 从sip user agent中判断是什么品牌设备大华默认使用talk模式其他使用broadcast模式 broadcastMode = true;
// String app = "talk"; }
String app = broadcastMode?"broadcast":"talk";
String stream = device.getDeviceId() + "_" + channelId; String stream = device.getDeviceId() + "_" + channelId;
StreamInfo broadcast = mediaService.getStreamInfoByAppAndStream(mediaServerItem, "broadcast", stream, null, null, null, false);
AudioBroadcastResult audioBroadcastResult = new AudioBroadcastResult(); AudioBroadcastResult audioBroadcastResult = new AudioBroadcastResult();
audioBroadcastResult.setApp(app); audioBroadcastResult.setApp(app);
audioBroadcastResult.setStream(stream); audioBroadcastResult.setStream(stream);
audioBroadcastResult.setStreamInfo(new StreamContent(mediaService.getStreamInfoByAppAndStream(mediaServerItem, app, stream, null, null, null,false))); audioBroadcastResult.setStreamInfo(new StreamContent(mediaService.getStreamInfoByAppAndStream(mediaServerItem, app, stream, null, null, null, false)));
audioBroadcastResult.setCodec("G.711"); audioBroadcastResult.setCodec("G.711");
return audioBroadcastResult; return audioBroadcastResult;
} }
@ -1037,6 +992,18 @@ public class PlayServiceImpl implements IPlayService {
} }
} }
} }
SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(device.getDeviceId(), channelId, null, null);
if (sendRtpItem != null) {
MediaServerItem mediaServer = mediaServerService.getOne(sendRtpItem.getMediaServerId());
Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServer, "rtp", sendRtpItem.getReceiveStream());
if (streamReady) {
logger.warn("[语音对讲] 进行中: {}", channelId);
event.call("语音对讲进行中");
return;
} else {
stopTalk(device, channelId);
}
}
// 发送通知 // 发送通知
cmder.audioBroadcastCmd(device, channelId, eventResultForOk -> { cmder.audioBroadcastCmd(device, channelId, eventResultForOk -> {
@ -1053,19 +1020,18 @@ public class PlayServiceImpl implements IPlayService {
} }
@Override @Override
public void stopAudioBroadcast(String deviceId, String channelId) { public void stopAudioBroadcast(String deviceId, String channelId) {
List<AudioBroadcastCatch> audioBroadcastCatchList = new ArrayList<>(); List<AudioBroadcastCatch> audioBroadcastCatchList = new ArrayList<>();
if (channelId == null) { if (channelId == null) {
audioBroadcastCatchList.addAll(audioBroadcastManager.get(deviceId)); audioBroadcastCatchList.addAll(audioBroadcastManager.get(deviceId));
}else { } else {
audioBroadcastCatchList.add(audioBroadcastManager.get(deviceId, channelId)); audioBroadcastCatchList.add(audioBroadcastManager.get(deviceId, channelId));
} }
if (audioBroadcastCatchList.size() > 0) { if (audioBroadcastCatchList.size() > 0) {
for (AudioBroadcastCatch audioBroadcastCatch : audioBroadcastCatchList) { for (AudioBroadcastCatch audioBroadcastCatch : audioBroadcastCatchList) {
Device device = deviceService.getDevice(deviceId); Device device = deviceService.getDevice(deviceId);
if (device == null || audioBroadcastCatch == null ) { if (device == null || audioBroadcastCatch == null) {
return; return;
} }
SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(deviceId, audioBroadcastCatch.getChannelId(), null, null); SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(deviceId, audioBroadcastCatch.getChannelId(), null, null);
@ -1075,7 +1041,7 @@ public class PlayServiceImpl implements IPlayService {
Map<String, Object> param = new HashMap<>(); Map<String, Object> param = new HashMap<>();
param.put("vhost", "__defaultVhost__"); param.put("vhost", "__defaultVhost__");
param.put("app", sendRtpItem.getApp()); param.put("app", sendRtpItem.getApp());
param.put("stream", sendRtpItem.getStreamId()); param.put("stream", sendRtpItem.getStream());
zlmresTfulUtils.stopSendRtp(mediaInfo, param); zlmresTfulUtils.stopSendRtp(mediaInfo, param);
try { try {
cmder.streamByeCmd(device, sendRtpItem.getChannelId(), audioBroadcastCatch.getSipTransactionInfo(), null); cmder.streamByeCmd(device, sendRtpItem.getChannelId(), audioBroadcastCatch.getSipTransactionInfo(), null);
@ -1199,12 +1165,12 @@ public class PlayServiceImpl implements IPlayService {
String is_Udp = sendRtpItem.isTcp() ? "0" : "1"; String is_Udp = sendRtpItem.isTcp() ? "0" : "1";
MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
logger.info("收到ACKrtp/{}开始向上级推流, 目标={}:{}SSRC={}, RTCP={}", sendRtpItem.getStreamId(), logger.info("收到ACKrtp/{}开始向上级推流, 目标={}:{}SSRC={}, RTCP={}", sendRtpItem.getStream(),
sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc(), sendRtpItem.isRtcp()); sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc(), sendRtpItem.isRtcp());
Map<String, Object> param = new HashMap<>(12); Map<String, Object> param = new HashMap<>(12);
param.put("vhost","__defaultVhost__"); param.put("vhost", "__defaultVhost__");
param.put("app",sendRtpItem.getApp()); param.put("app", sendRtpItem.getApp());
param.put("stream",sendRtpItem.getStreamId()); param.put("stream", sendRtpItem.getStream());
param.put("ssrc", sendRtpItem.getSsrc()); param.put("ssrc", sendRtpItem.getSsrc());
param.put("src_port", sendRtpItem.getLocalPort()); param.put("src_port", sendRtpItem.getLocalPort());
param.put("pt", sendRtpItem.getPt()); param.put("pt", sendRtpItem.getPt());
@ -1213,12 +1179,12 @@ public class PlayServiceImpl implements IPlayService {
param.put("is_udp", is_Udp); param.put("is_udp", is_Udp);
if (!sendRtpItem.isTcp()) { if (!sendRtpItem.isTcp()) {
// udp模式下开启rtcp保活 // udp模式下开启rtcp保活
param.put("udp_rtcp_timeout", sendRtpItem.isRtcp()? "1":"0"); param.put("udp_rtcp_timeout", sendRtpItem.isRtcp() ? "1" : "0");
} }
if (mediaInfo == null) { if (mediaInfo == null) {
RequestPushStreamMsg requestPushStreamMsg = RequestPushStreamMsg.getInstance( RequestPushStreamMsg requestPushStreamMsg = RequestPushStreamMsg.getInstance(
sendRtpItem.getMediaServerId(), sendRtpItem.getApp(), sendRtpItem.getStreamId(), sendRtpItem.getMediaServerId(), sendRtpItem.getApp(), sendRtpItem.getStream(),
sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc(), sendRtpItem.isTcp(), sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc(), sendRtpItem.isTcp(),
sendRtpItem.getLocalPort(), sendRtpItem.getPt(), sendRtpItem.isUsePs(), sendRtpItem.isOnlyAudio()); sendRtpItem.getLocalPort(), sendRtpItem.getPt(), sendRtpItem.isUsePs(), sendRtpItem.isOnlyAudio());
redisGbPlayMsgListener.sendMsgForStartSendRtpStream(sendRtpItem.getServerId(), requestPushStreamMsg, json -> { redisGbPlayMsgListener.sendMsgForStartSendRtpStream(sendRtpItem.getServerId(), requestPushStreamMsg, json -> {
@ -1233,16 +1199,16 @@ public class PlayServiceImpl implements IPlayService {
if (zlmrtpServerFactory.releasePort(mediaInfo, sendRtpItem.getSsrc())) { if (zlmrtpServerFactory.releasePort(mediaInfo, sendRtpItem.getSsrc())) {
if (sendRtpItem.isTcpActive()) { if (sendRtpItem.isTcpActive()) {
startSendRtpStreamResult = zlmrtpServerFactory.startSendRtpPassive(mediaInfo, param); startSendRtpStreamResult = zlmrtpServerFactory.startSendRtpPassive(mediaInfo, param);
}else { } else {
param.put("dst_url", sendRtpItem.getIp()); param.put("dst_url", sendRtpItem.getIp());
param.put("dst_port", sendRtpItem.getPort()); param.put("dst_port", sendRtpItem.getPort());
startSendRtpStreamResult = zlmrtpServerFactory.startSendRtpStream(mediaInfo, param); startSendRtpStreamResult = zlmrtpServerFactory.startSendRtpStream(mediaInfo, param);
} }
} }
}else { } else {
if (sendRtpItem.isTcpActive()) { if (sendRtpItem.isTcpActive()) {
startSendRtpStreamResult = zlmrtpServerFactory.startSendRtpPassive(mediaInfo, param); startSendRtpStreamResult = zlmrtpServerFactory.startSendRtpPassive(mediaInfo, param);
}else { } else {
param.put("dst_url", sendRtpItem.getIp()); param.put("dst_url", sendRtpItem.getIp());
param.put("dst_port", sendRtpItem.getPort()); param.put("dst_port", sendRtpItem.getPort());
startSendRtpStreamResult = zlmrtpServerFactory.startSendRtpStream(mediaInfo, param); startSendRtpStreamResult = zlmrtpServerFactory.startSendRtpStream(mediaInfo, param);
@ -1260,10 +1226,10 @@ public class PlayServiceImpl implements IPlayService {
if (jsonObject == null) { if (jsonObject == null) {
logger.error("RTP推流失败: 请检查ZLM服务"); logger.error("RTP推流失败: 请检查ZLM服务");
} else if (jsonObject.getInteger("code") == 0) { } else if (jsonObject.getInteger("code") == 0) {
logger.info("调用ZLM推流接口, 结果: {}", jsonObject); logger.info("调用ZLM推流接口, 结果: {}", jsonObject);
logger.info("RTP推流成功[ {}/{} ]{}->{}:{}, " ,param.get("app"), param.get("stream"), jsonObject.getString("local_port"), param.get("dst_url"), param.get("dst_port")); logger.info("RTP推流成功[ {}/{} ]{}->{}:{}, ", param.get("app"), param.get("stream"), jsonObject.getString("local_port"), param.get("dst_url"), param.get("dst_port"));
} else { } else {
logger.error("RTP推流失败: {}, 参数:{}",jsonObject.getString("msg"), JSON.toJSONString(param)); logger.error("RTP推流失败: {}, 参数:{}", jsonObject.getString("msg"), JSON.toJSONString(param));
if (sendRtpItem.isOnlyAudio()) { if (sendRtpItem.isOnlyAudio()) {
Device device = deviceService.getDevice(sendRtpItem.getDeviceId()); Device device = deviceService.getDevice(sendRtpItem.getDeviceId());
AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId()); AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
@ -1275,7 +1241,7 @@ public class PlayServiceImpl implements IPlayService {
logger.error("[命令发送失败] 停止语音对讲: {}", e.getMessage()); logger.error("[命令发送失败] 停止语音对讲: {}", e.getMessage());
} }
} }
}else { } else {
// 向上级平台 // 向上级平台
try { try {
commanderForPlatform.streamByeCmd(parentPlatform, callIdHeader.getCallId()); commanderForPlatform.streamByeCmd(parentPlatform, callIdHeader.getCallId());
@ -1285,4 +1251,105 @@ public class PlayServiceImpl implements IPlayService {
} }
} }
} }
@Override
public void talkCmd(Device device, String channelId, MediaServerItem mediaServerItem, String stream, AudioEvent event) {
if (device == null || channelId == null) {
return;
}
// TODO 必须多端口模式才支持语音喊话鹤语音对讲
logger.info("[语音对讲] device {}, channel: {}", device.getDeviceId(), channelId);
DeviceChannel deviceChannel = storager.queryChannel(device.getDeviceId(), channelId);
if (deviceChannel == null) {
logger.warn("开启语音对讲的时候未找到通道: {}", channelId);
event.call("开启语音对讲的时候未找到通道");
return;
}
// 查询通道使用状态
if (audioBroadcastManager.exit(device.getDeviceId(), channelId)) {
SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(device.getDeviceId(), channelId, null, null);
if (sendRtpItem != null && sendRtpItem.isOnlyAudio()) {
// 查询流是否存在,不存在则认为是异常状态
MediaServerItem mediaServer = mediaServerService.getOne(sendRtpItem.getMediaServerId());
Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServer, sendRtpItem.getApp(), sendRtpItem.getStream());
if (streamReady) {
logger.warn("[语音对讲] 正在语音广播,无法开启语音通话: {}", channelId);
event.call("正在语音广播");
return;
} else {
stopAudioBroadcast(device.getDeviceId(), channelId);
}
}
}
SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(device.getDeviceId(), channelId, stream, null);
if (sendRtpItem != null) {
MediaServerItem mediaServer = mediaServerService.getOne(sendRtpItem.getMediaServerId());
Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServer, "rtp", sendRtpItem.getReceiveStream());
if (streamReady) {
logger.warn("[语音对讲] 进行中: {}", channelId);
event.call("语音对讲进行中");
return;
} else {
stopTalk(device, channelId);
}
}
talk(mediaServerItem, device, channelId, stream, (MediaServerItem mediaServerItem1, JSONObject response) -> {
logger.info("[语音对讲] 收到设备发来的流");
}, eventResult -> {
logger.warn("[语音对讲] 失败,{}/{}, 错误码 {} {}", device.getDeviceId(), channelId, eventResult.statusCode, eventResult.msg);
event.call("失败,错误码 " + eventResult.statusCode + ", " + eventResult.msg);
}, () -> {
logger.warn("[语音对讲] 失败,{}/{} 超时", device.getDeviceId(), channelId);
event.call("失败,超时 ");
stopTalk(device, channelId);
}, errorMsg -> {
logger.warn("[语音对讲] 失败,{}/{} {}", device.getDeviceId(), channelId, errorMsg);
event.call(errorMsg);
stopTalk(device, channelId);
});
}
private void stopTalk(Device device, String channelId) {
stopTalk(device, channelId, null);
}
@Override
public void stopTalk(Device device, String channelId, Boolean streamIsReady) {
logger.info("[语音对讲] 停止, {}/{}", device.getDeviceId(), channelId);
SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(device.getDeviceId(), channelId, null, null);
if (sendRtpItem == null) {
logger.info("[语音对讲] 停止失败, 未找到发送信息,可能已经停止");
return;
}
// 停止向设备推流
String mediaServerId = sendRtpItem.getMediaServerId();
if (mediaServerId == null) {
return;
}
MediaServerItem mediaServer = mediaServerService.getOne(mediaServerId);
if (streamIsReady == null || streamIsReady) {
Map<String, Object> param = new HashMap<>();
param.put("vhost", "__defaultVhost__");
param.put("app", sendRtpItem.getApp());
param.put("stream", sendRtpItem.getStream());
param.put("ssrc", sendRtpItem.getSsrc());
zlmrtpServerFactory.stopSendRtpStream(mediaServer, param);
}
mediaServer.getSsrcConfig().releaseSsrc(sendRtpItem.getSsrc());
SsrcTransaction ssrcTransaction = streamSession.getSsrcTransaction(device.getDeviceId(), channelId, null, sendRtpItem.getStream());
if (ssrcTransaction != null) {
try {
cmder.streamByeCmd(device, channelId, sendRtpItem.getStream(), null);
} catch (InvalidArgumentException | ParseException | SipException | SsrcTransactionNotFoundException e) {
logger.info("[语音对讲] 停止消息发送失败,可能已经停止");
}
}
redisCatchStorage.deleteSendRTPServer(device.getDeviceId(), channelId,null, null);
}
} }

View File

@ -378,7 +378,7 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage {
+ sendRtpItem.getMediaServerId() + "_" + sendRtpItem.getMediaServerId() + "_"
+ sendRtpItem.getPlatformId() + "_" + sendRtpItem.getPlatformId() + "_"
+ sendRtpItem.getChannelId() + "_" + sendRtpItem.getChannelId() + "_"
+ sendRtpItem.getStreamId() + "_" + sendRtpItem.getStream() + "_"
+ sendRtpItem.getCallId(); + sendRtpItem.getCallId();
RedisUtil.set(key, sendRtpItem); RedisUtil.set(key, sendRtpItem);
} }

View File

@ -19,11 +19,7 @@ import com.genersoft.iot.vmp.service.IMediaService;
import com.genersoft.iot.vmp.service.IPlayService; import com.genersoft.iot.vmp.service.IPlayService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import com.genersoft.iot.vmp.vmanager.bean.DeferredResultEx; import com.genersoft.iot.vmp.vmanager.bean.*;
import com.genersoft.iot.vmp.vmanager.bean.AudioBroadcastResult;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
import com.genersoft.iot.vmp.vmanager.bean.StreamContent;
import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
import io.swagger.v3.oas.annotations.Operation; import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter; import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.tags.Tag; import io.swagger.v3.oas.annotations.tags.Tag;
@ -253,7 +249,7 @@ public class PlayController {
@Parameter(name = "timeout", description = "推流超时时间(秒)", required = true) @Parameter(name = "timeout", description = "推流超时时间(秒)", required = true)
@GetMapping("/broadcast/{deviceId}/{channelId}") @GetMapping("/broadcast/{deviceId}/{channelId}")
@PostMapping("/broadcast/{deviceId}/{channelId}") @PostMapping("/broadcast/{deviceId}/{channelId}")
public AudioBroadcastResult broadcastApi(@PathVariable String deviceId, @PathVariable String channelId, Integer timeout) { public AudioBroadcastResult broadcastApi(@PathVariable String deviceId, @PathVariable String channelId, Integer timeout, Boolean broadcastMode) {
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {
logger.debug("语音广播API调用"); logger.debug("语音广播API调用");
} }
@ -265,15 +261,7 @@ public class PlayController {
throw new ControllerException(ErrorCode.ERROR400.getCode(), "未找到通道: " + channelId); throw new ControllerException(ErrorCode.ERROR400.getCode(), "未找到通道: " + channelId);
} }
return playService.audioBroadcast(device, channelId); return playService.audioBroadcast(device, channelId, broadcastMode);
}
@GetMapping("/1111")
public void broadcastApi1() {
MediaServerItem defaultMediaServer = mediaServerService.getMediaServerForMinimumLoad(null);
Device device = storager.queryVideoDevice("34020000001320090001");
playService.talk(defaultMediaServer, device, "34020000001370000001", null, null, null);
} }
@ -289,7 +277,7 @@ public class PlayController {
} }
// try { // try {
// playService.stopAudioBroadcast(deviceId, channelId); // playService.stopAudioBroadcast(deviceId, channelId);
// } catch (InvalidArgumentException | ParseException | SsrcTransactionNotFoundException | SipException e) { // } catch (InvalidArgumentException | ParseException | SipException e) {
// logger.error("[命令发送失败] 停止语音: {}", e.getMessage()); // logger.error("[命令发送失败] 停止语音: {}", e.getMessage());
// throw new ControllerException(ErrorCode.ERROR100.getCode(), "命令发送失败: " + e.getMessage()); // throw new ControllerException(ErrorCode.ERROR100.getCode(), "命令发送失败: " + e.getMessage());
// } // }

View File

@ -4,6 +4,6 @@ package com.genersoft.iot.vmp.vmanager.gb28181.play.bean;
/** /**
* @author lin * @author lin
*/ */
public interface AudioBroadcastEvent { public interface AudioEvent {
void call(String msg); void call(String msg);
} }

View File

@ -185,7 +185,7 @@ public class ApiStreamController {
} }
try { try {
cmder.streamByeCmd(device, code, streamInfo.getStream(), null); cmder.streamByeCmd(device, code, streamInfo.getStream(), null);
} catch (InvalidArgumentException | ParseException | SipException | SsrcTransactionNotFoundException e) { } catch (InvalidArgumentException | ParseException | SipException | SsrcTransactionNotFoundException e) {
JSONObject result = new JSONObject(); JSONObject result = new JSONObject();
result.put("error","发送BYE失败" + e.getMessage()); result.put("error","发送BYE失败" + e.getMessage());
return result; return result;

View File

@ -12,7 +12,7 @@ module.exports = {
assetsPublicPath: './', assetsPublicPath: './',
proxyTable: { proxyTable: {
'/debug': { '/debug': {
target: 'https://default.wvp-pro.cn:18080', target: 'https://default.wvp-pro.cn:18082',
changeOrigin: true, changeOrigin: true,
pathRewrite: { pathRewrite: {
'^/debug': '/' '^/debug': '/'

View File

@ -299,6 +299,10 @@
</el-tab-pane> </el-tab-pane>
<el-tab-pane label="语音对讲" name="broadcast"> <el-tab-pane label="语音对讲" name="broadcast">
<div style="padding: 0 10px">
<el-switch v-model="broadcastMode" :disabled="broadcastStatus !== -1" active-color="#409EFF" active-text=""
inactive-text="对讲"></el-switch>
</div>
<div class="trank" style="text-align: center;"> <div class="trank" style="text-align: center;">
<el-button @click="broadcastStatusClick()" :type="getBroadcastStatus()" :disabled="broadcastStatus === -2" <el-button @click="broadcastStatusClick()" :type="getBroadcastStatus()" :disabled="broadcastStatus === -2"
circle icon="el-icon-microphone" style="font-size: 32px; padding: 24px;margin-top: 24px;"/> circle icon="el-icon-microphone" style="font-size: 32px; padding: 24px;margin-top: 24px;"/>
@ -390,6 +394,7 @@ export default {
recordStartTime: 0, recordStartTime: 0,
showTimeText: "00:00:00", showTimeText: "00:00:00",
streamInfo: null, streamInfo: null,
broadcastMode: true,
broadcastRtc: null, broadcastRtc: null,
broadcastStatus: -1, // -2 -1 0 1 broadcastStatus: -1, // -2 -1 0 1
}; };
@ -648,7 +653,7 @@ export default {
// //
this.$axios({ this.$axios({
method: 'get', method: 'get',
url: '/api/play/broadcast/' + this.deviceId + '/' + this.channelId + "?timeout=30" url: '/api/play/broadcast/' + this.deviceId + '/' + this.channelId + "?timeout=30&broadcastMode=" + this.broadcastMode
}).then( (res)=> { }).then( (res)=> {
if (res.data.code == 0) { if (res.data.code == 0) {
let streamInfo = res.data.data.streamInfo; let streamInfo = res.data.data.streamInfo;