How To Use the Raspberry Pi Python SDK

Where to get the code

Pull down the newest Python SDK from the GitHub repo!

You can follow our Getting Started with Raspberry Pi instructions for more details.

Asynchronous

API and Shell (Pinging)

import os

import sys

sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))


import asyncio

from sphero_sdk import SpheroRvrAsync

from sphero_sdk import SerialAsyncDal

from sphero_sdk import SpheroRvrTargets



loop = asyncio.get_event_loop()


rvr = SpheroRvrAsync(

    dal=SerialAsyncDal(

        loop

    )

)



async def main():

    """ This program demonstrates how to use the echo command, which sends data to RVR and RVR returns

        the same data. Echo can be used to check to see if RVR is connected and awake.

    """


    await rvr.wake()


    # Give RVR time to wake up

    await asyncio.sleep(2)


    echo_response = await rvr.echo(

        data=[0, 1, 2],

        target=SpheroRvrTargets.primary.value

    )

    print('Echo response 1: ', echo_response)


    echo_response = await rvr.echo(

        data=[4, 5, 6],

        target=SpheroRvrTargets.secondary.value

    )

    print('Echo response 2: ', echo_response)


    await rvr.close()



if __name__ == '__main__':

    try:

        loop.run_until_complete(

            main()

        )


    except KeyboardInterrupt:

        print('\nProgram terminated with keyboard interrupt.')


        loop.run_until_complete(

            rvr.close()

        )


    finally:

        if loop.is_running():

            loop.close()


API and Shell (Pinging) with REST API

import os

import sys

sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))


import asyncio

from sphero_sdk import SpheroRvrAsync

from sphero_sdk import RestfulAsyncDal

from sphero_sdk import SpheroRvrTargets



loop = asyncio.get_event_loop()


rvr = SpheroRvrAsync(

    dal=RestfulAsyncDal(

        domain='10.211.2.21'# Add your raspberry-pi's IP address here

        port=2010  # The port opened by the npm server is always 2010

    )

)



async def main():

    """ This program demonstrates how to use the echo command, which sends data to RVR and has RVR returns

        the same data. Echo can be used to check to see if RVR is connected and awake.  In order to test it,

        a node.js server must be running on the raspberry-pi connected to RVR.  This code is meant to be

        executed from a separate computer.

    """


    await rvr.wake()


    # Give RVR time to wake up

    await asyncio.sleep(2)


    echo_response = await rvr.echo(

        data=[0, 1, 2],

        target=SpheroRvrTargets.primary.value

    )

    print('Echo response 1: ', echo_response)


    echo_response = await rvr.echo(

        data=[4, 5, 6],

        target=SpheroRvrTargets.secondary.value

    )

    print('Echo response 2: ', echo_response)


    await rvr.close()



if __name__ == '__main__':

    try:

        loop.run_until_complete(

            main()

        )


    except KeyboardInterrupt:

        print('\nProgram terminated with keyboard interrupt.')


        loop.run_until_complete(

            rvr.close()

        )


    finally:

        if loop.is_running():

            loop.close()


Color Sensor (Color Detection)

import os

import sys

sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))


import asyncio

from sphero_sdk import SpheroRvrAsync

from sphero_sdk import SerialAsyncDal

from sphero_sdk import RvrStreamingServices



loop = asyncio.get_event_loop()


rvr = SpheroRvrAsync(

    dal=SerialAsyncDal(

        loop

    )

)



async def color_detected_handler(color_detected_data):

    print('Color detection data response: ', color_detected_data)



async def main():

    """ This program demonstrates how to use the color sensor on RVR (located on the down side of RVR, facing the floor)

        to report colors detected. To exit program, press <CTRL-C>

    """


    await rvr.wake()


    # Give RVR time to wake up

    await asyncio.sleep(2)


    await rvr.enable_color_detection(is_enabled=True)

    await rvr.sensor_control.add_sensor_data_handler(

        service=RvrStreamingServices.color_detection,

        handler=color_detected_handler

    )

    await rvr.sensor_control.start(interval=250)


    while True:

        await asyncio.sleep(1)



