using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Runtime.CompilerServices; using System.Threading; using System.Threading.Tasks; using Unity.Collections; using Unity.Collections.LowLevel.Unsafe; using UnityEngine; namespace JEngine.Core { public static unsafe class ThreadMgr { public struct ThreadTaskAwaiter : INotifyCompletion { public int Index; public void GetResult() { } public bool IsCompleted => false; public void OnCompleted(Action continuation) { Actions[Index] = continuation; } public ThreadTaskAwaiter GetAwaiter() { return this; } } private static int GetIndex() { bool gotLock = false; try { _createLock.Enter(ref gotLock); byte* ptr = UsageList; byte* max = ptr + MaxSize; while (ptr < max) { if (*ptr == 0) { *ptr = 1; break; } ptr++; } if (ptr == max) throw new Exception("ThreadMgr: ThreadTaskAwaiter is full!"); return (int)(ptr - UsageList); } finally { if (gotLock) _createLock.Exit(); } } private static void SetCompleted(int index) { if (UsageList[index] == 0 || index < 0 || index >= MaxSize) return; Action act = Actions[index]; try { UsageList[index] = 0; act?.Invoke(); } catch (Exception e) { Debug.LogException(e); } } /// /// 待执行任务 /// private static readonly Action[] Actions = new Action[MaxSize]; /// /// 使用列表 /// private static readonly byte* UsageList = (byte*)UnsafeUtility.Malloc(MaxSize, 4, Allocator.Persistent); /// /// 最大数量 /// private const int MaxSize = 10000; /// /// 锁 /// private static SpinLock _createLock; /// /// Init ThreadMgr /// public static void Initialize() { //注册Update到LifeCycleMgr _updateTaskId = LifeCycleMgr.Instance.AddUpdateTask(Update, () => _active); //默认运行 Activate(); GC.AddMemoryPressure(MaxSize); } /// /// Task id /// private static Guid _updateTaskId; /// /// status of activeness /// private static bool _active; /// /// Activate threadMgr to execute loop /// public static void Activate() { _active = true; } /// /// Deactivate threadMgr to stop loop /// public static void Deactivate() { _active = false; } /// /// Stop the current threadMgr, requires re-initialize to rerun /// public static void Stop() { LifeCycleMgr.Instance.RemoveUpdateItem(_updateTaskId); } /// /// Item to execute /// private struct DelayedQueueItem { public float Time; public Action Action; public bool MainThread; } /// /// Actions Queue /// private static readonly ConcurrentQueue Delayed = new ConcurrentQueue(); /// /// Queue an action with param on main thread to run /// /// /// [Obsolete("Use QueueOnMainThread instead")] public static ThreadTaskAwaiter QueueOnMainThread(Action action, object p) => QueueOnMainThread(action, p, 0f); /// /// Queue an action with param on main thread to run after specific seconds /// /// /// /// public static ThreadTaskAwaiter QueueOnMainThread(Action action, T p, float time = 0) { var ret = new ThreadTaskAwaiter(); ret.Index = GetIndex(); int index = ret.Index; var act = new Action(() => { action(p); SetCompleted(index); }); Delayed.Enqueue(new DelayedQueueItem { Time = _curTime + time, Action = act, MainThread = true }); return ret; } /// /// Queue an action on main thread to run after specific seconds /// /// /// public static ThreadTaskAwaiter QueueOnMainThread(Action action, float time = 0f) { var ret = new ThreadTaskAwaiter(); int index = GetIndex(); ret.Index = index; var act = new Action(() => { action(); SetCompleted(index); }); Delayed.Enqueue(new DelayedQueueItem { Time = _curTime + time, Action = act, MainThread = true }); return ret; } /// /// Queue an action on other thread to run after specific seconds /// /// /// /// /// public static ThreadTaskAwaiter QueueOnOtherThread(Action action, T p, float time = 0f) => QueueOnMainThread(action, p, time); /// /// Queue an action on other thread to run after specific seconds /// /// /// public static ThreadTaskAwaiter QueueOnOtherThread(Action action, float time = 0f) { var ret = new ThreadTaskAwaiter(); int index = GetIndex(); ret.Index = index; var act = new Action(() => { action(); SetCompleted(index); }); Delayed.Enqueue(new DelayedQueueItem { Time = _curTime + time, Action = act, MainThread = false }); return ret; } /// /// Current actions to process /// private static readonly List<(bool main, Action action)> CurActions = new List<(bool, Action)>(100); /// /// Current time /// private static float _curTime; /// /// Update loop on main thread /// static void Update() { _curTime = Time.time; var i = Delayed.Count; while (i-- > 0) { if (!Delayed.TryDequeue(out var item)) continue; if (item.Time <= _curTime) { CurActions.Add((item.MainThread, item.Action)); } else { Delayed.Enqueue(item); } } foreach (var (main, act) in CurActions) { if (!main) { Task.Run(() => { try { act?.Invoke(); } catch (Exception e) { Debug.LogException(e); } }); } else { try { act?.Invoke(); } catch (Exception e) { Debug.LogException(e); } } } CurActions.Clear(); } } }