捋一捋 async-tool 的问题

昨天做完精准营销的需求后, 提测版本一直连不上 MQ, 然后在本地启动后也未发现问题, 直到监听的消息队列有消息而且是大量消息时才会出现的错误:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
javax.jms.JMSException: Cannot send, channel has already failed: tcp://172.31.205.58:61616
at org.apache.activemq.util.JMSExceptionSupport.create(JMSExceptionSupport.java:62)
at org.apache.activemq.ActiveMQConnection.syncSendPacket(ActiveMQConnection.java:1409)
at org.apache.activemq.ActiveMQConnection.ensureConnectionInfoSent(ActiveMQConnection.java:1496)
at org.apache.activemq.ActiveMQConnection.createSession(ActiveMQConnection.java:325)
at org.apache.activemq.pool.ConnectionPool$2.makeObject(ConnectionPool.java:105)
at org.apache.activemq.pool.ConnectionPool$2.makeObject(ConnectionPool.java:90)
at org.apache.commons.pool.impl.GenericKeyedObjectPool.borrowObject(GenericKeyedObjectPool.java:1179)
at org.apache.activemq.pool.ConnectionPool.createSession(ConnectionPool.java:142)
at org.apache.activemq.pool.PooledConnection.createSession(PooledConnection.java:174)
at com.xxxx.msearch.core.support.activemq.producer.JmsProducer.sendMsg(JmsProducer.java:137)
at com.xxxx.msearch.core.support.activemq.producer.JmsProducer.access$100(JmsProducer.java:19)
at com.xxxx.msearch.core.support.activemq.producer.JmsProducer$2.run(JmsProducer.java:114)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.activemq.transport.InactivityIOException: Cannot send, channel has already failed: tcp://172.31.205.58:61616
at org.apache.activemq.transport.AbstractInactivityMonitor.doOnewaySend(AbstractInactivityMonitor.java:315)
at org.apache.activemq.transport.AbstractInactivityMonitor.oneway(AbstractInactivityMonitor.java:304)
at org.apache.activemq.transport.TransportFilter.oneway(TransportFilter.java:85)
at org.apache.activemq.transport.WireFormatNegotiator.oneway(WireFormatNegotiator.java:104)
at org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:68)
at org.apache.activemq.transport.ResponseCorrelator.asyncRequest(ResponseCorrelator.java:81)
at org.apache.activemq.transport.ResponseCorrelator.request(ResponseCorrelator.java:86)
at org.apache.activemq.ActiveMQConnection.syncSendPacket(ActiveMQConnection.java:1380)
... 13 more

因此开始走上 debug 这条不归路…

定位问题

上面报的错误, 一开始怀疑是 ActiveMQ 消费者连接断开, 导致发送不了消息, 因此一来就开始 debug sendMsg() 这个最终发送消息的方法.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
private void sendMsg(String queue, String jsonStr) throws Exception {
Connection connection = null;
Session session = null;
MessageProducer producer = null;
try {
//从连接池工厂中获取一个连接
connection = this.connectionFactory.createConnection();
//false 参数表示 为非事务型消息,后面的参数表示消息的确认类型
session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
//PTP消息方式
Destination destination = session.createQueue(queue);
//Destination is superinterface of Queue
producer = createProducer(producer, session, destination);
//map convert to javax message
Message message = getMessage(session, jsonStr);
producer.send(message);
log.info("send message, producer = {}", producer.getClass());
} finally {
closeSession(session);
closeConnection(connection);
}
}

先说这个方法的问题:

当每次发送消息时都会创建一个 ActiveMQ 连接, 然后创建一个 session, 最后创建一个 producer, 消息发送完成后关闭连接.
频繁的创建关闭连接将消耗大量系统资源, 降低性能, 因此一般使用连接池来保存连接.

20241229154732_5j5D0l4d.webp
20241229154732_7JQ0LlKY.webp

从 debug 日志中也可以看出来, 连接后又 close 了.

因此将 Connection, Session, Producer 进行复用. (这也是 ActiveMQ 官方推荐的做法).

将创建 producer 的整个操作放到 init() 中, 只执行一次.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
private MessageProducer producer = null;

