-
Notifications
You must be signed in to change notification settings - Fork 0
/
kettler.py
75 lines (57 loc) · 2.16 KB
/
kettler.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
import serial, time
def roundto(x, base=5):
return int(base * round(float(x)/base))
def add(x,y):
return x + y
class Kettler():
def __init__(self, device=None):
self.device = device
self.pulse = 0
self.pulses = []
self.rpm = 0
self.rpms = []
self.power = 0
self.xpower = float(self.power)
self.serialport = serial.Serial(self.device, 9600, timeout=0.1)
if not self.device:
raise NameError, 'Serial port not specified'
if not self.serialport.isOpen():
raise IOError, 'Serial port could not be opened'
time.sleep(1)
self.serialport.write('RS\r\n')
time.sleep(5)
if not self.serialport.readline().startswith('ACK'):
raise IOError, 'Could not connect to bike'
self.serialport.write('CM\r\n')
time.sleep(1)
if not self.serialport.readline().startswith('ACK'):
raise IOError, 'Could not connect to bike'
def status(self):
self.serialport.write('ST\r\n')
parts = self.serialport.readline().rstrip('\r\n').split('\t')
try:
if len(self.pulses) > 4:
self.pulses.pop()
self.pulses.insert(0, int(parts[0], 10))
self.pulse = int(reduce(add, self.pulses)/len(self.pulses))
if len(self.rpms) > 4:
self.rpms.pop()
self.rpms.insert(0, int(parts[1], 10))
self.rpm = int(reduce(add, self.rpms)/len(self.rpms))
except:
pass
def set_power(self, absolute=0.0, relative=0.0):
if relative != 0.0:
self.xpower += relative
elif absolute > 0.0:
self.xpower = absolute
if roundto(self.power) != roundto(self.xpower):
self.power = roundto(self.xpower)
watts = self.power
if watts < 25:
watts = 25
self.power = watts
if watts > 400:
watts = 400
self.power = watts
self.serialport.write('PW ' + str(watts) + '\r\n')