Examples

Code examples can be found in the examples directory on GitHub

Also take a look at the unit tests in the tests directory.

Get all calling users

Source: calling_users.py

 1#!/usr/bin/env python
 2"""
 3Example script
 4Get all calling users within the org
 5"""
 6
 7from dotenv import load_dotenv
 8
 9from wxc_sdk import WebexSimpleApi
10
11load_dotenv()
12
13api = WebexSimpleApi()
14
15# using wxc_sdk.people.PeopleApi.list to iterate over persons
16# Parameter calling_data needs to be set to true to gat calling specific information
17# calling users have the attribute location_id set
18calling_users = [user for user in api.people.list(calling_data=True)
19                 if user.location_id]
20print(f'{len(calling_users)} users:')
21print('\n'.join(user.display_name for user in calling_users))

Get all calling users (async variant)

Source: calling_users_async.py

 1#!/usr/bin/env python
 2"""
 3Example script
 4Get all calling users within the org using the (experimental) async API
 5"""
 6import asyncio
 7import time
 8
 9from dotenv import load_dotenv
10
11from wxc_sdk.as_api import AsWebexSimpleApi
12
13load_dotenv()
14
15
16async def get_calling_users():
17    """
18    Get details of all calling enabled users by:
19    1) getting all calling users
20    2) collecting all users that have a calling license
21    3) getting details for all users
22    """
23    async with AsWebexSimpleApi(concurrent_requests=40) as api:
24        print('Collecting calling licenses')
25        calling_license_ids = set(lic.license_id for lic in await api.licenses.list()
26                                  if lic.webex_calling)
27
28        # get users with a calling license
29        calling_users = [user async for user in api.people.list_gen()
30                         if any(lic_id in calling_license_ids for lic_id in user.licenses)]
31        print(f'{len(calling_users)} users:')
32        print('\n'.join(user.display_name for user in calling_users))
33
34        # get details for all users
35        start = time.perf_counter()
36        details = await asyncio.gather(*[api.people.details(person_id=user.person_id, calling_data=True)
37                                         for user in calling_users])
38        expired = time.perf_counter() - start
39        print(f'Got details for {len(details)} users in {expired * 1000:.3f} ms')
40
41
42asyncio.run(get_calling_users())

Default call forwarding settings for all users

This example start with the list of all calling users and then calls wxc_sdk.person_settings.forwarding.PersonForwardingApi.configure() for each user. To speed up things a ThreadPoolExecutor is used and all update operations are scheduled for execution by the thread pool.

Source: reset_call_forwarding.py

 1#!/usr/bin/env python
 2"""
 3Example script
 4Reset call forwarding to default for all users in the org
 5"""
 6
 7import logging
 8import time
 9from concurrent.futures import ThreadPoolExecutor
10
11from dotenv import load_dotenv
12
13from wxc_sdk import WebexSimpleApi
14from wxc_sdk.all_types import PersonForwardingSetting
15
16load_dotenv()
17
18logging.basicConfig(level=logging.INFO)
19
20# set to DEBUG to see the actual requests
21logging.getLogger('wxc_sdk.rest').setLevel(logging.INFO)
22
23api = WebexSimpleApi()
24
25# get all calling users
26start = time.perf_counter_ns()
27calling_users = [user for user in api.people.list(calling_data=True)
28                 if user.location_id]
29print(f'Got {len(calling_users)} calling users in '
30      f'{(time.perf_counter_ns() - start) / 1e6:.3f} ms')
31
32# set call forwarding to default for all users
33with ThreadPoolExecutor() as pool:
34    # default call forwarding settings
35    forwarding = PersonForwardingSetting.default()
36
37    # schedule update for each user and wait for completion
38    start = time.perf_counter_ns()
39    list(pool.map(
40        lambda user: api.person_settings.forwarding.configure(person_id=user.person_id,
41                                                              forwarding=forwarding),
42        calling_users))
43    print(f'Reset call forwarding to default for {len(calling_users)} users in '
44          f'{(time.perf_counter_ns() - start) / 1e6:.3f} ms')

Modify number of rings configuration for users read from CSV

Here we read a bunch of user email addresses from a CSV. The CSV as an ERROR column and we only want to consider users without errors.

For all these users a VM setting is updated and the results are written to a CSV for further processing.

Source: modify_voicemail.py

 1#!/usr/bin/env python
 2"""
 3Example Script
 4Modifying number of rings configuration in voicemail settings
 5Run -> python3 modify_voicemail.py modify_voicemail.csv
 6"""
 7import csv
 8import sys
 9import traceback
