Initial commit.

master
剑器近 2020-11-03 14:00:28 +08:00
commit 0725168a3e
39 changed files with 2768 additions and 0 deletions

42
.gitignore vendored Normal file
View File

@ -0,0 +1,42 @@
# Operating System Files
*.DS_Store
Thumbs.db
*.sw?
.#*
*#
*~
*.sublime-*
# Build Artifacts
.git*
.gradle/
build/
target/
bin/
out/
dependency-reduced-pom.xml
# Eclipse Project Files
.classpath
.project
.settings/
# IntelliJ IDEA Files
*.iml
*.ipr
*.iws
*.idea
*.log
README.html
# HTML resourse Files
#src/main/resources/static/index.html
#src/main/resources/static/view/
# temp ignore
src/main/resources/swagger.yaml

201
LICENSE Normal file
View File

@ -0,0 +1,201 @@
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction,
and distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by
the copyright owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all
other entities that control, are controlled by, or are under common
control with that entity. For the purposes of this definition,
"control" means (i) the power, direct or indirect, to cause the
direction or management of such entity, whether by contract or
otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity
exercising permissions granted by this License.
"Source" form shall mean the preferred form for making modifications,
including but not limited to software source code, documentation
source, and configuration files.
"Object" form shall mean any form resulting from mechanical
transformation or translation of a Source form, including but
not limited to compiled object code, generated documentation,
and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or
Object form, made available under the License, as indicated by a
copyright notice that is included in or attached to the work
(an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object
form, that is based on (or derived from) the Work and for which the
editorial revisions, annotations, elaborations, or other modifications
represent, as a whole, an original work of authorship. For the purposes
of this License, Derivative Works shall not include works that remain
separable from, or merely link (or bind by name) to the interfaces of,
the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including
the original version of the Work and any modifications or additions
to that Work or Derivative Works thereof, that is intentionally
submitted to Licensor for inclusion in the Work by the copyright owner
or by an individual or Legal Entity authorized to submit on behalf of
the copyright owner. For the purposes of this definition, "submitted"
means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems,
and issue tracking systems that are managed by, or on behalf of, the
Licensor for the purpose of discussing and improving the Work, but
excluding communication that is conspicuously marked or otherwise
designated in writing by the copyright owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity
on behalf of whom a Contribution has been received by Licensor and
subsequently incorporated within the Work.
2. Grant of Copyright License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the
Work and such Derivative Works in Source or Object form.
3. Grant of Patent License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
(except as stated in this section) patent license to make, have made,
use, offer to sell, sell, import, and otherwise transfer the Work,
where such license applies only to those patent claims licensable
by such Contributor that are necessarily infringed by their
Contribution(s) alone or by combination of their Contribution(s)
with the Work to which such Contribution(s) was submitted. If You
institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work
or a Contribution incorporated within the Work constitutes direct
or contributory patent infringement, then any patent licenses
granted to You under this License for that Work shall terminate
as of the date such litigation is filed.
4. Redistribution. You may reproduce and distribute copies of the
Work or Derivative Works thereof in any medium, with or without
modifications, and in Source or Object form, provided that You
meet the following conditions:
(a) You must give any other recipients of the Work or
Derivative Works a copy of this License; and
(b) You must cause any modified files to carry prominent notices
stating that You changed the files; and
(c) You must retain, in the Source form of any Derivative Works
that You distribute, all copyright, patent, trademark, and
attribution notices from the Source form of the Work,
excluding those notices that do not pertain to any part of
the Derivative Works; and
(d) If the Work includes a "NOTICE" text file as part of its
distribution, then any Derivative Works that You distribute must
include a readable copy of the attribution notices contained
within such NOTICE file, excluding those notices that do not
pertain to any part of the Derivative Works, in at least one
of the following places: within a NOTICE text file distributed
as part of the Derivative Works; within the Source form or
documentation, if provided along with the Derivative Works; or,
within a display generated by the Derivative Works, if and
wherever such third-party notices normally appear. The contents
of the NOTICE file are for informational purposes only and
do not modify the License. You may add Your own attribution
notices within Derivative Works that You distribute, alongside
or as an addendum to the NOTICE text from the Work, provided
that such additional attribution notices cannot be construed
as modifying the License.
You may add Your own copyright statement to Your modifications and
may provide additional or different license terms and conditions
for use, reproduction, or distribution of Your modifications, or
for any such Derivative Works as a whole, provided Your use,
reproduction, and distribution of the Work otherwise complies with
the conditions stated in this License.
5. Submission of Contributions. Unless You explicitly state otherwise,
any Contribution intentionally submitted for inclusion in the Work
by You to the Licensor shall be under the terms and conditions of
this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify
the terms of any separate license agreement you may have executed
with Licensor regarding such Contributions.
6. Trademarks. This License does not grant permission to use the trade
names, trademarks, service marks, or product names of the Licensor,
except as required for reasonable and customary use in describing the
origin of the Work and reproducing the content of the NOTICE file.
7. Disclaimer of Warranty. Unless required by applicable law or
agreed to in writing, Licensor provides the Work (and each
Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied, including, without limitation, any warranties or conditions
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
PARTICULAR PURPOSE. You are solely responsible for determining the
appropriateness of using or redistributing the Work and assume any
risks associated with Your exercise of permissions under this License.
8. Limitation of Liability. In no event and under no legal theory,
whether in tort (including negligence), contract, or otherwise,
unless required by applicable law (such as deliberate and grossly
negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special,
incidental, or consequential damages of any character arising as a
result of this License or out of the use or inability to use the
Work (including but not limited to damages for loss of goodwill,
work stoppage, computer failure or malfunction, or any and all
other commercial damages or losses), even if such Contributor
has been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability. While redistributing
the Work or Derivative Works thereof, You may choose to offer,
and charge a fee for, acceptance of support, warranty, indemnity,
or other liability obligations and/or rights consistent with this
License. However, in accepting such obligations, You may act only
on Your own behalf and on Your sole responsibility, not on behalf
of any other Contributor, and only if You agree to indemnify,
defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason
of your accepting any such warranty or additional liability.
END OF TERMS AND CONDITIONS
APPENDIX: How to apply the Apache License to your work.
To apply the Apache License to your work, attach the following
boilerplate notice, with the fields enclosed by brackets "[]"
replaced with your own identifying information. (Don't include
the brackets!) The text should be enclosed in the appropriate
comment syntax for the file format. We also recommend that a
file or class name and description of purpose be included on the
same "printed page" as the copyright notice for easier
identification within third-party archives.
Copyright 2019 剑器近
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

239
README.md Normal file
View File

@ -0,0 +1,239 @@
部标808协议快速开发包
====================
# 项目介绍
* 基于Netty实现JT/T 808部标协议的消息分发与编码解码
* 与Spring解耦合协议编码解码和Netty服务均可独立运行Android客户端同样适用
* SpringBoot 仅负责将协议暴露至Web接口目的是方便测试且为二次开发提供样例
* 最简洁、清爽、易用的部标开发框架。
问题交流群:[906230542]
# 主要特性
* 代码足够精简,便于二次开发;
* 致敬Spring、Hibernate设计理念熟悉Web开发的同学上手极快
* 使用注解描述协议,告别繁琐的封包、解包;
* 实时兼容2011、2013、2019部标协议版本支持分包请求
* 支持JT/T1078音视频协议T/JSATL12苏标主动安防协议
* 支持异步批量处理显著提升MySQL入库性能
* 提供报文解释器(解析过程分析工具),编码解码不再抓瞎;
* 全覆盖的测试用例,稳定发版。
# 代码仓库
* Gitee仓库地址[https://gitee.com/yezhihao/jt808-server/tree/master](https://gitee.com/yezhihao/jt808-server/tree/master)
* Github仓库地址[https://github.com/yezhihao/jt808-server/tree/master](https://github.com/yezhihao/jt808-server/tree/master)
# 下载方式
* Gitee下载命令`git clone https://gitee.com/yezhihao/jt808-server -b master`
* Github下载命令`git clone https://github.com/yezhihao/jt808-server -b master`
# 使用说明
## 项目分为四部分:
## 1.framework核心模块不推荐修改有BUG或扩展的需求建议提交issues或联系作者
```sh
└── framework
├── codec 编码解码
├── mvc 消息分发、处理
├── netty 网络通信
├── orm 序列化相关
└── session 消息发送和会话管理
```
注解:
* @Endpoint服务接入点等价SpringMVC的 @Controller
* @Mapping定义消息ID等价SpringMVC中 @RequestMapping
* @AsyncBatch, 异步批量消息对于并发较高的消息如0x0200(位置信息汇报)使用该注解显著提升Netty和MySQL入库性能。
* @Message协议类型等价Hibernate的 @Table
* @Field属性定义等价Hibernate的 @Column
* @Fs,多版本协议支持
## 2.protocol 部标协议定义,不推荐做大量修改
```sh
└── protocol
├── basics 部标协议通用消息头,以及公共的消息定义
├── codec 部标编码解码工具
├── commons 部标协议ID工具类等
├── jsatl12 T/JSATL12 苏标协议(已完成)
├── t808 JT/T808 部标协议(已完成)
└── t1078 JT/T1078 音视频协议(已完成)
```
消息定义样例:
```java
@Message(JT808.定位数据批量上传)
public class T0704 extends AbstractMessage<Header> {
private Integer total;
private Integer type;
private List<Item> items;
@Field(index = 0, type = DataType.WORD, desc = "数据项个数")
public Integer getTotal() { return total; }
public void setTotal(Integer total) { this.total = total; }
@Field(index = 2, type = DataType.BYTE, desc = "位置数据类型 0正常位置批量汇报1盲区补报")
public Integer getType() { return type; }
public void setType(Integer type) { this.type = type; }
@Field(index = 3, type = DataType.LIST, desc = "位置汇报数据项")
public List<Item> getItems() { return items; }
public void setItems(List<Item> items) { this.items = items; this.total = items.size(); }
}
```
## 3.web 开箱即用的Demo业务需求在这个包下开发可随意修改
```sh
└── web
├── config spring 相关配置
├── component.mybatis 附赠极简的mybatis分页插件:D
├── endpoint 808消息入口所有netty进入的请求都会根据@Mapping转发到此
└── controller service mapper ... 不再赘述
```
##### 消息接入:
```java
@Endpoint
public class JT808Endpoint {
@Autowired
private LocationService locationService;
@Autowired
private DeviceService deviceService;
//异步批量处理 队列大小20000 最大累积200处理一次 最大等待时间5秒
@AsyncBatch(capacity = 20000, maxElements = 200, maxWait = 5000)
@Mapping(types = 位置信息汇报, desc = "位置信息汇报")
public void 位置信息汇报(List<T0200> list) {
locationService.batchInsert(list);
}
@Async
@Mapping(types = 终端注册, desc = "终端注册")
public T8100 register(T0100 message, Session session) {
Header header = message.getHeader();
T8100 result = new T8100(session.nextSerialNo(), header.getMobileNo());
result.setSerialNo(header.getSerialNo());
String token = deviceService.register(message);
if (token != null) {
session.register(header);
result.setResultCode(T8100.Success);
result.setToken(token);
} else {
result.setResultCode(T8100.NotFoundTerminal);
}
return result;
}
}
```
##### 消息下发:
```java
@Controller
@RestController("terminal")
public class TerminalController {
private MessageManager messageManager = MessageManager.getInstance();
@ApiOperation("设置终端参数")
@PostMapping("{terminalId}/parameters")
public T0001 updateParameters(@PathVariable("terminalId") String terminalId, @RequestBody List<TerminalParameter> parameters) {
T8103 request = new T8103(terminalId);
request.setItems(parameters);
T0001 response = messageManager.request(request, T0001.class);
return response;
}
}
```
##### 已集成Swagger文档启动后可访问如下地址
* Swagger UI[http://127.0.0.1:8000/swagger-ui.html](http://127.0.0.1:8000/swagger-ui.html)
* Bootstrap UI[http://127.0.0.1:8000/doc.html](http://127.0.0.1:8000/doc.html)
![Bootstrap UI](https://images.gitee.com/uploads/images/2020/0731/135035_43dfca8e_670717.png "doc2.png")
## 4.test 808协议全覆盖的测试用例以及报文解释器
* QuickStart 不依赖Spring的启动可用于Android客户端
* Beans 测试数据
* TestBeans 消息对象的封包解包
* TestHex 原始报文测试
* Elucidator 报文解释器 - 解码
* DarkRepulsor 报文解释器 - 编码
分析报文内每个属性所处的位置以及转换后的值,以便查询报文解析出错的原因
Elucidator 运行效果如下:
```
0 [0200] 消息ID: 512
2 [4061] 消息体属性: 16481
4 [01] 协议版本号: 1
5 [00000000017299841738] 终端手机号: 17299841738
15 [ffff] 流水号: 65535
0 [00000400] 报警标志: 1024
4 [00000800] 状态: 2048
8 [06eeb6ad] 纬度: 116307629
12 [02633df7] 经度: 40058359
16 [0138] 海拔: 312
18 [0003] 速度: 3
20 [0063] 方向: 99
22 [200707192359] 时间: 2020-07-07T19:23:59
0 [01] 附加信息ID: 1
1 [04] 参数值长度: 4
2 [0000000b] 参数值: {0,0,0,11}
0 [02] 附加信息ID: 2
1 [02] 参数值长度: 2
2 [0016] 参数值: {0,22}
0 [03] 附加信息ID: 3
1 [02] 参数值长度: 2
2 [0021] 参数值: {0,33}
0 [04] 附加信息ID: 4
1 [02] 参数值长度: 2
2 [002c] 参数值: {0,44}
0 [05] 附加信息ID: 5
1 [03] 参数值长度: 3
2 [373737] 参数值: {55,55,55}
0 [11] 附加信息ID: 17
1 [05] 参数值长度: 5
2 [4200000042] 参数值: {66,0,0,0,66}
0 [12] 附加信息ID: 18
1 [06] 参数值长度: 6
2 [4d0000004d4d] 参数值: {77,0,0,0,77,77}
0 [13] 附加信息ID: 19
1 [07] 参数值长度: 7
2 [00000058005858] 参数值: {0,0,0,88,0,88,88}
0 [25] 附加信息ID: 37
1 [04] 参数值长度: 4
2 [00000063] 参数值: {0,0,0,99}
0 [2a] 附加信息ID: 42
1 [02] 参数值长度: 2
2 [000a] 参数值: {0,10}
0 [2b] 附加信息ID: 43
1 [04] 参数值长度: 4
2 [00000014] 参数值: {0,0,0,20}
0 [30] 附加信息ID: 48
1 [01] 参数值长度: 1
2 [1e] 参数值: {30}
0 [31] 附加信息ID: 49
1 [01] 参数值长度: 1
2 [28] 参数值: {40}
28 [01040000000b02020016030200210402002c05033737371105420000004212064d0000004d4d1307000000580058582504000000632a02000a2b040000001430011e310128] 位置附加信息: [BytesAttribute[id=1,value={0,0,0,11}], BytesAttribute[id=2,value={0,22}], BytesAttribute[id=3,value={0,33}], BytesAttribute[id=4,value={0,44}], BytesAttribute[id=5,value={55,55,55}], BytesAttribute[id=17,value={66,0,0,0,66}], BytesAttribute[id=18,value={77,0,0,0,77,77}], BytesAttribute[id=19,value={0,0,0,88,0,88,88}], BytesAttribute[id=37,value={0,0,0,99}], BytesAttribute[id=42,value={0,10}], BytesAttribute[id=43,value={0,0,0,20}], BytesAttribute[id=48,value={30}], BytesAttribute[id=49,value={40}]]
020040610100000000017299841738ffff000004000000080006eeb6ad02633df701380003006320070719235901040000000b02020016030200210402002c05033737371105420000004212064d0000004d4d1307000000580058582504000000632a02000a2b040000001430011e31012863
```
使用发包工具模拟请求
```
7e020040610100000000017299841738ffff000004000000080006eeb6ad02633df701380003006320070719235901040000000b02020016030200210402002c05033737371105420000004212064d0000004d4d1307000000580058582504000000632a02000a2b040000001430011e310128637e
```
![使用发包工具模拟请求](https://images.gitee.com/uploads/images/2019/0705/162745_9becaf08_670717.png)
项目会不定期进行更新建议star和watch一份您的支持是我最大的动力。
如有任何疑问或者BUG请联系我非常感谢。
技术交流QQ群[906230542]

197
pom.xml Normal file
View File

@ -0,0 +1,197 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>io.github.yezhihao</groupId>
<artifactId>netmc</artifactId>
<version>1.0.0.RELEASE</version>
<packaging>jar</packaging>
<name>Netmc</name>
<url>https://github.com/yezhihao/netmc</url>
<description>MVC framework based on netty implementation</description>
<licenses>
<license>
<name>The Apache Software License, Version 2.0</name>
<url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
</license>
</licenses>
<scm>
<url>https://github.com/yezhihao/netmc</url>
<connection>https://github.com/yezhihao/netmc.git</connection>
</scm>
<developers>
<developer>
<id>netmc.yezhihao</id>
<name>netmc</name>
<email>zhihao.ye@qq.com</email>
</developer>
</developers>
<properties>
<java.version>1.8</java.version>
<resource.delimiter>@</resource.delimiter>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<maven.compiler.source>${java.version}</maven.compiler.source>
<maven.compiler.target>${java.version}</maven.compiler.target>
<maven.test.skip>true</maven.test.skip>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.30</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.11</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-handler</artifactId>
<version>4.1.51.Final</version>
</dependency>
<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
<version>2.8.5</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>5.2.8.RELEASE</version>
<scope>compile</scope>
</dependency>
</dependencies>
<profiles>
<profile>
<id>nexus-release</id>
<activation>
<activeByDefault>true</activeByDefault>
</activation>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-source-plugin</artifactId>
<version>3.2.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>jar-no-fork</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-javadoc-plugin</artifactId>
<version>3.2.0</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>jar</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-gpg-plugin</artifactId>
<version>1.6</version>
<executions>
<execution>
<id>sign-artifacts</id>
<phase>verify</phase>
<goals>
<goal>sign</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<!--Release -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-release-plugin</artifactId>
<version>2.5.3</version>
</plugin>
<plugin>
<artifactId>maven-deploy-plugin</artifactId>
<version>2.8.2</version>
<executions>
<execution>
<id>default-deploy</id>
<phase>deploy</phase>
<goals>
<goal>deploy</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.sonatype.plugins</groupId>
<artifactId>nexus-staging-maven-plugin</artifactId>
<version>1.6.8</version>
<extensions>true</extensions>
<configuration>
<serverId>central-nexus</serverId>
<nexusUrl>https://oss.sonatype.org/</nexusUrl>
<autoReleaseAfterClose>true</autoReleaseAfterClose>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-scm-plugin</artifactId>
<version>1.11.2</version>
</plugin>
</plugins>
</build>
<distributionManagement>
<repository>
<id>nexus-release</id>
<url>https://oss.sonatype.org/service/local/staging/deploy/maven2</url>
</repository>
<snapshotRepository>
<id>nexus-snapshot</id>
<url>https://oss.sonatype.org/content/repositories/snapshots</url>
</snapshotRepository>
</distributionManagement>
</profile>
</profiles>
<repositories>
<repository>
<id>central</id>
<name>Maven Central</name>
<url>http://maven.aliyun.com/nexus/content/groups/public/</url>
<layout>default</layout>
<releases>
<enabled>true</enabled>
</releases>
</repository>
</repositories>
</project>

View File

@ -0,0 +1,138 @@
package io.github.yezhihao.netmc;
import io.github.yezhihao.netmc.codec.Delimiter;
import io.github.yezhihao.netmc.codec.LengthField;
import io.github.yezhihao.netmc.codec.MessageDecoder;
import io.github.yezhihao.netmc.codec.MessageEncoder;
import io.github.yezhihao.netmc.core.HandlerInterceptor;
import io.github.yezhihao.netmc.core.HandlerMapping;
import io.github.yezhihao.netmc.session.SessionManager;
import io.netty.channel.ChannelInboundHandlerAdapter;
/**
* @author yezhihao
* home https://gitee.com/yezhihao/jt808-server
*/
public class NettyConfig {
protected final int port;
protected final int maxFrameLength;
protected final LengthField lengthField;
protected final Delimiter[] delimiter;
protected final MessageDecoder decoder;
protected final MessageEncoder encoder;
protected final ChannelInboundHandlerAdapter adapter;
protected final HandlerMapping handlerMapping;
protected final HandlerInterceptor handlerInterceptor;
protected final SessionManager sessionManager;
private NettyConfig(int port,
int maxFrameLength,
LengthField lengthField,
Delimiter[] delimiter,
MessageDecoder decoder,
MessageEncoder encoder,
HandlerMapping handlerMapping,
HandlerInterceptor handlerInterceptor,
SessionManager sessionManager
) {
this.port = port;
this.maxFrameLength = maxFrameLength;
this.lengthField = lengthField;
this.delimiter = delimiter;
this.decoder = decoder;
this.encoder = encoder;
this.handlerMapping = handlerMapping;
this.handlerInterceptor = handlerInterceptor;
this.sessionManager = sessionManager;
this.adapter = new TCPServerHandler(this.handlerMapping, this.handlerInterceptor, this.sessionManager);
}
public static NettyConfig.Builder custom() {
return new Builder();
}
public static class Builder {
private int port;
private int maxFrameLength;
private LengthField lengthField;
private Delimiter[] delimiters;
private MessageDecoder decoder;
private MessageEncoder encoder;
private HandlerMapping handlerMapping;
private HandlerInterceptor handlerInterceptor;
private SessionManager sessionManager;
public Builder() {
}
public Builder setPort(int port) {
this.port = port;
return this;
}
public Builder setMaxFrameLength(int maxFrameLength) {
this.maxFrameLength = maxFrameLength;
return this;
}
public Builder setLengthField(LengthField lengthField) {
this.lengthField = lengthField;
return this;
}
public Builder setDelimiters(byte[][] delimiters) {
Delimiter[] t = new Delimiter[delimiters.length];
for (int i = 0; i < delimiters.length; i++) {
t[i] = new Delimiter(delimiters[i]);
}
this.delimiters = t;
return this;
}
public Builder setDelimiters(Delimiter... delimiters) {
this.delimiters = delimiters;
return this;
}
public Builder setDecoder(MessageDecoder decoder) {
this.decoder = decoder;
return this;
}
public Builder setEncoder(MessageEncoder encoder) {
this.encoder = encoder;
return this;
}
public Builder setHandlerMapping(HandlerMapping handlerMapping) {
this.handlerMapping = handlerMapping;
return this;
}
public Builder setHandlerInterceptor(HandlerInterceptor handlerInterceptor) {
this.handlerInterceptor = handlerInterceptor;
return this;
}
public Builder setSessionManager(SessionManager sessionManager) {
this.sessionManager = sessionManager;
return this;
}
public NettyConfig build() {
return new NettyConfig(
this.port,
this.maxFrameLength,
this.lengthField,
this.delimiters,
this.decoder,
this.encoder,
this.handlerMapping,
this.handlerInterceptor,
this.sessionManager
);
}
}
}

View File

@ -0,0 +1,111 @@
package io.github.yezhihao.netmc;
import io.github.yezhihao.netmc.codec.DelimiterBasedFrameDecoder;
import io.github.yezhihao.netmc.codec.LengthFieldAndDelimiterFrameDecoder;
import io.github.yezhihao.netmc.codec.MessageDecoderWrapper;
import io.github.yezhihao.netmc.codec.MessageEncoderWrapper;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioChannelOption;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.NettyRuntime;
import io.netty.util.concurrent.Future;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.TimeUnit;
/**
* @author yezhihao
* home https://gitee.com/yezhihao/jt808-server
*/
public class TCPServer {
private static final Logger log = LoggerFactory.getLogger(TCPServer.class);
private volatile boolean isRunning = false;
private EventLoopGroup bossGroup = null;
private EventLoopGroup workerGroup = null;
private String name;
private NettyConfig config;
public TCPServer(String name, NettyConfig config) {
this.name = name;
this.config = config;
}
private void startInternal() {
try {
this.bossGroup = new NioEventLoopGroup(1);
this.workerGroup = new NioEventLoopGroup(NettyRuntime.availableProcessors());
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.channel(NioServerSocketChannel.class);
bootstrap.group(bossGroup, workerGroup);
bootstrap.option(NioChannelOption.SO_BACKLOG, 1024)
.option(NioChannelOption.SO_REUSEADDR, true)
.childOption(NioChannelOption.TCP_NODELAY, true)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
private MessageEncoderWrapper messageEncoderWrapper = new MessageEncoderWrapper(config.encoder);
private MessageDecoderWrapper messageDecoderWrapper = new MessageDecoderWrapper(config.decoder);
@Override
public void initChannel(NioSocketChannel channel) {
channel.pipeline()
.addLast(new IdleStateHandler(4, 0, 0, TimeUnit.MINUTES))
.addLast("frameDecoder", frameDecoder())
.addLast("decoder", messageDecoderWrapper)
.addLast("encoder", messageEncoderWrapper)
.addLast("adapter", config.adapter);
}
});
ChannelFuture channelFuture = bootstrap.bind(config.port).sync();
log.warn("==={}启动成功, port={}===", name, config.port);
channelFuture.channel().closeFuture().sync();
} catch (Exception e) {
log.warn("==={}出现异常, port={}===", e);
} finally {
stop();
}
}
public ByteToMessageDecoder frameDecoder() {
if (config.lengthField == null)
return new DelimiterBasedFrameDecoder(config.maxFrameLength, config.delimiter);
return new LengthFieldAndDelimiterFrameDecoder(config.maxFrameLength, config.lengthField, config.delimiter);
}
public synchronized void start() {
if (this.isRunning) {
log.warn("==={}已经启动, port={}===", name, config.port);
return;
}
this.isRunning = true;
new Thread(() -> startInternal()).start();
}
public synchronized void stop() {
if (!this.isRunning) {
log.warn("==={}已经停止, port={}===", name, config.port);
}
this.isRunning = false;
Future future = this.bossGroup.shutdownGracefully();
if (!future.isSuccess())
log.warn("bossGroup 无法正常停止", future.cause());
future = this.workerGroup.shutdownGracefully();
if (!future.isSuccess())
log.warn("workerGroup 无法正常停止", future.cause());
log.warn("==={}已经停止, port={}===", name, config.port);
}
}

View File

@ -0,0 +1,108 @@
package io.github.yezhihao.netmc;
import io.github.yezhihao.netmc.core.HandlerInterceptor;
import io.github.yezhihao.netmc.core.HandlerMapping;
import io.github.yezhihao.netmc.core.handler.Handler;
import io.github.yezhihao.netmc.core.model.Message;
import io.github.yezhihao.netmc.session.Session;
import io.github.yezhihao.netmc.session.SessionManager;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author yezhihao
* home https://gitee.com/yezhihao/jt808-server
*/
@ChannelHandler.Sharable
public class TCPServerHandler extends ChannelInboundHandlerAdapter {
private static final Logger log = LoggerFactory.getLogger(TCPServerHandler.class.getSimpleName());
private HandlerMapping handlerMapping;
private HandlerInterceptor interceptor;
private SessionManager sessionManager;
public TCPServerHandler(HandlerMapping handlerMapping, HandlerInterceptor interceptor, SessionManager sessionManager) {
this.handlerMapping = handlerMapping;
this.interceptor = interceptor;
this.sessionManager = sessionManager;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
if (!(msg instanceof Message))
return;
Message request = (Message) msg;
Message response;
Channel channel = ctx.channel();
Session session = channel.attr(Session.KEY).get();
long time = session.access();
try {
Handler handler = handlerMapping.getHandler(request.getMessageType());
if (handler != null) {
if (!interceptor.beforeHandle(request, session))
return;
response = handler.invoke(request, session);
if (handler.returnVoid) {
response = interceptor.successful(request, session);
} else {
interceptor.afterHandle(request, response, session);
}
} else {
response = interceptor.notSupported(request, session);
}
} catch (Exception e) {
log.warn(String.valueOf(request), e);
response = interceptor.exceptional(request, session, e);
}
time = System.currentTimeMillis() - time;
if (time > 200)
log.info("=========消息ID{},处理耗时{}ms,", request.getHeader(), time);
if (response != null)
ctx.writeAndFlush(response);
}
@Override
public void channelActive(ChannelHandlerContext ctx) {
Channel channel = ctx.channel();
Session session = sessionManager.newSession(channel);
channel.attr(Session.KEY).set(session);
log.info(">>>>>终端连接{}", session);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) {
Session session = ctx.channel().attr(Session.KEY).get();
session.invalidate();
log.info("<<<<<断开连接{}", session);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable e) {
Session session = ctx.channel().attr(Session.KEY).get();
log.info("<<<<<终端异常断开连接" + session);
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
IdleState state = event.state();
if (state == IdleState.READER_IDLE || state == IdleState.WRITER_IDLE) {
Session session = ctx.channel().attr(Session.KEY).get();
log.warn("<<<<<终端主动断开连接{}", session);
ctx.close();
}
}
}
}

View File

@ -0,0 +1,23 @@
package io.github.yezhihao.netmc.codec;
public class Delimiter {
protected byte[] value;
protected boolean strip;
public Delimiter(byte[] value) {
this(value, true);
}
public Delimiter(byte[] value, boolean strip) {
this.value = value;
this.strip = strip;
}
public byte[] getValue() {
return value;
}
public boolean isStrip() {
return strip;
}
}

View File

@ -0,0 +1,122 @@
package io.github.yezhihao.netmc.codec;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.codec.TooLongFrameException;
import io.netty.util.internal.ObjectUtil;
import io.github.yezhihao.netmc.util.ByteBufUtils;
import java.util.List;
import static io.netty.util.internal.ObjectUtil.checkPositive;
public class DelimiterBasedFrameDecoder extends ByteToMessageDecoder {
private final Delimiter[] delimiters;
private final int maxFrameLength;
private final boolean failFast;
private boolean discardingTooLongFrame;
private int tooLongFrameLength;
public DelimiterBasedFrameDecoder(int maxFrameLength, Delimiter... delimiters) {
this(maxFrameLength, true, delimiters);
}
public DelimiterBasedFrameDecoder(int maxFrameLength, boolean failFast, Delimiter... delimiters) {
validateMaxFrameLength(maxFrameLength);
ObjectUtil.checkNonEmpty(delimiters, "delimiters");
this.delimiters = delimiters;
this.maxFrameLength = maxFrameLength;
this.failFast = failFast;
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
Object decoded = decode(ctx, in);
if (decoded != null) {
out.add(decoded);
}
}
protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {
// Try all delimiters and choose the delimiter which yields the shortest frame.
int minFrameLength = Integer.MAX_VALUE;
Delimiter minDelim = null;
for (Delimiter delim : delimiters) {
int frameLength = ByteBufUtils.indexOf(buffer, delim.value);
if (frameLength >= 0 && frameLength < minFrameLength) {
minFrameLength = frameLength;
minDelim = delim;
}
}
if (minDelim != null) {
int minDelimLength = minDelim.value.length;
ByteBuf frame = null;
if (discardingTooLongFrame) {
// We've just finished discarding a very large frame.
// Go back to the initial state.
discardingTooLongFrame = false;
buffer.skipBytes(minFrameLength + minDelimLength);
int tooLongFrameLength = this.tooLongFrameLength;
this.tooLongFrameLength = 0;
if (!failFast) {
fail(tooLongFrameLength);
}
return null;
}
if (minFrameLength > maxFrameLength) {
// Discard read frame.
buffer.skipBytes(minFrameLength + minDelimLength);
fail(minFrameLength);
return null;
}
if (minDelim.strip) {
//忽略长度等于0的报文
if (minFrameLength != 0) {
frame = buffer.readRetainedSlice(minFrameLength);
}
buffer.skipBytes(minDelimLength);
} else {
frame = buffer.readRetainedSlice(minFrameLength + minDelimLength);
}
return frame;
} else {
if (!discardingTooLongFrame) {
if (buffer.readableBytes() > maxFrameLength) {
// Discard the content of the buffer until a delimiter is found.
tooLongFrameLength = buffer.readableBytes();
buffer.skipBytes(buffer.readableBytes());
discardingTooLongFrame = true;
if (failFast) {
fail(tooLongFrameLength);
}
}
} else {
// Still discarding the buffer since a delimiter is not found.
tooLongFrameLength += buffer.readableBytes();
buffer.skipBytes(buffer.readableBytes());
}
return null;
}
}
private void fail(long frameLength) {
if (frameLength > 0) {
throw new TooLongFrameException("frame length exceeds " + maxFrameLength + ": " + frameLength + " - discarded");
} else {
throw new TooLongFrameException("frame length exceeds " + maxFrameLength + " - discarding");
}
}
private static void validateMaxFrameLength(int maxFrameLength) {
checkPositive(maxFrameLength, "maxFrameLength");
}
}

View File

@ -0,0 +1,63 @@
package io.github.yezhihao.netmc.codec;
import static io.netty.util.internal.ObjectUtil.checkPositive;
import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero;
public class LengthField {
protected final byte[] prefix;
protected final int lengthFieldMaxFrameLength;
protected final int lengthFieldOffset;
protected final int lengthFieldLength;
protected final int lengthFieldEndOffset;
protected final int lengthAdjustment;
protected final int initialBytesToStrip;
public LengthField(byte[] prefix, int maxFrameLength, int lengthFieldOffset, int lengthFieldLength) {
this(prefix, maxFrameLength, lengthFieldOffset, lengthFieldLength, 0, 0);
}
public LengthField(byte[] prefix, int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment, int initialBytesToStrip) {
checkPositive(maxFrameLength, "maxFrameLength_LengthField");
checkPositiveOrZero(lengthFieldOffset, "lengthFieldOffset");
checkPositiveOrZero(initialBytesToStrip, "initialBytesToStrip");
if (lengthFieldOffset > maxFrameLength - lengthFieldLength) {
throw new IllegalArgumentException("maxFrameLength_LengthField (" + maxFrameLength + ") must be equal to or greater than lengthFieldOffset (" + lengthFieldOffset + ") + lengthFieldLength (" + lengthFieldLength + ").");
} else {
this.prefix = prefix;
this.lengthFieldMaxFrameLength = maxFrameLength;
this.lengthFieldOffset = lengthFieldOffset;
this.lengthFieldLength = lengthFieldLength;
this.lengthAdjustment = lengthAdjustment;
this.lengthFieldEndOffset = lengthFieldOffset + lengthFieldLength;
this.initialBytesToStrip = initialBytesToStrip;
}
}
public byte[] getPrefix() {
return prefix;
}
public int getLengthFieldMaxFrameLength() {
return lengthFieldMaxFrameLength;
}
public int getLengthFieldOffset() {
return lengthFieldOffset;
}
public int getLengthFieldLength() {
return lengthFieldLength;
}
public int getLengthFieldEndOffset() {
return lengthFieldEndOffset;
}
public int getLengthAdjustment() {
return lengthAdjustment;
}
public int getInitialBytesToStrip() {
return initialBytesToStrip;
}
}

View File

@ -0,0 +1,178 @@
package io.github.yezhihao.netmc.codec;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.CorruptedFrameException;
import io.netty.handler.codec.DecoderException;
import io.netty.handler.codec.TooLongFrameException;
import io.netty.util.internal.ObjectUtil;
import io.github.yezhihao.netmc.util.ByteBufUtils;
import java.util.List;
/**
* @see io.netty.handler.codec.LengthFieldBasedFrameDecoder
*/
public class LengthFieldAndDelimiterFrameDecoder extends DelimiterBasedFrameDecoder {
protected final byte[] prefix;
private final int maxFrameLength;
private final int lengthFieldOffset;
private final int lengthFieldLength;
private final int lengthFieldEndOffset;
private final int lengthAdjustment;
private final int initialBytesToStrip;
private final boolean failFast;
private boolean discardingTooLongFrame;
private int tooLongFrameLength;
private int bytesToDiscard;
public LengthFieldAndDelimiterFrameDecoder(int maxFrameLength, LengthField lengthField, Delimiter... delimiters) {
this(maxFrameLength, true, lengthField, delimiters);
}
public LengthFieldAndDelimiterFrameDecoder(int maxFrameLength, boolean failFast, LengthField lengthField, Delimiter... delimiters) {
super(maxFrameLength, failFast, delimiters);
ObjectUtil.checkPositive(maxFrameLength, "delimiterMaxFrameLength");
ObjectUtil.checkNonEmpty(delimiters, "delimiters");
this.prefix = lengthField.prefix;
this.maxFrameLength = lengthField.lengthFieldMaxFrameLength;
this.lengthFieldOffset = lengthField.lengthFieldOffset;
this.lengthFieldLength = lengthField.lengthFieldLength;
this.lengthFieldEndOffset = lengthField.lengthFieldEndOffset;
this.lengthAdjustment = lengthField.lengthAdjustment;
this.initialBytesToStrip = lengthField.initialBytesToStrip;
this.failFast = failFast;
}
protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
if (discardingTooLongFrame) {
discardingTooLongFrame(in);
}
Object decoded;
if (ByteBufUtils.startsWith(in, prefix)) {
decoded = this.decode(ctx, in);
} else {
decoded = super.decode(ctx, in);
}
if (decoded != null) {
out.add(decoded);
}
}
private void discardingTooLongFrame(ByteBuf in) {
int bytesToDiscard = this.bytesToDiscard;
int localBytesToDiscard = Math.min(bytesToDiscard, in.readableBytes());
in.skipBytes(localBytesToDiscard);
bytesToDiscard -= localBytesToDiscard;
this.bytesToDiscard = bytesToDiscard;
this.failIfNecessary(false);
}
private static void failOnNegativeLengthField(ByteBuf in, int frameLength, int lengthFieldEndOffset) {
in.skipBytes(lengthFieldEndOffset);
throw new CorruptedFrameException("negative pre-adjustment length field: " + frameLength);
}
private static void failOnFrameLengthLessThanLengthFieldEndOffset(ByteBuf in, int frameLength, int lengthFieldEndOffset) {
in.skipBytes(lengthFieldEndOffset);
throw new CorruptedFrameException("Adjusted frame length (" + frameLength + ") is less than lengthFieldEndOffset: " + lengthFieldEndOffset);
}
private void exceededFrameLength(ByteBuf in, int frameLength) {
int discard = frameLength - in.readableBytes();
this.tooLongFrameLength = frameLength;
if (discard < 0) {
in.skipBytes(frameLength);
} else {
this.discardingTooLongFrame = true;
this.bytesToDiscard = discard;
in.skipBytes(in.readableBytes());
}
this.failIfNecessary(true);
}
private static void failOnFrameLengthLessThanInitialBytesToStrip(ByteBuf in, int frameLength, int initialBytesToStrip) {
in.skipBytes(frameLength);
throw new CorruptedFrameException("Adjusted frame length (" + frameLength + ") is less than initialBytesToStrip: " + initialBytesToStrip);
}
protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
if (in.readableBytes() < this.lengthFieldEndOffset) {
return null;
} else {
int actualLengthFieldOffset = in.readerIndex() + this.lengthFieldOffset;
int frameLength = this.getUnadjustedFrameLength(in, actualLengthFieldOffset, this.lengthFieldLength);
if (frameLength < 0) {
failOnNegativeLengthField(in, frameLength, this.lengthFieldEndOffset);
}
frameLength += this.lengthAdjustment + this.lengthFieldEndOffset;
if (frameLength < this.lengthFieldEndOffset) {
failOnFrameLengthLessThanLengthFieldEndOffset(in, frameLength, this.lengthFieldEndOffset);
}
if (frameLength > this.maxFrameLength) {
this.exceededFrameLength(in, frameLength);
return null;
} else {
if (in.readableBytes() < frameLength) {
return null;
} else {
if (this.initialBytesToStrip > frameLength) {
failOnFrameLengthLessThanInitialBytesToStrip(in, frameLength, initialBytesToStrip);
}
in.skipBytes(this.initialBytesToStrip);
int readerIndex = in.readerIndex();
int actualFrameLength = frameLength - this.initialBytesToStrip;
ByteBuf frame = in.retainedSlice(readerIndex, actualFrameLength);
in.readerIndex(readerIndex + actualFrameLength);
return frame;
}
}
}
}
protected int getUnadjustedFrameLength(ByteBuf buf, int offset, int length) {
int frameLength;
switch (length) {
case 2:
frameLength = buf.getUnsignedShort(offset);
break;
case 3:
frameLength = buf.getUnsignedMedium(offset);
break;
case 4:
frameLength = buf.getInt(offset);
break;
default:
throw new DecoderException("unsupported lengthFieldLength: " + lengthFieldLength + " (expected: 2, 3, 4)");
}
return frameLength;
}
private void failIfNecessary(boolean firstDetectionOfTooLongFrame) {
if (this.bytesToDiscard == 0) {
int tooLongFrameLength = this.tooLongFrameLength;
this.tooLongFrameLength = 0;
this.discardingTooLongFrame = false;
if (!this.failFast || firstDetectionOfTooLongFrame) {
this.fail(tooLongFrameLength);
}
} else if (this.failFast && firstDetectionOfTooLongFrame) {
this.fail(this.tooLongFrameLength);
}
}
private void fail(long frameLength) {
if (frameLength > 0) {
throw new TooLongFrameException("Adjusted frame length exceeds " + this.maxFrameLength + ": " + frameLength + " - discarded");
} else {
throw new TooLongFrameException("Adjusted frame length exceeds " + this.maxFrameLength + " - discarding");
}
}
}

View File

@ -0,0 +1,17 @@
package io.github.yezhihao.netmc.codec;
import io.netty.buffer.ByteBuf;
import io.github.yezhihao.netmc.session.Session;
/**
*
* @author yezhihao
* home https://gitee.com/yezhihao/jt808-server
*/
public interface MessageDecoder<T> {
T decode(ByteBuf buf);
T decode(ByteBuf buf, Session session);
}

View File

@ -0,0 +1,55 @@
package io.github.yezhihao.netmc.codec;
import io.github.yezhihao.netmc.session.Session;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.DecoderException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
*
* @author yezhihao
* home https://gitee.com/yezhihao/jt808-server
*/
@ChannelHandler.Sharable
public class MessageDecoderWrapper extends ChannelInboundHandlerAdapter {
private static final Logger log = LoggerFactory.getLogger(MessageDecoderWrapper.class.getSimpleName());
private MessageDecoder decoder;
public MessageDecoderWrapper(MessageDecoder decoder) {
this.decoder = decoder;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
if (msg instanceof ByteBuf) {
ByteBuf buf = (ByteBuf) msg;
try {
if (log.isInfoEnabled()) {
String hex;
if (buf.readableBytes() < 1048)
hex = ByteBufUtil.hexDump(buf);
else
hex = ByteBufUtil.hexDump(buf.slice(0, 32)) + "..." + ByteBufUtil.hexDump(buf.slice(buf.readableBytes() - 32, 32));
log.info(">>>>>原始报文[ip={}],hex={}", ctx.channel().remoteAddress(), hex);
}
Object message = decoder.decode(buf, ctx.channel().attr(Session.KEY).get());
if (message != null)
ctx.fireChannelRead(message);
buf.skipBytes(buf.readableBytes());
} catch (Exception e) {
throw new DecoderException(e);
} finally {
buf.release();
}
} else {
ctx.fireChannelRead(msg);
}
}
}

View File

@ -0,0 +1,14 @@
package io.github.yezhihao.netmc.codec;
import io.netty.buffer.ByteBuf;
/**
*
* @author yezhihao
* home https://gitee.com/yezhihao/jt808-server
*/
public interface MessageEncoder<T> {
ByteBuf encode(T message);
}

View File

@ -0,0 +1,55 @@
package io.github.yezhihao.netmc.codec;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.EncoderException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
*
* @author yezhihao
* home https://gitee.com/yezhihao/jt808-server
*/
@ChannelHandler.Sharable
public class MessageEncoderWrapper extends ChannelOutboundHandlerAdapter {
private static final Logger log = LoggerFactory.getLogger(MessageEncoderWrapper.class.getSimpleName());
private MessageEncoder encoder;
public MessageEncoderWrapper(MessageEncoder encoder) {
this.encoder = encoder;
}
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
ByteBuf buf = null;
try {
buf = encoder.encode(msg);
if (log.isInfoEnabled())
log.info("<<<<<原始报文[ip={}],hex={}", ctx.channel().remoteAddress(), ByteBufUtil.hexDump(buf));
if (buf.isReadable()) {
ctx.write(buf, promise);
} else {
buf.release();
ctx.write(Unpooled.EMPTY_BUFFER, promise);
}
buf = null;
} catch (EncoderException e) {
throw e;
} catch (Throwable e) {
throw new EncoderException(e);
} finally {
if (buf != null) {
buf.release();
}
}
}
}

View File

@ -0,0 +1,62 @@
package io.github.yezhihao.netmc.core;
import io.github.yezhihao.netmc.core.annotation.AsyncBatch;
import io.github.yezhihao.netmc.core.annotation.Mapping;
import io.github.yezhihao.netmc.core.handler.AsyncBatchHandler;
import io.github.yezhihao.netmc.core.handler.Handler;
import io.github.yezhihao.netmc.core.handler.SimpleHandler;
import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.Map;
/**
*
* @author yezhihao
* home https://gitee.com/yezhihao/jt808-server
*/
public abstract class AbstractHandlerMapping implements HandlerMapping {
private final Map<Object, Handler> handlerMap = new HashMap(60);
/**
* Endpoint@Mapping
*/
protected synchronized void registerHandlers(Object bean) {
Class<?> beanClass = bean.getClass();
Method[] methods = beanClass.getDeclaredMethods();
if (methods == null)
return;
for (Method method : methods) {
Mapping mapping = method.getAnnotation(Mapping.class);
if (mapping != null) {
String desc = mapping.desc();
int[] types = mapping.types();
AsyncBatch asyncBatch = method.getAnnotation(AsyncBatch.class);
Handler handler;
if (asyncBatch != null) {
handler = new AsyncBatchHandler(bean, method, desc, asyncBatch.poolSize(), asyncBatch.maxElements(), asyncBatch.maxWait());
} else {
handler = new SimpleHandler(bean, method, desc);
}
for (int type : types) {
handlerMap.put(type, handler);
}
}
}
}
/**
* Handler
*/
public Handler getHandler(Object messageType) {
return handlerMap.get(messageType);
}
}

View File

@ -0,0 +1,26 @@
package io.github.yezhihao.netmc.core;
import io.github.yezhihao.netmc.core.annotation.Endpoint;
import io.github.yezhihao.netmc.util.ClassUtils;
import java.util.List;
/**
* @author yezhihao
* home https://gitee.com/yezhihao/jt808-server
*/
public class DefaultHandlerMapping extends AbstractHandlerMapping {
public DefaultHandlerMapping(String endpointPackage) {
List<Class<?>> endpointClasses = ClassUtils.getClassList(endpointPackage, Endpoint.class);
for (Class<?> endpointClass : endpointClasses) {
try {
Object bean = endpointClass.newInstance();
super.registerHandlers(bean);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
}

View File

@ -0,0 +1,27 @@
package io.github.yezhihao.netmc.core;
import io.github.yezhihao.netmc.core.model.Header;
import io.github.yezhihao.netmc.core.model.Message;
import io.github.yezhihao.netmc.session.Session;
/**
*
* @author yezhihao
* home https://gitee.com/yezhihao/jt808-server
*/
public interface HandlerInterceptor<T extends Message<? extends Header>> {
/** 未找到对应的Handle */
T notSupported(T request, Session session);
/** 调用之前 */
boolean beforeHandle(T request, Session session);
/** 调用之后返回值为void的 */
T successful(T request, Session session);
/** 调用之后,有返回值的 */
void afterHandle(T request, T response, Session session);
/** 调用之后抛出异常的 */
T exceptional(T request, Session session, Exception e);
}

View File

@ -0,0 +1,14 @@
package io.github.yezhihao.netmc.core;
import io.github.yezhihao.netmc.core.handler.Handler;
/**
*
* @author yezhihao
* home https://gitee.com/yezhihao/jt808-server
*/
public interface HandlerMapping {
Handler getHandler(Object messageType);
}

View File

@ -0,0 +1,23 @@
package io.github.yezhihao.netmc.core;
import io.github.yezhihao.netmc.core.annotation.Endpoint;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import java.util.Map;
/**
* @author yezhihao
* home https://gitee.com/yezhihao/jt808-server
*/
public class SpringHandlerMapping extends AbstractHandlerMapping implements ApplicationContextAware {
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
Map<String, Object> endpoints = applicationContext.getBeansWithAnnotation(Endpoint.class);
for (Object bean : endpoints.values()) {
super.registerHandlers(bean);
}
}
}

View File

@ -0,0 +1,24 @@
package io.github.yezhihao.netmc.core.annotation;
import java.lang.annotation.*;
/**
*
* @author yezhihao
* home https://gitee.com/yezhihao/jt808-server
*/
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface AsyncBatch {
//线程数量
int poolSize() default 2;
//最大累计消息数
int maxElements() default 400;
//最大等待时间
int maxWait() default 1000;
}

View File

@ -0,0 +1,15 @@
package io.github.yezhihao.netmc.core.annotation;
import java.lang.annotation.*;
/**
*
* @author yezhihao
* home https://gitee.com/yezhihao/jt808-server
*/
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface Endpoint {
}

View File

@ -0,0 +1,19 @@
package io.github.yezhihao.netmc.core.annotation;
import java.lang.annotation.*;
/**
*
* @author yezhihao
* home https://gitee.com/yezhihao/jt808-server
*/
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface Mapping {
int[] types();
String desc() default "";
}

View File

@ -0,0 +1,118 @@
package io.github.yezhihao.netmc.core.handler;
import io.github.yezhihao.netmc.core.model.Message;
import io.github.yezhihao.netmc.session.Session;
import io.github.yezhihao.netmc.util.VirtualList;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.reflect.Method;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
*
* @author yezhihao
* home https://gitee.com/yezhihao/jt808-server
*/
public class AsyncBatchHandler extends Handler {
private static final Logger log = LoggerFactory.getLogger(AsyncBatchHandler.class.getSimpleName());
private ConcurrentLinkedQueue<Message> queue;
private ThreadPoolExecutor executor;
private int poolSize;
private int maxElements;
private int maxWait;
private int warningLines;
public AsyncBatchHandler(Object actionClass, Method actionMethod, String desc, int poolSize, int maxElements, int maxWait) {
super(actionClass, actionMethod, desc);
Class<?>[] parameterTypes = actionMethod.getParameterTypes();
if (parameterTypes.length > 1)
throw new RuntimeException("@AsyncBatch方法仅支持一个List参数:" + actionMethod);
if (!parameterTypes[0].isAssignableFrom(List.class))
throw new RuntimeException("@AsyncBatch方法的参数不是List类型:" + actionMethod);
this.poolSize = poolSize;
this.maxElements = maxElements;
this.maxWait = maxWait;
this.warningLines = maxElements * poolSize * 50;
this.queue = new ConcurrentLinkedQueue();
this.executor = new ThreadPoolExecutor(this.poolSize, this.poolSize, 1000L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(400),
new BasicThreadFactory.Builder().namingPattern(actionMethod.getName() + "-pool-%d").build());
for (int i = 0; i < poolSize; i++) {
boolean master = i == 0;
executor.execute(() -> {
try {
startInternal(master);
} catch (Exception e) {
log.error("批处理线程出错", e);
}
});
}
}
public Message invoke(Message request, Session session) {
queue.offer(request);
return null;
}
public void startInternal(boolean master) {
Message[] array = new Message[maxElements];
long logtime = 0;
long starttime = 0;
for (; ; ) {
Message temp;
int i = 0;
while ((temp = queue.poll()) != null) {
array[i++] = temp;
if (i >= maxElements)
break;
}
if (i > 0) {
starttime = System.currentTimeMillis();
try {
targetMethod.invoke(targetObject, new VirtualList<>(array, i));
} catch (Exception e) {
log.warn(targetMethod.getName(), e);
}
long time = System.currentTimeMillis() - starttime;
if (time > 1000L)
log.warn("批处理耗时:{}ms,共{}条记录", time, i);
}
if (i < maxElements) {
try {
for (int j = 0; j < i; j++)
array[j] = null;
Thread.sleep(maxWait);
} catch (InterruptedException e) {
}
} else if (master) {
if (logtime < starttime) {
logtime = starttime + 5000L;
int size = queue.size();
if (size > warningLines) {
log.warn("批处理队列繁忙, size:{}", size);
}
}
}
}
}
}

View File

@ -0,0 +1,82 @@
package io.github.yezhihao.netmc.core.handler;
import io.github.yezhihao.netmc.core.model.Header;
import io.github.yezhihao.netmc.core.model.Message;
import io.github.yezhihao.netmc.session.Session;
import sun.reflect.generics.reflectiveObjects.ParameterizedTypeImpl;
import java.lang.reflect.Method;
import java.lang.reflect.Type;
/**
* @author yezhihao
* home https://gitee.com/yezhihao/jt808-server
*/
@SuppressWarnings("unchecked")
public abstract class Handler {
public static final int MESSAGE = 0;
public static final int SESSION = 1;
public static final int HEADER = 2;
public final Object targetObject;
public final Method targetMethod;
public final int[] parameterTypes;
public final boolean returnVoid;
public final String desc;
public Handler(Object targetObject, Method targetMethod, String desc) {
this.targetObject = targetObject;
this.targetMethod = targetMethod;
this.returnVoid = targetMethod.getReturnType().isAssignableFrom(Void.TYPE);
this.desc = desc;
Type[] types = targetMethod.getGenericParameterTypes();
int[] parameterTypes = new int[types.length];
try {
for (int i = 0; i < types.length; i++) {
Type type = types[i];
Class clazz;
if (type instanceof ParameterizedTypeImpl)
clazz = (Class<?>) ((ParameterizedTypeImpl) type).getActualTypeArguments()[0];
else
clazz = (Class<?>) type;
if (Message.class.isAssignableFrom(clazz))
parameterTypes[i] = MESSAGE;
else if (Header.class.isAssignableFrom(clazz))
parameterTypes[i] = HEADER;
else if (Session.class.isAssignableFrom(clazz))
parameterTypes[i] = SESSION;
}
} catch (Exception e) {
throw new RuntimeException(e);
}
this.parameterTypes = parameterTypes;
}
public <T extends Message> T invoke(T request, Session session) throws Exception {
Object[] args = new Object[parameterTypes.length];
for (int i = 0; i < parameterTypes.length; i++) {
int type = parameterTypes[i];
switch (type) {
case Handler.MESSAGE:
args[i] = request;
break;
case Handler.SESSION:
args[i] = session;
break;
case Handler.HEADER:
args[i] = request.getHeader();
break;
}
}
return (T) targetMethod.invoke(targetObject, args);
}
@Override
public String toString() {
return desc;
}
}

View File

@ -0,0 +1,22 @@
package io.github.yezhihao.netmc.core.handler;
import io.github.yezhihao.netmc.core.model.Message;
import io.github.yezhihao.netmc.session.Session;
import java.lang.reflect.Method;
/**
*
* @author yezhihao
* home https://gitee.com/yezhihao/jt808-server
*/
public class SimpleHandler extends Handler {
public SimpleHandler(Object actionClass, Method actionMethod, String desc) {
super(actionClass, actionMethod, desc);
}
public Message invoke(Message request, Session session) throws Exception {
return super.invoke(request, session);
}
}

View File

@ -0,0 +1,19 @@
package io.github.yezhihao.netmc.core.model;
import java.io.Serializable;
/**
*
* @author yezhihao
* home https://gitee.com/yezhihao/jt808-server
*/
public interface Header<T> extends Serializable {
/** 客户端唯一标识 */
T getClientId();
/** 消息流水号 */
int getSerialNo();
void setSerialNo(int serialNo);
}

View File

@ -0,0 +1,15 @@
package io.github.yezhihao.netmc.core.model;
import java.io.Serializable;
/**
*
* @author yezhihao
* home https://gitee.com/yezhihao/jt808-server
*/
public interface Message<T extends Header> extends Serializable {
T getHeader();
Object getMessageType();
}

View File

@ -0,0 +1,12 @@
package io.github.yezhihao.netmc.core.model;
/**
* @author yezhihao
* home https://gitee.com/yezhihao/jt808-server
*/
public interface Response {
/** 应答消息流水号 */
int getSerialNo();
}

View File

@ -0,0 +1,126 @@
package io.github.yezhihao.netmc.session;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.github.yezhihao.netmc.core.model.Header;
import io.github.yezhihao.netmc.core.model.Message;
import io.github.yezhihao.netmc.core.model.Response;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
/**
* @author yezhihao
* home https://gitee.com/yezhihao/jt808-server
*/
public class MessageManager {
private static final Logger log = LoggerFactory.getLogger(MessageManager.class.getSimpleName());
private Map<String, SynchronousQueue> topicSubscribers = new ConcurrentHashMap<>();
private SessionManager sessionManager;
public MessageManager(SessionManager sessionManager) {
this.sessionManager = sessionManager;
}
/**
*
*/
public boolean notify(Message<? extends Header> message) {
Header header = message.getHeader();
Object clientId = header.getClientId();
Session session = sessionManager.get(clientId);
if (session == null) {
log.info("<<<<<<<<<<消息发送失败,未注册,{}", message);
return false;
}
header.setSerialNo(session.nextSerialNo());
session.writeObject(message);
return true;
}
/**
*
* 20
*/
public <T> T request(Message<? extends Header> request, Class<T> responseClass) {
return request(request, responseClass, 20000);
}
public <T> T request(Message<? extends Header> request, Class<T> responseClass, long timeout) {
Header header = request.getHeader();
Object clientId = header.getClientId();
Session session = sessionManager.get(clientId);
if (session == null) {
log.info("<<<<<<<<<<消息发送失败,未注册,{}", request);
return null;
}
header.setSerialNo(session.nextSerialNo());
String key = requestKey(header, responseClass);
SynchronousQueue syncQueue = this.subscribe(key);
if (syncQueue == null) {
log.info("<<<<<<<<<<请勿重复发送,{}", request);
}
try {
session.writeObject(request);
return (T) syncQueue.poll(timeout, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
log.warn("<<<<<<<<<<等待响应超时" + session, e);
} finally {
this.unsubscribe(key);
}
return null;
}
/**
*
*/
public boolean response(Message message) {
SynchronousQueue queue = topicSubscribers.get(responseKey(message));
if (queue != null)
return queue.offer(message);
return false;
}
private SynchronousQueue subscribe(String key) {
SynchronousQueue queue = null;
if (!topicSubscribers.containsKey(key))
topicSubscribers.put(key, queue = new SynchronousQueue());
return queue;
}
private void unsubscribe(String key) {
topicSubscribers.remove(key);
}
private static String requestKey(Header header, Class responseClass) {
StringBuilder key = new StringBuilder(13 + 1 + 27 + 1 + 5);
key.append(header.getClientId()).append('/').append(responseClass.getName());
if (Response.class.isAssignableFrom(responseClass))
key.append('/').append(header.getSerialNo());
return key.toString();
}
private static String responseKey(Message response) {
Class<? extends Message> responseClass = response.getClass();
Header header = response.getHeader();
StringBuilder key = new StringBuilder(13 + 1 + 27 + 1 + 5);
key.append(header.getClientId()).append('/').append(responseClass.getName());
if (response instanceof Response)
key.append('/').append(((Response) response).getSerialNo());
return key.toString();
}
}

View File

@ -0,0 +1,180 @@
package io.github.yezhihao.netmc.session;
import io.netty.channel.Channel;
import io.netty.util.AttributeKey;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.github.yezhihao.netmc.core.model.Header;
import java.util.Collection;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @author yezhihao
* home https://gitee.com/yezhihao/jt808-server
*/
public class Session {
private static final Logger log = LoggerFactory.getLogger(Session.class.getSimpleName());
public static final AttributeKey<Session> KEY = AttributeKey.newInstance(Session.class.getName());
protected final Channel channel;
private AtomicInteger serialNo = new AtomicInteger(0);
private boolean registered = false;
private Object clientId;
private final long creationTime;
private volatile long lastAccessedTime;
private Map<String, Object> attributes;
private Object subject;
private Object snapshot;
private Integer protocolVersion;
private SessionManager sessionManager;
protected Session(Channel channel, SessionManager sessionManager) {
this.channel = channel;
this.sessionManager = sessionManager;
this.creationTime = System.currentTimeMillis();
this.lastAccessedTime = creationTime;
this.attributes = new TreeMap<>();
}
public void writeObject(Object message) {
log.info("<<<<<<<<<<消息下发{},{}", this, message);
channel.writeAndFlush(message);
}
public int getId() {
return channel.id().hashCode();
}
public int nextSerialNo() {
int current;
int next;
do {
current = serialNo.get();
next = current > 0xffff ? 0 : current;
} while (!serialNo.compareAndSet(current, next + 1));
return next;
}
public boolean isRegistered() {
return registered;
}
/**
* SessionManager
*/
public void register(Header header) {
this.register(header, null);
}
public void register(Header header, Object subject) {
this.clientId = header.getClientId();
this.registered = true;
this.subject = subject;
sessionManager.put(clientId, this);
}
public Object getClientId() {
return clientId;
}
public long getCreationTime() {
return creationTime;
}
public long getLastAccessedTime() {
return lastAccessedTime;
}
public long access() {
lastAccessedTime = System.currentTimeMillis();
return lastAccessedTime;
}
public Collection<String> getAttributeNames() {
return attributes.keySet();
}
public Object getAttribute(String name) {
return attributes.get(name);
}
public void setAttribute(String name, Object value) {
attributes.put(name, value);
}
public Object removeAttribute(String name) {
return attributes.remove(name);
}
public Object getSubject() {
return subject;
}
public void setSubject(Object subject) {
this.subject = subject;
}
public Object getSnapshot() {
return snapshot;
}
public void setSnapshot(Object snapshot) {
this.snapshot = snapshot;
}
public Integer getProtocolVersion() {
return protocolVersion;
}
public void setProtocolVersion(int protocolVersion) {
this.protocolVersion = protocolVersion;
}
public Integer cachedProtocolVersion(Object clientId) {
return this.sessionManager.getVersion(clientId);
}
public void recordProtocolVersion(Object clientId, int protocolVersion) {
this.protocolVersion = protocolVersion;
this.sessionManager.putVersion(clientId, protocolVersion);
}
public void invalidate() {
channel.close();
sessionManager.callSessionDestroyedListener(this);
}
@Override
public boolean equals(Object o) {
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
return false;
Session that = (Session) o;
return this.getId() == that.getId();
}
@Override
public int hashCode() {
return getId();
}
@Override
public String toString() {
final StringBuilder sb = new StringBuilder(66);
sb.append("[ip=").append(channel.remoteAddress());
sb.append(", cid=").append(clientId);
sb.append(", reg=").append(registered);
sb.append(']');
return sb.toString();
}
}

View File

@ -0,0 +1,13 @@
package io.github.yezhihao.netmc.session;
/**
* @author yezhihao
* home https://gitee.com/yezhihao/jt808-server
*/
public interface SessionListener {
default void sessionCreated(Session session) {
}
default void sessionDestroyed(Session session) {
}
}

View File

@ -0,0 +1,81 @@
package io.github.yezhihao.netmc.session;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFutureListener;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
/**
* @author yezhihao
* home https://gitee.com/yezhihao/jt808-server
*/
public class SessionManager {
private Map<Object, Session> sessionMap;
private Cache<Object, Integer> versionCache;
private ChannelFutureListener remover;
private SessionListener sessionListener;
public SessionManager() {
this.sessionMap = new ConcurrentHashMap<>();
this.versionCache = Caffeine.newBuilder().expireAfterWrite(5, TimeUnit.MINUTES).build();
this.remover = future -> {
Session session = future.channel().attr(Session.KEY).get();
if (session != null) {
sessionMap.remove(session.getClientId(), session);
}
};
}
public SessionManager(SessionListener sessionListener) {
this();
this.sessionListener = sessionListener;
}
public Session newSession(Channel channel) {
Session session = new Session(channel, this);
callSessionCreatedListener(session);
return session;
}
protected void callSessionDestroyedListener(Session session) {
if (sessionListener != null)
sessionListener.sessionDestroyed(session);
}
protected void callSessionCreatedListener(Session session) {
if (sessionListener != null)
sessionListener.sessionCreated(session);
}
public Session get(Object clientId) {
return sessionMap.get(clientId);
}
public Collection<Session> all() {
return sessionMap.values();
}
protected void put(Object clientId, Session newSession) {
Session oldSession = sessionMap.put(clientId, newSession);
if (!newSession.equals(oldSession)) {
newSession.channel.closeFuture().addListener(remover);
}
}
public void putVersion(Object clientId, int version) {
versionCache.put(clientId, version);
}
public Integer getVersion(Object clientId) {
return versionCache.getIfPresent(clientId);
}
}

View File

@ -0,0 +1,26 @@
package io.github.yezhihao.netmc.util;
import java.util.AbstractList;
import java.util.List;
import java.util.function.Function;
public final class AdapterList<S, T> extends AbstractList<T> {
private final List<S> src;
private final Function<S, T> function;
public AdapterList(List<S> src, Function<S, T> function) {
this.src = src;
this.function = function;
}
@Override
public T get(int index) {
return function.apply(src.get(index));
}
@Override
public int size() {
return src.size();
}
}

View File

@ -0,0 +1,45 @@
package io.github.yezhihao.netmc.util;
import io.netty.buffer.ByteBuf;
/**
* @author yezhihao
* home https://gitee.com/yezhihao/jt808-server
*/
public class ByteBufUtils {
/**
* Returns the number of bytes between the readerIndex of the haystack and
* the first needle found in the haystack. -1 is returned if no needle is
* found in the haystack.
*/
public static int indexOf(ByteBuf haystack, byte[] needle) {
for (int i = haystack.readerIndex(); i < haystack.writerIndex(); i++) {
int haystackIndex = i;
int needleIndex;
for (needleIndex = 0; needleIndex < needle.length; needleIndex++) {
if (haystack.getByte(haystackIndex) != needle[needleIndex]) {
break;
} else {
haystackIndex++;
if (haystackIndex == haystack.writerIndex() && needleIndex != needle.length - 1) {
return -1;
}
}
}
if (needleIndex == needle.length) {
// Found the needle from the haystack!
return i - haystack.readerIndex();
}
}
return -1;
}
public static boolean startsWith(ByteBuf haystack, byte[] prefix) {
for (int i = 0, j = haystack.readerIndex(); i < prefix.length; )
if (prefix[i++] != haystack.getByte(j++))
return false;
return true;
}
}

View File

@ -0,0 +1,118 @@
package io.github.yezhihao.netmc.util;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.lang.annotation.Annotation;
import java.net.JarURLConnection;
import java.net.URL;
import java.util.Enumeration;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.jar.JarEntry;
import java.util.jar.JarFile;
/**
* @author yezhihao
* home https://gitee.com/yezhihao/jt808-server
*/
public class ClassUtils {
private static final Logger log = LoggerFactory.getLogger(ClassUtils.class.getSimpleName());
public static List<Class<?>> getClassList(String packageName, Class<? extends Annotation> annotationClass) {
List<Class<?>> classList = getClassList(packageName);
Iterator<Class<?>> iterator = classList.iterator();
while (iterator.hasNext()) {
Class<?> next = iterator.next();
if (!next.isAnnotationPresent(annotationClass))
iterator.remove();
}
return classList;
}
public static List<Class<?>> getClassList(String packageName) {
List<Class<?>> classList = new LinkedList();
String path = packageName.replace(".", "/");
try {
Enumeration<URL> urls = ClassUtils.getClassLoader().getResources(path);
while (urls.hasMoreElements()) {
URL url = urls.nextElement();
if (url != null) {
String protocol = url.getProtocol();
if (protocol.equals("file")) {
addClass(classList, url.toURI().getPath(), packageName);
} else if (protocol.equals("jar")) {
JarURLConnection jarURLConnection = (JarURLConnection) url.openConnection();
JarFile jarFile = jarURLConnection.getJarFile();
Enumeration<JarEntry> jarEntries = jarFile.entries();
while (jarEntries.hasMoreElements()) {
JarEntry jarEntry = jarEntries.nextElement();
String entryName = jarEntry.getName();
if (entryName.startsWith(path) && entryName.endsWith(".class")) {
String className = entryName.substring(0, entryName.lastIndexOf(".")).replaceAll("/", ".");
addClass(classList, className);
}
}
}
}
}
} catch (Exception e) {
log.error("获取类出错!", e);
}
return classList;
}
private static void addClass(List<Class<?>> classList, String packagePath, String packageName) {
try {
File[] files = new File(packagePath).listFiles(file -> (file.isDirectory() || file.getName().endsWith(".class")));
if (files != null)
for (File file : files) {
String fileName = file.getName();
if (file.isFile()) {
String className = fileName.substring(0, fileName.lastIndexOf("."));
if (packageName != null) {
className = packageName + "." + className;
}
addClass(classList, className);
} else {
String subPackagePath = fileName;
if (packageName != null) {
subPackagePath = packagePath + "/" + subPackagePath;
}
String subPackageName = fileName;
if (packageName != null) {
subPackageName = packageName + "." + subPackageName;
}
addClass(classList, subPackagePath, subPackageName);
}
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
private static void addClass(List<Class<?>> classList, String className) {
classList.add(loadClass(className, false));
}
public static Class<?> loadClass(String className, boolean isInitialized) {
try {
return Class.forName(className, isInitialized, getClassLoader());
} catch (ClassNotFoundException e) {
throw new RuntimeException(e);
}
}
public static ClassLoader getClassLoader() {
return Thread.currentThread().getContextClassLoader();
}
}

View File

@ -0,0 +1,102 @@
package io.github.yezhihao.netmc.util;
import java.io.Serializable;
import java.util.*;
import java.util.function.Consumer;
import java.util.function.UnaryOperator;
/**
* @author yezhihao
* home https://gitee.com/yezhihao/jt808-server
*/
public class VirtualList<E> extends AbstractList<E> implements RandomAccess, Serializable {
private final E[] elementData;
private final int size;
public VirtualList(E[] array, int length) {
this.elementData = array;
this.size = length;
}
@Override
public int size() {
return size;
}
@Override
public Object[] toArray() {
return elementData.clone();
}
@Override
@SuppressWarnings("unchecked")
public <T> T[] toArray(T[] a) {
if (a.length < size)
return Arrays.copyOf(this.elementData, size,
(Class<? extends T[]>) a.getClass());
System.arraycopy(this.elementData, 0, a, 0, size);
if (a.length > size)
a[size] = null;
return a;
}
@Override
public E get(int index) {
return elementData[index];
}
@Override
public E set(int index, E element) {
E oldValue = elementData[index];
elementData[index] = element;
return oldValue;
}
@Override
public int indexOf(Object o) {
E[] a = this.elementData;
if (o == null) {
for (int i = 0; i < size; i++)
if (a[i] == null)
return i;
} else {
for (int i = 0; i < size; i++)
if (o.equals(a[i]))
return i;
}
return -1;
}
@Override
public boolean contains(Object o) {
return indexOf(o) != -1;
}
@Override
public Spliterator<E> spliterator() {
return Spliterators.spliterator(elementData, 0, size, Spliterator.ORDERED);
}
@Override
public void forEach(Consumer<? super E> action) {
Objects.requireNonNull(action);
for (int i = 0; i < size; i++) {
action.accept(elementData[i]);
}
}
@Override
public void replaceAll(UnaryOperator<E> operator) {
Objects.requireNonNull(operator);
E[] a = this.elementData;
for (int i = 0; i < size; i++) {
a[i] = operator.apply(a[i]);
}
}
@Override
public void sort(Comparator<? super E> c) {
Arrays.sort(elementData, 0, size, c);
}
}

View File

@ -0,0 +1,29 @@
<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="WARN">
<properties>
<property name="LOG_HOME">logs/</property>
<property name="LOG_NAME">jt808</property>
</properties>
<Appenders>
<Console name="Console" target="SYSTEM_OUT">
<PatternLayout pattern="924 %d{HH:mm:ss.SSS} [%thread] %-5level%-21c{10}[%line]%msg%n"/>
</Console>
<RollingRandomAccessFile name="RollingRandomAccessFile" fileName="${LOG_HOME}/${LOG_NAME}.log" filePattern="${LOG_HOME}/${LOG_NAME}-%d{yyyy-MM-dd}-%i.log">
<PatternLayout pattern="828 %d{HH:mm:ss.SSS} [%thread] %-5level%-21c{10}[%line]%msg%n"/>
<DefaultRolloverStrategy max="20"/>
<Policies>
<TimeBasedTriggeringPolicy modulate="true" interval="1"/>
<SizeBasedTriggeringPolicy size="20 MB"/>
</Policies>
</RollingRandomAccessFile>
</Appenders>
<Loggers>
<Root level="info">
<AppenderRef ref="Console"/>
<AppenderRef ref="RollingRandomAccessFile"/>
</Root>
</Loggers>
</Configuration>

View File

@ -0,0 +1,7 @@
package io.github.yezhihao.netmc;
public class Test {
public static void main(String[] args) {
System.out.println();
}
}