private void init() throws Exception {
//设置JAVA线程池
this.threadPool = Executors.newFixedThreadPool(this.threadPoolSize);
//ActiveMQ的连接工厂
ActiveMQConnectionFactory actualConnectionFactory = new ActiveMQConnectionFactory(this.userName, this.password, this.brokerUrl);
actualConnectionFactory.setUseAsyncSend(this.useAsyncSendForJMS);
//Active中的连接池工厂
this.connectionFactory = new PooledConnectionFactory(actualConnectionFactory);
this.connectionFactory.setCreateConnectionOnStartup(true);
this.connectionFactory.setMaxConnections(this.maxConnections);
this.connectionFactory.setMaximumActiveSessionPerConnection(this.maximumActiveSessionPerConnection);

Connection connection;
Session session;
//从连接池工厂中获取一个连接
connection = this.connectionFactory.createConnection();
//false 参数表示 为非事务型消息,后面的参数表示消息的确认类型
session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
//PTP消息方式
Destination destination = session.createQueue("BiUserStatusSignal");
//Destination is superinterface of Queue
producer = createProducer(producer, session, destination);
}

重写 sendMsg()

1
2
3
4
5
6
private void sendMsg(String jsonStr) throws Exception {
ActiveMQTextMessage message = new ActiveMQTextMessage();
message.setText(jsonStr);
producer.send(message);
log.info("send");
}

当我以为就这么容易的把问题解决的时候, 新的错误又来了(如果问题就这么解决了, 我也不会写这个文档了).

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
Caused by: org.apache.activemq.transport.InactivityIOException: Cannot send, channel has already failed: tcp://172.31.205.58:61616
at org.apache.activemq.transport.AbstractInactivityMonitor.doOnewaySend(AbstractInactivityMonitor.java:315)
at org.apache.activemq.transport.AbstractInactivityMonitor.oneway(AbstractInactivityMonitor.java:304)
at org.apache.activemq.transport.WireFormatNegotiator.sendWireFormat(WireFormatNegotiator.java:168)
at org.apache.activemq.transport.WireFormatNegotiator.sendWireFormat(WireFormatNegotiator.java:84)
at org.apache.activemq.transport.WireFormatNegotiator.start(WireFormatNegotiator.java:74)
at org.apache.activemq.transport.TransportFilter.start(TransportFilter.java:58)
at org.apache.activemq.transport.TransportFilter.start(TransportFilter.java:58)
at org.apache.activemq.ActiveMQConnectionFactory.createActiveMQConnection(ActiveMQConnectionFactory.java:273)
... 25 more
java.lang.NullPointerException
at com.xxxx.msearch.core.support.activemq.producer.JmsProducer.sendMsg(JmsProducer.java:155)
at com.xxxx.msearch.core.support.activemq.producer.JmsProducer.access$100(JmsProducer.java:24)
at com.xxxx.msearch.core.support.activemq.producer.JmsProducer$2.run(JmsProducer.java:137)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

这次不仅出现了 NullPointerException, 以前的异常还是存在, ActiveMQ 的连接依然很多

20241229154732_37jKEmDb.webp

唯一改善的就是异常出现的频率降低了. 因此这个不是最根本的原因.

那么思考一下为什么会出现 NullPointerException.

上面的代码肯定是没有问题的, 除非是在多线程环境下.

另一个很严重的问题:

20241229154732_JHWp8RuI.webp

😳😳 居然能有 40 多个线程池……
好了, 基本知道什么原因导致的了, 并发 + 线程池问题.

看看运行时的线程:

20241229154732_3MHihWXd.webp

垃圾回收频繁, 线程数还在不断增长…..

追踪代码

初始化 producer 的连接池就是 init() 的这段代码

1
2
//设置JAVA线程池
this.threadPool = Executors.newFixedThreadPool(this.threadPoolSize);

能初始化多个连接池, init() 肯定被错误的执行了多次. 通过查看 BiUserStatusTask 这个类,能执行 init() 的也只有下面的代码了.

1
2
3
4
JmsProducerFactory jmsProducerFactory = new JmsProducerFactory(producerJmsConfig);

...
jmsProducerFactory.start();

start() 方法会判断 JmsProducer 是否 null, 为 null 才会调用 init() 来创建线程池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public synchronized void start() {
if(started.get()) {
return;
}
//配置下发到相关组件
deliverConfig();
started.set(true);
}

private void deliverConfig(){
if (jmsProducer == null) {
jmsProducer = new JmsProducer(jmsConfig.getBrokerURL(), jmsConfig.getUserName(), jmsConfig.getPassword());
}
}

整个 BiUserStatusTask 类的代码也没有看出哪个地方会多次创建线程池. 没办法只有加日志.

