Source code for cirq.google.programs

# Copyright 2018 The Cirq Developers
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
from typing import (
    Any, cast, Dict, Iterable, Sequence, Tuple, TYPE_CHECKING, Union
)
import numpy as np
import sympy

from cirq import devices, ops, protocols
from cirq.schedules import Schedule, ScheduledOperation
from cirq.value import Timestamp

if TYPE_CHECKING:
    # pylint: disable=unused-import
    from typing import Optional
    from cirq.google import xmon_device


[docs]def gate_to_proto_dict(gate: ops.Gate, qubits: Tuple[ops.Qid, ...]) -> Dict: if isinstance(gate, ops.MeasurementGate): return _measure_to_proto_dict(gate, qubits) if isinstance(gate, ops.XPowGate): if len(qubits) != 1: # coverage: ignore raise ValueError('Wrong number of qubits.') return _x_to_proto_dict(gate, qubits[0]) if isinstance(gate, ops.YPowGate): if len(qubits) != 1: # coverage: ignore raise ValueError('Wrong number of qubits.') return _y_to_proto_dict(gate, qubits[0]) if isinstance(gate, ops.PhasedXPowGate): if len(qubits) != 1: # coverage: ignore raise ValueError('Wrong number of qubits.') return _phased_x_to_proto_dict(gate, qubits[0]) if isinstance(gate, ops.ZPowGate): if len(qubits) != 1: # coverage: ignore raise ValueError('Wrong number of qubits.') return _z_to_proto_dict(gate, qubits[0]) if isinstance(gate, ops.CZPowGate): if len(qubits) != 2: # coverage: ignore raise ValueError('Wrong number of qubits.') return _cz_to_proto_dict(gate, *qubits) raise ValueError("Don't know how to serialize this gate: {!r}".format(gate))
def _x_to_proto_dict(gate: ops.XPowGate, q: ops.Qid) -> Dict: exp_w = { 'target': cast(devices.GridQubit, q).to_proto_dict(), 'axis_half_turns': _parameterized_value_to_proto_dict(0), 'half_turns': _parameterized_value_to_proto_dict( gate.exponent) } return {'exp_w': exp_w} def _y_to_proto_dict(gate: ops.YPowGate, q: ops.Qid) -> Dict: exp_w = { 'target': cast(devices.GridQubit, q).to_proto_dict(), 'axis_half_turns': _parameterized_value_to_proto_dict(0.5), 'half_turns': _parameterized_value_to_proto_dict( gate.exponent) } return {'exp_w': exp_w} def _phased_x_to_proto_dict(gate: ops.PhasedXPowGate, q: ops.Qid) -> Dict: exp_w = { 'target': cast(devices.GridQubit, q).to_proto_dict(), 'axis_half_turns': _parameterized_value_to_proto_dict( gate.phase_exponent), 'half_turns': _parameterized_value_to_proto_dict( gate.exponent) } return {'exp_w': exp_w} def _z_to_proto_dict(gate: ops.ZPowGate, q: ops.Qid) -> Dict: exp_z = { 'target': cast(devices.GridQubit, q).to_proto_dict(), 'half_turns': _parameterized_value_to_proto_dict( gate.exponent), } return {'exp_z': exp_z} def _cz_to_proto_dict(gate: ops.CZPowGate, p: ops.Qid, q: ops.Qid) -> Dict: exp_11 = { 'target1': cast(devices.GridQubit, p).to_proto_dict(), 'target2': cast(devices.GridQubit, q).to_proto_dict(), 'half_turns': _parameterized_value_to_proto_dict( gate.exponent) } return {'exp_11': exp_11} def _measure_to_proto_dict(gate: ops.MeasurementGate, qubits: Sequence[ops.Qid]): if len(qubits) == 0: raise ValueError('Measurement gate on no qubits.') invert_mask = None if gate.invert_mask: invert_mask = gate.invert_mask + (False,) * ( gate.num_qubits() - len(gate.invert_mask)) if invert_mask and len(invert_mask) != len(qubits): raise ValueError('Measurement gate had invert mask of length ' 'different than number of qubits it acts on.') measurement = { 'targets': [cast(devices.GridQubit, q).to_proto_dict() for q in qubits], 'key': protocols.measurement_key(gate), } if invert_mask: measurement['invert_mask'] = [json.dumps(x) for x in invert_mask] return {'measurement': measurement}
[docs]def schedule_to_proto_dicts(schedule: Schedule) -> Iterable[Dict]: """Convert a schedule into an iterable of proto dictionaries. Args: schedule: The schedule to convert to a proto dict. Must contain only gates that can be cast to xmon gates. Yields: A proto dictionary corresponding to an Operation proto. """ last_time_picos = None # type: Optional[int] for so in schedule.scheduled_operations: op = gate_to_proto_dict( cast(ops.GateOperation, so.operation).gate, so.operation.qubits) time_picos = so.time.raw_picos() if last_time_picos is None: op['incremental_delay_picoseconds'] = time_picos else: op['incremental_delay_picoseconds'] = time_picos - last_time_picos last_time_picos = time_picos yield op
[docs]def schedule_from_proto_dicts( device: 'xmon_device.XmonDevice', ops: Iterable[Dict], ) -> Schedule: """Convert proto dictionaries into a Schedule for the given device.""" scheduled_ops = [] last_time_picos = 0 for op in ops: delay_picos = 0 if 'incremental_delay_picoseconds' in op: delay_picos = op['incremental_delay_picoseconds'] time_picos = last_time_picos + delay_picos last_time_picos = time_picos xmon_op = xmon_op_from_proto_dict(op) scheduled_ops.append(ScheduledOperation.op_at_on( operation=xmon_op, time=Timestamp(picos=time_picos), device=device, )) return Schedule(device, scheduled_ops)
[docs]def pack_results(measurements: Sequence[Tuple[str, np.ndarray]]) -> bytes: """Pack measurement results into a byte string. Args: measurements: A sequence of tuples, one for each measurement, consisting of a string key and an array of boolean data. The data should be a 2-D array indexed by (repetition, qubit_index). All data for all measurements must have the same number of repetitions. Returns: Packed bytes, as described in the unpack_results docstring below. Raises: ValueError if the measurement data do not have the compatible shapes. """ if not measurements: return b'' shapes = [(key, np.shape(data)) for key, data in measurements] if not all(len(shape) == 2 for _, shape in shapes): raise ValueError("Expected 2-D data: shapes={}".format(shapes)) reps = shapes[0][1][0] if not all(shape[0] == reps for _, shape in shapes): raise ValueError( "Expected same reps for all keys: shapes={}".format(shapes)) bits = np.hstack([np.asarray(data, dtype=bool) for _, data in measurements]) bits = bits.reshape(-1) # Pad length to multiple of 8 if needed. remainder = len(bits) % 8 if remainder: bits = np.pad(bits, (0, 8 - remainder), 'constant') # Pack in little-endian bit order. bits = bits.reshape((-1, 8))[:, ::-1] byte_arr = np.packbits(bits, axis=1).reshape(-1) return byte_arr.tobytes()
[docs]def unpack_results( data: bytes, repetitions: int, key_sizes: Sequence[Tuple[str, int]] ) -> Dict[str, np.ndarray]: """Unpack data from a bitstring into individual measurement results. Args: data: Packed measurement results, in the form <rep0><rep1>... where each repetition is <key0_0>..<key0_{size0-1}><key1_0>... with bits packed in little-endian order in each byte. repetitions: number of repetitions. key_sizes: Keys and sizes of the measurements in the data. Returns: Dict mapping measurement key to a 2D array of boolean results. Each array has shape (repetitions, size) with size for that measurement. """ bits_per_rep = sum(size for _, size in key_sizes) total_bits = repetitions * bits_per_rep byte_arr = np.frombuffer(data, dtype='uint8').reshape((len(data), 1)) bits = np.unpackbits(byte_arr, axis=1)[:, ::-1].reshape(-1).astype(bool) bits = bits[:total_bits].reshape((repetitions, bits_per_rep)) results = {} ofs = 0 for key, size in key_sizes: results[key] = bits[:, ofs:ofs + size] ofs += size return results
[docs]def is_native_xmon_op(op: ops.Operation) -> bool: """Check if the gate corresponding to an operation is a native xmon gate. Args: op: Input operation. Returns: True if the operation is native to the xmon, false otherwise. """ return (isinstance(op, ops.GateOperation) and is_native_xmon_gate(op.gate))
def is_native_xmon_gate(gate: ops.Gate) -> bool: """Check if a gate is a native xmon gate. Args: gate: Input gate. Returns: True if the gate is native to the xmon, false otherwise. """ return isinstance(gate, (ops.CZPowGate, ops.MeasurementGate, ops.PhasedXPowGate, ops.XPowGate, ops.YPowGate, ops.ZPowGate))
[docs]def xmon_op_from_proto_dict(proto_dict: Dict) -> ops.Operation: """Convert the proto dictionary to the corresponding operation. See protos in api/google/v1 for specification of the protos. Args: proto_dict: Dictionary representing the proto. Keys are always strings, but values may be types correspond to a raw proto type or another dictionary (for messages). Returns: The operation. Raises: ValueError if the dictionary does not contain required values corresponding to the proto. """ def raise_missing_fields(gate_name: str): raise ValueError( '{} missing required fields: {}'.format(gate_name, proto_dict)) param = _parameterized_value_from_proto_dict qubit = devices.GridQubit.from_proto_dict if 'exp_w' in proto_dict: exp_w = proto_dict['exp_w'] if ('half_turns' not in exp_w or 'axis_half_turns' not in exp_w or 'target' not in exp_w): raise_missing_fields('ExpW') return ops.PhasedXPowGate( exponent=param(exp_w['half_turns']), phase_exponent=param(exp_w['axis_half_turns']), ).on(qubit(exp_w['target'])) elif 'exp_z' in proto_dict: exp_z = proto_dict['exp_z'] if 'half_turns' not in exp_z or 'target' not in exp_z: raise_missing_fields('ExpZ') return ops.Z(qubit(exp_z['target']))**param(exp_z['half_turns']) elif 'exp_11' in proto_dict: exp_11 = proto_dict['exp_11'] if ('half_turns' not in exp_11 or 'target1' not in exp_11 or 'target2' not in exp_11): raise_missing_fields('Exp11') return ops.CZ(qubit(exp_11['target1']), qubit(exp_11['target2']))**param(exp_11['half_turns']) elif 'measurement' in proto_dict: meas = proto_dict['measurement'] invert_mask = cast(Tuple[Any, ...], ()) if 'invert_mask' in meas: invert_mask = tuple(json.loads(x) for x in meas['invert_mask']) if 'key' not in meas or 'targets' not in meas: raise_missing_fields('Measurement') return ops.MeasurementGate( num_qubits=len(meas['targets']), key=meas['key'], invert_mask=invert_mask ).on(*[qubit(q) for q in meas['targets']]) else: raise ValueError('invalid operation: {}'.format(proto_dict))
def _parameterized_value_from_proto_dict(message: Dict ) -> Union[sympy.Basic, float]: parameter_key = message.get('parameter_key', None) if parameter_key: return sympy.Symbol(parameter_key) if 'raw' in message: return message['raw'] raise ValueError('No value specified for parameterized float. ' 'Expected "raw" or "parameter_key" to be set. ' 'message: {!r}'.format(message)) def _parameterized_value_to_proto_dict(param: Union[sympy.Basic, float] ) -> Dict: out = {} # type: Dict if isinstance(param, sympy.Symbol): out['parameter_key'] = str(param.free_symbols.pop()) else: out['raw'] = float(param) return out