Some DOTS Utilities: NativeCounter and NativeSum

I haven’t shared some utility scripts for a while now. It’s about time I do so again. We’ve been making a game in pure ECS and we have created lots of utilities along the way. Allow me to share to you NativeCounter and NativeSum.

We are making a game called City Hall Simulator. It’s a game about building and managing a city hall. The game has a simulated city in the background. We use the stats in this simulated city to determine the amount of people that will go to the city hall to fulfill their civic needs like getting a city ID, procuring birth certificates, signing up for socialized housing, applying for business permit, or just going to the park that the player built in the city hall.

Simulating the city requires a lot of counting and totaling. I implemented the city elements using ECS, of course, so I could use jobs and Burst compile them. I modeled the city a little bit more detailed like it has individuals and they form households. There are businesses that produces jobs. There are schools which individuals go to improve their education so they can get jobs. Ticking the city will require a lot of processes. Thanks to DOTS, I could run these processes in parallel… or so I thought.

The mere process of counting can’t be easily parallelized. For example, I want to count households with more than 5 members. I can write the following job but can’t call ScheduleParallel() on it:

[BurstCompile]
private struct CountBigHouseholdJob : IJobEntityBatch {
    [ReadOnly]
    public BufferTypeHandle<HouseholdMember> householdMemberType;

    public NativeReference<int> count;

    public void Execute(ArchetypeChunk batchInChunk, int batchIndex) {
        BufferAccessor<HouseholdMember> householdMembersBuffers =
            batchInChunk.GetBufferAccessor(this.householdMemberType);

        for (int i = 0; i < batchInChunk.Count; ++i) {
            DynamicBuffer<HouseholdMemberIncomeValue> householdMembers = householdMembersBuffers[i];
            if (householdMembers.Length > 5) {
                this.count.Value += 1;
            }
        }
    }
}

This is because NativeReference has no capability for writing in parallel. Some native containers like NativeList has a parallel writer. We want to have something like that but specifically used for counting.

NativeCounter

Enter NativeCounter. When I searched about it, there’s already an implementation as an example but it’s not really part of any DOTS library. Not to worry. I already went my way to make a working implementation based from the sample.

[StructLayout(LayoutKind.Sequential)]
[NativeContainer]
public unsafe struct NativeCounter {
    // The actual pointer to the allocated count needs to have restrictions relaxed so jobs can be scheduled with this container
    [NativeDisableUnsafePtrRestriction]
    private int* countIntegers;

#if ENABLE_UNITY_COLLECTIONS_CHECKS
    private AtomicSafetyHandle m_Safety;

    // The dispose sentinel tracks memory leaks. It is a managed type so it is cleared to null when scheduling a job
    // The job cannot dispose the container, and no one else can dispose it until the job has run, so it is ok to not pass it along
    // This attribute is required, without it this NativeContainer cannot be passed to a job; since that would give the job access to a managed object
    [NativeSetClassTypeToNullOnSchedule]
    private DisposeSentinel m_DisposeSentinel;
#endif

    // Keep track of where the memory for this was allocated
    private readonly Allocator m_AllocatorLabel;

    public const int INTS_PER_CACHE_LINE = JobsUtility.CacheLineSize / sizeof(int);

    public NativeCounter(Allocator label) {
        // This check is redundant since we always use an int that is blittable.
        // It is here as an example of how to check for type correctness for generic types.
#if ENABLE_UNITY_COLLECTIONS_CHECKS
        if (!UnsafeUtility.IsBlittable<int>()) {
            throw new ArgumentException(
                string.Format("{0} used in NativeQueue<{0}> must be blittable", typeof(int)));
        }
#endif
        this.m_AllocatorLabel = label;

        // Allocate native memory for a single integer
        this.countIntegers = (int*) UnsafeUtility.Malloc(
            UnsafeUtility.SizeOf<int>() * INTS_PER_CACHE_LINE * JobsUtility.MaxJobThreadCount, 4, label);

        // Create a dispose sentinel to track memory leaks. This also creates the AtomicSafetyHandle
#if ENABLE_UNITY_COLLECTIONS_CHECKS
        DisposeSentinel.Create(out this.m_Safety, out this.m_DisposeSentinel, 0, label);
#endif
        // Initialize the count to 0 to avoid uninitialized data
        this.Count = 0;
    }