if __name__ == '__main__':

    try:

        asyncio.ensure_future(

            main()

        )

        loop.run_forever()


    except KeyboardInterrupt:

        print('\nProgram terminated with keyboard interrupt.')


        loop.run_until_complete(

            asyncio.gather(

                rvr.enable_color_detection(is_enabled=False),

                rvr.close()

            )

        )


    finally:

        if loop.is_running():

            loop.close()


Color Sensor (Color Detection) with REST API

import os

import sys

sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))


import asyncio

from sphero_sdk import SpheroRvrAsync

from sphero_sdk import RestfulAsyncDal

from sphero_sdk import RvrStreamingServices



loop = asyncio.get_event_loop()


rvr = SpheroRvrAsync(

    dal=RestfulAsyncDal(

        domain='10.211.2.21'# Add your raspberry-pi's IP address here

        port=2010  # The port opened by the npm server is always 2010

    )

)



async def color_detected_handler(color_detected_data):

    print('Color detection data response: ', color_detected_data)



async def main():

    """ This program uses the color sensor on RVR (located on the down side of RVR, facing the floor) to report colors detected.

        To exit program, press <CTRL-C>

    """


    await rvr.wake()


    # Give RVR time to wake up

    await asyncio.sleep(2)


    await rvr.enable_color_detection(is_enabled=True)

    await rvr.sensor_control.add_sensor_data_handler(

        service=RvrStreamingServices.color_detection,

        handler=color_detected_handler

    )

    await rvr.sensor_control.start(interval=250)


    while True:

        await asyncio.sleep(1)



if __name__ == '__main__':

    try:

        asyncio.ensure_future(

            main()

        )

        loop.run_forever()


    except KeyboardInterrupt:

        print('\nProgram terminated with keyboard interrupt.')


        loop.run_until_complete(

            asyncio.gather(

                rvr.enable_color_detection(is_enabled=False),

                rvr.close()

            )

        )


    finally:

        if loop.is_running():

            loop.close()

Drive - Raw Motors

import os

import sys

sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))


import asyncio

from sphero_sdk import SpheroRvrAsync

from sphero_sdk import SerialAsyncDal

from sphero_sdk import RawMotorModesEnum



loop = asyncio.get_event_loop()


rvr = SpheroRvrAsync(

    dal=SerialAsyncDal(

        loop

    )

)



async def main():

    """ This program has RVR drive around in different directions.

    """


    await rvr.wake()


    # Give RVR time to wake up

    await asyncio.sleep(2)


    await rvr.reset_yaw()


    await rvr.raw_motors(

        left_mode=RawMotorModesEnum.forward.value,

        left_speed=128# Valid speed values are 0-255

        right_mode=RawMotorModesEnum.forward.value,

        right_speed=128  # Valid speed values are 0-255

    )


    # Delay to allow RVR to drive

    await asyncio.sleep(1)


    await rvr.raw_motors(

        left_mode=RawMotorModesEnum.reverse.value,

        left_speed=64# Valid speed values are 0-255

        right_mode=RawMotorModesEnum.reverse.value,

        right_speed=64  # Valid speed values are 0-255

    )


    # Delay to allow RVR to drive

    await asyncio.sleep(1)


    await rvr.raw_motors(

        left_mode=RawMotorModesEnum.reverse.value,

        left_speed=128# Valid speed values are 0-255

        right_mode=RawMotorModesEnum.forward.value,

        right_speed=128  # Valid speed values are 0-255

    )


    # Delay to allow RVR to drive

    await asyncio.sleep(1)


    await rvr.raw_motors(

        left_mode=RawMotorModesEnum.forward.value,

        left_speed=128# Valid speed values are 0-255

        right_mode=RawMotorModesEnum.forward.value,

        right_speed=128  # Valid speed values are 0-255

    )


    # Delay to allow RVR to drive

    await asyncio.sleep(1)


    await rvr.raw_motors(

        left_mode=RawMotorModesEnum.off.value,

        left_speed=0# Valid speed values are 0-255

        right_mode=RawMotorModesEnum.off.value,

        right_speed=0  # Valid speed values are 0-255

    )


    # Delay to allow RVR to drive

    await asyncio.sleep(1)


    await rvr.close()