10from concurrent.futures import ThreadPoolExecutor
11
12from dotenv import load_dotenv
13
14from wxc_sdk import WebexSimpleApi
15from wxc_sdk.all_types import *
16
17VOICEMAIL_SETTINGS_NUMBER_OF_RINGS = 6
18
19# loading environment variables - use .env file for development
20load_dotenv()
21
22
23def update_vm_settings():
24    """
25    actually update VM settings for all users present in input CSV
26    """
27    api = WebexSimpleApi()
28    final_report = []
29    mail_ids = []
30    # using wxc_sdk.people.PeopleApi.list to iterate over persons
31    # Parameter calling_data needs to be set to true to gat calling specific information
32    # calling users have the attribute location_id set
33    calling_users = [user for user in api.people.list(calling_data=True)
34                     if user.location_id]
35    print(f'{len(calling_users)} users:')
36    print('\n'.join(user.display_name for user in calling_users))
37
38    # get CSV file name from command line
39    with open(str(sys.argv[1]), 'r') as csv_file:
40        reader = csv.DictReader(csv_file)
41        # read all records from CSV. Ony consider records w/o error
42        for col in reader:
43            if not col['ERROR']:
44                # collect email address from USERNAME column for further processing
45                mail_ids.append(col['USERNAME'])
46            else:
47                final_report.append((col['USERNAME'], 'FAILED DUE TO ERROR REASON IN INPUT FILE'))
48
49    # work on calling users that have an email address that we read from the CSV
50    filteredUsers = [d for d in calling_users if d.emails[0] in mail_ids]
51
52    print("\nCalling Users in CI  - Count ", len(calling_users))
53    print("Mail IDs from input file after removing error columns - Count ", len(mail_ids))
54    print("FilteredUsers Users -  Count", len(filteredUsers))
55
56    def set_number_of_rings(user: Person):
57        """
58        Read VM config for a user
59        :param user: user to update
60        """
61        try:
62            # shortcut
63            vm = api.person_settings.voicemail
64
65            # Read Current configuration
66            vm_settings = vm.read(person_id=user.person_id)
67            print(f'\n Existing Configuration: {vm_settings} ')
68
69            # Modify number of rings value
70            vm_settings.send_unanswered_calls.number_of_rings = VOICEMAIL_SETTINGS_NUMBER_OF_RINGS
71            vm.configure(person_id=user.person_id, settings=vm_settings)
72            # Read configuration after changes
73            vm_settings = vm.read(person_id=user.person_id)
74            print(f'\n New Configuration: {vm_settings} ')
75            final_report.append((user.display_name, 'SUCCESS'))
76        except Exception as e:
77            final_report.append((user.display_name, 'FAILURE'))
78            print("type error: " + str(e))
79            print(traceback.format_exc())
80        return
81
82    # Modify settings for the filtered users
83    with ThreadPoolExecutor() as pool:
84        list(pool.map(lambda user: set_number_of_rings(user),
85                      filteredUsers))
86
87    print(final_report)
88    with open('output.csv', 'w') as f:
89        write = csv.writer(f)
90        write.writerow(["USERNAME", "STATUS"])
91        write.writerows(final_report)
92
93
94if __name__ == '__main__':
95    update_vm_settings()

Holiday schedule w/ US national holidays for all US locations

This example uses the Calendarific API at https://calendarific.com/ to get a list of national US holidays and creates a “National Holidays” holiday schedule for all US locations with all these national holidays.

A rudimentary API implementation in calendarific.py is used for the requests to https://calendarific.com/. Calendarific APIs require all requests to be authenticated using an API key. You can sign up for a free account to get a free API account key which then is read from environment variable CALENDARIFIC_KEY.