    public void Increment() {
        // Verify that the caller has write permission on this data. 
        // This is the race condition protection, without these checks the AtomicSafetyHandle is useless
#if ENABLE_UNITY_COLLECTIONS_CHECKS
        AtomicSafetyHandle.CheckWriteAndThrow(this.m_Safety);
#endif
        (*this.countIntegers)++;
    }

    public int Count {
        get {
            // Verify that the caller has read permission on this data. 
            // This is the race condition protection, without these checks the AtomicSafetyHandle is useless
#if ENABLE_UNITY_COLLECTIONS_CHECKS
            AtomicSafetyHandle.CheckReadAndThrow(this.m_Safety);
#endif
            int count = 0;
            for (int i = 0; i < JobsUtility.MaxJobThreadCount; ++i) {
                count += this.countIntegers[INTS_PER_CACHE_LINE * i];
            }

            return count;
        }
        
        set {
            // Verify that the caller has write permission on this data. 
            // This is the race condition protection, without these checks the AtomicSafetyHandle is useless
#if ENABLE_UNITY_COLLECTIONS_CHECKS
            AtomicSafetyHandle.CheckWriteAndThrow(this.m_Safety);
#endif
            // Clear all locally cached counts, 
            // set the first one to the required value
            for (int i = 1; i < JobsUtility.MaxJobThreadCount; ++i) {
                this.countIntegers[INTS_PER_CACHE_LINE * i] = 0;
            }

            *this.countIntegers = value;
        }
    }

    public bool IsCreated {
        get {
            return this.countIntegers != null;
        }
    }

    public void Dispose() {
        // Let the dispose sentinel know that the data has been freed so it does not report any memory leaks
#if ENABLE_UNITY_COLLECTIONS_CHECKS
        DisposeSentinel.Dispose(ref this.m_Safety, ref this.m_DisposeSentinel);
#endif

        UnsafeUtility.Free(this.countIntegers, this.m_AllocatorLabel);
        this.countIntegers = null;
    }

    [NativeContainer]
    // This attribute is what makes it possible to use NativeCounter.Concurrent in a ParallelFor job
    [NativeContainerIsAtomicWriteOnly]
    public struct ParallelWriter {
        // Copy of the pointer from the full NativeCounter
        [NativeDisableUnsafePtrRestriction]
        private int* countIntegers;

        // Copy of the AtomicSafetyHandle from the full NativeCounter. The dispose sentinel is not copied since this inner struct does not own the memory and is not responsible for freeing it.
#if ENABLE_UNITY_COLLECTIONS_CHECKS
        private AtomicSafetyHandle m_Safety;
#endif
        
        // The current worker thread index; it must use this exact name since it is injected
        [NativeSetThreadIndex]
        int m_ThreadIndex;

        // This is what makes it possible to assign to NativeCounter.Concurrent from NativeCounter
        public static implicit operator ParallelWriter(NativeCounter cnt) {
            ParallelWriter parallelWriter;
#if ENABLE_UNITY_COLLECTIONS_CHECKS
            AtomicSafetyHandle.CheckWriteAndThrow(cnt.m_Safety);
            parallelWriter.m_Safety = cnt.m_Safety;
            AtomicSafetyHandle.UseSecondaryVersion(ref parallelWriter.m_Safety);
#endif

            parallelWriter.countIntegers = cnt.countIntegers;
            parallelWriter.m_ThreadIndex = 0;

            return parallelWriter;
        }

        public void Increment() {
            // Increment still needs to check for write permissions
#if ENABLE_UNITY_COLLECTIONS_CHECKS
            AtomicSafetyHandle.CheckWriteAndThrow(this.m_Safety);
#endif
            
            // No need for atomics any more since we are just incrementing the local count
            ++this.countIntegers[INTS_PER_CACHE_LINE * this.m_ThreadIndex];
        }
    }
}

I will no longer go in detail about the implementation as the steps written on the linked documentation explains it way better. I’ll just describe the gist, instead. Basically, we are creating our own native container but we’re wrapping integers only and we expose a method aptly called Increment() and a property named Count. To be able to write safely in multiple concurrent threads, each thread is assigned its own integer that it can mutate. A thread index can be identified by using the NativeSetThreadIndex attribute which is then assigned to the integer the attribute was added to (ParallelWriter.m_ThreadIndex in this case). When calling ParallelWriter.Increment(), the running thread will only increment the integer it is assigned to. Once these integers are set up, the Count property then merely iterates through all of these integers and gets the total.