20241229154732_NirD1EBV.webp

问题找到了, 是进入 catch 了 😥.

20241229154732_o2lyOx1X.webp

原来是 JsonUtil.jsonToMap(text) 解析抛异常了 😰. (为什么我没有在 BiUserStatusTask 中打断点, 因为一直以为是 producer
发送消息的代码有问题).

最好的 debug 方式就是日志, 因为 debug 很慢, 而且多线程的时候并不好 debug. 上面的代码在 catch 中直接就发送异常消息然后入库, 导致日志中没有错误信息.

catch 中一定要打印日志, 因为这个属于系统异常日志, 是给开发的人看的, 而需要入库的日志一般都是业务日志或者业务异常日志,
开发的时候谁再去查一下数据库有没有异常日志啊! 直接打印到日志不是更好吗? 效率就是生命!

也请不要把所有代码都包裹在 try-catch 里面, 最好把异常分类处理, 一个 catch 就把所有异常捕获了倒是简单, 但是不好排查问题, 也会影响执行效率.

写代码的时候, 尽量把异常暴露出来, 不要忽略 catch. 在编码阶段就尽可能多的处理异常, 而不是上线了写到数据库, 然后统计异常数据来报警.

以前写过些方面的问题, 估计也没人在意吧.

进入主题

好了, 重点来了.

catch 里面也会发送 MQ 消息, 会不会是这里面的问题呢? 那我们先来捋一捋 LogSupportException.writeFuncExceptionLog() 这个方法

1
2
3
4
5
6
7
8
9
10
11
12
public class LogSupportException {
private static LogService logService = LogServiceFactory.getLogService();

/**
* 封装错误信息
*/
public static void writeSupportExceptionLog(Exception e, String componentName, String methodName, String className, String inputParams, String logDesc, LogSupportException.ErrorLevel errorLevel) {
...
saveException(logField);
}
...
}

writeSupportExceptionLog() 是封装处理信息的处理 (性能很低, 就不吐槽了, 自己去看吧). 会调用 logService.saveLog() 发送异步消息.

那么 logService 是哪里来的呢?

1
private static LogService logService = LogServiceFactory.getLogService();

是一个静态属性, 这里复习一下类的初始化顺序.

.java 被编译成 .class 被 Classloader 加载到 JVM 的时候, 首先会调用 static
代码块 和初始化 静态属性 (这个看 2 者代码的顺序), 如果新创建一个对象的时候, 会先执行代码块
, 然后才是构造方法. 那么问题来了,
子类父类初始化的顺序是什么呢?

logService 是一个静态属性, 会在被 JVM 加载的时候就初始化, 不管有没有创建这个类的实例.
因此我们进入到 LogServiceFactory.getLogService()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
public class LogServiceFactory {
private static ApplicationContext context = null;
private static LogService logService = null;
private static LogService logServiceB = null;
private static boolean initFlag = false;

synchronized static void init() {
if (!initFlag) {
context = new ClassPathXmlApplicationContext(
"classpath*:applicationContext-jms.xml");
initFlag = true;
}
}

synchronized static void initLogService() {
if (initFlag && logService == null) {
logService = (LogService) context.getBean("logServiceConcurrent");
logServiceB = (LogService) context.getBean("logServiceConcurrentB");

initFlag = true;
}
}



public static LogService getLogService() {
if (!initFlag) {
init();
}

if (initFlag && logService == null) {
initLogService();
}

return logService;
}

public static LogService getLogServiceB() {
if (!initFlag) {
init();
}

if (initFlag && logServiceB == null) {
initLogService();
}

return logServiceB;
}

public static <T> T getBean(String name) throws BeansException {
return (T) context.getBean(name);
}
}

哈哈哈 熟悉吧, 使用静态代码块来初始化 Spring 容器

1
2
3
4
5
6
7
synchronized static void init() {
if (!initFlag) {
context = new ClassPathXmlApplicationContext(
"classpath*:applicationContext-jms.xml");
initFlag = true;
}
}

其实这里使用 synchronized 是多余的, 因为 Classloader 从 JVM 底层上就保证了加载一个类的同步性, 避免了并发问题.

记住哦, 这里是第一次使用 new ClassPathXmlApplicationContext() 来初始化 Spring 容器, 配置文件是 applicationContext-jms.xml, 在**第二次
**的时候再说这么做存在的问题.