Source: us_holidays.py

  1#!/usr/bin/env python
  2"""
  3Example script
  4Create a holiday schedule for all US locations with all national holidays
  5"""
  6
  7import logging
  8from collections import defaultdict
  9from concurrent.futures import ThreadPoolExecutor
 10from datetime import date
 11from threading import Lock
 12from typing import List
 13
 14from dotenv import load_dotenv
 15
 16from calendarific import CalendarifiyApi, Holiday
 17from wxc_sdk import WebexSimpleApi
 18from wxc_sdk.locations import Location
 19from wxc_sdk.all_types import ScheduleType, Event, Schedule
 20
 21log = logging.getLogger(__name__)
 22
 23# a lock per location to protect observe_in_location()
 24location_locks: dict[str, Lock] = defaultdict(Lock)
 25
 26# Use parallel threads for provisioning?
 27USE_THREADING = True
 28
 29# True: delete holiday schedule instead of creating one
 30CLEAN_UP = False
 31
 32# first and last year for which to create public holiday events
 33FIRST_YEAR = 2022
 34LAST_YEAR = 2022
 35
 36LAST_YEAR = not CLEAN_UP and LAST_YEAR or FIRST_YEAR
 37
 38
 39def observe_in_location(*, api: WebexSimpleApi, location: Location, holidays: List[Holiday]):
 40    """
 41    create/update a "National Holiday" schedule in one location
 42
 43    :param api: Webex api
 44    :type api: WebexSimpleApi
 45    :param location: location to work on
 46    :type location: Location
 47    :param holidays: list of holidays to observe
 48    :type holidays: List[Holiday]
 49    """
 50    # there should always only one thread messing with the holiday schedule of a location
 51    with location_locks[location.location_id]:
 52        year = holidays[0].date.year
 53        schedule_name = 'National Holidays'
 54
 55        # shortcut
 56        ats = api.telephony.schedules
 57
 58        # existing "National Holiday" schedule or None
 59        schedule = next((schedule
 60                         for schedule in ats.list(obj_id=location.location_id,
 61                                                  schedule_type=ScheduleType.holidays,
 62                                                  name=schedule_name)
 63                         if schedule.name == schedule_name),
 64                        None)
 65        if CLEAN_UP:
 66            if schedule:
 67                log.info(f'Delete schedule {schedule.name} in location {schedule.location_name}')
 68                ats.delete_schedule(obj_id=location.location_id,
 69                                    schedule_type=ScheduleType.holidays,
 70                                    schedule_id=schedule.schedule_id)
 71            return
 72        if schedule:
 73            # we need the details: list response doesn't have events
 74            schedule = ats.details(obj_id=location.location_id,
 75                                   schedule_type=ScheduleType.holidays,
 76                                   schedule_id=schedule.schedule_id)
 77        # create list of desired schedule entries
 78        #   * one per holiday
 79        #   * only future holidays
 80        #   * not on a Sunday
 81        today = date.today()
 82        events = [Event(name=f'{holiday.name} {holiday.date.year}',
 83                        start_date=holiday.date,
 84                        end_date=holiday.date,
 85                        all_day_enabled=True)
 86                  for holiday in holidays
 87                  if holiday.date >= today and holiday.date.weekday() != 6]
 88
 89        if not schedule:
 90            # create new schedule
 91            log.debug(f'observe_in_location({location.name}, {year}): no existing schedule')
 92            if not events:
 93                log.info(f'observe_in_location({location.name}, {year}): no existing schedule, no events, done')
 94                return
 95            schedule = Schedule(name=schedule_name,
 96                                schedule_type=ScheduleType.holidays,
 97                                events=events)
 98            log.debug(
 99                f'observe_in_location({location.name}, {year}): creating schedule "{schedule_name}" with {len(events)} '
100                f'events')
101            schedule_id = ats.create(obj_id=location.location_id, schedule=schedule)
102            log.info(f'observe_in_location({location.name}, {year}): new schedule id: {schedule_id}, done')
103            return
104
105        # update existing schedule
106        with ThreadPoolExecutor() as pool:
107            # delete existing events in the past
108            to_delete = [event
109                         for event in schedule.events
110                         if event.start_date < today]
111            if to_delete:
112                log.debug(f'observe_in_location({location.name}, {year}): deleting {len(to_delete)} outdated events')
113                if USE_THREADING:
114                    list(pool.map(
115                        lambda event: ats.event_delete(obj_id=location.location_id,
116                                                       schedule_type=ScheduleType.holidays,
117                                                       event_id=event.event_id),
118                        to_delete))
119                else:
120                    for event in to_delete:
121                        ats.event_delete(obj_id=location.location_id,
122                                         schedule_type=ScheduleType.holidays,
123                                         event_id=event.event_id)
124
125            # add events which don't exist yet
126            existing_dates = set(event.start_date
127                                 for event in schedule.events)
128            to_add = [event
129                      for event in events
130                      if event.start_date not in existing_dates]
131            if not to_add:
132                log.info(f'observe_in_location({location.name}, {year}): no events to add, done.')
133                return
134            log.debug(f'observe_in_location({location.name}, {year}): creating {len(to_add)} new events.')
135            if USE_THREADING:
136                list(pool.map(
137                    lambda event: ats.event_create(
138                        obj_id=location.location_id,
139                        schedule_type=ScheduleType.holidays,
140                        schedule_id=schedule.schedule_id,
141                        event=event),
142                    to_add))
143            else:
144                for event in to_add:
145                    ats.event_create(
146                        obj_id=location.location_id,
147                        schedule_type=ScheduleType.holidays,
148                        schedule_id=schedule.schedule_id,
149                        event=event)
150        log.info(f'observe_in_location({location.name}, {year}): done.')
151    return
152
153
154def observe_national_holidays(*, api: WebexSimpleApi, locations: List[Location],
155                              year: int = None):
156    """
157    US national holidays for given locations
158
159    :param api: Webex api
160    :type api: WebexSimpleApi
161    :param locations: list of locations in which US national holidays should be observed
162    :type locations: List[Location]
163    :param year: year for national holidays. Default: current year
164    :type year: int
165    """
166    # default: this year
167    year = year or date.today().year
168
169    # get national holidays for specified year
170    holidays = CalendarifiyApi().holidays(country='US', year=year, holiday_type='national')
171
172    # update holiday schedule for each location
173    with ThreadPoolExecutor() as pool:
174        if USE_THREADING:
175            list(pool.map(
176                lambda location: observe_in_location(api=api, location=location, holidays=holidays),
177                locations))
178        else:
179            for location in locations:
180                observe_in_location(api=api, location=location, holidays=holidays)
181    return
182
183
184if __name__ == '__main__':
185    # read dotenv which has some environment variables like Webex API token and Calendarify
186    # API key.
187    load_dotenv()
188
189    # enable logging
190    logging.basicConfig(level=logging.DEBUG,
191                        format='%(asctime)s %(levelname)s %(threadName)s %(name)s: %(message)s')
192    logging.getLogger('urllib3').setLevel(logging.INFO)
193    logging.getLogger('wxc_sdk.rest').setLevel(logging.INFO)
194
195    # the actual action
196    with WebexSimpleApi(concurrent_requests=5) as wx_api:
197        # get all US locations
198        log.info('Getting locations...')
199        us_locations = [location
200                        for location in wx_api.locations.list()
201                        if location.address.country == 'US']
202
203        # create national holiday schedule for given year(s) and locations
204        start_year = 2022
205        last_year = 2023
206        if USE_THREADING:
207            with ThreadPoolExecutor() as pool:
208                list(pool.map(
209                    lambda year: observe_national_holidays(api=wx_api, year=year, locations=us_locations),
210                    range(FIRST_YEAR, LAST_YEAR + 1)))
211        else:
212            for year in range(FIRST_YEAR, LAST_YEAR + 1):
213                observe_national_holidays(api=wx_api, year=year, locations=us_locations)