Now we can rewrite CountBigHouseholdJob that can run in parallel:

[BurstCompile]
private struct CountBigHouseholdJob : IJobEntityBatch {
    [ReadOnly]
    public BufferTypeHandle<HouseholdMember> householdMemberType;

    public NativeCounter.ParallelWriter counter;

    public void Execute(ArchetypeChunk batchInChunk, int batchIndex) {
        BufferAccessor<HouseholdMember> householdMembersBuffers =
            batchInChunk.GetBufferAccessor(this.householdMemberType);

        for (int i = 0; i < batchInChunk.Count; ++i) {
            DynamicBuffer<HouseholdMemberIncomeValue> householdMembers = householdMembersBuffers[i];
            if (householdMembers.Length > 5) {
                this.counter.Increment();
            }
        }
    }
}

// Job usage
NativeCounter counter = new NativeCounter(Allocator.TempJob);

CountBigHouseholdJob countJob = new CountBigHouseholdJob() {
    householdMemberType = GetBufferTypeHandle<HouseholdMember>(),
    counter = counter
};
this.Dependency = countJob.ScheduleParallel(this.householdsQuery, 1, this.Dependency);

NativeSum

A few city processes later, I needed a way to calculate sum of things or total of things. Again, I wanted to run such jobs in parallel but can’t because NativeReference has no parallel writer. With the code of NativeCounter in hand, I figured I can just copy the code and retrofit it to add any numbers instead. After all, an increment is just something like Add(1). Here’s NativeSum:

[StructLayout(LayoutKind.Sequential)]
[NativeContainer]
public unsafe struct NativeSum {
    // The actual pointer to the allocated sum needs to have restrictions relaxed so jobs can be scheduled with this utility
    [NativeDisableUnsafePtrRestriction]
    private int* sumIntegers;

#if ENABLE_UNITY_COLLECTIONS_CHECKS
    private AtomicSafetyHandle m_Safety;

    // The dispose sentinel tracks memory leaks. It is a managed type so it is cleared to null when scheduling a job
    // The job cannot dispose the container, and no one else can dispose it until the job has run, so it is ok to not pass it along
    // This attribute is required, without it this NativeContainer cannot be passed to a job; since that would give the job access to a managed object
    [NativeSetClassTypeToNullOnSchedule]
    private DisposeSentinel m_DisposeSentinel;
#endif

    // Keep track of where the memory for this was allocated
    private readonly Allocator m_AllocatorLabel;

    public const int INTS_PER_CACHE_LINE = JobsUtility.CacheLineSize / sizeof(int);

    public NativeSum(Allocator label) {
        // This check is redundant since we always use an int that is blittable.
        // It is here as an example of how to check for type correctness for generic types.
#if ENABLE_UNITY_COLLECTIONS_CHECKS
        if (!UnsafeUtility.IsBlittable<int>()) {
            throw new ArgumentException(
                string.Format("{0} used in NativeQueue<{0}> must be blittable", typeof(int)));
        }
#endif
        this.m_AllocatorLabel = label;

        // Allocate native memory for a single integer
        this.sumIntegers = (int*) UnsafeUtility.Malloc(
            UnsafeUtility.SizeOf<int>() * INTS_PER_CACHE_LINE * JobsUtility.MaxJobThreadCount, 4, label);

        // Create a dispose sentinel to track memory leaks. This also creates the AtomicSafetyHandle
#if ENABLE_UNITY_COLLECTIONS_CHECKS
        DisposeSentinel.Create(out this.m_Safety, out this.m_DisposeSentinel, 0, label);
#endif

        Clear();
    }

    public void Clear() {
        // Clear uninitialized data
        // Verify that the caller has write permission on this data. 
        // This is the race condition protection, without these checks the AtomicSafetyHandle is useless
#if ENABLE_UNITY_COLLECTIONS_CHECKS
        AtomicSafetyHandle.CheckWriteAndThrow(this.m_Safety);
#endif
        
        for (int i = 0; i < JobsUtility.MaxJobThreadCount; ++i) {
            this.sumIntegers[INTS_PER_CACHE_LINE * i] = 0;
        }            
    }

    public void Add(int amount) {
        // Verify that the caller has write permission on this data. 
        // This is the race condition protection, without these checks the AtomicSafetyHandle is useless
#if ENABLE_UNITY_COLLECTIONS_CHECKS
        AtomicSafetyHandle.CheckWriteAndThrow(this.m_Safety);
#endif
        (*this.sumIntegers) += amount;
    }