if __name__ == '__main__':

    try:

        loop.run_until_complete(

            main()

        )


    except KeyboardInterrupt:

        print('\nProgram terminated with keyboard interrupt.')


        loop.run_until_complete(

            rvr.close()

        )


    finally:

        if loop.is_running():

            loop.close()


Drive with Heading

import os

import sys

sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))


import asyncio

from sphero_sdk import SpheroRvrAsync

from sphero_sdk import SerialAsyncDal

from sphero_sdk import DriveFlagsBitmask



loop = asyncio.get_event_loop()


rvr = SpheroRvrAsync(

    dal=SerialAsyncDal(

        loop

    )

)



async def main():

    """ This program has RVR drive around in different directions using the function drive_with_heading.

    """


    await rvr.wake()


    # Give RVR time to wake up

    await asyncio.sleep(2)


    await rvr.reset_yaw()


    await rvr.drive_with_heading(

        speed=128# Valid speed values are 0-255

        heading=0# Valid heading values are 0-359

        flags=DriveFlagsBitmask.none.value

    )


    # Delay to allow RVR to drive

    await asyncio.sleep(1)


    await rvr.drive_with_heading(

        speed=128# Valid speed values are 0-255

        heading=0# Valid heading values are 0-359

        flags=DriveFlagsBitmask.drive_reverse.value

    )


    # Delay to allow RVR to drive

    await asyncio.sleep(1)


    await rvr.drive_with_heading(

        speed=128# Valid speed values are 0-255

        heading=90# Valid heading values are 0-359

        flags=DriveFlagsBitmask.none.value

    )


    # Delay to allow RVR to drive

    await asyncio.sleep(1)


    await rvr.drive_with_heading(

        speed=128# Valid speed values are 0-255

        heading=270# Valid heading values are 0-359

        flags=DriveFlagsBitmask.none.value

    )


    # Delay to allow RVR to drive

    await asyncio.sleep(1)


    await rvr.drive_with_heading(

        speed=0# Valid heading values are 0-359

        heading=0# Valid heading values are 0-359

        flags=DriveFlagsBitmask.none.value

    )


    # Delay to allow RVR to drive

    await asyncio.sleep(1)


    await rvr.close()



if __name__ == '__main__':

    try:

        loop.run_until_complete(

            main()

        )


    except KeyboardInterrupt:

        print('\nProgram terminated with keyboard interrupt.')


        loop.run_until_complete(

            rvr.close()

        )


    finally:

        if loop.is_running():

            loop.close()


Drive with Heading - Reverse Mode

import os

import sys

sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))


import asyncio

from sphero_sdk import SpheroRvrAsync

from sphero_sdk import SerialAsyncDal

from sphero_sdk import DriveFlagsBitmask



loop = asyncio.get_event_loop()


rvr = SpheroRvrAsync(

    dal=SerialAsyncDal(

        loop

    )

)



async def main():

    """ This program has RVR drive around in different directions using the function drive_with_heading.

    """


    await rvr.wake()


    # Give RVR time to wake up

    await asyncio.sleep(2)


    await rvr.reset_yaw()


    await rvr.drive_with_heading(

        speed=128# Valid speed values are 0-255

        heading=90# Valid heading values are 0-359

        flags=DriveFlagsBitmask.drive_reverse.value

    )


    # Delay to allow RVR to drive

    await asyncio.sleep(1)


    await rvr.drive_with_heading(

        speed=0# Valid heading values are 0-359

        heading=0# Valid heading values are 0-359

        flags=DriveFlagsBitmask.none.value

    )


    # Delay to allow RVR to drive

    await asyncio.sleep(1)


    await rvr.close()



if __name__ == '__main__':

    try:

        loop.run_until_complete(

            main()

        )


    except KeyboardInterrupt:

        print('\nProgram terminated with keyboard interrupt.')


        loop.run_until_complete(

            rvr.close()

        )


    finally:

        if loop.is_running():

            loop.close()