Holiday schedule w/ US national holidays for all US locations (async variant)

This example uses the Calendarific API at https://calendarific.com/ to get a list of national US holidays and creates a “National Holidays” holiday schedule for all US locations with all these national holidays.

A rudimentary API implementation in calendarific.py is used for the requests to https://calendarific.com/. Calendarific APIs require all requests to be authenticated using an API key. You can sign up for a free account to get a free API account key which then is read from environment variable CALENDARIFIC_KEY.

Source: us_holidays_async.py

  1#!/usr/bin/env python
  2"""
  3Example script
  4Create a holiday schedule for all US locations with all national holidays
  5
  6Using the asyc SDK variant
  7"""
  8import asyncio
  9import functools
 10import logging
 11from collections import defaultdict
 12from datetime import date
 13from typing import List
 14
 15from dotenv import load_dotenv
 16
 17from calendarific import CalendarifiyApi, Holiday
 18from wxc_sdk.all_types import ScheduleType, Event, Schedule
 19from wxc_sdk.as_api import AsWebexSimpleApi
 20from wxc_sdk.locations import Location
 21
 22log = logging.getLogger(__name__)
 23
 24# a lock per location to protect observe_in_location()
 25location_locks: dict[str, asyncio.Lock] = defaultdict(asyncio.Lock)
 26
 27# Use parallel tasks for provisioning?
 28USE_TASKS = True
 29
 30# True: delete holiday schedule instead of creating one
 31CLEAN_UP = False
 32
 33# first and last year for which to create public holiday events
 34FIRST_YEAR = 2022
 35LAST_YEAR = 2023
 36
 37LAST_YEAR = not CLEAN_UP and LAST_YEAR or FIRST_YEAR
 38
 39
 40async def observe_in_location(*, api: AsWebexSimpleApi, location: Location, holidays: List[Holiday]):
 41    """
 42    create/update a "National Holiday" schedule in one location
 43
 44    :param api: Webex api
 45    :type api: WebexSimpleApi
 46    :param location: location to work on
 47    :type location: Location
 48    :param holidays: list of holidays to observe
 49    :type holidays: List[Holiday]
 50    """
 51    # there should always only one thread messing with the holiday schedule of a location
 52    async with location_locks[location.location_id]:
 53        year = holidays[0].date.year
 54        schedule_name = 'National Holidays'
 55
 56        # shortcut
 57        ats = api.telephony.schedules
 58
 59        # existing "National Holiday" schedule or None
 60        schedule = next((schedule
 61                         for schedule in await ats.list(obj_id=location.location_id,
 62                                                        schedule_type=ScheduleType.holidays,
 63                                                        name=schedule_name)
 64                         if schedule.name == schedule_name),
 65                        None)
 66        if CLEAN_UP:
 67            if schedule:
 68                log.info(f'Delete schedule {schedule.name} in location {schedule.location_name}')
 69                await ats.delete_schedule(obj_id=location.location_id,
 70                                          schedule_type=ScheduleType.holidays,
 71                                          schedule_id=schedule.schedule_id)
 72            return
 73        if schedule:
 74            # we need the details: list response doesn't have events
 75            schedule = await ats.details(obj_id=location.location_id,
 76                                         schedule_type=ScheduleType.holidays,
 77                                         schedule_id=schedule.schedule_id)
 78        # create list of desired schedule entries
 79        #   * one per holiday
 80        #   * only future holidays
 81        #   * not on a Sunday
 82        today = date.today()
 83        events = [Event(name=f'{holiday.name} {holiday.date.year}',
 84                        start_date=holiday.date,
 85                        end_date=holiday.date,
 86                        all_day_enabled=True)
 87                  for holiday in holidays
 88                  if holiday.date >= today and holiday.date.weekday() != 6]
 89
 90        if not schedule:
 91            # create new schedule
 92            log.debug(f'observe_in_location({location.name}, {year}): no existing schedule')
 93            if not events:
 94                log.info(f'observe_in_location({location.name}, {year}): no existing schedule, no events, done')
 95                return
 96            schedule = Schedule(name=schedule_name,
 97                                schedule_type=ScheduleType.holidays,
 98                                events=events)
 99            log.debug(
100                f'observe_in_location({location.name}, {year}): creating schedule "{schedule_name}" with {len(events)} '
101                f'events')
102            schedule_id = await ats.create(obj_id=location.location_id, schedule=schedule)
103            log.info(f'observe_in_location({location.name}, {year}): new schedule id: {schedule_id}, done')
104            return
105
106        # update existing schedule
107        # delete existing events in the past
108        to_delete = [event
109                     for event in schedule.events
110                     if event.start_date < today]
111        if to_delete:
112            log.debug(f'observe_in_location({location.name}, {year}): deleting {len(to_delete)} outdated events')
113            if USE_TASKS:
114                await asyncio.gather(*[ats.event_delete(obj_id=location.location_id,
115                                                        schedule_type=ScheduleType.holidays,
116                                                        event_id=event.event_id)
117                                       for event in to_delete])
118            else:
119                for event in to_delete:
120                    await ats.event_delete(obj_id=location.location_id,
121                                           schedule_type=ScheduleType.holidays,
122                                           event_id=event.event_id)
123
124        # add events which don't exist yet
125        existing_dates = set(event.start_date
126                             for event in schedule.events)
127        to_add = [event
128                  for event in events
129                  if event.start_date not in existing_dates]
130        if not to_add:
131            log.info(f'observe_in_location({location.name}, {year}): no events to add, done.')
132            return
133        log.debug(f'observe_in_location({location.name}, {year}): creating {len(to_add)} new events.')
134        if USE_TASKS:
135            await asyncio.gather(*[ats.event_create(obj_id=location.location_id,
136                                                    schedule_type=ScheduleType.holidays,
137                                                    schedule_id=schedule.schedule_id,
138                                                    event=event)
139                                   for event in to_add])
140        else:
141            for event in to_add:
142                await ats.event_create(obj_id=location.location_id,
143                                       schedule_type=ScheduleType.holidays,
144                                       schedule_id=schedule.schedule_id,
145                                       event=event)
146        log.info(f'observe_in_location({location.name}, {year}): done.')
147    return
148
149
150async def observe_national_holidays(*, api: AsWebexSimpleApi, locations: List[Location],
151                                    year: int = None):
152    """
153    US national holidays for given locations
154
155    :param api: Webex api
156    :type api: WebexSimpleApi
157    :param locations: list of locations in which US national holidays should be observed
158    :type locations: List[Location]
159    :param year: year for national holidays. Default: current year
160    :type year: int
161    """
162    # default: this year
163    year = year or date.today().year
164
165    # get national holidays for specified year
166    loop = asyncio.get_running_loop()
167    # avoid sync all:
168    # holidays = CalendarifiyApi().holidays(country='US', year=year, holiday_type='national')
169    holidays = await loop.run_in_executor(None, functools.partial(CalendarifiyApi().holidays,
170                                                                  country='US', year=year, holiday_type='national'))
171
172    # update holiday schedule for each location
173    if USE_TASKS:
174        await asyncio.gather(*[observe_in_location(api=api, location=location, holidays=holidays)
175                               for location in locations])
176    else:
177        for location in locations:
178            await observe_in_location(api=api, location=location, holidays=holidays)
179    return
180
181
182if __name__ == '__main__':
183    # read dotenv which has some environment variables like Webex API token and Calendarify
184    # API key.
185    load_dotenv()
186
187    # enable logging
188    logging.basicConfig(level=logging.DEBUG,
189                        format='%(asctime)s %(levelname)s %(threadName)s %(name)s: %(message)s')
190    logging.getLogger('urllib3').setLevel(logging.INFO)
191    logging.getLogger('wxc_sdk.as_rest').setLevel(logging.INFO)
192
193    # the actual action
194    async def do_provision():
195
196        async with AsWebexSimpleApi(concurrent_requests=5) as wx_api:
197            # get all US locations
198            log.info('Getting locations...')
199            us_locations = [location
200                            for location in await wx_api.locations.list()
201                            if location.address.country == 'US']
202
203            # create national holiday schedule for given year(s) and locations
204            if USE_TASKS:
205                await asyncio.gather(*[observe_national_holidays(api=wx_api, year=year, locations=us_locations)
206                                       for year in range(FIRST_YEAR, LAST_YEAR + 1)])
207            else:
208                for year in range(FIRST_YEAR, LAST_YEAR + 1):
209                    await observe_national_holidays(api=wx_api, year=year, locations=us_locations)
210
211    asyncio.run(do_provision())