    public int Total {
        get {
            // Verify that the caller has read permission on this data. 
            // This is the race condition protection, without these checks the AtomicSafetyHandle is useless
#if ENABLE_UNITY_COLLECTIONS_CHECKS
            AtomicSafetyHandle.CheckReadAndThrow(this.m_Safety);
#endif
            int total = 0;
            for (int i = 0; i < JobsUtility.MaxJobThreadCount; ++i) {
                total += this.sumIntegers[INTS_PER_CACHE_LINE * i];
            }

            return total;
        }
    }

    public bool IsCreated {
        get {
            return this.sumIntegers != null;
        }
    }

    public void Dispose() {
        // Let the dispose sentinel know that the data has been freed so it does not report any memory leaks
#if ENABLE_UNITY_COLLECTIONS_CHECKS
        DisposeSentinel.Dispose(ref this.m_Safety, ref this.m_DisposeSentinel);
#endif

        UnsafeUtility.Free(this.sumIntegers, this.m_AllocatorLabel);
        this.sumIntegers = null;
    }

    [NativeContainer]
    // This attribute is what makes it possible to use NativeSum.ParallelWriter in a ParallelFor job
    [NativeContainerIsAtomicWriteOnly]
    public struct ParallelWriter {
        // Copy of the pointer from the main NativeSum
        [NativeDisableUnsafePtrRestriction]
        private int* dataPointer;

        // Copy of the AtomicSafetyHandle from the full NativeCounter. The dispose sentinel is not copied since this inner struct does not own the memory and is not responsible for freeing it.
#if ENABLE_UNITY_COLLECTIONS_CHECKS
        private AtomicSafetyHandle m_Safety;
#endif
        
        // The current worker thread index; it must use this exact name since it is injected
        [NativeSetThreadIndex]
        int m_ThreadIndex;

        // This is what makes it possible to assign to NativeCounter.Concurrent from NativeCounter
        public static implicit operator ParallelWriter(NativeSum sum) {
            ParallelWriter parallelWriter;
#if ENABLE_UNITY_COLLECTIONS_CHECKS
            AtomicSafetyHandle.CheckWriteAndThrow(sum.m_Safety);
            parallelWriter.m_Safety = sum.m_Safety;
            AtomicSafetyHandle.UseSecondaryVersion(ref parallelWriter.m_Safety);
#endif

            parallelWriter.dataPointer = sum.sumIntegers;
            parallelWriter.m_ThreadIndex = 0;

            return parallelWriter;
        }

        public void Add(int amount) {
            // Increment still needs to check for write permissions
#if ENABLE_UNITY_COLLECTIONS_CHECKS
            AtomicSafetyHandle.CheckWriteAndThrow(this.m_Safety);
#endif
            
            // No need for atomics any more since we are just incrementing the local count
            this.dataPointer[INTS_PER_CACHE_LINE * this.m_ThreadIndex] += amount;
        }
    }
}

It uses the same concept in NativeCounter such that each thread is assigned an integer to work with. The only difference is providing a method named Add(int) and a property called Total. Here’s a sample job that uses NativeSum:

[BurstCompile]
private struct ComputePublicMarketsOperationCostJob : IJobEntityBatch {
    [ReadOnly]
    public ComponentTypeHandle<Business> businessType;
    
    public NativeSum.ParallelWriter totalCost;

    public int fundsPerIncomeLevel;

    public int maintenanceCost;
    
    public void Execute(ArchetypeChunk batchInChunk, int batchIndex) {
        NativeArray<Business> businesses = batchInChunk.GetNativeArray(this.businessType);

        for (int i = 0; i < batchInChunk.Count; ++i) {
            Business business = businesses[i];
            this.totalCost.Add(business.GetTotalSalary(this.fundsPerIncomeLevel));
            
            // Add maintenance cost per public market
            this.totalCost.Add(this.maintenanceCost);
        }
    }
}

This job simply runs through all public market entities (which has a Business component) then adds the total salary of all jobs in it and adds a fixed maintenanceCost value. This can be run in parallel.

That’s all for now

There you have it. Two simple utilities that help in making jobs run in parallel. You can definitely use these in your projects right now.

If you like my posts, please subscribe to my mailing list and be among the first to know what I’m up to. I’ll send you a free game upon subscription. 🙂

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s