在项目里面为了提高性能往往会在主线程里面开启一个新线程去执行,这种做法最方便快捷,但是当用户量数据上涨,很显然每次去开启新的线程服务器往往会吃不消,这时就需要线程池来管理和监控线程的状态。

创建多线程的三种姿势

java 多线程很常见,如何使用多线程,如何创建线程,java 中有三种方式:

  • 通过继承 Thread 接口
public class Mytheard1 extends Thread {
    @Override
    public void run() {
        for (int i = 0; i < 30; i++) {
            System.out.println("thread#1===" + i);
            try {
                Thread.sleep(100L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}
  • 通过实现 Runnable 接口
public class Mytheard2 implements Runnable {
    @Override
    public void run() {
        for (int i = 0; i < 30; i++) {
            System.out.println("thread#2===" + i);
            try {
                Thread.sleep(100L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}
  • 通过实现 Callable 接口
public class Mytheard3 implements Callable<Integer> {
    @Override
    public Integer call() throws Exception{
        int sum = 0;
        for (int i = 0; i < 30; i++) {
            System.out.println("thread#3===" + i);
            sum += i;
        }
        return sum;
    }
}
  • 启动上面三个线程
public static void main(String[] args) throws InterruptedException {
        long startTime = System.currentTimeMillis();
        //  通过主线程启动自己的线程
        //  通过继承 thread 类
        Mytheard1 thread1 = new Mytheard1();
        thread1.start();
        //  通过实现 runnable 接口
        Thread thread2 = new Thread(new Mytheard2());
        thread2.start();
        //  通过实现 callable 接口
        Mytheard3 th = new Mytheard3();
        FutureTask<Integer> result = new FutureTask<>(th);
        new Thread(result).start();
        // 注意这里都不是直接调用 run() 方法,而是调运线程类 Thread 的 start 方法,在 Thread 方法内部,会调运本地系统方法,最终会自动调运自己线程类的 run 方法
        //  让主线程睡眠
        Thread.sleep(1000L);
        System.out.println("主线程结束!用时:"
                + (System.currentTimeMillis() - startTime));
    }

上面三种方式更推荐通过实现 Runnable接口和实现 Callable接口,因为面向接口编程拓展性更好,而且可以防止 java 单继承的限制。

线程类型的简单说明

java 中线程一共有两种类型:守护线程( daemon thread)和用户线程( user thread),又叫非守护线程

  • 守护线程 可以通过 thread.setDaemon(true) 方法设置线程是否为守护线程, thread.setDaemon(true) 必须在 thread.start()之前设置,否则会抛出一个 IllegalThreadStateException 异常。在守护线程中开启的新线程也将是守护线程。守护线程顾名思义是用来守护的,是给所有得非守护进程提供服务的,所以在 jvm 执行完所有的非守护进程之后, jvm 就会停止,守护线程也不会再运行,最典型的守护线程就是 java 的垃圾回收机制 ( GC)。

  • 非守护线程 java 线程默认设置是非守护线程 thread.setDaemon(false)。当主线程运行完之后,只要主线程里面有非守护线程 jvm 就不会退出,直到所有的非守护线程执行完之后 jvm 才会退出。

总结:如果把一个线程设置成守护线程,则 jvm 的退出就不会关心当前线程的执行状态。

线程池的使用

上面代码中可以直接新起线程,如果 100 个并发同时访问主线程也就是短时间就启动了 200 个线程,200 个线程同时工作,逻辑上是没有任何问题的,但是这样做对系统资源的开销很大。基于这样的考虑,就要考虑启用线程池,线程池里有很多可用线程资源,如果需要就直接从线程池里拿就是。当不用的时候,线程池会自动帮我们管理。

所以使用线程池主要有以下两个好处:

  1. 减少在创建和销毁线程上所花的时间以及系统资源的开销
  2. 如不使用线程池,有可能造成系统创建大量线程而导致消耗完系统内存 。
  • 自定义线程池

定义单例线程池:

public class MyPool {
    private static MyPool myPool = null;
    // 单例线程池中有两种具体的线程池
    private ThreadPoolExecutor threadPool = null;
    private ScheduledThreadPoolExecutor scheduledPool = null;
    public ThreadPoolExecutor getThreadPool() {
        return threadPool;
    }
    public ScheduledThreadPoolExecutor getScheduledPool() {
        return scheduledPool;
    }
    // 设置线程池的各个参数的大小
    private int corePoolSize = 10;// 池中所保存的线程数,包括空闲线程。
    private int maximumPoolSize = 20;// 池中允许的最大线程数。
    private long keepAliveTime = 3;// 当线程数大于核心时,此为终止前多余的空闲线程等待新任务的最长时间。
    private int scheduledPoolSize = 10;
    private static synchronized void create() {
        if (myPool == null)
            myPool = new MyPool();
    }
    public static MyPool getInstance() {
        if (myPool == null)
            create();
        return myPool;
    }
    private MyPool() {
        // 实例化线程池,这里使用的 LinkedBlockingQueue 作为 workQueue ,使用 DiscardOldestPolicy 作为 handler
        this.threadPool = new ThreadPoolExecutor(corePoolSize, maximumPoolSize,
                keepAliveTime, TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(),
                new ThreadPoolExecutor.CallerRunsPolicy());// 不在新线程中执行任务,而是由调用者所在的线程来执行
        // 实例化计划任务线程池
        this.scheduledPool = new ScheduledThreadPoolExecutor(scheduledPoolSize);
    }
}

创建线程池的主要参数说明:

  1. corePoolSize(int):线程池中保持的线程数量,包括空闲线程在内。也就是线程池释放的最小线程数量界限。

  2. maximumPoolSize(int): 线程池中嫩容纳最大线程数量。

  3. keepAliveTime(long): 空闲线程保持在线程池中的时间,当线程池中线程数量大于 corePoolSize 的时候。

  4. unit(TimeUnit枚举类): 上面参数时间的单位,可以是分钟,秒,毫秒等等。

  5. workQueue(BlockingQueue): 任务队列,当线程任务提交到线程池以后,首先放入队列中,然后线程池按照该任务队列依次执行相应的任务。可以使用的 workQueue 有很多,比如:LinkedBlockingQueue 等等。

  6. threadFactory(ThreadFactory类): 新线程产生工厂类。

  7. handler(RejectedExecutionHandler类): 当提交线程拒绝执行、异常的时候,处理异常的类。该类取值如下:(注意都是内部类)

    ThreadPoolExecutor.AbortPolicy: 丢弃任务并抛出 RejectedExecutionException 异常。

    ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。

    ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务,重复此过程。

    ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务。

获取线程池并添加任务:

public void testThreadPool() {
        ThreadPoolExecutor pool1 = (ThreadPoolExecutor) Executors.newCachedThreadPool();
        pool1.execute(() -> System.out.println("快捷线程池中的线程!"));
        ThreadPoolExecutor pool2 = MyPool.getInstance().getThreadPool();
        pool2.execute(() -> {
            System.out.println("pool2 普通线程池中的线程!");
            try {
                Thread.sleep(30*1000);} catch (InterruptedException e) {e.printStackTrace();
            }
        });
        System.out.println("pool2 poolSize:"+pool2.getPoolSize());
        System.out.println("pool2 corePoolSize:"+pool2.getCorePoolSize());
        System.out.println("pool2 largestPoolSize:"+pool2.getLargestPoolSize());
        System.out.println("pool2 maximumPoolSize:"+pool2.getMaximumPoolSize());
        ScheduledThreadPoolExecutor pool3 = MyPool.getInstance().getScheduledPool();
        pool3.scheduleAtFixedRate(() -> System.out.println("计划任务线程池中的线程!"), 0, 5000, TimeUnit.MILLISECONDS);
}
  • JDK 提供的常用线程池

java 提供了几种常用的线程池,可以快捷的供程序员使用

  1. newFixedThreadPool 创建固定大小数量线程池,数量通过传入的参数决定。
  2. newSingleThreadExecutor 创建一个线程容量的线程池,所有的线程依次执行,相当于创建固定数量为 1 的线程池。
  3. newCachedThreadPool 创建可缓存的线程池,没有最大线程限制(实际上是 Integer.MAX_VALUE)。如果用空闲线程等待时间超过一分钟,就关闭该线程。
  4. newScheduledThreadPool 创建计划 (延迟) 任务线程池, 线程池中的线程可以让其在特定的延迟时间之后执行,也可以以固定的时间重复执行(周期性执行)。相当于以前的 Timer 类的使用。
  5. newSingleThreadScheduledExecutor 创建单线程池延迟任务,创建一个线程容量的计划任务。
  • Spring Boot 中使用线程池

如果使用 spring 框架的朋友,可以直接使用 spring 封装的线程池,由 spring 容器管理。 Spring Boot 中有两种方式配置线程池,一种是 自定义配置,二种是 修改原生 spring 异步线程池的装配

1. 自定义线程池

@Configuration
@EnableAsync// 开启线程池
public class TaskExecutePool {
    @Autowired
    private TaskThreadPoolConfig config;
    @Bean
    public Executor myTaskAsyncPool() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        // 核心线程池大小
        executor.setCorePoolSize(config.getCorePoolSize());
        // 最大线程数
        executor.setMaxPoolSize(config.getMaxPoolSize());
        // 队列容量
        executor.setQueueCapacity(config.getQueueCapacity());
        // 活跃时间
        executor.setKeepAliveSeconds(config.getKeepAliveSeconds());
        // 线程名字前缀
        executor.setThreadNamePrefix("MyExecutor-");
        // setRejectedExecutionHandler:当 pool 已经达到 max size 的时候,如何处理新任务
        // CallerRunsPolicy:不在新线程中执行任务,而是由调用者所在的线程来执行
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        executor.initialize();
        return executor;
    }
}

2. 修改原生 spring 异步线程池的装配

@Configuration
@EnableAsync
public class NativeAsyncTaskExecutePool implements AsyncConfigurer {
    private Logger logger = LoggerFactory.getLogger(this.getClass());
    // 注入配置类
    @Autowired
    TaskThreadPoolConfig config;
    @Override
    public Executor getAsyncExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        // 核心线程池大小
        executor.setCorePoolSize(config.getCorePoolSize());
        // 最大线程数
        executor.setMaxPoolSize(config.getMaxPoolSize());
        // 队列容量
        executor.setQueueCapacity(config.getQueueCapacity());
        // 活跃时间
        executor.setKeepAliveSeconds(config.getKeepAliveSeconds());
        // 线程名字前缀
        executor.setThreadNamePrefix("MyExecutor2-");
        // setRejectedExecutionHandler:当 pool 已经达到 max size 的时候,如何处理新任务
        // CallerRunsPolicy:不在新线程中执行任务,而是由调用者所在的线程来执行
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        executor.initialize();
        return executor;
    }
    /**
     *  异步任务中异常处理
     * @return
     */
    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        return (ex, method, objects) -> {
            logger.error("=========================="+ex.getMessage()+"=======================", ex);
            logger.error("exception method:"+method.getName());
        };
    }
}

3. 线程池配置类

@Component
public class TaskThreadPoolConfig {
    @Value("${task.pool.corePoolSize}")
    private int corePoolSize;
    @Value("${task.pool.maxPoolSize}")
    private int maxPoolSize;
    @Value("${task.pool.keepAliveSeconds}")
    private int keepAliveSeconds;
    @Value("${task.pool.queueCapacity}")
    private int queueCapacity;
    ......// 省略 get(),set() 方法
}

4. 配置文件配置线程池大小

# spring 线程池
task:
  pool:
 #核心线程池
    corePoolSize: 500
 #最大线程池
    maxPoolSize: 1000
 #活跃时间
    keepAliveSeconds: 300
 #队列容量
    queueCapacity: 50

5. 需要异步线程执行的任务

@Component
public class AsyncTask {
    protected final Logger logger = LoggerFactory.getLogger(this.getClass());
    @Async("myTaskAsyncPool")  //myTaskAsynPool 即配置线程池的方法名,此处如果不写自定义线程池的方法名,会使用默认的线程池
    public void doTask1(int i) {
        logger.info("Task"+i+"started.");
    }
    @Async// 使用默认的线程池
    public void doTask2(int i) {
        if (i == 0) {
            throw new NullPointerException();
        }
        logger.info("Task2-Native"+i+"started.");
    }
    @Async// 使用默认的线程池并返回参数
    public ListenableFuture<String> doTask3(int i) {
        logger.info("Task3- 返回值"+i+"started.");
        return new AsyncResult<>(i + "");
    }
}

6. 获取线程池,并执行任务

@Test
public void AsyncTaskTest() {
    for (int i = 0; i < 10000; i++) {
        try {
            // 自定义线程池
            asyncTask.doTask1(i);
            //spring 异步线程池
            asyncTask.doTask2(i);
            String text = asyncTask.doTask3(i).get();// 阻塞调用
            System.out.println(text);
            String context = asyncTask.doTask3(i).get(1, TimeUnit.SECONDS);// 限时调用
            System.out.println(context);
        } catch (InterruptedException | ExecutionException | TimeoutException e) {
            e.printStackTrace();
        }
    }
    logger.info("All tasks finished.");
}
文章作者: LibSept24_
本文链接:
版权声明: 本站所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 LibSept24_
Java 多线程
喜欢就支持一下吧
打赏
微信 微信
支付宝 支付宝