Persist tokens and obtain new tokens interactively

A typical problem specifically when creating CLI scripts is how to obtain valid access tokens for the API operations. If your code wants to invoke Webex REST APIs on behalf of a user then an integration is needed. The concepts of integrations are explained at the “Integrations” page on developer.cisco.com.

This example code shows how an OAUth Grant flow for an integration can be initiated from a script by using the Python webbrowser module and calling the open() method with the authorization URL of a given integration to open that URL in the system web browser. The user can then authenticate and grant access to the integration. In the last step of a successful authorization flow the web browser is redirected to the redirect_url of the integration.

The example code starts a primitive web server serving GET requests to http://localhost:6001/redirect. This URL has to be the redirect URL of the integration you create under My Webex Apps on developer.webex.com.

The sample script reads the integration parameters from environment variables (TOKEN_INTEGRATION_CLIENT_ID, TOKEN_INTEGRATION_CLIENT_SECRET, TOKEN_INTEGRATION_CLIENT_SCOPES). These variables can also be defined in get_tokens.env in the current directory:

# rename this to get_tokens.env and set values

# client ID for integration to be used.
TOKEN_INTEGRATION_CLIENT_ID=

# client secret of integration
TOKEN_INTEGRATION_CLIENT_SECRET=

# scopes to request. Use these scopes when creating the integration at developer.webex.com
# scopes can be in any form supported by wxc_sdk.scopes.parse_scopes()
TOKEN_INTEGRATION_CLIENT_SCOPES="spark:calls_write spark:kms spark:calls_read spark-admin:telephony_config_read spark:people_read"