那么 logService 从 Spring 容器中获取到了, 然后调用 saveLog(), 下面是 saveLog() 的实现:

1
2
3
4
5
6
@Override
public boolean saveLog(String key, Object logMessage) {
String queueName = JmsTemplateFacotry.getJmsConfig().getQueue();
ObjectEvent objectEvent = new ObjectEvent(key, logMessage);
return sendToMq(queueName, objectEvent);
}

没什么特别之处, 就是从配置中获取队列名, 然后调用 sendToMq(), 但是得一步一步跟代码呀, 不然怎么知道有什么问题.

1
String queueName = JmsTemplateFacotry.getJmsConfig().getQueue();

那我们就看看 JmsTemplateFacotry 这个类,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public class JmsTemplateFacotry {
private static JmsProducer jmsProducer;
private static JmsConsumer jmsConsumer;
private static JmsConfig jmsConfig;

static {
ApplicationContext context = new ClassPathXmlApplicationContext(
"classpath*:applicationContext-jms.xml");
jmsConfig = (JmsConfig) context.getBean("jmsTemplateConfig");
}

public static void initProducer(){
if (jmsProducer == null) {
jmsProducer = new JmsProducer(jmsConfig.getBrokerURL(), jmsConfig.getUserName(), jmsConfig.getPassword());
}
}

public static void initConsumer(){
if(jmsConsumer == null){
jmsConsumer = new JmsConsumer(jmsConfig.getBrokerURL(),jmsConfig.getUserName(),jmsConfig.getPassword());
}
}

public static void messageSender(String queue,String jsonStr){
initProducer();
jmsProducer.send(queue, jsonStr);
}

public static JmsConsumer getJmsConsumer(){
initConsumer();
return jmsConsumer;
}

public static JmsConfig getJmsConfig(){
return jmsConfig;
}
}

😁 看见没,又是个静态代码块, 第二次通过 new ClassPathXmlApplicationContext() 初始化 Spring 容器了哦,
配置文件还是 applicationContext-jms.xml.

Spring 初始化问题

那我们来说说两次初始化 Spring 容器的问题.

1
ApplicationContext context = new ClassPathXmlApplicationContext("classpath*:applicationContext-jms.xml");

如果调用多次上面的方法, 将导致初始化多个 Spring 容器

第一个:

20241229154732_Vb1Unnl6.webp
20241229154732_BnXpQpfk.webp

第二个:

20241229154732_71YaDYHx.webp
20241229154732_can7KSuN.webp

也就是说同一个 bean 会被初始化 2 次.

Spring 容器中的 bean 默认是单例的, 说的是同一个 Spring 容器只能存在一个相同的 bean.

如果是 Spring + Spring MVC 相同的 bean 被初始化 2 次, 会导致事务不生效, @Value 不生效等各种各样的问题, 因此最佳实践是把 Spring 容器和 Spring
MVC 容器分开加载, 每个容器只初始化对应的 bean.

重复的初始化造成资源浪费, 而且还会导致不确定性问题出现, 所以以前老的初始化方式不可取, 正确的做法:

子包只需要提供功能即可, 不要自作主张的初始化. 初始化的工作统一由部署包(需要运行的主类或 Web)来做, 通过 import 子包的 xml 配置,
统一由父容器来管理所有 bean. 这样可以统一管控, 避免随意初始化. 对于配置文件也是这个道理.

并发问题

说了这么多, 终于要说根本的问题了.

LogServiceAsyncJmsImpl 异步向 mq 中发送异常日志的方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
private boolean sendToMq(String queue, ObjectEvent objectEvent) {
try {
String jsonEvent = objectEvent.getKey()
+ "-"
+ JsonUtil.objectToJson(objectEvent
.getMsg());
// 注意重点代码
JmsTemplateFacotry.messageSender(queue, jsonEvent);
return true;
} catch (Exception ex) {
log.error("JmsTemplateFacotry messageSender Error : {}", ex.getMessage());
}
return false;
}

messageSender() 的实现

1
2
3
4
5
6
7
8
9
10
public static void messageSender(String queue,String jsonStr){
initProducer();
jmsProducer.send(queue, jsonStr);
}

public static void initProducer(){
if (jmsProducer == null) {
jmsProducer = new JmsProducer(jmsConfig.getBrokerURL(), jmsConfig.getUserName(), jmsConfig.getPassword());
}
}

