多线程,并发,是经常遇到的问题,平时解决的方案也想过很多,比如说现在有1000行消息,需要开10个线程同时处理。
之前想过两个方案:
方案一: 一次开10个线程,每个线程处理一条消息,等10个线程全部处理结束之后,再开启下10个线程,直到全部处理完毕
缺陷:需要等待其他n - 1个线程结束后,才能同时启动下n个线程
方案二: 将1000行消息分割为10份,每100行用一个线程处理。
优点:无等待
缺陷: 分割不均,无法充分利用所有的线程
现在想想,上面两个缺点挺多,就又想了两种方案:
方案三:使用ConcurrentLinkedQueue<Task>存储所有的Task,然后同时开启n个线程读取Queue.
优点:充分利用所有线程,无等待
缺点:需要将所有的task转移到Queue中,消耗一倍内存
方案四:使用java.util.concurrent包,固定数量线程池。
优点:完美解决
缺点:当单个task执行时间很短的时候,线程池中的线程并不会被全部使用,这样很多task就会block在一个线程中,降低执行速率
下面贴出每个方案的代码实现,备忘吧,如果有更好的想法,或者更简单的方式,再继续补充~
public class Task {
private int id;
public Task(int id) {
this.id = id;
}
public void start() {
System.out.println(Thread.currentThread().getName() + ": start to handle task " + id);
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
import java.util.LinkedList;
import java.util.List;
public class TaskProducer {
public static List<Task> produce(int count) {
List<Task> tasks = new LinkedList<Task>();
for(int i = 0; i < count; i ++) {
tasks.add(new Task(i + 1));
}
return tasks;
}
}
import java.util.LinkedList;
import java.util.List;
/**
* 策略1: 每次开启n个线程,等待n个线程全部结束之后,再开启下n个线程,每个线程处理一个task.
* 缺陷:需要等待其他n - 1个线程结束后,才能同时启动下n个线程
*/
public class Strategy1 {
public static void main(String[] args) {
List<Task> tasks = TaskProducer.produce(1000);
handleTasks(tasks, 10);
System.out.println("All finished");
}
public static void handleTasks(List<Task> tasks, int threadCount) {
int taskCount = tasks.size();
List<Thread> threadHolder = new LinkedList<Thread>();
for(int i = 0; i < taskCount; i += threadCount) {
for(int j = 0; j < threadCount && (i + j) < taskCount; j ++) {
Thread thread = new Thread(new TaskHandler(tasks.get(i + j)));
threadHolder.add(thread);
thread.start();
}
waitToFinish(threadHolder);
threadHolder.clear();
}
}
public static void waitToFinish(List<Thread> threadHolder) {
while(true) {
boolean allFinished = true;
for(Thread thread : threadHolder) {
allFinished = allFinished && !thread.isAlive();
}
if(allFinished) {
break;
}
}
}
public static class TaskHandler implements Runnable {
private Task task;
public TaskHandler(Task task) {
this.task = task;
}
@Override
public void run() {
task.start();
}
}
}
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
/**
* 策略2: 将所有的task分割成n个子task列表,然后开启n个线程,每个线程处理一个子列表
* 优点:无等待
* 缺陷: 分割不均,无法充分利用所有的线程
*/
public class Strategy2 {
public static void main(String[] args) {
List<Task> tasks = TaskProducer.produce(1000);
handleTasks(tasks, 10);
System.out.println("All finished");
}
public static void handleTasks(List<Task> tasks, int threadCount) {
List<List<Task>> splitTasks = splitTasksToNThreads(tasks, threadCount);
List<Thread> threadHolder = new LinkedList<Thread>();
for (List<Task> segment : splitTasks) {
Thread thread = new Thread(new TaskHandler(segment));
threadHolder.add(thread);
thread.start();
}
waitToFinish(threadHolder);
}
public static void waitToFinish(List<Thread> threadHolder) {
while(true) {
boolean allFinished = true;
for(Thread thread : threadHolder) {
allFinished = allFinished && !thread.isAlive();
}
if(allFinished) {
break;
}
}
}
public static List<List<Task>> splitTasksToNThreads(List<Task> tasks, int threadCount) {
List<List<Task>> splitTasks = new ArrayList<List<Task>>(threadCount);
int taskCount = tasks.size();
int taskPerThread = new BigDecimal(taskCount).divide(new BigDecimal(threadCount), RoundingMode.CEILING).intValue();
for (int i = 0; i < taskCount; i += taskPerThread) {
List<Task> segment = new LinkedList<Task>();
for (int j = 0; j < taskPerThread && (i + j) < taskCount; j++) {
segment.add(tasks.get(i + j));
}
splitTasks.add(segment);
}
tasks.clear();
return splitTasks;
}
public static class TaskHandler implements Runnable {
private List<Task> tasks;
public TaskHandler(List<Task> tasks) {
this.tasks = tasks;
}
@Override
public void run() {
for (Task task : tasks) {
task.start();
}
}
}
}
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
/**
* 策略3: 使用ConcurrentLinkedQueue<Task>存储所有的Task,然后同时开启n个线程读取Queue.
* 优点:充分利用所有线程,无等待
* 缺点:需要将所有的task转移到Queue中,消耗一倍内存
*/
public class Strategy3 {
public static void main(String[] args) {
List<Task> tasks = TaskProducer.produce(1000);
handleTasks(tasks, 10);
System.out.println("All finished");
}
public static void handleTasks(List<Task> tasks, int threadCount) {
Queue<Task> taskQueue = new ConcurrentLinkedQueue<Task>();
taskQueue.addAll(tasks);
List<Thread> threadHolder = new LinkedList<Thread>();
for(int i = 0; i < threadCount; i ++) {
Thread thread = new Thread(new TaskHandler(taskQueue));
threadHolder.add(thread);
thread.start();
}
waitToFinish(threadHolder);
}
public static void waitToFinish(List<Thread> threadHolder) {
while(true) {
boolean allFinished = true;
for(Thread thread : threadHolder) {
allFinished = allFinished && !thread.isAlive();
}
if(allFinished) {
break;
}
}
}
public static class TaskHandler implements Runnable {
private final Queue<Task> tasks;
public TaskHandler(Queue<Task> tasks) {
this.tasks = tasks;
}
public void run() {
while(!tasks.isEmpty()) {
Task task = tasks.poll();
if(task != null) {
task.start();
}
}
}
}
}
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* 策略4: 使用java.util.concurrent包,线程池。
* 优点:完美解决。
*/
public class Strategy4 {
public static void main(String[] args) {
List<Task> tasks = TaskProducer.produce(1000);
handleTasks(tasks, 10);
System.out.println("All finished");
}
public static void handleTasks(List<Task> tasks, int threadCount) {
try {
ExecutorService executor = Executors.newFixedThreadPool(threadCount);
for(Task task : tasks) {
executor.submit(new TaskHandler(task));
}
executor.shutdown();
executor.awaitTermination(60, TimeUnit.SECONDS);
} catch (Exception e) {
e.printStackTrace();
}
}
public static class TaskHandler implements Runnable {
private Task task;
public TaskHandler(Task task) {
this.task = task;
}
public void run() {
task.start();
}
}
}
分享到:
相关推荐
.NET平台上的线程同步的问题线程之间共享的变量访问的同步,它的操作时原子操作,且被线程共享.你可以通过Interlocked.Increment 或 Interlocked.Decrement来增加或减少共享变量.它的有点在于是原子操作,也就是说...
线程同步与妥协处理机制可以较好的解决多线程使用过程中产生的问题。实验中采用了这两种方法后数据混乱、死锁等问题的出现几率大大降低。实验结论表明上面两种方法的使用可以很好的控制死锁、数据混乱的出现,具有...
所以要处理线程同时即时刷新界面要用SendMessage,它是一同步发送消息,会等处理完成才返回,即一SendMessage就会进入Refresh函数 2.而多线程函数的使用中可能出现以下问题,如果用 CWinThread * pthread=...
这个就不用多说了吧,如果你的线程处理不想一直循环轮询的话,可以试试这个。
最近捣鼓了一下多线程的同步问题,发现其实C#关于多线程同步事件处理还是很灵活,这里主要写一下,自己测试的一些代码,涉及到了AutoResetEvent 和 ManualResetEvent,当然还有也简要提了一下System.Threading....
通过学习《CLR via C#》里面的内容,对线程同步形成了脉络较清晰的体系结构,在多线程中实现线程同步的是线程同步构造,这个构造分两大类,一个是基元构造,一个是混合构造。所谓基元则是在代码中使用最简单的构造。...
一个基于C#开发的多线程处理程序源代码,该例子程序代码采用VS2005+C#开发,可以进行多线程进行后台运算程序的同步处理。
Java多线程数据同步处理的研究分析
vc++ 线程同步与异步套接字编程实例,Windows套接字在两种模式下执行I/O操作,阻塞和非阻塞。在阻塞模式下,在I/O操作完成前,执行操作的Winsock函数会一直等待下去,不会立即返回程序(将控制权交还给程序)。而在非...
当所有的线程在互相之间不需要进行通信的情况下就能够顺利地运行时, M i c r o s o f t Window s的运行性能最好。但是,线程很少能够在所有的时间都独立地进行操作...当这个任务完成时,另一个线程必须了解这个情况。
多线程处理视频时,由于不同线程处理速度不同步,不同线程之间的数据传递需要进行缓冲处理。单块缓存的互斥操作或两块缓存的乒乓操作在大多数情况下不够用(数据流不平稳时丢数据);附件中利用C++实现了一个调用...
创建线程的方法,子线程的问题 并行处理 同步:所线程等方法,将处理代码写入线程,对于新手非常容易上手
本下载解决delphiXE在处理多线程时,如何对各线程进行管理、如何做到中途中断执行多线程时的线程安全,如何在多线程内部进行UI同步等等,还可看我的博文同步。
感觉多线程确实麻烦,线程间要处理好同步与通讯,如果用CWinThread好一点,直接是一个线程对象,如果用AfxBeginThread,那必须定个全局函数,或者写个静态函数,一般是传个this指针进去,然后再用这个指针调用本类...
通过一个标志位来处理的方式虽然可以实现效果,但是还不够安全,极有可能有多条线程同时操作一个全局变量,导致资源争夺问题,为了保证安全,可以在此基础上加上对应的锁来处理同步问题,比如加上互斥锁 mutex,就...
操作系统课的一个小作业,使用C#处理多线程的同步与互斥,使用Mutex类和AutoResetEvent类。作为一个小白,参考了其他代码写出来,某些地方还有些不完全明白,都写在注释里了,求高手指点。
并且对接下来脚本的线程处理,及监控线程起到一个较高的实际认知。 对多线程基 础及后续多线程课程有承前启后的作用 主要学习内容: 1.线程的启动及关闭 2.监控线程的运用 3.大漠多线程之参数传递 易语言调用大漠...
二是由于对称多处理机(SMP)出现,可以满足多个运行单位,而多个进程并行开销过大。 因此在80年代,出现了能独立运行的基本单位——线程(Threads)。 线程,有时被称为轻量级进程(Lightweight Process,LWP),...
多线程处理视频时,由于不同线程处理速度不同步,不同线程之间的数据传递需要进行缓冲处理。单块缓存的互斥操作或两块缓存的乒乓操作在大多数情况下不够用(数据流不平稳时丢数据);附件中利用C++实现了一个调用...
模拟实现多线程处理银行的实时转账交易,代码完整,可以完美运行~