The sample code persists the tokens in get_tokens.yml in the current directory. On startup the sample code tries to read tokens from that file. If needed a new access token is obtained using the refresh token.

An OAuth flow is only initiated if no (valid) tokens could be read from get_tokens.yml

Source: get_tokens.py

  1#!/usr/bin/env python
  2"""
  3Example script
  4read tokens from file or interactively obtain token by starting a local web server and open the authorization URL in
  5the local web browser
  6"""
  7import json
  8import logging
  9import os
 10from typing import Optional
 11
 12from dotenv import load_dotenv
 13from yaml import safe_load, safe_dump
 14
 15from wxc_sdk import WebexSimpleApi
 16from wxc_sdk.integration import Integration
 17from wxc_sdk.scopes import parse_scopes
 18from wxc_sdk.tokens import Tokens
 19
 20log = logging.getLogger(__name__)
 21
 22
 23def env_path() -> str:
 24    """
 25    determine path for .env to load environment variables from
 26
 27    :return: .env file path
 28    """
 29    return os.path.join(os.getcwd(), f'{os.path.splitext(os.path.basename(__file__))[0]}.env')
 30
 31
 32def yml_path() -> str:
 33    """
 34    determine path of YML file to persist tokens
 35
 36    :return: path to YML file
 37    :rtype: str
 38    """
 39    return os.path.join(os.getcwd(), f'{os.path.splitext(os.path.basename(__file__))[0]}.yml')
 40
 41
 42def build_integration() -> Integration:
 43    """
 44    read integration parameters from environment variables and create an integration
 45
 46    :return: :class:`wxc_sdk.integration.Integration` instance
 47    """
 48    client_id = os.getenv('TOKEN_INTEGRATION_CLIENT_ID')
 49    client_secret = os.getenv('TOKEN_INTEGRATION_CLIENT_SECRET')
 50    scopes = parse_scopes(os.getenv('TOKEN_INTEGRATION_CLIENT_SCOPES'))
 51    redirect_url = 'http://localhost:6001/redirect'
 52    if not all((client_id, client_secret, scopes)):
 53        raise ValueError('failed to get integration parameters from environment')
 54    return Integration(client_id=client_id, client_secret=client_secret, scopes=scopes,
 55                       redirect_url=redirect_url)
 56
 57
 58def get_tokens() -> Optional[Tokens]:
 59    """
 60
 61    Tokens are read from a YML file. If needed an OAuth flow is initiated.
 62
 63    :return: tokens
 64    :rtype: :class:`wxc_sdk.tokens.Tokens`
 65    """
 66
 67    def write_tokens(tokens_to_cache: Tokens):
 68        with open(yml_path(), mode='w') as f:
 69            safe_dump(json.loads(tokens_to_cache.json()), f)
 70        return
 71
 72    def read_tokens() -> Optional[Tokens]:
 73        try:
 74            with open(yml_path(), mode='r') as f:
 75                data = safe_load(f)
 76                tokens_read = Tokens.parse_obj(data)
 77        except Exception as e:
 78            log.info(f'failed to read tokens from file: {e}')
 79            tokens_read = None
 80        return tokens_read
 81
 82    integration = build_integration()
 83    tokens = integration.get_cached_tokens(read_from_cache=read_tokens,
 84                                           write_to_cache=write_tokens)
 85    return tokens
 86
 87
 88logging.basicConfig(level=logging.DEBUG)
 89
 90# load environment variables from .env
 91path = env_path()
 92log.info(f'reading {path}')
 93load_dotenv(env_path())
 94
 95tokens = get_tokens()
 96
 97# use the tokens to get identity of authenticated user
 98api = WebexSimpleApi(tokens=tokens)
 99me = api.people.me()
