-
Notifications
You must be signed in to change notification settings - Fork 1
/
main.py
366 lines (276 loc) · 12.3 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
import machine
import uos
import time
from bno055 import *
from bno055_base import BNO055_BASE
from queue import * # do Queue(max_size, threshold) to make object
from math import floor
import sdcard
# from sys import exit # only for testing
"""
SUBSCALE PAYLOAD CONTROL Version 1.0-alpha
Authors: Payload Sub-Team
Since: 11/09/2022
Created for University of Massachusetts Rocket Team (Payload Sub-Team)
This program collects and stores data from the BNO055 IMU, detects launch and landing, and orients to ground upon landing.
"""
# setup uart object (uart0 maps to pin 1 on the pico)
uart = machine.UART(0, 9600)
# initialize the serial connection with given parameters
uart.init(9600, parity=None, stop=1)
time.sleep(0.5)
print("code has been run.")
uart.write("Initial Transmission\n\n")
uart.write("\nRocket was connected to power\n")
uart.write("\nWaiting 3 seconds\n")
time.sleep(3)
# IF YOU ARE IMPLEMENTING DATA TRANSMISSIONS, READ THE BELOW TEXT:
# To add transmissions, use uart.write(string)
class ForceLandingException(Exception):
pass
init_time = time.ticks_ms()
def mean(arr, length):
sum = 0
for e in arr:
if e != None:
sum += e
return sum / length
# Input: Queue to find mean/variance
# Output: array: [mean, variance]
def mean_variance(queue):
size = queue.get_size()
arr = queue.get_array()
mean_val = mean(arr, size)
sum = 0
for e in arr:
if e != None:
sum += (e - mean_val) ** 2
return [mean_val, sum / size]
# calculate maximum size of queue needed by frequency (samples/s) * interval to keep track of (ms) / 1000 (ms/s) * 1.25 room for error
def calculate_max_size(sample_frequency, queue_interval):
return floor(sample_frequency * queue_interval / 1000 * 1.25)
# Inputs: Array of functions that should be run every x ms
# Array of intervals of time between each run of its corresponding function
# Outputs: None
# Each function in func_arr should raise an exception if the loop should stop being run
def do_every(func_arr, interval_arr):
prevTimes = [-999999] * len(func_arr)
while True:
for i, func in enumerate(func_arr):
if time.ticks_diff(time.ticks_ms(), prevTimes[i]) > interval_arr[i]:
try:
func()
prevTimes[i] = time.ticks_ms()
except StopIteration: # Leave the while loop
return
# Initialize IMU
uart.write("\nInitializing IMU\n")
i2c = machine.I2C(0, sda=machine.Pin(16), scl=machine.Pin(17))
imu = BNO055(i2c)
# Assign chip select (CS) pin (and start it high)
cs = machine.Pin(9, machine.Pin.OUT)
# Intialize SPI peripheral (start with 1 MHz)
spi = machine.SPI(
1,
baudrate=10000,
polarity=0,
phase=0,
bits=8,
firstbit=machine.SPI.MSB,
sck=machine.Pin(10),
mosi=machine.Pin(11),
miso=machine.Pin(8),
)
# Initialize SD card
uart.write("\nIMU initialized. Initializing SD Card\n")
sd = sdcard.SDCard(spi, cs)
# Mount filesystem
vfs = uos.VfsFat(sd)
uos.mount(vfs, "/sd")
# Find a file name so nothing is overridden
def get_valid_file_name(path, ext):
try:
# If uos.stat doesn't have error, then the file exists, use a different name
uos.stat(path + "." + ext)
# Try same name with a "1" appended to it
return get_valid_file_name(path + "1", ext)
except OSError:
# file does not exist, use this file name
return path + "." + ext
uart.write("\n SD Card initialized.\n")
# NOTE: Need to close this at the end of the program
imu_data = open(get_valid_file_name("/sd/imu_data", "csv"), "w")
imu_data.write(
"Time, Temperature, Mag X, Mag Y, Mag Z, Gyro X, Gyro Y, Gyro Z, Acc X, Acc Y, Acc Z, Lin Acc X, Lin Acc Y, Lin Acc Z, Gravity X, Gravity Y, Gravity Z, Euler X, Euler Y, Euler Z\n"
)
flight_log = open(get_valid_file_name("/sd/flight_log", "txt"), "w")
def write_to_imu_data():
imu_data.write(
str(time.ticks_diff(time.ticks_ms(), init_time) / 1000.0)
+ ","
+ str(imu.temperature())
+ ","
+ "{:5.3f},{:5.3f},{:5.3f},".format(*imu.mag())
+ "{:5.3f},{:5.3f},{:5.3f},".format(*imu.gyro())
+ "{:5.3f},{:5.3f},{:5.3f},".format(*imu.accel())
+ "{:5.3f},{:5.3f},{:5.3f},".format(*imu.lin_acc())
+ "{:5.3f},{:5.3f},{:5.3f},".format(*imu.gravity())
+ "{:4.3f},{:4.3f},{:4.3f}".format(*imu.euler())
+ "\n"
)
# if time.ticks_diff(time.ticks_ms(), init_time) > 10000:
# raise StopIteration ONLY USED FOR TESTING PURPOSES
imu_data_frequency = 100 # Hz
imu_data_interval = 1 / imu_data_frequency * 1000 # ms
# Declaring queues. They must be accessible via the global context
time_queue = None
accel_queue = None
# Inputs: Interval: How much acceleration data should be kept track of (eg last 2000 ms, interval = 2000)
# NOTE: This does not mean acceleration is updated every interval ms
# Output: A function that can be passed into do_every to update the time and acceleration queues
def make_data_updater(interval):
def updater():
cur_time = time.ticks_ms()
time_queue.enqueue(cur_time)
accel_data = imu.accel()
accel_queue.enqueue((accel_data[0] ** 2 + accel_data[1]** 2 + accel_data[2] ** 2) ** 0.5)
while time.ticks_diff(cur_time, time_queue.peek()) > interval:
time_queue.dequeue()
accel_queue.dequeue()
return updater
# -----CALIBRATION PHASE-----
def calibration_fn():
uart.write("Calibration required: sys {} gyro {} accel {} mag {}".format(*imu.cal_status()))
if imu.calibrated():
uart.write("\n\nCalibration Done!")
flight_log.write("\nCALIBRATED!")
# bytearray(b'\xfa\xff\x00\x00\xe9\xffF\x04\x13\x01|\xff\xff\xff\x00\x00\x00\x00\xe8\x03\xec\x01')
raise StopIteration
# Check for calibration every 1000 ms
do_every([calibration_fn], [1000])
# The IMU is calibrated at this point
# The reference gravity must be calculated in setup for the rest of the program to use
# For stopping the executition of program during testing
#offset_arr = bytearray(b"\xfa\xff\x00\x00\xe9\xffF\x04\x13\x01|\xff\xff\xff\x00\x00\x00\x00\xe8\x03\xec\x01")
#imu.set_offsets(offset_arr)
flight_log.write("\nIMU is calibrated")
flight_log.write("\nTime (ms): " + str(time.ticks_ms()))
uart.write("\n\nIMU is calibrated")
# -----SETUP PHASE-----
GRAVITY = 9.8
queue_frequency = 5 # Hz
check_interval = 5000 # time between variance checks
# calculate time between samples in ms based on desired frequency
accel_sample_interval = 1000 / queue_frequency
max_size = calculate_max_size(queue_frequency, check_interval)
# Queue(maximum length of queue, initial threshold)
time_queue = Queue(max_size, None)
accel_queue = Queue(max_size, GRAVITY)
data_updater = make_data_updater(check_interval)
def find_reference_gravity(): # CHECK HERE FOR SOMEWHAT ARBITRARY VALUES
global GRAVITY
if time.ticks_diff(time.ticks_ms(), time_queue.peek()) < check_interval * 0.5:
return
stats = mean_variance(accel_queue)
if (stats[1] < 0.1 and stats[0] > 9.5 and stats[0] < 10.1):
# ARBITRARY: if variance less than .1 m/s^2, also check that mean is reasonable (g+-.3)
# May also want to check that queue has big enough number of elements
# Set reference gravity
flight_log.write("\nCalculated Gravity: " + str(stats))
uart.write("\nCalculated Gravity: " + str(stats))
GRAVITY = stats[0] * 1.25
# ARBITRARY: Multiply by 1.1 to give some room for error
raise StopIteration
do_every([data_updater, find_reference_gravity],[accel_sample_interval, check_interval])
# GRAVITY is set at this point
flight_log.write("\nGravity has been set: " + str(GRAVITY))
flight_log.write("\nTime (ms): " + str(time_queue.peek()))
uart.write("\n\nGravity has been calculated. Ready for flight.")
uart.write("\nGravity has been set: " + str(GRAVITY))
uart.write("\nTime (ms): " + str(time_queue.peek()))
# -----BEFORE FLIGHT PHASE-----
# Reset time and acceleration queues
queue_frequency = 50 # Hz
burn_time = 2970 * 0.85 # Multiply by .85 for some room for error
accel_sample_interval = 1000 / queue_frequency
# calculate time between samples in ms based on desired frequency
max_size = calculate_max_size(queue_frequency, burn_time)
time_queue = Queue(max_size, None)
accel_queue = Queue(max_size, GRAVITY)
def check_launch():
if time.ticks_diff(time.ticks_ms(), time_queue.peek()) < 0.5 * burn_time:
return # Dont check for launch if there's not enough data in the queue yet
uart.write("\n Launch Threshold Proportion: " + str(accel_queue.get_proportion_above_threshold()))
if accel_queue.get_proportion_above_threshold() > 0.95:
# ARBITRARY: If 95% of the data is above the gravity threshold, then check variance
mean_variance_calc = mean_variance(accel_queue)
uart.write("\n Variance of Threshold:"+str(mean_variance_calc[1]))
if mean_variance_calc[1] > 5: # ARBITRARY: If the variance is above 0.5 m/s^2, then launch is detected
raise StopIteration
data_updater = make_data_updater(burn_time)
# Function to update accelerations, keeping only last burn_time ms in the queue
check_launch_interval = 1000
# ARBITRARY: check for launch every check_launch_interval ms
do_every([write_to_imu_data, data_updater, check_launch], [imu_data_interval, accel_sample_interval, check_launch_interval])
# The rocket has launched at this point
flight_log.write("\nThe rocket has launched")
flight_log.write("\nTime (ms): " + str(time_queue.peek()))
uart.write("\n\nLaunch Detected\n")
uart.write("\nTime (ms): " + str(time_queue.peek()))
# -----IN FLIGHT PHASE-----
# Reset time and acceleration queues
queue_frequency = 25 # Hz
check_interval = 10000 # Want to keep track of last 10 seconds of acceleration data
# calculate time between samples in ms based on desired frequency
accel_sample_interval = 1000 / queue_frequency
max_size = calculate_max_size(queue_frequency, check_interval)
time_queue = Queue(max_size, None)
accel_queue = Queue(max_size, GRAVITY)
def check_landing():
if time.ticks_diff(time.ticks_ms(), time_queue.peek()) < 0.5 * check_interval:
return # Dont check for landing if there's not enough data in the queue yet
uart.write("\n Landing Threshold Proportion: " + str(accel_queue.get_proportion_above_threshold()))
if accel_queue.get_proportion_above_threshold() > 0.95:
# ARBITRARY: If no more than 5% of the acceleration data is above the gravity threshold, then check variance
uart.write("\n Variance of Threshold:" + str(mean_variance(accel_queue)[1]))
if mean_variance(accel_queue)[1] < 0.25:
# ARBITRARY: If variance is below 0.25 m/s^2, then landing is detected
raise StopIteration
data_updater = make_data_updater(check_interval)
check_landing_interval = 3000 # ARBITRARY: Check for landing every 3 seconds
do_every([write_to_imu_data, data_updater, check_landing], [imu_data_interval, accel_sample_interval, check_landing_interval])
flight_log.write("\nThe rocket has landed")
flight_log.write("\nTime (ms): " + str(time_queue.peek()))
uart.write("\n\nLanding Detected")
uart.write("\nTime (ms): " + str(time_queue.peek()))
# -----LANDED PHASE-----
# DO STUFF AFTER LANDING HERE
# Do stuff to make sure these close no matter what
servo = machine.PWM(machine.Pin(18))
servo.freq(50) # Set PWM frequency
# define stop,CCW and CW timing in ns
# Our servo has a max ns of 2000000, min ns 1000000, and stops at ns 1500000
servoStop = 1_500_000
servoCCW = 1_450_000
servoCW = 1_550_000
def is_not_vertical(): # Checks if rocket is level since servo freaks out otherwise
return -10 < imu.euler()[1] < 10
init_adjust_time = time.ticks_ms()
def adjust_servo_angle():
if time.ticks_diff(time.ticks_ms(), init_adjust_time) > 10000:
raise StopIteration
if is_not_vertical(): # check that rocket is not moving and level
if imu.euler()[2] > 3: # Leveling code
servo.duty_ns(servoCCW)
elif imu.euler()[2] < -3:
servo.duty_ns(servoCW)
else:
servo.duty_ns(servoStop) # Tell servo to stop if the BNO is level
print("Program running...")
servo.duty_ns(servoStop) # Servo is turned off initially
do_every([write_to_imu_data, adjust_servo_angle], [imu_data_interval, 10])
print("Done!")
servo.duty_ns(servoStop) # Servo is turned off initially
imu_data.close()
flight_log.close()
uart.write("\n\nProgram Terminated")