Drive with Helper

import os

import sys

sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))


import asyncio

from sphero_sdk import SpheroRvrAsync

from sphero_sdk import SerialAsyncDal



loop = asyncio.get_event_loop()


rvr = SpheroRvrAsync(

    dal=SerialAsyncDal(

        loop

    )

)



async def main():

    """ This program has RVR drive with how to drive RVR using the drive control helper.

    """


    await rvr.wake()


    # Give RVR time to wake up

    await asyncio.sleep(2)


    await rvr.drive_control.reset_heading()


    await rvr.drive_control.drive_forward_seconds(

        speed=64,

        heading=0# Valid heading values are 0-359

        time_to_drive=1

    )


    # Delay to allow RVR to drive

    await asyncio.sleep(1)


    await rvr.drive_control.drive_backward_seconds(

        speed=64,

        heading=0# Valid heading values are 0-359

        time_to_drive=1

    )


    # Delay to allow RVR to drive

    await asyncio.sleep(1)


    await rvr.drive_control.turn_left_degrees(

        heading=0# Valid heading values are 0-359

        amount=90

    )


    # Delay to allow RVR to drive

    await asyncio.sleep(1)


    await rvr.close()



if __name__ == '__main__':

    try:

        loop.run_until_complete(

            main()

        )


    except KeyboardInterrupt:

        print('\nProgram terminated with keyboard interrupt.')


        loop.run_until_complete(

            rvr.close()

        )


    finally:

        if loop.is_running():

            loop.close()


Drive with Helper - Roll

import os

import sys

sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))


import asyncio

from sphero_sdk import SpheroRvrAsync

from sphero_sdk import SerialAsyncDal



loop = asyncio.get_event_loop()


rvr = SpheroRvrAsync(

    dal=SerialAsyncDal(

        loop

    )

)



async def main():

    """ This program has RVR drive with how to drive RVR using the drive control helper.

    """


    await rvr.wake()


    # Give RVR time to wake up

    await asyncio.sleep(2)


    await rvr.drive_control.reset_heading()


    await rvr.drive_control.roll_start(

        speed=64,

        heading=90

    )


    # Delay to allow RVR to drive

    await asyncio.sleep(1)


    await rvr.drive_control.roll_stop(heading=270)


    # Delay to allow RVR to drive

    await asyncio.sleep(1)


    await rvr.close()



if __name__ == '__main__':

    try:

        loop.run_until_complete(

            main()

        )


    except KeyboardInterrupt:

        print('\nProgram terminated with keyboard interrupt.')


        loop.run_until_complete(

            rvr.close()

        )


    finally:

        if loop.is_running():

            loop.close()


Drive with REST API

import os

import sys

sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))


import asyncio

from sphero_sdk import SpheroRvrAsync

from sphero_sdk import RestfulAsyncDal

from sphero_sdk import DriveFlagsBitmask



loop = asyncio.get_event_loop()


rvr = SpheroRvrAsync(

    dal=RestfulAsyncDal(

        domain='10.211.2.21'# Add your raspberry-pi's IP address here

        port=2010  # The port opened by the npm server is always 2010

    )

)



async def main():

    """ This program has RVR drive using the Rest API.  In order to test it, a node.js server must be running on the

        raspberry-pi connected to RVR.  This code is meant to be executed from a separate computer.


        Note:

            To give RVR time to drive, we call asyncio.sleep(...); if we did not have these calls, the program would

            go on and execute all the statements and exit without the driving ever taking place.

    """


    await rvr.wake()


    # Give RVR time to wake up

    await asyncio.sleep(2)


    await rvr.reset_yaw()


    await rvr.drive_with_heading(

        speed=128# Valid speed values are 0-255

        heading=0# Valid heading values are 0-359

        flags=DriveFlagsBitmask.none.value

    )


    # Delay to allow RVR to drive

    await asyncio.sleep(1)


    await rvr.drive_with_heading(

        speed=128# Valid speed values are 0-255

        heading=0# Valid heading values are 0-359

        flags=DriveFlagsBitmask.drive_reverse.value

    )


    # Delay to allow RVR to drive

    await asyncio.sleep(1)


    await rvr.drive_with_heading(

        speed=128# Valid speed values are 0-255

        heading=90# Valid heading values are 0-359

        flags=DriveFlagsBitmask.none.value

    )


    # Delay to allow RVR to drive

    await asyncio.sleep(1)


    await rvr.drive_with_heading(

        speed=128# Valid speed values are 0-255

        heading=270# Valid heading values are 0-359

        flags=DriveFlagsBitmask.none.value

    )


    # Delay to allow RVR to drive

    await asyncio.sleep(1)


    await rvr.drive_with_heading(

        speed=0# Valid heading values are 0-359

        heading=0# Valid heading values are 0-359

        flags=DriveFlagsBitmask.none.value

    )


    # Delay to allow RVR to drive

    await asyncio.sleep(1)


    await rvr.close()