100print(f'authenticated as {me.display_name} ({me.emails[0]}) ')

Read/update call intercept settings of a user

usage: call_intercept.py [-h] [–token TOKEN] user_email [{on,off}]

positional arguments:
user_email email address of user
{on,off} operation to apply

options:
-h, –help show this help message and exit
–token TOKEN admin access token to use

The script uses the access token passed via the CLI, reads one from the WEBEX_ACCESS_TOKEN environment variable or obtains tokens via an OAuth flow. For the last option the integration parameters are read from environment variables which can be set in a .env file

Source: call_intercept.py

  1#!/usr/bin/env python
  2"""
  3Script to read/update call intercept settings of a calling user.
  4
  5The script uses the access token passed via the CLI, reads one from the WEBEX_ACCESS_TOKEN environment variable or
  6obtains tokens via an OAuth flow.
  7
  8    usage: call_intercept.py [-h] [--token TOKEN] user_email [{on,off}]
  9
 10    positional arguments:
 11      user_email     email address of user
 12      {on,off}       operation to apply
 13
 14    options:
 15      -h, --help     show this help message and exit
 16      --token TOKEN  admin access token to use
 17"""
 18import argparse
 19import logging
 20import os
 21import re
 22import sys
 23from json import loads
 24from typing import Optional
 25
 26from dotenv import load_dotenv
 27from wxc_sdk import WebexSimpleApi
 28from wxc_sdk.integration import Integration
 29from wxc_sdk.person_settings.call_intercept import InterceptSetting
 30from wxc_sdk.rest import RestError
 31from wxc_sdk.scopes import parse_scopes
 32from wxc_sdk.tokens import Tokens
 33from yaml import safe_dump, safe_load
 34
 35log = logging.getLogger(__name__)
 36
 37
 38def env_path() -> str:
 39    """
 40    determine path for .env to load environment variables from
 41
 42    :return: .env file path
 43    """
 44    return os.path.join(os.getcwd(), f'{os.path.splitext(os.path.basename(__file__))[0]}.env')
 45
 46
 47def yml_path() -> str:
 48    """
 49    determine path of YML file to persist tokens
 50
 51    :return: path to YML file
 52    :rtype: str
 53    """
 54    return os.path.join(os.getcwd(), f'{os.path.splitext(os.path.basename(__file__))[0]}.yml')
 55
 56
 57def build_integration() -> Integration:
 58    """
 59    read integration parameters from environment variables and create an integration
 60
 61    :return: :class:`wxc_sdk.integration.Integration` instance
 62    """
 63    client_id = os.getenv('TOKEN_INTEGRATION_CLIENT_ID')
 64    client_secret = os.getenv('TOKEN_INTEGRATION_CLIENT_SECRET')
 65    scopes = os.getenv('TOKEN_INTEGRATION_CLIENT_SCOPES')
 66    if scopes:
 67        scopes = parse_scopes(scopes)
 68    if not all((client_id, client_secret, scopes)):
 69        raise ValueError('failed to get integration parameters from environment')
 70    redirect_url = 'http://localhost:6001/redirect'
 71    return Integration(client_id=client_id, client_secret=client_secret, scopes=scopes,
 72                       redirect_url=redirect_url)
 73
 74
 75def get_tokens() -> Optional[Tokens]:
 76    """
 77
 78    Tokens are read from a YML file. If needed an OAuth flow is initiated.
 79
 80    :return: tokens
 81    :rtype: :class:`wxc_sdk.tokens.Tokens`
 82    """
 83
 84    def write_tokens(tokens_to_cache: Tokens):
 85        with open(yml_path(), mode='w') as f:
 86            safe_dump(loads(tokens_to_cache.json()), f)
 87        return
 88
 89    def read_tokens() -> Optional[Tokens]:
 90        try:
 91            with open(yml_path(), mode='r') as f:
 92                data = safe_load(f)
 93                tokens_read = Tokens.parse_obj(data)
 94        except Exception as e:
 95            log.info(f'failed to read tokens from file: {e}')
 96            tokens_read = None
 97        return tokens_read
 98
 99    integration = build_integration()
