`
terrencexu
  • 浏览: 121586 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

10个线程同步处理1000行消息

    博客分类:
  • Java
阅读更多

多线程,并发,是经常遇到的问题,平时解决的方案也想过很多,比如说现在有1000行消息,需要开10个线程同时处理。

 

之前想过两个方案:

 

方案一: 一次开10个线程,每个线程处理一条消息,等10个线程全部处理结束之后,再开启下10个线程,直到全部处理完毕

缺陷:需要等待其他n - 1个线程结束后,才能同时启动下n个线程

 

方案二: 将1000行消息分割为10份,每100行用一个线程处理。

优点:无等待

缺陷: 分割不均,无法充分利用所有的线程

 

现在想想,上面两个缺点挺多,就又想了两种方案:

 

方案三:使用ConcurrentLinkedQueue<Task>存储所有的Task,然后同时开启n个线程读取Queue.

优点:充分利用所有线程,无等待

缺点:需要将所有的task转移到Queue中,消耗一倍内存

 

方案四:使用java.util.concurrent包,固定数量线程池。

优点:完美解决

缺点:当单个task执行时间很短的时候,线程池中的线程并不会被全部使用,这样很多task就会block在一个线程中,降低执行速率

 

下面贴出每个方案的代码实现,备忘吧,如果有更好的想法,或者更简单的方式,再继续补充~

 

 

public class Task {

	private int id;
	
	public Task(int id) {
		this.id = id;
	}
	
	public void start() {
		System.out.println(Thread.currentThread().getName() + ": start to handle task " + id);

		try {
			Thread.sleep(5000);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}
	
}

 

 

 

import java.util.LinkedList;
import java.util.List;

public class TaskProducer {

	public static List<Task> produce(int count) {
		List<Task> tasks = new LinkedList<Task>();
		
		for(int i = 0; i < count; i ++) {
			tasks.add(new Task(i + 1));
		}
		
		return tasks;
	}
	
}

 

 

 

import java.util.LinkedList;
import java.util.List;

/**
 * 策略1: 每次开启n个线程,等待n个线程全部结束之后,再开启下n个线程,每个线程处理一个task.
 * 缺陷:需要等待其他n - 1个线程结束后,才能同时启动下n个线程
 */
public class Strategy1 {

	public static void main(String[] args) {
		List<Task> tasks = TaskProducer.produce(1000);
		handleTasks(tasks, 10);
		System.out.println("All finished");
	}
	
	public static void handleTasks(List<Task> tasks, int threadCount) {
		int taskCount = tasks.size();
		
		List<Thread> threadHolder = new LinkedList<Thread>();
		for(int i = 0; i < taskCount; i += threadCount) {
			for(int j = 0; j < threadCount && (i + j) < taskCount; j ++) {
				Thread thread = new Thread(new TaskHandler(tasks.get(i + j)));
				threadHolder.add(thread);
				thread.start();
			}
			
			waitToFinish(threadHolder);
			threadHolder.clear();
		}
	}
	
	public static void waitToFinish(List<Thread> threadHolder) {
		while(true) {
			boolean allFinished = true;
			for(Thread thread : threadHolder) {
				allFinished = allFinished && !thread.isAlive();
			}
			
			if(allFinished) {
				break;
			}
		}
	}
	
	public static class TaskHandler implements Runnable {
		private Task task;
		
		public TaskHandler(Task task) {
			this.task = task;
		}

		@Override
		public void run() {
			task.start();
		}
	}
	
}

 

 

 

import java.math.BigDecimal;
import java.math.RoundingMode;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;

/**
 * 策略2: 将所有的task分割成n个子task列表,然后开启n个线程,每个线程处理一个子列表
 * 优点:无等待
 * 缺陷: 分割不均,无法充分利用所有的线程
 */
public class Strategy2 {

	public static void main(String[] args) {
		List<Task> tasks = TaskProducer.produce(1000);
		handleTasks(tasks, 10);
		System.out.println("All finished");
	}

	public static void handleTasks(List<Task> tasks, int threadCount) {
		List<List<Task>> splitTasks = splitTasksToNThreads(tasks, threadCount);

		List<Thread> threadHolder = new LinkedList<Thread>();
		for (List<Task> segment : splitTasks) {
			Thread thread = new Thread(new TaskHandler(segment));
			threadHolder.add(thread);
			thread.start();
		}
		
		waitToFinish(threadHolder);
	}
	
	public static void waitToFinish(List<Thread> threadHolder) {
		while(true) {
			boolean allFinished = true;
			for(Thread thread : threadHolder) {
				allFinished = allFinished && !thread.isAlive();
			}
			
			if(allFinished) {
				break;
			}
		}
	}
	
	public static List<List<Task>> splitTasksToNThreads(List<Task> tasks, int threadCount) {
		List<List<Task>> splitTasks = new ArrayList<List<Task>>(threadCount);

		int taskCount = tasks.size();
		int taskPerThread = new BigDecimal(taskCount).divide(new BigDecimal(threadCount), RoundingMode.CEILING).intValue();

		for (int i = 0; i < taskCount; i += taskPerThread) {
			List<Task> segment = new LinkedList<Task>();
			for (int j = 0; j < taskPerThread && (i + j) < taskCount; j++) {
				segment.add(tasks.get(i + j));
			}

			splitTasks.add(segment);
		}

		tasks.clear();
		
		return splitTasks;
	}
	
	public static class TaskHandler implements Runnable {
		private List<Task> tasks;

		public TaskHandler(List<Task> tasks) {
			this.tasks = tasks;
		}
		
		@Override
		public void run() {
			for (Task task : tasks) {
				task.start();
			}
		}
	}

}

 

 

 

import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;

/**
 * 策略3: 使用ConcurrentLinkedQueue<Task>存储所有的Task,然后同时开启n个线程读取Queue.
 * 优点:充分利用所有线程,无等待
 * 缺点:需要将所有的task转移到Queue中,消耗一倍内存
 */
public class Strategy3 {

	public static void main(String[] args) {
		List<Task> tasks = TaskProducer.produce(1000);
		handleTasks(tasks, 10);
		System.out.println("All finished");
	}
	
	public static void handleTasks(List<Task> tasks, int threadCount) {
		Queue<Task> taskQueue = new ConcurrentLinkedQueue<Task>();
		taskQueue.addAll(tasks);
		
		List<Thread> threadHolder = new LinkedList<Thread>();
		for(int i = 0; i < threadCount; i ++) {
			Thread thread = new Thread(new TaskHandler(taskQueue));
			threadHolder.add(thread);
			thread.start();
		}
		
		waitToFinish(threadHolder);
	}
	
	public static void waitToFinish(List<Thread> threadHolder) {
		while(true) {
			boolean allFinished = true;
			for(Thread thread : threadHolder) {
				allFinished = allFinished && !thread.isAlive();
			}
			
			if(allFinished) {
				break;
			}
		}
	}
	
	public static class TaskHandler implements Runnable {
		
		private final Queue<Task> tasks;
		
		public TaskHandler(Queue<Task> tasks) {
			this.tasks = tasks;
		}
		
		public void run() {
			while(!tasks.isEmpty()) {
				Task task = tasks.poll();
				if(task != null) {
					task.start();
				}
			}
		}
		
	}
	
}

 

 

import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
 * 策略4: 使用java.util.concurrent包,线程池。
 * 优点:完美解决。
 */
public class Strategy4 {

	public static void main(String[] args) {
		List<Task> tasks = TaskProducer.produce(1000);
		handleTasks(tasks, 10);
		System.out.println("All finished");
	}
	
	public static void handleTasks(List<Task> tasks, int threadCount) {
		try {
			ExecutorService executor = Executors.newFixedThreadPool(threadCount);
			
			for(Task task : tasks) {
				executor.submit(new TaskHandler(task));
			}
			
			executor.shutdown();
			executor.awaitTermination(60, TimeUnit.SECONDS);
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
	
	public static class TaskHandler implements Runnable {

		private Task task;
		
		public TaskHandler(Task task) {
			this.task = task;
		}
		
		public void run() {
			task.start();
		}
		
	}
	
}
6
3
分享到:
评论
9 楼 ultime 2013-01-10  
[b][/b][i][/i][u][/u][/color][size=large][/size][color=green][align=center][/align]   正好用到,谢谢lz了。
8 楼 terrencexu 2010-09-10  
jy1245626 写道
我是这么想的,要处理1000条消息,这的动作应该是一个异步的动作,就是说开一个线程出来让他执行这个动作,至于他什么时候执行完,就不管了。既然不管了就不用waitToFinish(threadHolder);了,然后我就注释掉它,速度就是瞬间了


呃,不是的,这里的意思就是,需要等所有的任务都执行完了,才能继续进行后续的操作,比如说我需要开3个线程,把所有的服务都启动起来了,才能进行后续的测试。
7 楼 jy1245626 2010-09-09  
我是这么想的,要处理1000条消息,这的动作应该是一个异步的动作,就是说开一个线程出来让他执行这个动作,至于他什么时候执行完,就不管了。既然不管了就不用waitToFinish(threadHolder);了,然后我就注释掉它,速度就是瞬间了
6 楼 terrencexu 2010-09-08  
jy1245626 写道
呵呵,我没啥好方法,多线程我也不熟,只是运行了第一种方法后,看第二种方法,有点受不了它的速度

我昨天自己跑了一下这几个case,在10的整数倍的task数的时候,第二个的运行速度是最快的,执行速度从快到慢依次是:2 > 3 > 4 > 1,你能帮忙告诉我你的test case吗?我想试试看看什么地方需要改进改进,非常感谢 
5 楼 jy1245626 2010-09-08  
呵呵,我没啥好方法,多线程我也不熟,只是运行了第一种方法后,看第二种方法,有点受不了它的速度
4 楼 terrencexu 2010-09-07  
jy1245626 写道
第二种方法的效率巨慢,

有没有比较好的改进方法?
3 楼 jy1245626 2010-09-07  
第二种方法的效率巨慢,
2 楼 terrencexu 2010-09-06  
lgd_java2eye 写道
楼主  List<Task> tasks = TaskProducer.produce(11);  
这个是创建11个任务,而不是创建10个任务,是不是写错了啊!!!!!!!!!!!!!!!

应该创建1000个任务~~
1 楼 lgd_java2eye 2010-09-06  
楼主  List<Task> tasks = TaskProducer.produce(11);  
这个是创建11个任务,而不是创建10个任务,是不是写错了啊!!!!!!!!!!!!!!!

相关推荐

    c# 线程(thread)同步处理

    .NET平台上的线程同步的问题线程之间共享的变量访问的同步,它的操作时原子操作,且被线程共享.你可以通过Interlocked.Increment 或 Interlocked.Decrement来增加或减少共享变量.它的有点在于是原子操作,也就是说...

    基于线程同步与妥协处理机制的多线程技术

    线程同步与妥协处理机制可以较好的解决多线程使用过程中产生的问题。实验中采用了这两种方法后数据混乱、死锁等问题的出现几率大大降低。实验结论表明上面两种方法的使用可以很好的控制死锁、数据混乱的出现,具有...

    worker线程处理时,同时刷新对话框消息

    所以要处理线程同时即时刷新界面要用SendMessage,它是一同步发送消息,会等处理完成才返回,即一SendMessage就会进入Refresh函数 2.而多线程函数的使用中可能出现以下问题,如果用 CWinThread * pthread=...

    生产消费者队列(c#),用于线程的队列自动同步

    这个就不用多说了吧,如果你的线程处理不想一直循环轮询的话,可以试试这个。

    详细解析C#多线程同步事件及等待句柄

    最近捣鼓了一下多线程的同步问题,发现其实C#关于多线程同步事件处理还是很灵活,这里主要写一下,自己测试的一些代码,涉及到了AutoResetEvent 和 ManualResetEvent,当然还有也简要提了一下System.Threading....

    详解C#多线程之线程同步

    通过学习《CLR via C#》里面的内容,对线程同步形成了脉络较清晰的体系结构,在多线程中实现线程同步的是线程同步构造,这个构造分两大类,一个是基元构造,一个是混合构造。所谓基元则是在代码中使用最简单的构造。...

    一个基于C#开发的多线程处理程序源代码

    一个基于C#开发的多线程处理程序源代码,该例子程序代码采用VS2005+C#开发,可以进行多线程进行后台运算程序的同步处理。

    Java多线程数据同步处理的研究分析.pdf

    Java多线程数据同步处理的研究分析

    vc++ 线程同步与异步套接字编程实例

    vc++ 线程同步与异步套接字编程实例,Windows套接字在两种模式下执行I/O操作,阻塞和非阻塞。在阻塞模式下,在I/O操作完成前,执行操作的Winsock函数会一直等待下去,不会立即返回程序(将控制权交还给程序)。而在非...

    用户方式中线程的同步.pdf

    当所有的线程在互相之间不需要进行通信的情况下就能够顺利地运行时, M i c r o s o f t Window s的运行性能最好。但是,线程很少能够在所有的时间都独立地进行操作...当这个任务完成时,另一个线程必须了解这个情况。

    多线程处理视频进行缓冲处理

    多线程处理视频时,由于不同线程处理速度不同步,不同线程之间的数据传递需要进行缓冲处理。单块缓存的互斥操作或两块缓存的乒乓操作在大多数情况下不够用(数据流不平稳时丢数据);附件中利用C++实现了一个调用...

    c sharp ——线程 同步.txt

    创建线程的方法,子线程的问题 并行处理 同步:所线程等方法,将处理代码写入线程,对于新手非常容易上手

    delphiXE多线程同步对象及异步执行.zip

    本下载解决delphiXE在处理多线程时,如何对各线程进行管理、如何做到中途中断执行多线程时的线程安全,如何在多线程内部进行UI同步等等,还可看我的博文同步。

    MulThreadclass.rar_AfxBeginThread_cwinthread_多线程 同步

    感觉多线程确实麻烦,线程间要处理好同步与通讯,如果用CWinThread好一点,直接是一个线程对象,如果用AfxBeginThread,那必须定个全局函数,或者写个静态函数,一般是传个this指针进去,然后再用这个指针调用本类...

    <一>、C++实现多线程的同步处理:控制ABC的输出顺序,输出10组,mutex+condition-variable源码示例

    通过一个标志位来处理的方式虽然可以实现效果,但是还不够安全,极有可能有多条线程同时操作一个全局变量,导致资源争夺问题,为了保证安全,可以在此基础上加上对应的锁来处理同步问题,比如加上互斥锁 mutex,就...

    C# 多线程的同步与互斥(使用Mutex和Event)

    操作系统课的一个小作业,使用C#处理多线程的同步与互斥,使用Mutex类和AutoResetEvent类。作为一个小白,参考了其他代码写出来,某些地方还有些不完全明白,都写在注释里了,求高手指点。

    [『辅助』] 易编远航第一期-六套大漠多线程中级进阶视频教程

    并且对接下来脚本的线程处理,及监控线程起到一个较高的实际认知。 对多线程基 础及后续多线程课程有承前启后的作用 主要学习内容: 1.线程的启动及关闭 2.监控线程的运用 3.大漠多线程之参数传递 易语言调用大漠...

    C#多线程及同步示例简析

    二是由于对称多处理机(SMP)出现,可以满足多个运行单位,而多个进程并行开销过大。 因此在80年代,出现了能独立运行的基本单位——线程(Threads)。  线程,有时被称为轻量级进程(Lightweight Process,LWP),...

    多线程处理视频时,由于不同线程处理速度不同步,不同线程之间的数据传递需要进行缓冲处理

    多线程处理视频时,由于不同线程处理速度不同步,不同线程之间的数据传递需要进行缓冲处理。单块缓存的互斥操作或两块缓存的乒乓操作在大多数情况下不够用(数据流不平稳时丢数据);附件中利用C++实现了一个调用...

    java多线程模拟处理银行的实时转账交易

    模拟实现多线程处理银行的实时转账交易,代码完整,可以完美运行~

Global site tag (gtag.js) - Google Analytics