if __name__ == '__main__':

    try:

        loop.run_until_complete(

            main()

        )


    except KeyboardInterrupt:

        print('\nProgram terminated with keyboard interrupt.')


        loop.run_until_complete(

            rvr.close()

        )


    finally:

        if loop.is_running():

            loop.close()

Infrared - Broadcast IR

import os

import sys

sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))


import asyncio

from sphero_sdk import SpheroRvrAsync

from sphero_sdk import SerialAsyncDal

from sphero_sdk import InfraredCodes

from sphero_sdk import RawMotorModesEnum



loop = asyncio.get_event_loop()


rvr = SpheroRvrAsync(

    dal=SerialAsyncDal(

        loop

    )

)



async def main():

    """ This program sets up RVR to communicate with another robot, e.g. BOLT, capable of infrared communication.

    """


    await rvr.wake()


    # Give RVR time to wake up

    await asyncio.sleep(2)


    await rvr.start_robot_to_robot_infrared_broadcasting(

        far_code=InfraredCodes.one.value,

        near_code=InfraredCodes.zero.value

    )


    for i in range(2):

        await rvr.raw_motors(

            left_mode=RawMotorModesEnum.forward.value,

            left_speed=64# Valid speed values are 0-255

            right_mode=RawMotorModesEnum.forward.value,

            right_speed=64  # Valid speed values are 0-255

        )


        # Delay to allow RVR to drive

        await asyncio.sleep(2)


    await rvr.stop_robot_to_robot_infrared_broadcasting()


    await rvr.close()



if __name__ == '__main__':

    try:

        loop.run_until_complete(

            main()

        )


    except KeyboardInterrupt:

        print('\nProgram terminated with keyboard interrupt.')


        loop.run_until_complete(

            asyncio.gather(

                rvr.stop_robot_to_robot_infrared_broadcasting(),

                rvr.close()

            )

        )


    finally:

        if loop.is_running():

            loop.close()


Infrared - Broadcast IR with Helper

import os

import sys

sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))


import asyncio

from sphero_sdk import SpheroRvrAsync

from sphero_sdk import SerialAsyncDal

from sphero_sdk import InfraredCodes

from sphero_sdk import RawMotorModesEnum



loop = asyncio.get_event_loop()


rvr = SpheroRvrAsync(

    dal=SerialAsyncDal(

        loop

    )

)



async def main():

    """ This program sets up RVR to communicate with another robot, e.g. BOLT, capable of infrared communication.

    """


    await rvr.wake()


    # Give RVR time to wake up

    await asyncio.sleep(2)


    await rvr.infrared_control.start_infrared_broadcasting(

        far_code=InfraredCodes.one,

        near_code=InfraredCodes.zero

    )


    await rvr.raw_motors(

        left_mode=RawMotorModesEnum.forward.value,

        left_speed=64# Valid speed values are 0-255

        right_mode=RawMotorModesEnum.forward.value,

        right_speed=64  # Valid speed values are 0-255

    )


    # Delay to allow RVR to drive

    await asyncio.sleep(4)


    await rvr.infrared_control.stop_infrared_broadcasting()


    await rvr.close()



