How To Use the Raspberry Pi Python SDK
Where to get the code
Pull down the newest Python SDK from the GitHub repo!
You can follow our Getting Started with Raspberry Pi instructions for more details.
Asynchronous
API and Shell (Pinging)
import os
import sys
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))
import asyncio
from sphero_sdk import SpheroRvrAsync
from sphero_sdk import SerialAsyncDal
from sphero_sdk import SpheroRvrTargets
loop = asyncio.get_event_loop()
rvr = SpheroRvrAsync(
dal=SerialAsyncDal(
loop
)
)
async def main():
""" This program demonstrates how to use the echo command, which sends data to RVR and RVR returns
the same data. Echo can be used to check to see if RVR is connected and awake.
"""
await rvr.wake()
# Give RVR time to wake up
await asyncio.sleep(2)
echo_response = await rvr.echo(
data=[0, 1, 2],
target=SpheroRvrTargets.primary.value
)
print('Echo response 1: ', echo_response)
echo_response = await rvr.echo(
data=[4, 5, 6],
target=SpheroRvrTargets.secondary.value
)
print('Echo response 2: ', echo_response)
await rvr.close()
if __name__ == '__main__':
try:
loop.run_until_complete(
main()
)
except KeyboardInterrupt:
print('\nProgram terminated with keyboard interrupt.')
loop.run_until_complete(
rvr.close()
)
finally:
if loop.is_running():
loop.close()
API and Shell (Pinging) with REST API
import os
import sys
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))
import asyncio
from sphero_sdk import SpheroRvrAsync
from sphero_sdk import RestfulAsyncDal
from sphero_sdk import SpheroRvrTargets
loop = asyncio.get_event_loop()
rvr = SpheroRvrAsync(
dal=RestfulAsyncDal(
domain='10.211.2.21', # Add your raspberry-pi's IP address here
port=2010 # The port opened by the npm server is always 2010
)
)
async def main():
""" This program demonstrates how to use the echo command, which sends data to RVR and has RVR returns
the same data. Echo can be used to check to see if RVR is connected and awake. In order to test it,
a node.js server must be running on the raspberry-pi connected to RVR. This code is meant to be
executed from a separate computer.
"""
await rvr.wake()
# Give RVR time to wake up
await asyncio.sleep(2)
echo_response = await rvr.echo(
data=[0, 1, 2],
target=SpheroRvrTargets.primary.value
)
print('Echo response 1: ', echo_response)
echo_response = await rvr.echo(
data=[4, 5, 6],
target=SpheroRvrTargets.secondary.value
)
print('Echo response 2: ', echo_response)
await rvr.close()
if __name__ == '__main__':
try:
loop.run_until_complete(
main()
)
except KeyboardInterrupt:
print('\nProgram terminated with keyboard interrupt.')
loop.run_until_complete(
rvr.close()
)
finally:
if loop.is_running():
loop.close()
Color Sensor (Color Detection)
import os
import sys
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))
import asyncio
from sphero_sdk import SpheroRvrAsync
from sphero_sdk import SerialAsyncDal
from sphero_sdk import RvrStreamingServices
loop = asyncio.get_event_loop()
rvr = SpheroRvrAsync(
dal=SerialAsyncDal(
loop
)
)
async def color_detected_handler(color_detected_data):
print('Color detection data response: ', color_detected_data)
async def main():
""" This program demonstrates how to use the color sensor on RVR (located on the down side of RVR, facing the floor)
to report colors detected. To exit program, press <CTRL-C>
"""
await rvr.wake()
# Give RVR time to wake up
await asyncio.sleep(2)
await rvr.enable_color_detection(is_enabled=True)
await rvr.sensor_control.add_sensor_data_handler(
service=RvrStreamingServices.color_detection,
handler=color_detected_handler
)
await rvr.sensor_control.start(interval=250)
while True:
await asyncio.sleep(1)
if __name__ == '__main__':
try:
asyncio.ensure_future(
main()
)
loop.run_forever()
except KeyboardInterrupt:
print('\nProgram terminated with keyboard interrupt.')
loop.run_until_complete(
asyncio.gather(
rvr.enable_color_detection(is_enabled=False),
rvr.close()
)
)
finally:
if loop.is_running():
loop.close()
Color Sensor (Color Detection) with REST API
import os
import sys
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))
import asyncio
from sphero_sdk import SpheroRvrAsync
from sphero_sdk import RestfulAsyncDal
from sphero_sdk import RvrStreamingServices
loop = asyncio.get_event_loop()
rvr = SpheroRvrAsync(
dal=RestfulAsyncDal(
domain='10.211.2.21', # Add your raspberry-pi's IP address here
port=2010 # The port opened by the npm server is always 2010
)
)
async def color_detected_handler(color_detected_data):
print('Color detection data response: ', color_detected_data)
async def main():
""" This program uses the color sensor on RVR (located on the down side of RVR, facing the floor) to report colors detected.
To exit program, press <CTRL-C>
"""
await rvr.wake()
# Give RVR time to wake up
await asyncio.sleep(2)
await rvr.enable_color_detection(is_enabled=True)
await rvr.sensor_control.add_sensor_data_handler(
service=RvrStreamingServices.color_detection,
handler=color_detected_handler
)
await rvr.sensor_control.start(interval=250)
while True:
await asyncio.sleep(1)
if __name__ == '__main__':
try:
asyncio.ensure_future(
main()
)
loop.run_forever()
except KeyboardInterrupt:
print('\nProgram terminated with keyboard interrupt.')
loop.run_until_complete(
asyncio.gather(
rvr.enable_color_detection(is_enabled=False),
rvr.close()
)
)
finally:
if loop.is_running():
loop.close()
Drive - Raw Motors
import os
import sys
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))
import asyncio
from sphero_sdk import SpheroRvrAsync
from sphero_sdk import SerialAsyncDal
from sphero_sdk import RawMotorModesEnum
loop = asyncio.get_event_loop()
rvr = SpheroRvrAsync(
dal=SerialAsyncDal(
loop
)
)
async def main():
""" This program has RVR drive around in different directions.
"""
await rvr.wake()
# Give RVR time to wake up
await asyncio.sleep(2)
await rvr.reset_yaw()
await rvr.raw_motors(
left_mode=RawMotorModesEnum.forward.value,
left_speed=128, # Valid speed values are 0-255
right_mode=RawMotorModesEnum.forward.value,
right_speed=128 # Valid speed values are 0-255
)
# Delay to allow RVR to drive
await asyncio.sleep(1)
await rvr.raw_motors(
left_mode=RawMotorModesEnum.reverse.value,
left_speed=64, # Valid speed values are 0-255
right_mode=RawMotorModesEnum.reverse.value,
right_speed=64 # Valid speed values are 0-255
)
# Delay to allow RVR to drive
await asyncio.sleep(1)
await rvr.raw_motors(
left_mode=RawMotorModesEnum.reverse.value,
left_speed=128, # Valid speed values are 0-255
right_mode=RawMotorModesEnum.forward.value,
right_speed=128 # Valid speed values are 0-255
)
# Delay to allow RVR to drive
await asyncio.sleep(1)
await rvr.raw_motors(
left_mode=RawMotorModesEnum.forward.value,
left_speed=128, # Valid speed values are 0-255
right_mode=RawMotorModesEnum.forward.value,
right_speed=128 # Valid speed values are 0-255
)
# Delay to allow RVR to drive
await asyncio.sleep(1)
await rvr.raw_motors(
left_mode=RawMotorModesEnum.off.value,
left_speed=0, # Valid speed values are 0-255
right_mode=RawMotorModesEnum.off.value,
right_speed=0 # Valid speed values are 0-255
)
# Delay to allow RVR to drive
await asyncio.sleep(1)
await rvr.close()
if __name__ == '__main__':
try:
loop.run_until_complete(
main()
)
except KeyboardInterrupt:
print('\nProgram terminated with keyboard interrupt.')
loop.run_until_complete(
rvr.close()
)
finally:
if loop.is_running():
loop.close()
Drive with Heading
import os
import sys
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))
import asyncio
from sphero_sdk import SpheroRvrAsync
from sphero_sdk import SerialAsyncDal
from sphero_sdk import DriveFlagsBitmask
loop = asyncio.get_event_loop()
rvr = SpheroRvrAsync(
dal=SerialAsyncDal(
loop
)
)
async def main():
""" This program has RVR drive around in different directions using the function drive_with_heading.
"""
await rvr.wake()
# Give RVR time to wake up
await asyncio.sleep(2)
await rvr.reset_yaw()
await rvr.drive_with_heading(
speed=128, # Valid speed values are 0-255
heading=0, # Valid heading values are 0-359
flags=DriveFlagsBitmask.none.value
)
# Delay to allow RVR to drive
await asyncio.sleep(1)
await rvr.drive_with_heading(
speed=128, # Valid speed values are 0-255
heading=0, # Valid heading values are 0-359
flags=DriveFlagsBitmask.drive_reverse.value
)
# Delay to allow RVR to drive
await asyncio.sleep(1)
await rvr.drive_with_heading(
speed=128, # Valid speed values are 0-255
heading=90, # Valid heading values are 0-359
flags=DriveFlagsBitmask.none.value
)
# Delay to allow RVR to drive
await asyncio.sleep(1)
await rvr.drive_with_heading(
speed=128, # Valid speed values are 0-255
heading=270, # Valid heading values are 0-359
flags=DriveFlagsBitmask.none.value
)
# Delay to allow RVR to drive
await asyncio.sleep(1)
await rvr.drive_with_heading(
speed=0, # Valid heading values are 0-359
heading=0, # Valid heading values are 0-359
flags=DriveFlagsBitmask.none.value
)
# Delay to allow RVR to drive
await asyncio.sleep(1)
await rvr.close()
if __name__ == '__main__':
try:
loop.run_until_complete(
main()
)
except KeyboardInterrupt:
print('\nProgram terminated with keyboard interrupt.')
loop.run_until_complete(
rvr.close()
)
finally:
if loop.is_running():
loop.close()
Drive with Heading - Reverse Mode
import os
import sys
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))
import asyncio
from sphero_sdk import SpheroRvrAsync
from sphero_sdk import SerialAsyncDal
from sphero_sdk import DriveFlagsBitmask
loop = asyncio.get_event_loop()
rvr = SpheroRvrAsync(
dal=SerialAsyncDal(
loop
)
)
async def main():
""" This program has RVR drive around in different directions using the function drive_with_heading.
"""
await rvr.wake()
# Give RVR time to wake up
await asyncio.sleep(2)
await rvr.reset_yaw()
await rvr.drive_with_heading(
speed=128, # Valid speed values are 0-255
heading=90, # Valid heading values are 0-359
flags=DriveFlagsBitmask.drive_reverse.value
)
# Delay to allow RVR to drive
await asyncio.sleep(1)
await rvr.drive_with_heading(
speed=0, # Valid heading values are 0-359
heading=0, # Valid heading values are 0-359
flags=DriveFlagsBitmask.none.value
)
# Delay to allow RVR to drive
await asyncio.sleep(1)
await rvr.close()
if __name__ == '__main__':
try:
loop.run_until_complete(
main()
)
except KeyboardInterrupt:
print('\nProgram terminated with keyboard interrupt.')
loop.run_until_complete(
rvr.close()
)
finally:
if loop.is_running():
loop.close()
Drive with Helper
import os
import sys
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))
import asyncio
from sphero_sdk import SpheroRvrAsync
from sphero_sdk import SerialAsyncDal
loop = asyncio.get_event_loop()
rvr = SpheroRvrAsync(
dal=SerialAsyncDal(
loop
)
)
async def main():
""" This program has RVR drive with how to drive RVR using the drive control helper.
"""
await rvr.wake()
# Give RVR time to wake up
await asyncio.sleep(2)
await rvr.drive_control.reset_heading()
await rvr.drive_control.drive_forward_seconds(
speed=64,
heading=0, # Valid heading values are 0-359
time_to_drive=1
)
# Delay to allow RVR to drive
await asyncio.sleep(1)
await rvr.drive_control.drive_backward_seconds(
speed=64,
heading=0, # Valid heading values are 0-359
time_to_drive=1
)
# Delay to allow RVR to drive
await asyncio.sleep(1)
await rvr.drive_control.turn_left_degrees(
heading=0, # Valid heading values are 0-359
amount=90
)
# Delay to allow RVR to drive
await asyncio.sleep(1)
await rvr.close()
if __name__ == '__main__':
try:
loop.run_until_complete(
main()
)
except KeyboardInterrupt:
print('\nProgram terminated with keyboard interrupt.')
loop.run_until_complete(
rvr.close()
)
finally:
if loop.is_running():
loop.close()
Drive with Helper - Roll
import os
import sys
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))
import asyncio
from sphero_sdk import SpheroRvrAsync
from sphero_sdk import SerialAsyncDal
loop = asyncio.get_event_loop()
rvr = SpheroRvrAsync(
dal=SerialAsyncDal(
loop
)
)
async def main():
""" This program has RVR drive with how to drive RVR using the drive control helper.
"""
await rvr.wake()
# Give RVR time to wake up
await asyncio.sleep(2)
await rvr.drive_control.reset_heading()
await rvr.drive_control.roll_start(
speed=64,
heading=90
)
# Delay to allow RVR to drive
await asyncio.sleep(1)
await rvr.drive_control.roll_stop(heading=270)
# Delay to allow RVR to drive
await asyncio.sleep(1)
await rvr.close()
if __name__ == '__main__':
try:
loop.run_until_complete(
main()
)
except KeyboardInterrupt:
print('\nProgram terminated with keyboard interrupt.')
loop.run_until_complete(
rvr.close()
)
finally:
if loop.is_running():
loop.close()
Drive with REST API
import os
import sys
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))
import asyncio
from sphero_sdk import SpheroRvrAsync
from sphero_sdk import RestfulAsyncDal
from sphero_sdk import DriveFlagsBitmask
loop = asyncio.get_event_loop()
rvr = SpheroRvrAsync(
dal=RestfulAsyncDal(
domain='10.211.2.21', # Add your raspberry-pi's IP address here
port=2010 # The port opened by the npm server is always 2010
)
)
async def main():
""" This program has RVR drive using the Rest API. In order to test it, a node.js server must be running on the
raspberry-pi connected to RVR. This code is meant to be executed from a separate computer.
Note:
To give RVR time to drive, we call asyncio.sleep(...); if we did not have these calls, the program would
go on and execute all the statements and exit without the driving ever taking place.
"""
await rvr.wake()
# Give RVR time to wake up
await asyncio.sleep(2)
await rvr.reset_yaw()
await rvr.drive_with_heading(
speed=128, # Valid speed values are 0-255
heading=0, # Valid heading values are 0-359
flags=DriveFlagsBitmask.none.value
)
# Delay to allow RVR to drive
await asyncio.sleep(1)
await rvr.drive_with_heading(
speed=128, # Valid speed values are 0-255
heading=0, # Valid heading values are 0-359
flags=DriveFlagsBitmask.drive_reverse.value
)
# Delay to allow RVR to drive
await asyncio.sleep(1)
await rvr.drive_with_heading(
speed=128, # Valid speed values are 0-255
heading=90, # Valid heading values are 0-359
flags=DriveFlagsBitmask.none.value
)
# Delay to allow RVR to drive
await asyncio.sleep(1)
await rvr.drive_with_heading(
speed=128, # Valid speed values are 0-255
heading=270, # Valid heading values are 0-359
flags=DriveFlagsBitmask.none.value
)
# Delay to allow RVR to drive
await asyncio.sleep(1)
await rvr.drive_with_heading(
speed=0, # Valid heading values are 0-359
heading=0, # Valid heading values are 0-359
flags=DriveFlagsBitmask.none.value
)
# Delay to allow RVR to drive
await asyncio.sleep(1)
await rvr.close()
if __name__ == '__main__':
try:
loop.run_until_complete(
main()
)
except KeyboardInterrupt:
print('\nProgram terminated with keyboard interrupt.')
loop.run_until_complete(
rvr.close()
)
finally:
if loop.is_running():
loop.close()
Infrared - Broadcast IR
import os
import sys
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))
import asyncio
from sphero_sdk import SpheroRvrAsync
from sphero_sdk import SerialAsyncDal
from sphero_sdk import InfraredCodes
from sphero_sdk import RawMotorModesEnum
loop = asyncio.get_event_loop()
rvr = SpheroRvrAsync(
dal=SerialAsyncDal(
loop
)
)
async def main():
""" This program sets up RVR to communicate with another robot, e.g. BOLT, capable of infrared communication.
"""
await rvr.wake()
# Give RVR time to wake up
await asyncio.sleep(2)
await rvr.start_robot_to_robot_infrared_broadcasting(
far_code=InfraredCodes.one.value,
near_code=InfraredCodes.zero.value
)
for i in range(2):
await rvr.raw_motors(
left_mode=RawMotorModesEnum.forward.value,
left_speed=64, # Valid speed values are 0-255
right_mode=RawMotorModesEnum.forward.value,
right_speed=64 # Valid speed values are 0-255
)
# Delay to allow RVR to drive
await asyncio.sleep(2)
await rvr.stop_robot_to_robot_infrared_broadcasting()
await rvr.close()
if __name__ == '__main__':
try:
loop.run_until_complete(
main()
)
except KeyboardInterrupt:
print('\nProgram terminated with keyboard interrupt.')
loop.run_until_complete(
asyncio.gather(
rvr.stop_robot_to_robot_infrared_broadcasting(),
rvr.close()
)
)
finally:
if loop.is_running():
loop.close()
Infrared - Broadcast IR with Helper
import os
import sys
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))
import asyncio
from sphero_sdk import SpheroRvrAsync
from sphero_sdk import SerialAsyncDal
from sphero_sdk import InfraredCodes
from sphero_sdk import RawMotorModesEnum
loop = asyncio.get_event_loop()
rvr = SpheroRvrAsync(
dal=SerialAsyncDal(
loop
)
)
async def main():
""" This program sets up RVR to communicate with another robot, e.g. BOLT, capable of infrared communication.
"""
await rvr.wake()
# Give RVR time to wake up
await asyncio.sleep(2)
await rvr.infrared_control.start_infrared_broadcasting(
far_code=InfraredCodes.one,
near_code=InfraredCodes.zero
)
await rvr.raw_motors(
left_mode=RawMotorModesEnum.forward.value,
left_speed=64, # Valid speed values are 0-255
right_mode=RawMotorModesEnum.forward.value,
right_speed=64 # Valid speed values are 0-255
)
# Delay to allow RVR to drive
await asyncio.sleep(4)
await rvr.infrared_control.stop_infrared_broadcasting()
await rvr.close()
if __name__ == '__main__':
try:
loop.run_until_complete(
main()
)
except KeyboardInterrupt:
print('\nProgram terminated with keyboard interrupt.')
loop.run_until_complete(
asyncio.gather(
rvr.stop_robot_to_robot_infrared_broadcasting(),
rvr.close()
)
)
finally:
if loop.is_running():
loop.close()
Infrared - Listen For and Send IR
import os
import sys
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))
import asyncio
from sphero_sdk import SpheroRvrAsync
from sphero_sdk import SerialAsyncDal
loop = asyncio.get_event_loop()
rvr = SpheroRvrAsync(
dal=SerialAsyncDal(
loop
)
)
async def infrared_message_received_handler(infrared_message):
print('Infrared message response: ', infrared_message)
async def main():
""" This program sets up RVR to communicate with another robot, e.g. BOLT, capable of infrared communication.
"""
await rvr.wake()
# Give RVR time to wake up
await asyncio.sleep(2)
await rvr.on_robot_to_robot_infrared_message_received_notify(handler=infrared_message_received_handler)
await rvr.enable_robot_infrared_message_notify(is_enabled=True)
infrared_code = 3
strength = 64
while True:
await rvr.send_infrared_message(
infrared_code=infrared_code,
front_strength=strength,
left_strength=strength,
right_strength=strength,
rear_strength=strength
)
print('Infrared message sent with code: {0}'.format(infrared_code))
await asyncio.sleep(2)
if __name__ == '__main__':
try:
asyncio.ensure_future(
main()
)
loop.run_forever()
except KeyboardInterrupt:
print('\nProgram terminated with keyboard interrupt.')
loop.run_until_complete(
asyncio.gather(
rvr.stop_robot_to_robot_infrared_broadcasting(),
rvr.close()
)
)
finally:
if loop.is_running():
loop.close()
Infrared - Listen For and Send IR with Helper
import os
import sys
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))
import asyncio
from sphero_sdk import SpheroRvrAsync
from sphero_sdk import SerialAsyncDal
from sphero_sdk import InfraredCodes
loop = asyncio.get_event_loop()
rvr = SpheroRvrAsync(
dal=SerialAsyncDal(
loop
)
)
async def infrared_message_received_handler(infrared_message):
print('Infrared message response: ', infrared_message)
async def main():
""" This program sets up RVR to communicate with another robot, e.g. BOLT, capable of infrared communication.
"""
await rvr.wake()
# Give RVR time to wake up
await asyncio.sleep(2)
await rvr.infrared_control.listen_for_infrared_message(handler=infrared_message_received_handler)
codes = [
InfraredCodes.zero,
InfraredCodes.one,
InfraredCodes.two,
InfraredCodes.three
]
while True:
await rvr.infrared_control.send_infrared_messages(
messages=codes,
strength=64
)
print('Infrared message sent with codes: {0}'.format([code.value for code in codes]))
await asyncio.sleep(2)
if __name__ == '__main__':
try:
loop.run_until_complete(
main()
)
except KeyboardInterrupt:
print('\nProgram terminated with keyboard interrupt.')
loop.run_until_complete(
asyncio.gather(
rvr.stop_robot_to_robot_infrared_broadcasting(),
rvr.close()
)
)
finally:
if loop.is_running():
loop.close()
Infrared - Listen For and Send IR with REST API
import os
import sys
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))
import asyncio
from sphero_sdk import SpheroRvrAsync
from sphero_sdk import RestfulAsyncDal
loop = asyncio.get_event_loop()
rvr = SpheroRvrAsync(
dal=RestfulAsyncDal(
domain='10.211.2.21', # Add your raspberry-pi's IP address here
port=2010 # The port opened by the npm server is always 2010
)
)
async def infrared_message_received_handler(response):
print('Response data for IR message received:',response)
async def main():
""" This program sets up RVR to communicate with another robot, e.g. BOLT, capable of infrared communication.
To try this out, write a script for your other robot that a) broadcasts on the corresponding
channel that RVR is set to listen to [in this case channel 0] and b) listens on the channel which
RVR sends messages on [in this case channel 3]
"""
await rvr.wake()
# Give RVR time to wake up
await asyncio.sleep(2)
await rvr.on_robot_to_robot_infrared_message_received_notify(handler=infrared_message_received_handler)
await rvr.enable_robot_infrared_message_notify(is_enabled=True)
infrared_code = 3
strength = 64
while True:
await rvr.send_infrared_message(
infrared_code=infrared_code,
front_strength=strength,
left_strength=strength,
right_strength=strength,
rear_strength=strength
)
print('Infrared message sent with code: {0}'.format(infrared_code))
await asyncio.sleep(2)
if __name__ == '__main__':
try:
asyncio.ensure_future(
main()
)
loop.run_forever()
except KeyboardInterrupt:
print('\nProgram terminated with keyboard interrupt.')
loop.run_until_complete(
asyncio.gather(
rvr.stop_robot_to_robot_infrared_broadcasting(),
rvr.close()
)
)
finally:
if loop.is_running():
loop.close()
LEDs - Set All LEDs
import os
import sys
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))
import asyncio
from sphero_sdk import SpheroRvrAsync
from sphero_sdk import Colors
from sphero_sdk import RvrLedGroups
from sphero_sdk import SerialAsyncDal
loop = asyncio.get_event_loop()
rvr = SpheroRvrAsync(
dal=SerialAsyncDal(
loop
)
)
async def main():
""" This program demonstrates how to set the all the LEDs.
"""
await rvr.wake()
# Give RVR time to wake up
await asyncio.sleep(2)
await rvr.set_all_leds(
led_group=RvrLedGroups.all_lights.value,
led_brightness_values=[color for _ in range(10) for color in Colors.off.value]
)
# Delay to show LEDs change
await asyncio.sleep(1)
await rvr.set_all_leds(
led_group=RvrLedGroups.all_lights.value,
led_brightness_values=[color for x in range(10) for color in [0, 255, 0]]
)
# Delay to show LEDs change
await asyncio.sleep(1)
await rvr.close()
if __name__ == '__main__':
try:
loop.run_until_complete(
main()
)
except KeyboardInterrupt:
print('\nProgram terminated with keyboard interrupt.')
loop.run_until_complete(
rvr.close()
)
finally:
if loop.is_running():
loop.close()
LEDs - Set All LEDs with Helper
import os
import sys
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))
import asyncio
from sphero_sdk import SpheroRvrAsync
from sphero_sdk import Colors
from sphero_sdk import SerialAsyncDal
loop = asyncio.get_event_loop()
rvr = SpheroRvrAsync(
dal=SerialAsyncDal(
loop
)
)
async def main():
""" This program demonstrates how to set the all the LEDs of RVR using the LED control helper.
"""
await rvr.wake()
# Give RVR time to wake up
await asyncio.sleep(2)
await rvr.led_control.turn_leds_off()
# Delay to show LEDs change
await asyncio.sleep(1)
await rvr.led_control.set_all_leds_color(color=Colors.yellow)
# Delay to show LEDs change
await asyncio.sleep(1)
await rvr.led_control.turn_leds_off()
# Delay to show LEDs change
await asyncio.sleep(1)
await rvr.led_control.set_all_leds_rgb(red=255, green=144, blue=0)
# Delay to show LEDs change
await asyncio.sleep(1)
await rvr.close()
if __name__ == '__main__':
try:
loop.run_until_complete(
main()
)
except KeyboardInterrupt:
print('\nProgram terminated with keyboard interrupt.')
loop.run_until_complete(
rvr.close()
)
finally:
if loop.is_running():
loop.close()
LEDs - Set Multiple LEDs
import os
import sys
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))
import asyncio