messageSender() 会先判断 jmsProducer 是否为 null, 为 null 就实例化一个 JmsProducer 对象, 实例化 JmsProducer 对象时,
会调用上面创建线程池的 init().

看着是个很合理的逻辑, 但是却没有考虑并发的问题. 如果是多线程, 会出现上面情况?
在分析之前, 先来复习下使用 new 创建一个对象的过程:

20241229154732_iZvvDu57.webp

一个类被 JVM 加载的时机:

  1. 使用 new 关键字实例化对象的时候;
  2. 读取或设置这个类的静态字段(被 final 修饰,已在编译器把结果放入常量池的静态字段除外)的时候;
  3. 调用这个类的静态方法的时候;
  4. 使用 java.lang.reflect 包对这个类进行反射调用的时候;
  5. 当虚拟机启动, 直接指定一个要执行的类(也就是包含 main() 的主类);

上面是类的初始化的 5 种情况, 通过阅读 JmsProducer 类的代码, 我们可以确定第一次初始化 JmsProducer 时, 就是通过 new 关键字.
因此先执行 JmsProducer 的初始化流程, 最终创建 JmsProducer 类的 class 对象.
注意哦, 如果一开始 JVM 没有加载过 JmsProducer 这个类, 会先对类进行加载从而生成当前类的 class 对象, 并不会生成 JmsProducer 类的实例对象.

以上流程, JVM 都能保证是同步的, 因此同一个类型只能被同一个类加载器加载一次.

具体可见 「深入理解 Java 虚拟机」第 7 章

只有当使用 new 关键字时, 如果没有被 JVM 初始化就走上面的流程, 如果已被初始化了, 才开始走类的实例化流程,

20241229154732_Zv0CZLC5.webp

那我们来分析一下这段代码在多线程的情况下会出现上面问题:

1
2
3
4
5
6
7
public static void initProducer(){
if (jmsProducer == null) {
jmsProducer = new JmsProducer(jmsConfig.getBrokerURL(),
jmsConfig.getUserName(),
jmsConfig.getPassword());
}
}

先说第一个好理解的情况:

因为是多线程环境, 可能同时多个线程一起进入 if 判断逻辑, 因为 jmsProducer == null 为 true, 会执行多次实例化流程.

先来说说另一个复杂点的情况:

当第一次执行 JmsTemplateFacotry.initProducer() 时, jmsProducer == null .
线程 1 进入 if 判断, 由于 jmsProducer == null 为 true, 会执行实例化流程.
这个时候 线程 2 进入 if 判断逻辑, 由于实例化流程也需要时间, 在还没有实例化完成之前, jmsProducer == null 为 true, 因此 线程 2
会再次实例化一个 jmsProducer.

总结一下实例化对象的过程:

  1. 分配内存
  2. 初始化对象(内存赋值)
  3. 内存地址赋给 instance (instance != null)

以上原因也就直接导致了创建多个线程池 !!!

