Merge pull request #837 from keDaYao/featur-jt1078

新增1078信令,修复BUG
pull/844/head
648540858 2023-04-28 14:59:44 +08:00 committed by GitHub
commit adbddd5eb0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 744 additions and 13 deletions

View File

@ -1,8 +1,7 @@
package com.genersoft.iot.vmp.jt1078.cmd;
import com.genersoft.iot.vmp.jt1078.proc.entity.Cmd;
import com.genersoft.iot.vmp.jt1078.proc.response.J9101;
import com.genersoft.iot.vmp.jt1078.proc.response.J9102;
import com.genersoft.iot.vmp.jt1078.proc.response.*;
import com.genersoft.iot.vmp.jt1078.session.SessionManager;
import java.util.Random;
@ -16,6 +15,15 @@ public class JT1078Template {
private final Random random = new Random();
private static final String H9101 = "9101";
private static final String H9102 = "9102";
private static final String H9201 = "9201";
private static final String H9202 = "9202";
private static final String H9205 = "9205";
private static final String H0001 = "0001";
private static final String H1205 = "1205";
/**
*
*
@ -26,8 +34,8 @@ public class JT1078Template {
Cmd cmd = new Cmd.Builder()
.setDevId(devId)
.setPackageNo(randomInt())
.setMsgId("9101")
.setRespId("0001")
.setMsgId(H9101)
.setRespId(H0001)
.setRs(j9101)
.build();
return SessionManager.INSTANCE.request(cmd, timeOut);
@ -43,13 +51,64 @@ public class JT1078Template {
Cmd cmd = new Cmd.Builder()
.setDevId(devId)
.setPackageNo(randomInt())
.setMsgId("9102")
.setRespId("0001")
.setMsgId(H9102)
.setRespId(H0001)
.setRs(j9102)
.build();
return SessionManager.INSTANCE.request(cmd, timeOut);
}
/**
*
*
* @param devId
* @param j9205
*/
public String queryBackTime(String devId, J9205 j9205, Integer timeOut) {
Cmd cmd = new Cmd.Builder()
.setDevId(devId)
.setPackageNo(randomInt())
.setMsgId(H9205)
.setRespId(H1205)
.setRs(j9205)
.build();
return SessionManager.INSTANCE.request(cmd, timeOut);
}
/**
*
*
* @param devId
* @param j9201
*/
public String startBackLive(String devId, J9201 j9201, Integer timeOut) {
Cmd cmd = new Cmd.Builder()
.setDevId(devId)
.setPackageNo(randomInt())
.setMsgId(H9201)
.setRespId(H1205)
.setRs(j9201)
.build();
return SessionManager.INSTANCE.request(cmd, timeOut);
}
/**
*
*
* @param devId
* @param j9202
*/
public String controlBackLive(String devId, J9202 j9202, Integer timeOut) {
Cmd cmd = new Cmd.Builder()
.setDevId(devId)
.setPackageNo(randomInt())
.setMsgId(H9202)
.setRespId(H0001)
.setRs(j9202)
.build();
return SessionManager.INSTANCE.request(cmd, timeOut);
}
private Long randomInt() {
return (long) random.nextInt(1000) + 1;
}

View File

@ -16,7 +16,7 @@ import org.springframework.core.annotation.Order;
@Order(Integer.MIN_VALUE)
@Configuration
@ConditionalOnProperty(value = "jt1078.enable", havingValue = "true")
public class TcpAutoConfiguration {
public class JT1078AutoConfiguration {
@Bean(initMethod = "start", destroyMethod = "stop")
public TcpServer jt1078Server(@Value("${jt1078.port}") Integer port) {

View File

@ -1,7 +1,7 @@
package com.genersoft.iot.vmp.jt1078.config;
import com.genersoft.iot.vmp.jt1078.cmd.JT1078Template;
import com.genersoft.iot.vmp.jt1078.proc.response.J9101;
import com.genersoft.iot.vmp.jt1078.proc.response.*;
import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.web.bind.annotation.GetMapping;
@ -26,6 +26,9 @@ public class JT1078Controller {
@Resource
JT1078Template jt1078Template;
/**
* jt1078Template
*/
@GetMapping("/start/live/{deviceId}/{channelId}")
public WVPResult<?> startLive(@PathVariable String deviceId, @PathVariable String channelId) {
J9101 j9101 = new J9101();
@ -35,12 +38,14 @@ public class JT1078Controller {
j9101.setTcpPort(7618);
j9101.setUdpPort(7618);
j9101.setType(0);
// TODO 分配ZLM,获取IP、端口
String s = jt1078Template.startLive(deviceId, j9101, 6);
// TODO 设备响应成功后,封装拉流结果集
WVPResult<String> wvpResult = new WVPResult<>();
wvpResult.setCode(200);
wvpResult.setData(String.format("http://192.168.1.1/rtp/%s_%s.live.mp4", deviceId, channelId));
return wvpResult;
}
}

View File

@ -102,4 +102,15 @@ public class Cmd {
}
}
@Override
public String toString() {
return "Cmd{" +
"devId='" + devId + '\'' +
", packageNo=" + packageNo +
", msgId='" + msgId + '\'' +
", respId='" + respId + '\'' +
", rs=" + rs +
'}';
}
}

View File

@ -0,0 +1,190 @@
package com.genersoft.iot.vmp.jt1078.proc.request;
import com.alibaba.fastjson2.JSON;
import com.genersoft.iot.vmp.jt1078.annotation.MsgId;
import com.genersoft.iot.vmp.jt1078.proc.Header;
import com.genersoft.iot.vmp.jt1078.proc.response.J8001;
import com.genersoft.iot.vmp.jt1078.proc.response.Rs;
import com.genersoft.iot.vmp.jt1078.session.Session;
import com.genersoft.iot.vmp.jt1078.session.SessionManager;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import java.util.ArrayList;
import java.util.List;
/**
*
*
* @author QingtaiJiang
* @date 2023/4/28 10:36
* @email qingtaij@163.com
*/
@MsgId(id = "1205")
public class J1205 extends Re {
Integer respNo;
private List<JRecordItem> recordList = new ArrayList<JRecordItem>();
@Override
protected Rs decode0(ByteBuf buf, Header header, Session session) {
respNo = buf.readUnsignedShort();
long size = buf.readUnsignedInt();
for (int i = 0; i < size; i++) {
JRecordItem item = new JRecordItem();
item.setChannelId(buf.readUnsignedByte());
item.setStartTime(ByteBufUtil.hexDump(buf.readSlice(6)));
item.setEndTime(ByteBufUtil.hexDump(buf.readSlice(6)));
item.setWarn(buf.readLong());
item.setMediaType(buf.readUnsignedByte());
item.setStreamType(buf.readUnsignedByte());
item.setStorageType(buf.readUnsignedByte());
item.setSize(buf.readUnsignedInt());
recordList.add(item);
}
return null;
}
@Override
protected Rs handler(Header header, Session session) {
SessionManager.INSTANCE.response(header.getDevId(), "1205", (long) respNo, JSON.toJSONString(this));
J8001 j8001 = new J8001();
j8001.setRespNo(header.getSn());
j8001.setRespId(header.getMsgId());
j8001.setResult(J8001.SUCCESS);
return j8001;
}
public Integer getRespNo() {
return respNo;
}
public void setRespNo(Integer respNo) {
this.respNo = respNo;
}
public List<JRecordItem> getRecordList() {
return recordList;
}
public void setRecordList(List<JRecordItem> recordList) {
this.recordList = recordList;
}
public static class JRecordItem {
// 逻辑通道号
private int channelId;
// 开始时间
private String startTime;
// 结束时间
private String endTime;
// 报警标志
private long warn;
// 音视频资源类型
private int mediaType;
// 码流类型
private int streamType = 1;
// 存储器类型
private int storageType;
// 文件大小
private long size;
public int getChannelId() {
return channelId;
}
public void setChannelId(int channelId) {
this.channelId = channelId;
}
public String getStartTime() {
return startTime;
}
public void setStartTime(String startTime) {
this.startTime = startTime;
}
public String getEndTime() {
return endTime;
}
public void setEndTime(String endTime) {
this.endTime = endTime;
}
public long getWarn() {
return warn;
}
public void setWarn(long warn) {
this.warn = warn;
}
public int getMediaType() {
return mediaType;
}
public void setMediaType(int mediaType) {
this.mediaType = mediaType;
}
public int getStreamType() {
return streamType;
}
public void setStreamType(int streamType) {
this.streamType = streamType;
}
public int getStorageType() {
return storageType;
}
public void setStorageType(int storageType) {
this.storageType = storageType;
}
public long getSize() {
return size;
}
public void setSize(long size) {
this.size = size;
}
@Override
public String toString() {
return "JRecordItem{" +
"channelId=" + channelId +
", startTime='" + startTime + '\'' +
", endTime='" + endTime + '\'' +
", warn=" + warn +
", mediaType=" + mediaType +
", streamType=" + streamType +
", storageType=" + storageType +
", size=" + size +
'}';
}
}
@Override
public String toString() {
return "J1205{" +
"respNo=" + respNo +
", recordList=" + recordList +
'}';
}
}

View File

@ -6,6 +6,8 @@ import io.netty.buffer.Unpooled;
import io.netty.util.CharsetUtil;
/**
*
*
* @author QingtaiJiang
* @date 2023/4/27 18:25
* @email qingtaij@163.com

View File

@ -1,13 +1,17 @@
package com.genersoft.iot.vmp.jt1078.proc.response;
import com.genersoft.iot.vmp.jt1078.annotation.MsgId;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
/**
*
*
* @author QingtaiJiang
* @date 2023/4/27 18:49
* @email qingtaij@163.com
*/
@MsgId(id = "9102")
public class J9102 extends Rs {
// 通道号
@ -47,7 +51,7 @@ public class J9102 extends Rs {
buffer.writeByte(command);
buffer.writeByte(closeType);
buffer.writeByte(streamType);
return null;
return buffer;
}
@ -82,4 +86,14 @@ public class J9102 extends Rs {
public void setStreamType(Integer streamType) {
this.streamType = streamType;
}
@Override
public String toString() {
return "J9102{" +
"channel=" + channel +
", command=" + command +
", closeType=" + closeType +
", streamType=" + streamType +
'}';
}
}

View File

@ -0,0 +1,173 @@
package com.genersoft.iot.vmp.jt1078.proc.response;
import com.genersoft.iot.vmp.jt1078.annotation.MsgId;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import io.netty.util.CharsetUtil;
/**
*
*
* @author QingtaiJiang
* @date 2023/4/28 10:37
* @email qingtaij@163.com
*/
@MsgId(id = "9201")
public class J9201 extends Rs {
// 服务器IP地址
private String ip;
// 实时视频服务器TCP端口号
private int tcpPort;
// 实时视频服务器UDP端口号
private int udpPort;
// 逻辑通道号
private int channel;
// 音视频资源类型0.音视频 1.音频 2.视频 3.视频或音视频
private int type;
// 码流类型0.所有码流 1.主码流 2.子码流(如果此通道只传输音频,此字段置0)
private int rate;
// 存储器类型0.所有存储器 1.主存储器 2.灾备存储器"
private int storageType;
// 回放方式0.正常回放 1.快进回放 2.关键帧快退回放 3.关键帧播放 4.单帧上传
private int playbackType;
// 快进或快退倍数0.无效 1.1倍 2.2倍 3.4倍 4.8倍 5.16倍 (回放控制为1和2时,此字段内容有效,否则置0)
private int playbackSpeed;
// 开始时间YYMMDDHHMMSS,回放方式为4时,该字段表示单帧上传时间
private String startTime;
// 结束时间YYMMDDHHMMSS,回放方式为4时,该字段无效,为0表示一直回放
private String endTime;
@Override
public ByteBuf encode() {
ByteBuf buffer = Unpooled.buffer();
buffer.writeByte(ip.getBytes().length);
buffer.writeCharSequence(ip, CharsetUtil.UTF_8);
buffer.writeShort(tcpPort);
buffer.writeShort(udpPort);
buffer.writeByte(channel);
buffer.writeByte(type);
buffer.writeByte(rate);
buffer.writeByte(storageType);
buffer.writeByte(playbackType);
buffer.writeByte(playbackSpeed);
buffer.writeBytes(ByteBufUtil.decodeHexDump(startTime));
buffer.writeBytes(ByteBufUtil.decodeHexDump(endTime));
return buffer;
}
public String getIp() {
return ip;
}
public void setIp(String ip) {
this.ip = ip;
}
public int getTcpPort() {
return tcpPort;
}
public void setTcpPort(int tcpPort) {
this.tcpPort = tcpPort;
}
public int getUdpPort() {
return udpPort;
}
public void setUdpPort(int udpPort) {
this.udpPort = udpPort;
}
public int getChannel() {
return channel;
}
public void setChannel(int channel) {
this.channel = channel;
}
public int getType() {
return type;
}
public void setType(int type) {
this.type = type;
}
public int getRate() {
return rate;
}
public void setRate(int rate) {
this.rate = rate;
}
public int getStorageType() {
return storageType;
}
public void setStorageType(int storageType) {
this.storageType = storageType;
}
public int getPlaybackType() {
return playbackType;
}
public void setPlaybackType(int playbackType) {
this.playbackType = playbackType;
}
public int getPlaybackSpeed() {
return playbackSpeed;
}
public void setPlaybackSpeed(int playbackSpeed) {
this.playbackSpeed = playbackSpeed;
}
public String getStartTime() {
return startTime;
}
public void setStartTime(String startTime) {
this.startTime = startTime;
}
public String getEndTime() {
return endTime;
}
public void setEndTime(String endTime) {
this.endTime = endTime;
}
@Override
public String toString() {
return "J9201{" +
"ip='" + ip + '\'' +
", tcpPort=" + tcpPort +
", udpPort=" + udpPort +
", channel=" + channel +
", type=" + type +
", rate=" + rate +
", storageType=" + storageType +
", playbackType=" + playbackType +
", playbackSpeed=" + playbackSpeed +
", startTime='" + startTime + '\'' +
", endTime='" + endTime + '\'' +
'}';
}
}

View File

@ -0,0 +1,80 @@
package com.genersoft.iot.vmp.jt1078.proc.response;
import com.genersoft.iot.vmp.jt1078.annotation.MsgId;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
/**
*
*
* @author QingtaiJiang
* @date 2023/4/28 10:37
* @email qingtaij@163.com
*/
@MsgId(id = "9202")
public class J9202 extends Rs {
// 逻辑通道号
private int channel;
// 回放控制0.开始回放 1.暂停回放 2.结束回放 3.快进回放 4.关键帧快退回放 5.拖动回放 6.关键帧播放
private int playbackType;
// 快进或快退倍数0.无效 1.1倍 2.2倍 3.4倍 4.8倍 5.16倍 (回放控制为3和4时,此字段内容有效,否则置0)
private int playbackSpeed;
// 拖动回放位置(YYMMDDHHMMSS,回放控制为5时,此字段有效)
private String playbackTime;
@Override
public ByteBuf encode() {
ByteBuf buffer = Unpooled.buffer();
buffer.writeByte(channel);
buffer.writeByte(playbackType);
buffer.writeByte(playbackSpeed);
buffer.writeBytes(ByteBufUtil.decodeHexDump(playbackTime));
return buffer;
}
public int getChannel() {
return channel;
}
public void setChannel(int channel) {
this.channel = channel;
}
public int getPlaybackType() {
return playbackType;
}
public void setPlaybackType(int playbackType) {
this.playbackType = playbackType;
}
public int getPlaybackSpeed() {
return playbackSpeed;
}
public void setPlaybackSpeed(int playbackSpeed) {
this.playbackSpeed = playbackSpeed;
}
public String getPlaybackTime() {
return playbackTime;
}
public void setPlaybackTime(String playbackTime) {
this.playbackTime = playbackTime;
}
@Override
public String toString() {
return "J9202{" +
"channel=" + channel +
", playbackType=" + playbackType +
", playbackSpeed=" + playbackSpeed +
", playbackTime='" + playbackTime + '\'' +
'}';
}
}

View File

@ -0,0 +1,94 @@
package com.genersoft.iot.vmp.jt1078.proc.response;
import com.genersoft.iot.vmp.jt1078.annotation.MsgId;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
/**
*
*
* @author QingtaiJiang
* @date 2023/4/28 10:36
* @email qingtaij@163.com
*/
@MsgId(id = "9205")
public class J9205 extends Rs {
// 逻辑通道号
private int channelId;
// 开始时间YYMMDDHHMMSS,全0表示无起始时间
private String startTime;
// 结束时间YYMMDDHHMMSS,全0表示无终止时间
private String endTime;
// 报警标志
private final int warnType = 0;
// 音视频资源类型0.音视频 1.音频 2.视频 3.视频或音视频
private int mediaType;
// 码流类型0.所有码流 1.主码流 2.子码流
private int streamType = 0;
// 存储器类型0.所有存储器 1.主存储器 2.灾备存储器
private int storageType = 0;
@Override
public ByteBuf encode() {
ByteBuf buffer = Unpooled.buffer();
buffer.writeByte(channelId);
buffer.writeBytes(ByteBufUtil.decodeHexDump(startTime));
buffer.writeBytes(ByteBufUtil.decodeHexDump(endTime));
buffer.writeLong(warnType);
buffer.writeByte(mediaType);
buffer.writeByte(streamType);
buffer.writeByte(storageType);
return buffer;
}
public void setChannelId(int channelId) {
this.channelId = channelId;
}
public void setStartTime(String startTime) {
this.startTime = startTime;
}
public void setEndTime(String endTime) {
this.endTime = endTime;
}
public void setMediaType(int mediaType) {
this.mediaType = mediaType;
}
public void setStreamType(int streamType) {
this.streamType = streamType;
}
public void setStorageType(int storageType) {
this.storageType = storageType;
}
public int getWarnType() {
return warnType;
}
@Override
public String toString() {
return "J9205{" +
"channelId=" + channelId +
", startTime='" + startTime + '\'' +
", endTime='" + endTime + '\'' +
", warnType=" + warnType +
", mediaType=" + mediaType +
", streamType=" + streamType +
", storageType=" + storageType +
'}';
}
}

View File

@ -76,13 +76,13 @@ public enum SessionManager {
Session session = this.get(cmd.getDevId());
if (session == null) {
log.error("DevId: {} not online!", cmd.getDevId());
return "-1";
return null;
}
String requestKey = requestKey(cmd.getDevId(), cmd.getRespId(), cmd.getPackageNo());
SynchronousQueue<String> subscribe = subscribe(requestKey);
if (subscribe == null) {
log.error("DevId: {} key:{} send repaid", cmd.getDevId(), requestKey);
return "-1";
return null;
}
session.writeObject(cmd);
try {
@ -105,7 +105,7 @@ public enum SessionManager {
log.error("{}", e.getMessage(), e);
}
}
log.warn("未找到对应回复指令,key:{} 消息:{} ", requestKey, data);
log.warn("Not find response,key:{} data:{} ", requestKey, data);
return false;
}

View File

@ -0,0 +1,103 @@
package com.genersoft.iot.vmp.jt1078;
import com.genersoft.iot.vmp.jt1078.cmd.JT1078Template;
import com.genersoft.iot.vmp.jt1078.codec.netty.TcpServer;
import com.genersoft.iot.vmp.jt1078.proc.response.J9102;
import com.genersoft.iot.vmp.jt1078.proc.response.J9201;
import com.genersoft.iot.vmp.jt1078.proc.response.J9202;
import com.genersoft.iot.vmp.jt1078.proc.response.J9205;
import java.util.Scanner;
/**
* @author QingtaiJiang
* @date 2023/4/28 14:22
* @email qingtaij@163.com
*/
public class JT1078ServerTest {
private static final JT1078Template jt1078Template = new JT1078Template();
public static void main(String[] args) {
System.out.println("Starting jt1078 server...");
TcpServer tcpServer = new TcpServer(21078);
tcpServer.start();
System.out.println("Start jt1078 server success!");
Scanner s = new Scanner(System.in);
while (true) {
String code = s.nextLine();
switch (code) {
case "1":
test9102();
break;
case "2":
test9201();
break;
case "3":
test9202();
break;
case "4":
test9205();
break;
default:
break;
}
}
}
private static void test9102() {
J9102 j9102 = new J9102();
j9102.setChannel(1);
j9102.setCommand(0);
j9102.setCloseType(0);
j9102.setStreamType(0);
String s = jt1078Template.stopLive("18864197066", j9102, 6);
System.out.println(s);
}
private static void test9201() {
J9201 j9201 = new J9201();
j9201.setIp("192.168.1.1");
j9201.setChannel(1);
j9201.setTcpPort(7618);
j9201.setUdpPort(7618);
j9201.setType(0);
j9201.setRate(0);
j9201.setStorageType(0);
j9201.setPlaybackType(0);
j9201.setPlaybackSpeed(0);
j9201.setStartTime("230428134100");
j9201.setEndTime("230428134200");
String s = jt1078Template.startBackLive("18864197066", j9201, 6);
System.out.println(s);
}
private static void test9202() {
J9202 j9202 = new J9202();
j9202.setChannel(1);
j9202.setPlaybackType(2);
j9202.setPlaybackSpeed(0);
j9202.setPlaybackTime("230428134100");
String s = jt1078Template.controlBackLive("18864197066", j9202, 6);
System.out.println(s);
}
private static void test9205() {
J9205 j9205 = new J9205();
j9205.setChannelId(1);
j9205.setStartTime("230428134100");
j9205.setEndTime("230428134100");
j9205.setMediaType(0);
j9205.setStreamType(0);
j9205.setStorageType(0);
String s = jt1078Template.queryBackTime("18864197066", j9205, 6);
System.out.println(s);
}
}