if __name__ == '__main__':

    try:

        loop.run_until_complete(

            main()

        )


    except KeyboardInterrupt:

        print('\nProgram terminated with keyboard interrupt.')


        loop.run_until_complete(

            asyncio.gather(

                rvr.stop_robot_to_robot_infrared_broadcasting(),

                rvr.close()

            )

        )


    finally:

        if loop.is_running():

            loop.close()


Infrared - Listen For and Send IR

import os

import sys

sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))


import asyncio

from sphero_sdk import SpheroRvrAsync

from sphero_sdk import SerialAsyncDal



loop = asyncio.get_event_loop()


rvr = SpheroRvrAsync(

    dal=SerialAsyncDal(

        loop

    )

)



async def infrared_message_received_handler(infrared_message):

    print('Infrared message response: ', infrared_message)



async def main():

    """ This program sets up RVR to communicate with another robot, e.g. BOLT, capable of infrared communication.

    """


    await rvr.wake()


    # Give RVR time to wake up

    await asyncio.sleep(2)


    await rvr.on_robot_to_robot_infrared_message_received_notify(handler=infrared_message_received_handler)


    await rvr.enable_robot_infrared_message_notify(is_enabled=True)


    infrared_code = 3

    strength = 64


    while True:

        await rvr.send_infrared_message(

            infrared_code=infrared_code,

            front_strength=strength,

            left_strength=strength,

            right_strength=strength,

            rear_strength=strength

        )


        print('Infrared message sent with code: {0}'.format(infrared_code))


        await asyncio.sleep(2)



if __name__ == '__main__':

    try:

        asyncio.ensure_future(

            main()

        )

        loop.run_forever()


    except KeyboardInterrupt:

        print('\nProgram terminated with keyboard interrupt.')


        loop.run_until_complete(

            asyncio.gather(

                rvr.stop_robot_to_robot_infrared_broadcasting(),

                rvr.close()

            )

        )


    finally:

        if loop.is_running():

            loop.close()


Infrared - Listen For and Send IR with Helper

import os

import sys

sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))


import asyncio

from sphero_sdk import SpheroRvrAsync

from sphero_sdk import SerialAsyncDal

from sphero_sdk import InfraredCodes



loop = asyncio.get_event_loop()


rvr = SpheroRvrAsync(

    dal=SerialAsyncDal(

        loop

    )

)



async def infrared_message_received_handler(infrared_message):

    print('Infrared message response: ', infrared_message)



async def main():

    """ This program sets up RVR to communicate with another robot, e.g. BOLT, capable of infrared communication.

    """


    await rvr.wake()


    # Give RVR time to wake up

    await asyncio.sleep(2)


    await rvr.infrared_control.listen_for_infrared_message(handler=infrared_message_received_handler)


    codes = [

        InfraredCodes.zero,

        InfraredCodes.one,

        InfraredCodes.two,

        InfraredCodes.three

    ]


    while True:

        await rvr.infrared_control.send_infrared_messages(

            messages=codes,

            strength=64

        )


        print('Infrared message sent with codes: {0}'.format([code.value for code in codes]))


        await asyncio.sleep(2)



if __name__ == '__main__':

    try:

        loop.run_until_complete(

            main()

        )


    except KeyboardInterrupt:

        print('\nProgram terminated with keyboard interrupt.')


        loop.run_until_complete(

            asyncio.gather(

                rvr.stop_robot_to_robot_infrared_broadcasting(),

                rvr.close()

            )

        )


    finally:

        if loop.is_running():

            loop.close()


Infrared - Listen For and Send IR with REST API

import os

import sys

sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))


import asyncio

from sphero_sdk import SpheroRvrAsync

from sphero_sdk import RestfulAsyncDal



loop = asyncio.get_event_loop()


rvr = SpheroRvrAsync(

    dal=RestfulAsyncDal(

        domain='10.211.2.21'# Add your raspberry-pi's IP address here

        port=2010  # The port opened by the npm server is always 2010

    )

)



async def infrared_message_received_handler(response):

    print('Response data for IR message received:',response)