100    tokens = integration.get_cached_tokens(read_from_cache=read_tokens,
101                                           write_to_cache=write_tokens)
102    return tokens
103
104
105RE_EMAIL = re.compile(r"^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$")
106
107
108def email_type(value):
109    if not RE_EMAIL.match(value):
110        raise argparse.ArgumentTypeError(f"'{value}' is not a valid email")
111    return value
112
113
114def main():
115    """
116    where the magic happens
117    """
118    # read .env file with the settings for the integration to be used to obtain tokens
119    load_dotenv(env_path())
120
121    parser = argparse.ArgumentParser()
122    parser.add_argument('user_email', type=email_type, help='email address of user')
123    parser.add_argument('on_off', choices=['on', 'off'], nargs='?', help='operation to apply')
124    parser.add_argument('--token', type=str, required=False, help='admin access token to use')
125    args = parser.parse_args()
126
127    if args.token:
128        tokens = args.token
129    elif (tokens := os.getenv('WEBEX_ACCESS_TOKEN')) is None:
130        tokens = get_tokens()
131
132    if not tokens:
133        print('Failed to get tokens', file=sys.stderr)
134        exit(1)
135
136    # set level to DEBUG to see debug of REST requests
137    logging.basicConfig(level=(gt := getattr(sys, 'gettrace', None)) and gt() and logging.DEBUG or logging.INFO)
138
139    with WebexSimpleApi(tokens=tokens) as api:
140        # get user
141        email = args.user_email.lower()
142        user = next((user
143                     for user in api.people.list(email=email)
144                     if user.emails[0] == email), None)
145        if user is None:
146            print(f'User "{email}" not found', file=sys.stderr)
147            exit(1)
148
149        # display call intercept status
150        try:
151            intercept = api.person_settings.call_intercept.read(person_id=user.person_id)
152        except RestError as e:
153            print(f'Failed to read call intercept settings: {e.response.status_code}, {e.description}')
154            exit(1)
155
156        print('on' if intercept.enabled else 'off')
157        if args.on_off:
158            # action: turn on/off
159            intercept = InterceptSetting.default()
160            intercept.enabled = args.on_off == 'on'
161            try:
162                api.person_settings.call_intercept.configure(person_id=user.person_id,
163                                                             intercept=intercept)
164            except RestError as e:
165                print(f'Failed to update call intercept settings: {e.response.status_code}, {e.description}')
166                exit(1)
167
168            # read call intercept again
169            try:
170                intercept = api.person_settings.call_intercept.read(person_id=user.person_id)
171            except RestError as e:
172                print(f'Failed to read call intercept settings: {e.response.status_code}, {e.description}')
173                exit(1)
174
175            # display state after update
176            print(f"set to {'on' if intercept.enabled else 'off'}")
177
178    exit(0)
179
180
181if __name__ == '__main__':
182    main()