Source code for logic.tracker
import math
from logger import get_tracker_logger
import numpy as np
import glm
from cluster.mean_shift import MeanShift, MeanShiftResult
from logic.abfilter import ABFilter
from logic.radar import Signal
from objects import Tracked
EPSILON = 2
REQUIRED_MINIMUM_TIME = 3 * 24
STOP_TRACKING_TIME = 100
PULSE_POWER_OF_EMITTED_SIGNAL = 150000 # w
TRANSMITTING_ANTENNA_GAINS = 40
RECEIVING_ANTENNA_GAINS = 40
WAVELENGTH = 0.035 # meters
EPR_TARGET = 0.1 # meters ^ 2
TOTAL_LOSS_FACTOR = 0.9 * 0.9 * 1
RECEIVER_NOISE_FACTOR = 5 # decibel
BOLTZMANN_CONSTANT = 1.38 * (10 ** -23)
STANDARD_TEMPERATURE = 290
[docs]
class Tracker:
""" The Tracker class
Provides methods for tracking objects
"""
def __init__(self):
self.objects: list[Tracked] = []
self.archive_objects: list[Tracked] = []
self.filter: ABFilter = ABFilter()
self.mean_shift: MeanShift = MeanShift()
self.logger = get_tracker_logger("Tracker")
[docs]
def calculate_coordinate(self, signals: list[Signal], time: int) -> list:
"""
Calculate position where signals was reflected.
:param signals: List of signals
:type signals: list[Signal]
:param time: Current time in the simulation
:type time: int
:return: list of positions of signals
:rtype: list[glm.vec3]
"""
coordinates: list = []
for signal in signals:
r = (signal.init_power / signal.power) ** 0.5
if signal.direction.z < 0.01:
continue
noise = self.get_signal_noise(r, time)
print(noise)
if noise > 13:
coordinates.append(-signal.speed * r)
return coordinates
def __calculate_real_coordinates(self, signals: list[Signal], time) \
-> list:
coordinates: list = []
for signal in signals:
r = np.sqrt(np.sum(signal.direction ** 2))
if self.get_signal_noise(r, time) > 13:
coordinates.append(signal.direction)
return coordinates
[docs]
@staticmethod
def calculate_cluster_centers(result: MeanShiftResult) -> list:
"""
Calculate positions of aircraft based on mean_shift clustering
:param result: result of mean_shift clustering
:type result: MeanShiftResult
:return: list of aircraft positions
"""
clusters = {}
clusters_count = {}
for i in range(len(result.shifted_points)):
if result.cluster_ids[i] not in clusters:
clusters[result.cluster_ids[i]] = result.shifted_points[i]
clusters_count[result.cluster_ids[i]] = 1
else:
clusters[result.cluster_ids[i]] += result.shifted_points[i]
clusters_count[result.cluster_ids[i]] += 1
for cluster in clusters.keys():
clusters[cluster] /= clusters_count[cluster]
aircraft = []
for key in clusters.keys():
aircraft.append(clusters[key])
return aircraft
[docs]
def process_signals(self, signals: list[Signal], current_time: int):
"""
Processes signals, finds new objects, or updates old ones
:param signals: list of signals
:param current_time: time in the simulation
:return: positions of tracked objects
"""
coordinates: list = self.calculate_coordinate(signals, current_time)
real_coordinates: list = self.__calculate_real_coordinates(signals,
current_time)
aircraft = np.array(self.calculate_cluster_centers(
self.mean_shift.cluster(coordinates, kernel_bandwidth=1)))
aircraft.sort()
real_aircraft = np.array(self.calculate_cluster_centers(
self.mean_shift.cluster(real_coordinates, kernel_bandwidth=1)))
real_aircraft.sort()
calculated = []
for i, plane in enumerate(aircraft):
coordinate = glm.vec3(plane[0], plane[1], plane[2])
for obj in self.objects:
if sum((obj.trajectory[
-1] - coordinate) ** 2) ** 0.5 < EPSILON:
obj.mse += sum(
(coordinate - obj.extrapolatedValueAB) ** 2) ** 0.5
self.logger.info(f"{obj.obj_name}({obj.mse})")
self.filter.filterAB(obj, coordinate, current_time + 1)
calculated.append(obj.trajectory[-1])
obj.update_time(current_time)
break
else:
self.objects.append(Tracked(current_time))
self.filter.filterAB(self.objects[-1], coordinate,
current_time)
self.update_objects(current_time)
return calculated
[docs]
def update_objects(self, current_time: int):
"""
Deletes objects that have not been updated for a long time
:param current_time: time in the simulation
"""
for obj in self.objects:
if current_time - obj.last_tracked_time < STOP_TRACKING_TIME:
continue
if obj.tracked_time >= REQUIRED_MINIMUM_TIME:
self.archive_objects.append(obj)
self.objects.remove(obj)
[docs]
@staticmethod
def get_signal_noise(radius: int, time: int):
"""
Calculate signal noise ratio
:param radius: the distance that the signal
traveled after it was reflected
:param time: time in the simulation
:return: signal noise ratio
"""
return (2 * PULSE_POWER_OF_EMITTED_SIGNAL * time *
10 ** (TRANSMITTING_ANTENNA_GAINS / 10) *
10 ** (RECEIVING_ANTENNA_GAINS / 10) *
(WAVELENGTH ** 2) * EPR_TARGET * TOTAL_LOSS_FACTOR / 10 /
(((4 * math.pi) ** 3) * (radius) ** 4 *
10 ** (RECEIVER_NOISE_FACTOR / 10) * BOLTZMANN_CONSTANT *
STANDARD_TEMPERATURE))