async def main():

    """ This program sets up RVR to communicate with another robot, e.g. BOLT, capable of infrared communication.


        To try this out, write a script for your other robot that a) broadcasts on the corresponding

        channel that RVR is set to listen to [in this case channel 0] and b) listens on the channel which

        RVR sends messages on [in this case channel 3]

    """


    await rvr.wake()


    # Give RVR time to wake up

    await asyncio.sleep(2)


    await rvr.on_robot_to_robot_infrared_message_received_notify(handler=infrared_message_received_handler)


    await rvr.enable_robot_infrared_message_notify(is_enabled=True)


    infrared_code = 3

    strength = 64


    while True:

        await rvr.send_infrared_message(

            infrared_code=infrared_code,

            front_strength=strength,

            left_strength=strength,

            right_strength=strength,

            rear_strength=strength

        )


        print('Infrared message sent with code: {0}'.format(infrared_code))


        await asyncio.sleep(2)



if __name__ == '__main__':

    try:

        asyncio.ensure_future(

            main()

        )

        loop.run_forever()


    except KeyboardInterrupt:

        print('\nProgram terminated with keyboard interrupt.')


        loop.run_until_complete(

            asyncio.gather(

                rvr.stop_robot_to_robot_infrared_broadcasting(),

                rvr.close()

            )

        )


    finally:

        if loop.is_running():

            loop.close()


LEDs - Set All LEDs

import os

import sys

sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))


import asyncio

from sphero_sdk import SpheroRvrAsync

from sphero_sdk import Colors

from sphero_sdk import RvrLedGroups

from sphero_sdk import SerialAsyncDal



loop = asyncio.get_event_loop()


rvr = SpheroRvrAsync(

    dal=SerialAsyncDal(

        loop

    )

)



async def main():

    """ This program demonstrates how to set the all the LEDs.

    """


    await rvr.wake()


    # Give RVR time to wake up

    await asyncio.sleep(2)


    await rvr.set_all_leds(

        led_group=RvrLedGroups.all_lights.value,

        led_brightness_values=[color for _ in range(10) for color in Colors.off.value]

    )


    # Delay to show LEDs change

    await asyncio.sleep(1)


    await rvr.set_all_leds(

        led_group=RvrLedGroups.all_lights.value,

        led_brightness_values=[color for x in range(10) for color in [0, 255, 0]]

    )


    # Delay to show LEDs change

    await asyncio.sleep(1)


    await rvr.close()



if __name__ == '__main__':

    try:

        loop.run_until_complete(

            main()

        )


    except KeyboardInterrupt:

        print('\nProgram terminated with keyboard interrupt.')


        loop.run_until_complete(

            rvr.close()

        )


    finally:

        if loop.is_running():

            loop.close()


LEDs - Set All LEDs with Helper

import os

import sys

sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))


import asyncio

from sphero_sdk import SpheroRvrAsync

from sphero_sdk import Colors

from sphero_sdk import SerialAsyncDal



loop = asyncio.get_event_loop()


rvr = SpheroRvrAsync(

    dal=SerialAsyncDal(

        loop

    )

)



async def main():

    """ This program demonstrates how to set the all the LEDs of RVR using the LED control helper.

    """


    await rvr.wake()


    # Give RVR time to wake up

    await asyncio.sleep(2)


    await rvr.led_control.turn_leds_off()


    # Delay to show LEDs change

    await asyncio.sleep(1)


    await rvr.led_control.set_all_leds_color(color=Colors.yellow)


    # Delay to show LEDs change

    await asyncio.sleep(1)


    await rvr.led_control.turn_leds_off()


    # Delay to show LEDs change

    await asyncio.sleep(1)


    await rvr.led_control.set_all_leds_rgb(red=255, green=144, blue=0)


    # Delay to show LEDs change

    await asyncio.sleep(1)


    await rvr.close()



if __name__ == '__main__':

    try:

        loop.run_until_complete(

            main()

        )


    except KeyboardInterrupt:

        print('\nProgram terminated with keyboard interrupt.')


        loop.run_until_complete(

            rvr.close()

        )


    finally:

        if loop.is_running():

            loop.close()


LEDs - Set Multiple LEDs

import os

import sys

sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))


import asyncio