就这么简单, 由于多线程并发执行同一段代码. 做事要验证, 那我们来验证一下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Test
public void test1() throws InterruptedException {

for (int index = 0; index < 1000; index++) {
new Thread(new Runnable() {
public void run() {
try {
log.info("jsmProducer = {}", JmsTemplateFacotry.getJmsProducer());
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();

}

Thread.currentThread().join();
}

输出:

1
2
3
4
5
6
7
8
9
10
11
12
13
com.xxxx.msearch.core.support.activemq.producer.JmsProducer@546dd457
com.xxxx.msearch.core.support.activemq.producer.JmsProducer@63666aa6
com.xxxx.msearch.core.support.activemq.producer.JmsProducer@63a9e6ce
com.xxxx.msearch.core.support.activemq.producer.JmsProducer@65da9f79
com.xxxx.msearch.core.support.activemq.producer.JmsProducer@3c657f0b
com.xxxx.msearch.core.support.activemq.producer.JmsProducer@679c614
com.xxxx.msearch.core.support.activemq.producer.JmsProducer@619ef7cf
com.xxxx.msearch.core.support.activemq.producer.JmsProducer@27081999
com.xxxx.msearch.core.support.activemq.producer.JmsProducer@1e4acc1c
com.xxxx.msearch.core.support.activemq.producer.JmsProducer@52370b34
com.xxxx.msearch.core.support.activemq.producer.JmsProducer@e8a0743
com.xxxx.msearch.core.support.activemq.producer.JmsProducer@d882d64
....

对吧, 实例化了好多次吧

解决问题

知道了问题所在, 解决问题就很容易了, 我们只需要保证 JmsProducer 是单例的就可以了

单例的所有写法会了吗?

这里只使用最可靠最简单的一种方式: 枚举 (第二个推荐静态内部类, 不推荐 DCL, 因为 DCL 并不完全可靠)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/**
* <p>Company: xxxx公司 </p>
* <p>Description: 枚举单例获取 JmsProducer, 保证只有一个实例</p>
*
* @author dong4j
* @date 2019-03-08 11:04
* @email dong4j@gmail.com
*/
public enum JmsProducerEnum {
INSTANCE;

private JmsProducer instance;

private JmsConfig jmsConfig;

public void setJmsConfig(JmsConfig jmsConfig){
this.jmsConfig = jmsConfig;
}

public JmsProducer getInstance() {
if(instance == null){
instance = new JmsProducer(jmsConfig.getBrokerURL(), jmsConfig.getUserName(), jmsConfig.getPassword());
}
return instance;
}
}

测试一下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Test
public void test2() throws InterruptedException {

for (int index = 0; index < 1000; index++) {
new Thread(new Runnable() {
public void run() {
try {
JmsProducerEnum instance = JmsProducerEnum.INSTANCE;
instance.setJmsConfig(JmsTemplateFacotry.getJmsConfig());
log.info("jsmProducer = {}", instance.getInstance());
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
}
Thread.currentThread().join();
}

输出:

1
2
3
4
5
6
7
com.xxxx.msearch.core.support.activemq.producer.JmsProducer@6aa6a851
com.xxxx.msearch.core.support.activemq.producer.JmsProducer@6aa6a851
com.xxxx.msearch.core.support.activemq.producer.JmsProducer@6aa6a851
com.xxxx.msearch.core.support.activemq.producer.JmsProducer@6aa6a851
com.xxxx.msearch.core.support.activemq.producer.JmsProducer@6aa6a851
com.xxxx.msearch.core.support.activemq.producer.JmsProducer@6aa6a851
...

最终的修改方案

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class JmsTemplateFacotry {
private static JmsProducer jmsProducer;
private static JmsConsumer jmsConsumer;
private static JmsConfig jmsConfig;

static {
ApplicationContext context = new ClassPathXmlApplicationContext(
"classpath*:applicationContext-jms.xml");
jmsConfig = (JmsConfig) context.getBean("jmsTemplateConfig");

JmsProducerEnum instance = JmsProducerEnum.INSTANCE;
instance.setJmsConfig(jmsConfig);
jmsProducer = instance.getInstance();
}

...
}

这里还用了双重保证, JVM 保证 static 代码块只执行一次, 枚举单例再保证唯一实例.

为什么会出现并发问题

BiUserStatusTask 类中的初始化代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@Override
public void init() {
ringAdapter = (RingAdapter) DataCache.getContext().getBean("ringAdapter");
JmsConfig producerJmsConfig = new JmsConfig();
producerJmsConfig.setBrokerURL(MQ_URL);
producerJmsConfig.setUserName(MQ_USER_NAME);
producerJmsConfig.setPassword(MQ_USER_PASSWORD);
jmsProducerFactory = new JmsProducerFactory(producerJmsConfig);
JmsConfig consumerJmsConfig = new JmsConfig();
consumerJmsConfig.setBrokerURL(MQ_URL);
JmsConsumerFactory jmsConsumerFactory = new JmsConsumerFactory(consumerJmsConfig);
jmsConsumerFactory.getJmsConsumer().setQueue(MQ_SIGNAL_NAME);
jmsConsumerFactory.getJmsConsumer().setQueuePrefetch(Integer.valueOf(QUEUE_SIZE));
jmsConsumerFactory.getJmsConsumer().setMessageListener(new MultiThreadMessageListener(Integer.parseInt(MQ_THREAD), new MessageHandler() {
@Override
public void handle(Message message) {
try {
logger.debug("执行任务:" + taskName + "休眠!");
Thread.sleep(Integer.parseInt(MQ_SLEEP));
getUserStatusData(message);
} catch (Exception e) {
e.printStackTrace();
}
}
}));
try {
jmsConsumerFactory.getJmsConsumer().start();
} catch (Exception e) {
logger.error(CommonFunc.getExceptionStack(e));
LogSupportException.writeFuncExceptionLog(e, "异步工具", "run-BiUserStatusTask", this.getClass().getSimpleName(), LogSupportException.ErrorLevel.ERROR);
}
}

用于接收 BiUserStatusSignal 队列消息的消费者也维护了一个线程池, 在 MessageListener 里面, 当拿到一批消息后, 会通过多线程来处理,
也就是 getUserStatusData() 方法, 当这个方法进入 catch 时, 最终会通过 JmsTemplateFacotry 来发送消息, 然后在实例化 JmsProducer
时没有考虑到多线程问题, 导致创建多个线程池.

可靠的单元测试

iconmons-mq 这个组件只测试了 JMSProducerJMSConsumer, 却没有测试几个 Factory 类, 而且单元测试应该使用 Junit, 不要使用 main(),
更不要使用 System.out.println.
尽量做到规范化, 可测试化.

为什么 JMSProducer 测试了 100w 次也没问题, 因为是通过手动 new 的方式创建, 而且只有一次, 这样就保证了 JMSProducer 单例.

最后进行集成测试

20241229154732_RW96blZf.webp

没再出现错误日志

运行时数据:

20241229154732_IM7GfOOx.webp

长时间运行, GC 次数少, 线程数保持在 117.

问题总结

20241229154732_bZVrlIir.webp

通过这次的问题修复, 我们涉及到了, 并发, 线程池, 类的初始化, 类的实例化, Spring 容器的初始化等相关知识点, 也清楚了说明了老代码中存在的一些框架问题.

对于 12530 重构的想法

个人觉得老框架不需要改了, 真改不动了, 如果没有改完或者没有经过大量的测试, 是很容易出现问题的.

主要是以前的代码结构太烂, 框架结构也不合理, 最重要的是相关依赖管理太随意了, 根本就没有管理.各组件的版本管理也不规范, 导致后期维护性大打折扣.
现在能做的就是在不动主体框架的情况下, 尽量重构代码.

当然我们也做过这方面的努力, 重构项目依赖管理, 重构日志, 重构配置, 重构组件, 但是改出来的东西却不尽人意, 给其他同事造成了工作上的负担.
不过这是重构阶段必须要经历的情况.

12530 业务多, 代码多, 组件多, 基本上是牵一发而动全身, 因此在没有大量测试的情况下, 很难保证重构后的正确性与稳定性.

因此按照我的建议就是不要改老代码了, 把业务迁移到 ms-project 上来, 至少依赖管理, 配置管理, 日志管理这些做的比老框架好, 代码也更规范.

怎么重构代码以前或多或少说过一点, 这里再重申一下:

只要把 IDEA 提示的警告改完就可以了, 这种重构是对业务和测试影响最小的方式. 以前也说过怎么通过修改警告来学习底层的知识, 为什么 IDEA
对这段代码提出警告, 有没有更好的更规范的代码实现这段逻辑?

在写业务逻辑的时候是不是沿用以前的代码规范和思考逻辑? 以前的代码就一定正确吗? 有没有优化的空间呢?

举个例子:

在做精准营销的时候, 先把相关代码捋一遍, 清楚个大概逻辑, 不需要深入看代码.

第一遍: 把遇到的所有警告提示看一遍

1
Map<String, String> inputStrings = new HashMap<String, String>();

这段代码有问题吗? IDEA 已经给出了提示

  1. new HashMap<String, String> 可以简化为 new HashMap<>;
  2. 初始化 HashMap 时设置初始大小;

如果你看到这个提示, 你会不会想为什么能简化成 new HashMap<>?
为什么最好为 HashMap 设置初始值?

你就会去查资料, 因为 JDK7 的新特性, 叫钻石语法, 那么你还可以查一下 JDK7 其他新的语法, 看是否能用到项目中.
给 HashMap 设置初始值是为了合理分配内存, 减少 resize 的次数, 从而提高效率.
那你就会去看 HashMap 源码, 你就会知道:

  1. 什么情况下回 resize;
  2. resize 后的容量是多少;
  3. 负载因子又是什么;
  4. 为什么 HashMap 不是线程安全的;
  5. 有没有线程安全的 HashMap, 有哪些;
  6. HashMap 的存储方式是怎样的; JDK7 和 JDK8 的实现方式有什么不同;
  7. 如果 key 是对象为什么要重写 hashCode() 和 equals()方法;
  8. 为什么 HashMap 一般使用 String 做 key;
  9. ….

然后再深入一些:

  1. 在多线程的情况下, 使用 HashMap 存在的问题;
  2. HashMap 与 ConcurrentHashMap 的区别; ConcurrentHashMap 又是怎么实现的;
  3. 能不能说出 put() 的逻辑;
  4. 更深入的了解 hashCode() 的作用;
  5. 自己设计一个 hash 方法, 减少 hash 碰撞;
  6. 能通过什么方式提高 HashMap 的查询效率;
  7. ….

那说到 String, 又可以去看 String 的源码了, 然后你就会明白:

  1. 为什么 String 是不可变类; 自己怎么设计一个不可变类;
  2. 为什么我们在循环里面不使用 + 来拼接字符串;
  3. 与 StringBuffer, StringBuilder 的区别是什么;

然后再深入一些:

  1. String 的在内存中的存储位置;
  2. 从 JDK6 开始, 是怎么优化 String 的;
  3. String 的不可变性是绝对的吗? 可不可以使用一些手段修改 String;
  4. ….

就看 1 行代码, 你能联想到这些问题吗?

不是说只写业务代码就不能学到东西, 学东西在哪儿都可以学到, 只要有这个心就行.

先把基础知识学好了, 框架这些都是锦上添花的事.

第二遍: 把代码结构梳理一遍

简单的重构从第二遍开始, 一个方法超过 80 行, 就该拆分了.

  1. 有没有代码是共用的?
  2. 能不能抽离成工具类?
  3. 注释写好了吗?
  4. 代码逻辑清晰了吗?
  5. 注释掉的代码删没删? 留着干嘛? 算代码行数? 我们有版本管理, 不要的请直接删除.
  6. 字段名, 方法名命名规范吗?
  7. 有魔法值吗?
  8. tay-catch 合理包裹合理吗? 异常处理方式合理吗? 有没处理到的异常吗?
  9. 必要的日志打印了吗? 日志等级设置合理吗?
  10. 重载的方法写 @Override 注释了吗?
  11. 方法的访问修饰符合理吗? 返回值合理吗? 入参合理吗?
  12. switch case 到了全部情况吗?
  13. if 判断合理吗?
  14. return 的地方合理吗?

把你看到的不合理的地方全部重构了, 这一步也全部是借助 IDEA 强大的重构功能, 比如选中你想抽离为方法的代码, ctl + shift + m (windows 的快捷键不清楚,
好像是这个)自动重构, 只需要命名就可以了

20241229154732_7VgzmY90.webp

其他的重构快捷键和功能自己去了解和使用.

把吃饭的工具使用熟练是最基本的要求, 然后就是效率问题, 能自动化绝不手动, 能节约 1 秒 时间的事, 我宁愿话 1 个小时来学习.

现在使用的工具有没有更好的工具可以替代? 有没有去了解过同类工具?
请记住, 工具就是你的兵器, 一把趁手的兵器比手无寸铁好得多

第三遍: 梳理业务逻辑

这一遍就可以开始开发业务了.
了解需求, 先想怎么写, 不要一上来就开始写代码, 想一个方案出来, 相关的单元测试写出来, 再想想:

  1. 还有没有更好的实现方式?
  2. 以前的代码存在的问题?
  3. 以前的逻辑还能怎样优化?
  4. 以前的接口定义合理吗?
  5. 能不能运用到设计模式把业务抽离出来? 提高维护性和可扩展性?
  6. ….

这部分没有太多话语权, 毕竟做的少. 举个例子吧:

20241229154732_Ow8tJ5Na.webp

不是代码写得越多就越好, 不是方法越多代码结构就清晰.

20241229154732_amOis5Tf.webp

不要按照以前老的思路来写代码, 要有自己的思考.
以前的逻辑合理吗? 接口规范合理吗? 返回结果合理吗?

20241229154732_Jqtt2ttF.webp

业务端传入 servicedId 来查指定的业务的订购状态, 为什么还要通过接口的 serviceId 返回类型来判断是不是查询的当前业务? 难道我查询的什么业务还要通过接口来告诉我吗?

那传个 serviceId 还有什么意义?

当然这里也有历史原因, 或者都可以说全部是历史原因, 但是我们现在可以改变, 可以重构, 为了更好的将来, 为了下一批维护者少掉坑里面, 这些都可以改….

写得代码不是能跑通就可以了(「又不是不能跑」 😳), 需要思考和反思.

最后多回顾自己的代码, 随着自己知识面的扩展, 知识点的加深, 再看看以前的代码是不是又有更好的实现方式, 再次重构啊.