一、概述! a, u- h. [( C/ i; q d
多线程(multithreading),是指从软件或者硬件上实现多个线程并发执行的技术。具有多线程能力的计算机因有硬件支持而能够在同一时间执行多于一个线程,进而提升整体处理性能。具有这种能力的系统包括对称多处理机、多核心处理器以及芯片级多处理或同时多线程处理器。在一个程序中,这些独立运行的程序片段叫作“线程”(Thread),利用它编程的概念就叫作“多线程处理”。1 W0 D8 b0 n7 U" ]! @: E
队列(Queue)代表了一个先进先出的对象集合。当您需要对各项进行先进先出的访问时,则使用队列。当您在列表中添加一项,称为入队,当您从列表中移除一项时,称为出队。" B/ o$ J4 K$ M
比如平常我们在处理定时任务的时候,假设就一台机器,我们不可能单线程一条一条数据的去跑,这时候就需要提高机器资源的利用率。下面我们来介绍下,如何实现多线程+队列以提高并发处理能力。
0 H1 v3 Q: Q* a' H二、代码实现
, u" {% M. u8 k3 d- u2 D8 P S3 k1、定义线程数threadNum和队列queues
5 i% D0 r/ f) _2 c+ n! Z% g/// <summary>
/// 线程总数
/// </summary>
private int threadNum = 4;
/// <summary>
/// 总数
/// </summary>
private int totalCount = 0;
/// <summary>
/// 已处理
/// </summary>
private int index = 0;
/// <summary>
/// 队列
/// </summary>
private ConcurrentQueue<AssetRepayment> queues = new ConcurrentQueue<AssetRepayment>(); 2、定义线程列表,往线程添加数据# e( e/ A( w& l- O, a% h& ~' ~
public void SubDeTransaction()
{
var list = new List<AssetRepayment>();
for (int i = 0; i < 1000; i++)
{
list.Add(new AssetRepayment() { Title = i.ToString() + "---" + Guid.NewGuid().ToString() });
}
if (list == null || list.Count() == 0)
{
Console.WriteLine("没有可执行的数据");
return;
}
totalCount = list.Count;
Console.WriteLine("可执行的数据:" + list.Count() + "条");
foreach (var item in list)
{
queues.Enqueue(item);
}
List<Task> tasks = new List<Task>();
for (int i = 0; i < threadNum; i++)
{
var task = Task.Run(() =>
{
Process();
});
tasks.Add(task);
}
var taskList = Task.Factory.ContinueWhenAll(tasks.ToArray(), (ts) =>
{
});
taskList.Wait();
} 3、对线程数进行限制 for (int i = 0; i < threadNum; i++)* J6 ^/ J% [1 l8 |( h* [2 Z
var taskList = Task.Factory.ContinueWhenAll(tasks.ToArray(), (ts) =>
{
});
taskList.Wait(); 4、从队列取出数据进行业务处理
; D" z" e7 |6 c6 ^' M3 T* J' Z) n) h# B private void Process()
{
while (true)
{
var currentIndex = Interlocked.Increment(ref index);
AssetRepayment repayId = null;
var isExit = queues.TryDequeue(out repayId);
if (!isExit)
{
break;
}
try
{
Console.WriteLine(repayId.Title);
Console.WriteLine(string.Format(" 共{0}条 当前第{1}条", totalCount, currentIndex));
}
catch (Exception ex)
{
Console.WriteLine(ex);
}
}
} 运行测试:
6 M, f- d' _- [8 Z$ k- m
D; j$ d3 t; i& D& W$ ^0 {( k代码地址:https://gitee.com/conanOpenSource_admin/Example |