Source code for logic.tracker

import math
from logger import get_tracker_logger

import numpy as np

import glm
from cluster.mean_shift import MeanShift, MeanShiftResult
from logic.abfilter import ABFilter
from logic.radar import Signal
from objects import Tracked

EPSILON = 2
REQUIRED_MINIMUM_TIME = 3 * 24
STOP_TRACKING_TIME = 100

PULSE_POWER_OF_EMITTED_SIGNAL = 150000  # w
TRANSMITTING_ANTENNA_GAINS = 40
RECEIVING_ANTENNA_GAINS = 40
WAVELENGTH = 0.035  # meters
EPR_TARGET = 0.1  # meters ^ 2
TOTAL_LOSS_FACTOR = 0.9 * 0.9 * 1
RECEIVER_NOISE_FACTOR = 5  # decibel
BOLTZMANN_CONSTANT = 1.38 * (10 ** -23)
STANDARD_TEMPERATURE = 290


[docs] class Tracker: """ The Tracker class Provides methods for tracking objects """ def __init__(self): self.objects: list[Tracked] = [] self.archive_objects: list[Tracked] = [] self.filter: ABFilter = ABFilter() self.mean_shift: MeanShift = MeanShift() self.logger = get_tracker_logger("Tracker")
[docs] def calculate_coordinate(self, signals: list[Signal], time: int) -> list: """ Calculate position where signals was reflected. :param signals: List of signals :type signals: list[Signal] :param time: Current time in the simulation :type time: int :return: list of positions of signals :rtype: list[glm.vec3] """ coordinates: list = [] for signal in signals: r = (signal.init_power / signal.power) ** 0.5 if signal.direction.z < 0.01: continue noise = self.get_signal_noise(r, time) print(noise) if noise > 13: coordinates.append(-signal.speed * r) return coordinates
def __calculate_real_coordinates(self, signals: list[Signal], time) \ -> list: coordinates: list = [] for signal in signals: r = np.sqrt(np.sum(signal.direction ** 2)) if self.get_signal_noise(r, time) > 13: coordinates.append(signal.direction) return coordinates
[docs] @staticmethod def calculate_cluster_centers(result: MeanShiftResult) -> list: """ Calculate positions of aircraft based on mean_shift clustering :param result: result of mean_shift clustering :type result: MeanShiftResult :return: list of aircraft positions """ clusters = {} clusters_count = {} for i in range(len(result.shifted_points)): if result.cluster_ids[i] not in clusters: clusters[result.cluster_ids[i]] = result.shifted_points[i] clusters_count[result.cluster_ids[i]] = 1 else: clusters[result.cluster_ids[i]] += result.shifted_points[i] clusters_count[result.cluster_ids[i]] += 1 for cluster in clusters.keys(): clusters[cluster] /= clusters_count[cluster] aircraft = [] for key in clusters.keys(): aircraft.append(clusters[key]) return aircraft
[docs] def process_signals(self, signals: list[Signal], current_time: int): """ Processes signals, finds new objects, or updates old ones :param signals: list of signals :param current_time: time in the simulation :return: positions of tracked objects """ coordinates: list = self.calculate_coordinate(signals, current_time) real_coordinates: list = self.__calculate_real_coordinates(signals, current_time) aircraft = np.array(self.calculate_cluster_centers( self.mean_shift.cluster(coordinates, kernel_bandwidth=1))) aircraft.sort() real_aircraft = np.array(self.calculate_cluster_centers( self.mean_shift.cluster(real_coordinates, kernel_bandwidth=1))) real_aircraft.sort() calculated = [] for i, plane in enumerate(aircraft): coordinate = glm.vec3(plane[0], plane[1], plane[2]) for obj in self.objects: if sum((obj.trajectory[ -1] - coordinate) ** 2) ** 0.5 < EPSILON: obj.mse += sum( (coordinate - obj.extrapolatedValueAB) ** 2) ** 0.5 self.logger.info(f"{obj.obj_name}({obj.mse})") self.filter.filterAB(obj, coordinate, current_time + 1) calculated.append(obj.trajectory[-1]) obj.update_time(current_time) break else: self.objects.append(Tracked(current_time)) self.filter.filterAB(self.objects[-1], coordinate, current_time) self.update_objects(current_time) return calculated
[docs] def update_objects(self, current_time: int): """ Deletes objects that have not been updated for a long time :param current_time: time in the simulation """ for obj in self.objects: if current_time - obj.last_tracked_time < STOP_TRACKING_TIME: continue if obj.tracked_time >= REQUIRED_MINIMUM_TIME: self.archive_objects.append(obj) self.objects.remove(obj)
[docs] @staticmethod def get_signal_noise(radius: int, time: int): """ Calculate signal noise ratio :param radius: the distance that the signal traveled after it was reflected :param time: time in the simulation :return: signal noise ratio """ return (2 * PULSE_POWER_OF_EMITTED_SIGNAL * time * 10 ** (TRANSMITTING_ANTENNA_GAINS / 10) * 10 ** (RECEIVING_ANTENNA_GAINS / 10) * (WAVELENGTH ** 2) * EPR_TARGET * TOTAL_LOSS_FACTOR / 10 / (((4 * math.pi) ** 3) * (radius) ** 4 * 10 ** (RECEIVER_NOISE_FACTOR / 10) * BOLTZMANN_CONSTANT * STANDARD_TEMPERATURE))